import { EventEmitter } from 'node:events'
import WebSocket, { RawData } from 'ws'

import type { TapeTrade } from '../../core/types.js'

type BinanceStreamKey = 'spot' | 'perp'

interface BinanceTradeMessage {
  e: string
  s: string
  t: number
  p: string
  q: string
  T: number
  m: boolean
}

const STREAM_URLS: Record<BinanceStreamKey, string> = {
  spot: 'wss://stream.binance.com:9443/ws/btcusdt@trade',
  perp: 'wss://fstream.binance.com/ws/btcusdt@trade'
}

export class BinanceTradeStreamer extends EventEmitter {
  readonly name = 'binance'

  private readonly sockets = new Map<BinanceStreamKey, WebSocket>()
  private readonly reconnectTimers = new Map<BinanceStreamKey, NodeJS.Timeout>()
  private stopped = false

  constructor(
    private readonly options: {
      includeFutures: boolean
    }
  ) {
    super()
  }

  start(): void {
    this.stopped = false
    this.connectStream('spot')

    if (this.options.includeFutures) {
      this.connectStream('perp')
    }
  }

  stop(): void {
    this.stopped = true

    for (const timer of this.reconnectTimers.values()) {
      clearTimeout(timer)
    }

    for (const socket of this.sockets.values()) {
      socket.close()
    }

    this.reconnectTimers.clear()
    this.sockets.clear()
  }

  private connectStream(streamKey: BinanceStreamKey): void {
    const url = STREAM_URLS[streamKey]
    const socket = new WebSocket(url)
    this.sockets.set(streamKey, socket)

    socket.on('open', () => {
      this.emit('discover', { stream: streamKey, count: 1, symbols: ['BTCUSDT'] })
      this.emit('ready', { stream: streamKey, symbolCount: 1, url })
    })

    socket.on('message', (payload: RawData) => {
      try {
        const message = JSON.parse(payload.toString()) as BinanceTradeMessage
        if (message.e !== 'trade') {
          return
        }

        const price = Number(message.p)
        const size = Number(message.q)
        const timestamp = Number(message.T)

        if (!Number.isFinite(price) || !Number.isFinite(size) || !Number.isFinite(timestamp) || price <= 0 || size <= 0) {
          return
        }

        const trade: TapeTrade = {
          exchange: 'BINANCE',
          exchangeClass: 'cex',
          instrumentType: streamKey,
          symbol: message.s,
          timestamp,
          price,
          size,
          notionalUsd: price * size,
          side: message.m ? 'sell' : 'buy',
          tradeId: String(message.t),
          raw: message
        }

        this.emit('trade', trade)
      } catch (error) {
        this.emit('error', error)
      }
    })

    socket.on('error', (error: Error) => this.emit('error', error))
    socket.on('close', () => this.handleClose(streamKey))
  }

  private handleClose(streamKey: BinanceStreamKey): void {
    this.sockets.delete(streamKey)

    if (this.stopped) {
      return
    }

    const reconnectTimer = setTimeout(() => this.connectStream(streamKey), 3_000)
    this.reconnectTimers.set(streamKey, reconnectTimer)
    this.emit('warn', `Reconnecting ${streamKey} stream in 3s`)
  }
}
