import { EventEmitter } from 'node:events'
import WebSocket, { RawData } from 'ws'

import type { BybitCategory, BybitInstrument, TapeTrade } from '../../core/types.js'
import { BybitRestClient } from './rest.js'

interface BybitPublicTradeRaw {
  i?: string
  T: number | string
  p: number | string
  v: number | string
  S: 'Buy' | 'Sell'
  s: string
}

interface BybitPublicTradeMessage {
  topic?: string
  data?: BybitPublicTradeRaw[]
}

const PUBLIC_WS_URLS: Record<BybitCategory, string> = {
  spot: 'wss://stream.bybit.com/v5/public/spot',
  linear: 'wss://stream.bybit.com/v5/public/linear',
  inverse: 'wss://stream.bybit.com/v5/public/inverse'
}

function chunk<T>(items: T[], size: number): T[][] {
  const chunks: T[][] = []
  for (let index = 0; index < items.length; index += size) {
    chunks.push(items.slice(index, index + size))
  }
  return chunks
}

function normalizeTrade(category: BybitCategory, trade: BybitPublicTradeRaw): TapeTrade | null {
  const price = Number(trade.p)
  const rawSize = Number(trade.v)
  const timestamp = Number(trade.T)

  if (!Number.isFinite(price) || !Number.isFinite(rawSize) || !Number.isFinite(timestamp) || price <= 0 || rawSize <= 0) {
    return null
  }

  const size = category === 'inverse' ? rawSize / price : rawSize
  const notionalUsd = size * price

  return {
    exchange: 'BYBIT',
    exchangeClass: 'cex',
    instrumentType: category === 'spot' ? 'spot' : category === 'linear' ? 'perp' : 'inverse',
    symbol: trade.s,
    timestamp,
    price,
    size,
    notionalUsd,
    side: trade.S === 'Buy' ? 'buy' : 'sell',
    tradeId: trade.i,
    raw: trade
  }
}

export class BybitPublicTradeStreamer extends EventEmitter {
  readonly name = 'bybit'

  private readonly sockets = new Map<BybitCategory, WebSocket>()
  private readonly reconnectTimers = new Map<BybitCategory, NodeJS.Timeout>()
  private readonly pingTimers = new Map<BybitCategory, NodeJS.Timeout>()
  private readonly symbolsByCategory = new Map<BybitCategory, string[]>()
  private stopped = false

  constructor(
    private readonly restClient: BybitRestClient,
    private readonly options: {
      categories: BybitCategory[]
      symbolPrefix: string
      subscriptionChunkSize: number
    }
  ) {
    super()
  }

  async start(): Promise<void> {
    this.stopped = false

    for (const category of this.options.categories) {
      const instruments = await this.restClient.getBtcInstruments(category, this.options.symbolPrefix)
      const symbols = instruments.map((instrument: BybitInstrument) => instrument.symbol)

      this.symbolsByCategory.set(category, symbols)
      this.emit('discover', { stream: category, count: symbols.length, symbols })

      if (symbols.length === 0) {
        this.emit('warn', `No ${this.options.symbolPrefix} symbols found for ${category}`)
        continue
      }

      this.connectCategory(category)
    }
  }

  stop(): void {
    this.stopped = true

    for (const timer of this.reconnectTimers.values()) {
      clearTimeout(timer)
    }

    for (const timer of this.pingTimers.values()) {
      clearInterval(timer)
    }

    for (const socket of this.sockets.values()) {
      socket.close()
    }

    this.reconnectTimers.clear()
    this.pingTimers.clear()
    this.sockets.clear()
  }

  private connectCategory(category: BybitCategory): void {
    const symbols = this.symbolsByCategory.get(category) ?? []
    const url = PUBLIC_WS_URLS[category]
    const socket = new WebSocket(url)

    this.sockets.set(category, socket)

    socket.on('open', () => {
      this.emit('ready', { stream: category, symbolCount: symbols.length, url })

      for (const topics of chunk(
        symbols.map(symbol => `publicTrade.${symbol}`),
        this.options.subscriptionChunkSize
      )) {
        socket.send(JSON.stringify({ op: 'subscribe', args: topics }))
      }

      const pingTimer = setInterval(() => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify({ op: 'ping' }))
        }
      }, 20_000)

      this.pingTimers.set(category, pingTimer)
    })

    socket.on('message', (payload: RawData) => {
      try {
        const message = JSON.parse(payload.toString()) as BybitPublicTradeMessage
        if (!message.topic || !message.topic.startsWith('publicTrade.') || !Array.isArray(message.data)) {
          return
        }

        for (const rawTrade of message.data) {
          const trade = normalizeTrade(category, rawTrade)
          if (trade) {
            this.emit('trade', trade)
          }
        }
      } catch (error) {
        this.emit('error', error)
      }
    })

    socket.on('error', (error: Error) => {
      this.emit('error', error)
    })

    socket.on('close', () => {
      const pingTimer = this.pingTimers.get(category)
      if (pingTimer) {
        clearInterval(pingTimer)
        this.pingTimers.delete(category)
      }

      this.sockets.delete(category)

      if (this.stopped) {
        return
      }

      const reconnectTimer = setTimeout(() => this.connectCategory(category), 3_000)
      this.reconnectTimers.set(category, reconnectTimer)
      this.emit('warn', `Reconnecting ${category} public stream in 3s`)
    })
  }
}
