import { EventEmitter } from 'node:events'
import WebSocket, { RawData } from 'ws'

import type { TapeTrade } from '../../core/types.js'

interface OkxTradeUpdate {
  instId: string
  tradeId: string
  px: string
  sz: string
  side: 'buy' | 'sell'
  ts: string
}

interface OkxTradeMessage {
  arg?: {
    channel?: string
    instId?: string
  }
  data?: OkxTradeUpdate[]
}

const OKX_URL = 'wss://ws.okx.com:8443/ws/v5/public'
const OKX_INSTRUMENTS = [
  { instId: 'BTC-USDT', instrumentType: 'spot' as const, btcMultiplier: 1 },
  { instId: 'BTC-USDT-SWAP', instrumentType: 'swap' as const, btcMultiplier: 0.01 }
]

export class OkxTradeStreamer extends EventEmitter {
  readonly name = 'okx'

  private socket?: WebSocket
  private pingTimer?: NodeJS.Timeout
  private reconnectTimer?: NodeJS.Timeout
  private stopped = false

  start(): void {
    this.stopped = false
    const socket = new WebSocket(OKX_URL)
    this.socket = socket

    socket.on('open', () => {
      socket.send(
        JSON.stringify({
          op: 'subscribe',
          args: OKX_INSTRUMENTS.map(instrument => ({ channel: 'trades', instId: instrument.instId }))
        })
      )

      this.emit('discover', { stream: 'public', count: OKX_INSTRUMENTS.length, symbols: OKX_INSTRUMENTS.map(entry => entry.instId) })
      this.emit('ready', { stream: 'public', symbolCount: OKX_INSTRUMENTS.length, url: OKX_URL })

      this.pingTimer = setInterval(() => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send('ping')
        }
      }, 20_000)
    })

    socket.on('message', (payload: RawData) => {
      try {
        const text = payload.toString()
        if (!text.startsWith('{')) {
          return
        }

        const message = JSON.parse(text) as OkxTradeMessage
        if (!message.arg || message.arg.channel !== 'trades' || !Array.isArray(message.data)) {
          return
        }

        const instrument = OKX_INSTRUMENTS.find(entry => entry.instId === message.arg?.instId)
        if (!instrument) {
          return
        }

        for (const update of message.data) {
          const price = Number(update.px)
          const rawSize = Number(update.sz)
          const timestamp = Number(update.ts)
          const size = rawSize * instrument.btcMultiplier

          if (!Number.isFinite(price) || !Number.isFinite(size) || !Number.isFinite(timestamp) || price <= 0 || size <= 0) {
            continue
          }

          const trade: TapeTrade = {
            exchange: 'OKX',
            exchangeClass: 'cex',
            instrumentType: instrument.instrumentType,
            symbol: update.instId,
            timestamp,
            price,
            size,
            notionalUsd: price * size,
            side: update.side,
            tradeId: update.tradeId,
            raw: update
          }

          this.emit('trade', trade)
        }
      } catch (error) {
        this.emit('error', error)
      }
    })

    socket.on('error', (error: Error) => this.emit('error', error))
    socket.on('close', () => this.handleClose())
  }

  stop(): void {
    this.stopped = true

    if (this.pingTimer) {
      clearInterval(this.pingTimer)
      this.pingTimer = undefined
    }

    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
      this.reconnectTimer = undefined
    }

    this.socket?.close()
    this.socket = undefined
  }

  private handleClose(): void {
    if (this.pingTimer) {
      clearInterval(this.pingTimer)
      this.pingTimer = undefined
    }

    this.socket = undefined

    if (this.stopped) {
      return
    }

    this.reconnectTimer = setTimeout(() => this.start(), 3_000)
    this.emit('warn', 'Reconnecting public stream in 3s')
  }
}
