import { shareLatest } from '@react-rxjs/core';
import { mergeWithKey } from '@react-rxjs/utils';
import {
  EMPTY,
  merge,
  Observable,
  Subject,
} from 'rxjs';
import {
  catchError,
  filter,
  map,
  scan,
  skip,
  startWith,
  switchMap,
  tap,
  withLatestFrom,
} from 'rxjs/operators';

import { algoWebsocketConnectionNumber$ } from '../../../../shared/algoWebsocket/algoConnectionStatus';
import { getFetchClearing$ } from '../../../../shared/services/fetchService';
import { getToken } from '../../../../shared/websocket/token';
import { futureProductCodeMap$, securityListState$ } from '../../../../shared/services/securitiesService';
import { accountMap$, accountState$ } from '../../../../shared/services/accountsService';
import { FUT } from '../../../orderEntry/services/trade/sendEntry';
import { algoExecutionReportsWithAccountNumber$ } from '../algoOrders/algoOrdersService';
import { ExecutionReport } from '../openOrders/openOrdersReducer';
import { executionReportWithDateAndAccountNumber$ } from '../openOrders/openOrdersService';
import {
  convertPositionToPositionRow,
  HistoricalPositionResponse,
  initialPositionMap,
  PositionResponse,
  positionsReducer,
} from './positionsReducer';

const token$ = new Subject<string>();

export const getPositions = () => token$.next(getToken());

const isHistoricalPositions = (
  input: any,
): input is PositionResponse[] => {
  if (input.length > 0) return input[0].account_id !== undefined;
  return true;
};

export const positionCall$: Observable<HistoricalPositionResponse> = token$.pipe(
  switchMap(() => getFetchClearing$({
    restCallName: 'positions',
  })),
  map((fetchResponse) => {
    if (fetchResponse.error || !isHistoricalPositions(fetchResponse.response)) {
      return {
        response: [],
        error: true,
      } as HistoricalPositionResponse;
    }
    return {
      response: fetchResponse.response,
      error: fetchResponse.error,
    };
  }),
  shareLatest(),
);

export const positionState$: Observable<HistoricalPositionResponse> = positionCall$.pipe(
  filter((position) => position.error !== true),
  catchError(() => EMPTY),
  shareLatest(),
);

export const positionError$: Observable<HistoricalPositionResponse> = positionCall$.pipe(
  filter((position) => position.error === true),
  catchError(() => EMPTY),
  shareLatest(),
);

positionState$.subscribe();

//* * */
export const historicalPositionState$ = positionState$.pipe(
  withLatestFrom(accountMap$),
  map(([response, accountMap]) => initialPositionMap.withMutations(
    (m) => response && response.response.forEach((account) => account.positions.forEach(
      (position) => {
        const row = convertPositionToPositionRow(position,
          accountMap.get(account.account_id));

        m.set(row.account + row.contract, row);
      },
    )),
  )),
  catchError(() => EMPTY),
);

export const allPositionsState$ = positionState$.pipe(
  map((response) => (Array.isArray(response) ? response
    .map((account) => account.positions).flat()
    .map((position) => position.positions).flat()
    .map((position) => position.position_id) : [])),
  catchError(() => EMPTY),
);

historicalPositionState$.subscribe();

export const getPositionsWhenAccounts$ = accountState$.pipe(
  filter((accounts) => Boolean(accounts.accounts.length)),
  tap(() => {
    getPositions();
  }),
);

export const getPositionsOnAlgoReconnect$ = algoWebsocketConnectionNumber$.pipe(
  skip(1),
  tap(() => {
    getPositions();
  }),
);

const newPositionUpdate$ = merge(
  executionReportWithDateAndAccountNumber$,
  algoExecutionReportsWithAccountNumber$,
).pipe(
  withLatestFrom(securityListState$),
  filter(([executionReport, selectedMarket]) => selectedMarket
    .securities.find((sec) => sec.symbol === executionReport.symbol)?.
    securityType === FUT),
  map(([executionReport]) => executionReport),
  withLatestFrom(allPositionsState$),
  filter(([executionReport, allPositions]) => (Array.isArray(allPositions) ? !allPositions
    .includes(executionReport.execID) : true)),
  map(([executionReport]) => executionReport),
);

const newPositionUpdateWithProductCode$ = newPositionUpdate$.pipe(
  withLatestFrom(futureProductCodeMap$),
  map(([executionReport, futureProductCodeMap]) => (
    {
      ...executionReport,
      productCode: futureProductCodeMap.get(executionReport.symbol) || executionReport.symbol,
    })),
) as Observable<ExecutionReport>;

/** */
export const newPositionState$ = mergeWithKey({
  newPositionUpdate: newPositionUpdateWithProductCode$,
  // we loop in positionState here because we reset the new position state
  // whenever we do another historical position fetch
  newHistoricalPositions: positionState$,
}).pipe(
  scan(positionsReducer, initialPositionMap),
  startWith(initialPositionMap),
);
