import {
  switchMap, filter, map, finalize,
} from 'rxjs/operators';
import { shareLatest } from '@react-rxjs/core';
import { defer, EMPTY } from 'rxjs';
import { mergeWithKey } from '@react-rxjs/utils';
import {
  SelectedMarketRequest,
  selectedMarketSubject$,
} from './selectedMarketService';
import { ProductValue } from '../ProductValue';
import {
  ConnectionStatus,
  connectionStatusState$,
} from '../websocket/connectionStatus';
import { send, subscribeToMessageStream$ } from '../websocket/transport';
import { correlationId } from '../helperFunctions/correlationId';

interface MarketDataAckMessage {
  type: 'INFO_MESSAGE';
  information: string;
}

export interface MarketDataSubscriptionAck {
  product: ProductValue;
}

const subscribeToMarket$ = (request: SelectedMarketRequest) => defer(() => {
  send({
    correlation: request.correlation,
    type: 'MarketDataSubscribe',
    symbol: request.product.symbol,
  });

  return subscribeToMessageStream$('INFO_MESSAGE').pipe(
    filter(
      (msg: any): msg is MarketDataAckMessage => msg.information
          === `Subscribed to market data for ${request.product.symbol}.`,
    ),
    map(
      (): MarketDataSubscriptionAck => ({
        product: request.product,
      }),
    ),
    finalize(() => {
      send({
        correlation: correlationId(),
        type: 'MarketDataUnsubscribe',
        symbol: request.product.symbol,
      });
    }),
  );
});

export const subscribedMarketData$ = mergeWithKey({
  newSubscribe: selectedMarketSubject$,
  socketClosed: connectionStatusState$.pipe(
    filter((status) => status === ConnectionStatus.CLOSED),
  ),
}).pipe(
  switchMap((event) => (event.type === 'socketClosed' ? EMPTY : subscribeToMarket$(event.payload))),
  shareLatest(),
);
