import { RFQTicker } from '@shared/protos/ticker';
import { OtcOrder } from '@shared/protos/trading';
import { Stream, StreamEvent, Subscription } from './Stream';

export type TickerCallback = (ticker: RFQTicker) => void;

interface RFQStream {
  onMessage: TickerCallback;
  subscribe: (callback: TickerCallback) => void;
  callbacks: Set<TickerCallback>;
}

const createRfqStream = (rfqSymbol: string, exchange: string): RFQStream => {
  const callbacks = new Set<TickerCallback>();
  // cache of tickers - required for callbacks that subscribe after the server snapshot
  const cache: Record<string, RFQTicker> = {};

  const subscribe = (callback: TickerCallback) => {
    Object.values(cache).forEach(ticker => callback(ticker));
    callbacks.add(callback);
  };

  const onMessage = (ticker: RFQTicker) => {
    cache[`${rfqSymbol}-${exchange}`] = ticker;
    for (const callback of callbacks) {
      callback(ticker);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
  };
};

// Service for subscribing to RFQs
export const streamRFQService = (stream: Stream) => {
  const rfqStream: Record<string, RFQStream> = {};

  const rfqUnsubscribe = (rfqSymbol: string, exchange: string) => {
    const ps = rfqStream[`${rfqSymbol}-${exchange}`];
    if (ps && ps.callbacks.size === 0) {
      delete rfqStream[`${rfqSymbol}-${exchange}`];
      stream.unsubscribe(Subscription.rfq({ rfqInfo: rfqSymbol, exchange }));
    }
  };

  // manage all subscriptions
  stream.onEvent('rfq', (event: StreamEvent) => {
    const tickers = event.asRFQTickers();
    const rfqSymbolExchangeKeys = Object.keys(rfqStream);
    if (!tickers) return;
    tickers.forEach(ticker => {
      rfqSymbolExchangeKeys.forEach(rfqSymbolExchangeKey => {
        rfqStream[rfqSymbolExchangeKey]?.onMessage(ticker);
      });
    });
  });

  stream.onConnect(() => {
    const rfqs = Object.keys(rfqStream);
    if (rfqs.length > 0) {
      rfqs.forEach(rfq => {
        const [rfqSymbol, size, exchange] = rfq.split('-');
        stream.subscribe(Subscription.rfq({ rfqInfo: `${rfqSymbol}-${size}`, exchange }));
      });
    }
  });

  // Subscribe to a product symbols
  const subscribe = (rfqSymbol: string, exchange: string, callback: TickerCallback) => {
    let ps = rfqStream[`${rfqSymbol}-${exchange}`];
    if (!ps) {
      ps = createRfqStream(rfqSymbol, exchange);
      rfqStream[`${rfqSymbol}-${exchange}`] = ps;
    }
    ps.subscribe(callback);
    stream.subscribe(Subscription.rfq({ rfqInfo: rfqSymbol, exchange }));
  };

  // Unsubscribe from a product
  const unsubscribe = (rfqSymbol: string, exchange: string, callback: TickerCallback) => {
    const ps = rfqStream[`${rfqSymbol}-${exchange}`];
    if (ps) {
      ps.callbacks.delete(callback);
      if (ps.callbacks.size === 0) {
        rfqUnsubscribe(rfqSymbol, exchange);
      }
    }
  };

  const placeOrder = (order: OtcOrder) => {
    stream.placeOtcOrder(order);
  };

  return {
    subscribe,
    placeOrder,
    unsubscribe,
  };
};
