import { EventEmitter } from 'node:events'
import WebSocket, { RawData } from 'ws'

import type { TapeTrade } from '../../core/types.js'

interface DydxTrade {
  id: string
  size: string
  price: string
  side: 'BUY' | 'SELL'
  createdAt: string
}

interface DydxMessage {
  channel?: string
  type?: string
  contents?: {
    trades?: DydxTrade[]
  }
}

const DYDX_URL = 'wss://indexer.dydx.trade/v4/ws'

export class DydxTradeStreamer extends EventEmitter {
  readonly name = 'dydx'

  private socket?: WebSocket
  private reconnectTimer?: NodeJS.Timeout
  private stopped = false
  private skippedInitialSnapshot = false

  start(): void {
    this.stopped = false
    this.skippedInitialSnapshot = false
    const socket = new WebSocket(DYDX_URL)
    this.socket = socket

    socket.on('open', () => {
      socket.send(
        JSON.stringify({
          type: 'subscribe',
          channel: 'v4_trades',
          id: 'BTC-USD'
        })
      )

      this.emit('discover', { stream: 'perp', count: 1, symbols: ['BTC-USD'] })
      this.emit('ready', { stream: 'perp', symbolCount: 1, url: DYDX_URL })
    })

    socket.on('message', (payload: RawData) => {
      try {
        const message = JSON.parse(payload.toString()) as DydxMessage
        if (message.channel !== 'v4_trades' || !Array.isArray(message.contents?.trades)) {
          return
        }

        if (!this.skippedInitialSnapshot) {
          this.skippedInitialSnapshot = true
          this.emit('warn', 'Skipped initial dYdX snapshot batch for cleaner live tape')
          return
        }

        for (const update of message.contents.trades) {
          const price = Number(update.price)
          const size = Number(update.size)
          const timestamp = Date.parse(update.createdAt)

          if (!Number.isFinite(price) || !Number.isFinite(size) || !Number.isFinite(timestamp) || price <= 0 || size <= 0) {
            continue
          }

          const trade: TapeTrade = {
            exchange: 'DYDX',
            exchangeClass: 'dex',
            instrumentType: 'perp',
            symbol: 'BTC-USD',
            timestamp,
            price,
            size,
            notionalUsd: price * size,
            side: update.side === 'BUY' ? 'buy' : 'sell',
            tradeId: update.id,
            raw: update
          }

          this.emit('trade', trade)
        }
      } catch (error) {
        this.emit('error', error)
      }
    })

    socket.on('error', (error: Error) => this.emit('error', error))
    socket.on('close', () => this.handleClose())
  }

  stop(): void {
    this.stopped = true

    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
      this.reconnectTimer = undefined
    }

    this.socket?.close()
    this.socket = undefined
  }

  private handleClose(): void {
    this.socket = undefined

    if (this.stopped) {
      return
    }

    this.reconnectTimer = setTimeout(() => this.start(), 3_000)
    this.emit('warn', 'Reconnecting perp stream in 3s')
  }
}
