import { shareLatest } from '@react-rxjs/core';
import { mergeWithKey } from '@react-rxjs/utils';
import { defer, Observable, timer } from 'rxjs';
import {
  filter,
  map,
  scan,
  share,
  startWith,
  switchMap,
  withLatestFrom,
} from 'rxjs/operators';

import { checkIfDateIsWithin24Hours } from '../../shared/helperFunctions/dateFunctions';
import {
  selectedMarketRequestCorrelation$,
  selectedMarketState$,
} from '../../shared/services/selectedMarketService';
import { subscribeToMessageStream$ } from '../../shared/websocket/transport';
import {
  marketDataToTrades,
  MarketDataTradeMessage,
  getPaginatedRestTradeHistory$,
} from './tradeHistoryServiceHelpers';
import { TradeHistoryData } from './types';

const VOLUME_REFRESH_TIMER = 30000;

type TradeHistoryEvent =
  | {
    type: 'subscriptionData';
    payload: TradeHistoryData;
  }
  | {
    type: 'restData';
    payload: TradeHistoryData;
  }
  | {
    type: 'volumeRefresh';
    payload: number;
  };

export const all24HourVolumeTradesReducer = (
  allTrades: TradeHistoryData,
  event: TradeHistoryEvent,
): TradeHistoryData => {
  // on the volume refresh interval, refilter all trades to remove any
  // that are now outside 24h
  if (event.type === 'volumeRefresh') {
    return {
      ...allTrades,
      trades: allTrades.trades.filter((trade) => checkIfDateIsWithin24Hours(trade.time)),
    };
  }
  const newTrades = event.payload;
  return {
    trades: [...allTrades.trades, ...newTrades.trades]
      .filter((trade) => checkIfDateIsWithin24Hours(trade.time))
      .sort((a, b) => {
        if (a.time.getTime() === b.time.getTime()) {
          if (a.id === b.id) {
            return 0;
          }
          if (a.id < b.id) {
            return 1;
          }
          return -1;
        }
        if (a.time.getTime() < b.time.getTime()) {
          return 1;
        }
        return -1;
      }),
    error: allTrades.error || newTrades.error,
    symbol: allTrades.symbol || newTrades.symbol,
  };
};

export const emptyTradeHistory = {
  trades: [],
  error: false,
  symbol: '',
} as TradeHistoryData;

const getSelectedMarketTradeMessages$ = defer(() => subscribeToMessageStream$('MarketDataIncrementalRefreshTrade').pipe(
  withLatestFrom(selectedMarketRequestCorrelation$),
  filter(([msg, correlation]) => msg.correlation === correlation),
  map(([msg]) => msg),
)) as Observable<MarketDataTradeMessage>;

export const getTradeHistorySubscription$ = () => defer(() => getSelectedMarketTradeMessages$.pipe(
  map((tradeMessage) => marketDataToTrades(tradeMessage)),
  shareLatest(),
));

export const volumeRefreshTimer$ = timer(
  VOLUME_REFRESH_TIMER,
  VOLUME_REFRESH_TIMER,
).pipe(share());

export const mergeTradeHistory$ = (symbol: string) => mergeWithKey({
  subscriptionData: getTradeHistorySubscription$(),
  restData: getPaginatedRestTradeHistory$(symbol),
  volumeRefresh: volumeRefreshTimer$,
}).pipe(
  scan(all24HourVolumeTradesReducer, emptyTradeHistory),
  startWith(emptyTradeHistory),
);

export const all24HourTrades$ = selectedMarketState$.pipe(
  switchMap((product) => mergeTradeHistory$(product.symbol)),
  shareLatest(),
);
