import { shareLatest } from '@react-rxjs/core';
import {
  merge,
  Observable,
  Subject,
} from 'rxjs';
import {
  filter,
  map,
  scan,
  skip,
  startWith,
  switchMap,
  tap,
  withLatestFrom,
} from 'rxjs/operators';

import { algoWebsocketConnectionNumber$ } from '../../../../shared/algoWebsocket/algoConnectionStatus';
import { getFetchClearing$ } from '../../../../shared/services/fetchService';
import { getToken } from '../../../../shared/websocket/token';
import { accountMap$, accountState$ } from '../../../../shared/services/accountsService';
import { algoExecutionReportsWithAccountNumber$ } from '../algoOrders/algoOrdersService';
import { executionReportWithDateAndAccountNumber$ } from '../openOrders/openOrdersService';
import {
  convertTradeToTradeRow,
  fillsOrderReducer,
  HistoricalTradeResponse,
  initialTradeMap,
  NewTradeResponse,
} from './fillsReducer';
import { recentBlockTrade$ } from '../blockTrades/blockTradesService';

const token$ = new Subject<string>();

export const getTrades = () => token$.next(getToken());

const isHistoricalTrades = (
  input: any,
): input is HistoricalTradeResponse => input.trades !== undefined;

export const tradeCall$: Observable<NewTradeResponse> = token$.pipe(
  switchMap(() => getFetchClearing$({
    restCallName: 'trades',
    params: new URLSearchParams({ limit: '25' }),
  })),
  map((fetchResponse) => ((!isHistoricalTrades(fetchResponse.response)
    ? { response: {}, error: true } as NewTradeResponse
    : {
      response: fetchResponse.response,
      error: fetchResponse.error,
    }))),
  shareLatest(),
);

export const tradeState$: Observable<HistoricalTradeResponse> = tradeCall$.pipe(
  filter((trade) => trade.error !== true),
  map((data) => data.response),
  shareLatest(),
);

export const tradeError$ = tradeCall$.pipe(
  filter((trade) => trade.error === true),
  shareLatest(),
);

tradeState$.subscribe();
tradeError$.subscribe();

export const historicalTradeState$ = tradeState$.pipe(
  withLatestFrom(accountMap$),
  map(([response, accountMap]) => initialTradeMap.withMutations(
    (m) => response.trades
      .forEach((trade) => m
        .set(trade.trade_id, convertTradeToTradeRow({
          ...trade,
          accountName: accountMap
            .get(trade.account_id) || trade.account_id,
        }))),
  )),
);

historicalTradeState$.subscribe();

export const getTradesWhenAccounts$ = accountState$.pipe(
  filter((accounts) => Boolean(accounts.accounts.length)),
  tap(() => {
    getTrades();
  }),
);

export const getTradesOnAlgoReconnect$ = algoWebsocketConnectionNumber$.pipe(
  skip(1),
  tap(() => {
    getTrades();
  }),
);

export const fillState$ = merge(
  executionReportWithDateAndAccountNumber$,
  algoExecutionReportsWithAccountNumber$,
).pipe(
  scan(fillsOrderReducer, initialTradeMap),
  startWith(initialTradeMap),
);

export const newTradeState$ = merge(
  executionReportWithDateAndAccountNumber$,
  algoExecutionReportsWithAccountNumber$,
).pipe(
  filter((executionReport) => executionReport.execType === 'TRADE'),
  shareLatest(),
);

recentBlockTrade$.pipe(
  tap(() => getTrades()),
).subscribe();
