import { shareLatest } from '@react-rxjs/core';
import { defer, EMPTY, Observable } from 'rxjs';
import {
  retryWhen,
  tap,
  delay,
  takeUntil,
  filter,
  repeatWhen,
  mergeMap,
  take,
  scan,
  exhaustMap,
} from 'rxjs/operators';
import { split } from '@react-rxjs/utils';
import { logger } from '../logger';
import {
  ConnectionStatus,
  connectionStatusState$,
} from '../websocket/connectionStatus';
import { setConnectionStatus as setAlgoConnectionStatus } from './algoConnectionStatus';
import { algoWebSocketSubject } from './getAlgoWebSocketSubject';
import { logMessages } from '../services/logMessages';
import { MessageMap } from '../websocket/types';

const messageStreams$: Observable<MessageMap> = defer(() => algoWebSocketSubject.pipe(
  retryWhen((errors) => errors.pipe(
    tap((err) => {
      logger.error('Algo Web Socket connection error', err);
    }),
    delay(5000),
    tap(() => {
      setAlgoConnectionStatus(ConnectionStatus.CONNECTING);
    }),
  )),
  takeUntil(
    connectionStatusState$.pipe(
      filter((status) => status === ConnectionStatus.AUTHENTICATED),
      take(1),
      mergeMap(() => connectionStatusState$.pipe(
        filter((status) => status !== ConnectionStatus.AUTHENTICATED),
      )),
    ),
  ),
  repeatWhen(() => connectionStatusState$.pipe(
    filter((status) => status === ConnectionStatus.AUTHENTICATED),
  )),
  logMessages('new algo message'),
  split((msg) => msg.type),
  scan((acc, message$) => ({ ...acc, [message$.key]: message$ }), {}),
  shareLatest(),
));

export const subscribeToAlgoMessageStream$ = (type: string) => messageStreams$.pipe(
  exhaustMap((messageMap) => messageMap[type] || EMPTY),
);

export const algoSend = (msg: any) => {
  logger.debug(`sending algo msg - ${JSON.stringify(msg)}`);
  algoWebSocketSubject.next(msg);
};
