import {
  Observable, GroupedObservable, Subject, EMPTY, defer,
} from 'rxjs';
import {
  scan,
  map,
  takeUntil,
  filter,
  exhaustMap,
  finalize,
  pluck,
  distinctUntilChanged,
} from 'rxjs/operators';
import { shareLatest, bind } from '@react-rxjs/core';
import { split } from '@react-rxjs/utils';
import { ProductValue } from '../../../shared/ProductValue';
import {
  subscribeToMessageStream$,
  send,
} from '../../../shared/websocket/transport';
import { correlationId } from '../../../shared/helperFunctions/correlationId';
import { batchUpdates } from '../../../batchUpdates';
import { withAuth } from '../../../shared/services/authStatusService';

export interface OrderBookSide {
  action: 'DELETED' | 'UPDATE' | 'NO CHANGE' | 'NEW';
  count: number;
  totalVolume: number;
  price: number;
  lastUpdate: string;
}

interface OrderBookTopMessage {
  correlation: string;
  type: 'TopOfBookMarketData';
  bids: OrderBookSide[];
  offers: OrderBookSide[];
  symbol: string;
  product?: ProductValue;
}

const dispose = new Subject<string>();

export type TopBook = {
  price: number;
  quantity: number;
} | null;

const getTopBookFromMsg = (entries: OrderBookSide[]): TopBook => {
  let allDeleted = true;
  const transformedEntries = entries.reduce((acc, curr) => {
    if (curr.action !== 'DELETED') {
      allDeleted = false;
      return { action: curr.action, price: curr.price, totalVolume: curr.totalVolume };
    }
    return acc;
  }, {} as Pick<OrderBookSide, 'action' | 'price' | 'totalVolume'>);
  const { action, price, totalVolume: quantity } = transformedEntries;
  return !action || allDeleted ? null : { price, quantity };
};

const topOfBookBySymbol: Observable<Record<
string,
GroupedObservable<string, { bid: TopBook; offer: TopBook }>
>> = subscribeToMessageStream$<OrderBookTopMessage>('TopOfBookMarketData').pipe(
  split(
    (msg) => msg.symbol,
    (stream$, symbol) => stream$.pipe(
      map((msg) => ({
        bid: getTopBookFromMsg(msg.bids),
        offer: getTopBookFromMsg(msg.offers),
      })),
      takeUntil(dispose.pipe(filter((s) => s === symbol))),
    ),
  ),
  scan((acc, message$) => ({ ...acc, [message$.key]: message$ }), {}),
  shareLatest(),
);
topOfBookBySymbol.subscribe();

const [, getTopOfTheBook] = bind((symbol: string) => withAuth()(
  defer(() => {
    send({
      correlation: correlationId(),
      type: 'TopOfBookMarketDataSubscribe',
      symbol,
      topOfBookDepth: 1,
    });
    return topOfBookBySymbol.pipe(
      exhaustMap((bookMap) => bookMap[symbol] || EMPTY),
      finalize(() => {
        dispose.next(symbol);
        send({
          correlation: correlationId(),
          type: 'TopOfBookMarketDataUnsubscribe',
          symbol,
        });
      }),
      batchUpdates(),
    );
  }),
));

const compareTopBook = (
  a: TopBook, b: TopBook,
) => !!a && !!b && a.quantity === b.quantity && a.price === b.price;

export const [useTopOfTheBookBid] = bind(
  (symbol: string) => getTopOfTheBook(symbol).pipe(
    pluck('bid'),
    distinctUntilChanged(compareTopBook),
  ),
  null,
);

export const [useTopOfTheBookOffer] = bind(
  (symbol: string) => getTopOfTheBook(symbol).pipe(
    pluck('offer'),
    distinctUntilChanged(compareTopBook),
  ),
  null,
);
