/** CultRoll Scraper Worker — queue-native adapter orchestration
 *
 * Single cron trigger "0 * * * *" enqueues due source targets and stale-title
 * metadata refresh jobs into Cloudflare Queues. Source jobs submit
 * pipeline jobs, and pipeline jobs can chain other pipeline jobs.
 *
 * HTTP endpoints:
 *   GET  /health         — liveness check
 *   POST /run?target=<id>&kind=source|pipeline — enqueue one job
 *   POST /work           — enqueue all currently due source targets now
 *   GET  /status         — list source targets + queue pipeline job graph
 *   GET  /stats          — source queue/schedule statistics
 */

import type {
  ScraperEnv,
  SourceTargetRow,
  SourceTargetCinemaRow,
  AdapterConfig,
} from './types';
import { ADAPTERS } from './adapters/registry';

interface ScopedSourceCinemaRow extends SourceTargetCinemaRow {
  cinema_name_en: string | null;
  city_id: string | null;
}

type TargetTable = 'source_targets';
type RunMode = 'direct' | 'queued';
type PipelineAdapterId = 'tmdb-enricher' | 'chain-posters' | 'dedup-tmdb';
type PublicPipelineAdapterId = 'metadata-enricher' | 'chain-posters' | 'metadata-dedup';

interface SourceQueueMessage {
  targetId: string;
  kind: 'source';
  reason: 'scheduled' | 'manual';
  enqueuedAt: string;
}

interface PipelineQueueMessage {
  adapterId: PipelineAdapterId;
  kind: 'pipeline';
  reason: 'refresh' | 'manual' | 'chained';
  config?: AdapterConfig;
  enqueuedAt: string;
}

type QueueJobMessage = SourceQueueMessage | PipelineQueueMessage;

interface RateLimitPolicy {
  defaultDelayMs: number;
  hostDelayMs: Record<string, number>;
}

const SOURCE_TABLE: TargetTable = 'source_targets';
const PIPELINE_ADAPTERS: PipelineAdapterId[] = ['tmdb-enricher', 'chain-posters', 'dedup-tmdb'];
const PIPELINE_ENTRY_ADAPTER: PipelineAdapterId = 'tmdb-enricher';
const TMDB_REFRESH_INTERVAL_HOURS = 36;
const TMDB_REFRESH_BATCH_SIZE = 200;
const TMDB_REFRESH_SWEEP_LIMIT = 1000;

const ADAPTER_DEFAULT_RATE_LIMIT_MS: Record<string, number> = {
  'grand-cinemas': 300,
  'empire-cinemas': 300,
  'taj-cinemas': 300,
  'vox-cinemas': 350,
  cinemall: 350,
  'tmdb-enricher': 250,
  'chain-posters': 400,
  'dedup-tmdb': 0,
};

const PIPELINE_JOB_DEFAULT_CONFIG: Record<PipelineAdapterId, AdapterConfig> = {
  'tmdb-enricher': { limit: 15, refresh_hours: 36, cast_limit: 10, include_existing: false },
  'chain-posters': { limit: 10 },
  'dedup-tmdb': {},
};

const PIPELINE_JOB_CHAIN: Record<PipelineAdapterId, PipelineAdapterId[]> = {
  'tmdb-enricher': ['chain-posters'],
  'chain-posters': ['dedup-tmdb'],
  'dedup-tmdb': [],
};

const PUBLIC_PIPELINE_ID_MAP: Record<PipelineAdapterId, PublicPipelineAdapterId> = {
  'tmdb-enricher': 'metadata-enricher',
  'chain-posters': 'chain-posters',
  'dedup-tmdb': 'metadata-dedup',
};

const INTERNAL_PIPELINE_ID_MAP: Record<PublicPipelineAdapterId, PipelineAdapterId> = {
  'metadata-enricher': 'tmdb-enricher',
  'chain-posters': 'chain-posters',
  'metadata-dedup': 'dedup-tmdb',
};

export default {
  // ---------------------------------------------------------------------------
  // Cron handler — runs every hour, enqueues due source adapters
  // ---------------------------------------------------------------------------
  async scheduled(_event: ScheduledEvent, env: ScraperEnv, ctx: ExecutionContext) {
    ctx.waitUntil(runScheduledOrchestration(env));
  },

  // ---------------------------------------------------------------------------
  // Queue consumer — processes target jobs
  // ---------------------------------------------------------------------------
  async queue(batch: MessageBatch<QueueJobMessage>, env: ScraperEnv) {
    let sourceSucceeded = false;
    let newTitlesDetected = 0;
    const deferredSourceMessages = [] as Array<{
      message: (typeof batch.messages)[number];
      newTitles: number;
    }>;
    for (const message of batch.messages) {
      const job = normalizeQueueMessage(message.body);
      if (!job) {
        console.warn('Dropping malformed queue message', { body: message.body });
        message.ack();
        continue;
      }
      try {
        const result = job.kind === 'source'
          ? await runTarget(env, job.targetId, 'queued')
          : await runPipelineJob(env, job, 'queued');
        if (job.kind === 'source') {
          const newTitles = toResultNumber((result as Record<string, unknown>).new_titles);
          sourceSucceeded = true;
          newTitlesDetected += newTitles;
          deferredSourceMessages.push({ message, newTitles });
        } else {
          message.ack();
        }
        console.log(`Queue job complete`, { ...job, ...result });
      } catch (err) {
        const msg = err instanceof Error ? err.message : String(err);
        console.error(`Queue job failed`, { ...job, error: msg });
        if (isTerminalQueueError(msg)) {
          message.ack();
        } else {
          message.retry();
        }
      }
    }
    let retryNewTitleSources = false;
    if (sourceSucceeded && newTitlesDetected > 0) {
      try {
        const queued = await enqueuePipelineJob(
          env,
          PIPELINE_ENTRY_ADAPTER,
          'refresh',
          buildTmdbNewTitleConfig(newTitlesDetected)
        );
        if (queued) {
          console.log(`Queued metadata enrichment for ${newTitlesDetected} newly detected title(s) after source batch`);
        }
      } catch (err) {
        console.error('Failed to enqueue metadata enrichment after source batch:', err);
        retryNewTitleSources = true;
      }
    }
    for (const { message, newTitles } of deferredSourceMessages) {
      if (retryNewTitleSources && newTitles > 0) {
        message.retry();
      } else {
        message.ack();
      }
    }
  },

  // ---------------------------------------------------------------------------
  // HTTP handler — admin / health / manual triggers
  // ---------------------------------------------------------------------------
  async fetch(request: Request, env: ScraperEnv): Promise<Response> {
    const url = new URL(request.url);
    const authError = requireScraperAdminAuth(request, env);
    if (authError) return authError;

    if (url.pathname === '/health') {
      return Response.json({
        status: 'ok',
        source_adapters: Object.keys(ADAPTERS).filter((id) => !isPipelineAdapterId(id)),
        pipeline_adapters: PIPELINE_ADAPTERS.map(toPublicPipelineAdapterId),
        queue_enabled: Boolean(env.SCRAPER_QUEUE),
      });
    }

    if (url.pathname === '/run' && request.method === 'POST') {
      const targetId = url.searchParams.get('target');
      const kindParam = url.searchParams.get('kind');
      const sync = url.searchParams.get('sync') === '1';
      const config = await readPipelineConfig(request);
      if (!targetId) return Response.json({ error: 'target param required' }, { status: 400 });
      if (kindParam && kindParam !== 'source' && kindParam !== 'pipeline') {
        return Response.json({ error: 'kind must be source or pipeline' }, { status: 400 });
      }
      const kind = kindParam === 'pipeline' ? 'pipeline' : 'source';
      try {
        if (kind === 'pipeline') {
          const pipelineTargetId = toInternalPipelineAdapterId(targetId);
          if (!pipelineTargetId) {
            return Response.json({ error: 'target must be a known pipeline adapter id' }, { status: 400 });
          }
          if (sync) {
            const result = await runPipelineJob(
              env,
              createPipelineJob(pipelineTargetId, 'manual', config),
              'direct'
            );
            return Response.json({ status: 'ok', kind, adapter: toPublicPipelineAdapterId(pipelineTargetId), result: sanitizePipelineResponse(result) });
          }
          const queued = await enqueuePipelineJob(env, pipelineTargetId, 'manual', config);
          return Response.json({ status: 'queued', kind, adapter: toPublicPipelineAdapterId(pipelineTargetId), queued });
        }

        if (sync) {
          const result = await runTarget(env, targetId, 'direct');
          return Response.json({ status: 'ok', kind, target: targetId, result });
        }
        const enqueued = await enqueueManualSourceTarget(env, targetId);
        return Response.json({ status: 'queued', ...enqueued });
      } catch (err) {
        return Response.json({ error: String(err) }, { status: 500 });
      }
    }

    if (url.pathname === '/work' && request.method === 'POST') {
      try {
        const sources = await enqueueDueSourceTargets(env);
        return Response.json({ status: 'queued', sources });
      } catch (err) {
        return Response.json({ error: String(err) }, { status: 500 });
      }
    }

    if (url.pathname === '/status') {
      const sources = await listSourceTargets(env);
      return Response.json({
        sources,
        pipeline_jobs: PIPELINE_ADAPTERS.map((adapter) => ({
          adapter: toPublicPipelineAdapterId(adapter),
          defaults: PIPELINE_JOB_DEFAULT_CONFIG[adapter],
          next: PIPELINE_JOB_CHAIN[adapter].map(toPublicPipelineAdapterId),
        })),
        queue_enabled: Boolean(env.SCRAPER_QUEUE),
      });
    }

    if (url.pathname === '/stats') {
      const stats = await collectSourceStats(env);
      return Response.json(stats);
    }

    return Response.json({ error: 'Not found' }, { status: 404 });
  },
};

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

interface ListSourceTargetsOptions {
  enabledOnly?: boolean;
  dueOnly?: boolean;
}

async function listSourceTargets(
  env: ScraperEnv,
  options: ListSourceTargetsOptions = {}
): Promise<SourceTargetRow[]> {
  const whereClauses: string[] = [];
  if (options.enabledOnly) whereClauses.push('enabled = 1');
  if (options.dueOnly) whereClauses.push("(next_run_at IS NULL OR next_run_at <= datetime('now'))");
  const where = whereClauses.length ? `WHERE ${whereClauses.join(' AND ')}` : '';
  const { results } = await env.DB.prepare(
    `SELECT * FROM ${SOURCE_TABLE}
     ${where}
     ORDER BY adapter_id, country_code, id`
  ).all<SourceTargetRow>();
  return results ?? [];
}

async function collectSourceStats(env: ScraperEnv): Promise<Record<string, unknown>> {
  const [totalRes, enabledRes, dueRes, runsRes] = await env.DB.batch([
    env.DB.prepare(`SELECT COUNT(*) AS n FROM ${SOURCE_TABLE}`),
    env.DB.prepare(`SELECT COUNT(*) AS n FROM ${SOURCE_TABLE} WHERE enabled = 1`),
    env.DB.prepare(
      `SELECT COUNT(*) AS n FROM ${SOURCE_TABLE}
       WHERE enabled = 1
         AND (next_run_at IS NULL OR next_run_at <= datetime('now'))`
    ),
    env.DB.prepare(
      `SELECT
         MAX(last_run_at) AS latest_last_run_at,
         MIN(next_run_at) AS next_due_at
       FROM ${SOURCE_TABLE}
       WHERE enabled = 1`
    ),
  ]);

  const [runs24Res, failed24Res] = await env.DB.batch([
    env.DB.prepare(
      `SELECT COUNT(*) AS n
         FROM scraper_runs
        WHERE started_at >= datetime('now', '-24 hours')`
    ),
    env.DB.prepare(
      `SELECT COUNT(*) AS n
         FROM scraper_runs
        WHERE started_at >= datetime('now', '-24 hours')
          AND status = 'failed'`
    ),
  ]);

  const runsRow = (runsRes.results[0] ?? {}) as Record<string, unknown>;
  return {
    sources_total: toDbNumber(totalRes.results[0]),
    sources_enabled: toDbNumber(enabledRes.results[0]),
    sources_due_now: toDbNumber(dueRes.results[0]),
    latest_last_run_at: asNonEmptyString(runsRow.latest_last_run_at) ?? null,
    next_due_at: asNonEmptyString(runsRow.next_due_at) ?? null,
    runs_last_24h: toDbNumber(runs24Res.results[0]),
    failed_runs_last_24h: toDbNumber(failed24Res.results[0]),
    pipeline_entry: toPublicPipelineAdapterId(PIPELINE_ENTRY_ADAPTER),
    pipeline_jobs_total: PIPELINE_ADAPTERS.length,
    queue_enabled: Boolean(env.SCRAPER_QUEUE),
  };
}

async function getSourceTargetById(
  env: ScraperEnv,
  targetId: string
): Promise<SourceTargetRow | null> {
  return env.DB.prepare(`SELECT * FROM ${SOURCE_TABLE} WHERE id = ?`)
    .bind(targetId)
    .first<SourceTargetRow>();
}

async function enqueueDueSourceTargets(env: ScraperEnv): Promise<number> {
  const targets = await listSourceTargets(env, { enabledOnly: true, dueOnly: true });
  return enqueueSourceTargets(env, targets, 'scheduled');
}

async function enqueueSourceTargetJob(
  env: ScraperEnv,
  target: SourceTargetRow,
  reason: SourceQueueMessage['reason']
): Promise<boolean> {
  await env.SCRAPER_QUEUE.send({
    targetId: target.id,
    kind: 'source',
    reason,
    enqueuedAt: new Date().toISOString(),
  } satisfies SourceQueueMessage);
  await env.DB.prepare(
    `UPDATE ${SOURCE_TABLE}
        SET next_run_at = datetime('now', '+' || interval_hours || ' hours')
      WHERE id = ?`
  ).bind(target.id).run();
  return true;
}

async function enqueueSourceTargets(
  env: ScraperEnv,
  targets: SourceTargetRow[],
  reason: SourceQueueMessage['reason']
): Promise<number> {
  let queued = 0;
  for (const target of targets) {
    const ok = await enqueueSourceTargetJob(env, target, reason);
    if (ok) queued++;
  }
  return queued;
}

async function enqueueManualSourceTarget(
  env: ScraperEnv,
  targetId: string
): Promise<Record<string, unknown>> {
  const target = await getSourceTargetById(env, targetId);
  if (!target) throw new Error(`Target not found: ${targetId}`);
  const queued = await enqueueSourceTargetJob(env, target, 'manual');
  return {
    target: target.id,
    kind: 'source',
    adapter: target.adapter_id,
    queued,
  };
}

async function runScheduledOrchestration(env: ScraperEnv): Promise<void> {
  const [sourcesQueued, metadataRefresh] = await Promise.all([
    enqueueDueSourceTargets(env),
    maybeEnqueueTmdbMetadataRefresh(env),
  ]);
  if (sourcesQueued > 0) {
    console.log(`Scheduled ${sourcesQueued} due source job(s)`);
  }
  if (metadataRefresh.jobs > 0) {
    console.log(`Scheduled ${metadataRefresh.jobs} metadata refresh job(s) for ${metadataRefresh.titles} stale title(s)`);
  }
}

function buildTmdbNewTitleConfig(newTitlesDetected: number): AdapterConfig {
  const scaledLimit = Math.max(15, Math.min(200, newTitlesDetected * 2));
  return {
    ...(PIPELINE_JOB_DEFAULT_CONFIG['tmdb-enricher'] ?? {}),
    include_existing: false,
    refresh_hours: TMDB_REFRESH_INTERVAL_HOURS,
    limit: scaledLimit,
  };
}

function buildTmdbMetadataRefreshConfig(titleIds: string[]): AdapterConfig {
  return {
    ...(PIPELINE_JOB_DEFAULT_CONFIG['tmdb-enricher'] ?? {}),
    include_existing: false,
    refresh_hours: TMDB_REFRESH_INTERVAL_HOURS,
    limit: titleIds.length,
    title_ids: titleIds,
  };
}

async function maybeEnqueueTmdbMetadataRefresh(
  env: ScraperEnv
): Promise<{ jobs: number; titles: number }> {
  const titleIds = await loadStaleTmdbTitleIds(env);
  if (titleIds.length === 0) return { jobs: 0, titles: 0 };

  let jobs = 0;
  for (const chunk of chunkArray(titleIds, TMDB_REFRESH_BATCH_SIZE)) {
    await enqueuePipelineJob(
      env,
      PIPELINE_ENTRY_ADAPTER,
      'refresh',
      buildTmdbMetadataRefreshConfig(chunk)
    );
    jobs++;
  }

  return { jobs, titles: titleIds.length };
}

async function loadStaleTmdbTitleIds(env: ScraperEnv): Promise<string[]> {
  const { results } = await env.DB
    .prepare(
      `SELECT id
         FROM titles
        WHERE tmdb_id IS NOT NULL
          AND (
            language IS NULL
            OR TRIM(language) = ''
            OR title_original IS NULL
            OR imdb_id IS NULL
            OR (
              (trailer_url IS NULL OR TRIM(trailer_url) = '')
              AND (
                tmdb_enriched_at IS NULL
                OR tmdb_enriched_at <= datetime('now', ?)
              )
            )
            OR
            tmdb_enriched_at IS NULL
            OR tmdb_enriched_at <= datetime('now', ?)
          )
          ORDER BY COALESCE(tmdb_enriched_at, '1970-01-01') ASC
          LIMIT ?`
     )
    .bind(
      `-${TMDB_REFRESH_INTERVAL_HOURS} hours`,
      `-${TMDB_REFRESH_INTERVAL_HOURS} hours`,
      TMDB_REFRESH_SWEEP_LIMIT,
    )
    .all<{ id: string }>();
  return (results ?? [])
    .map((row) => asNonEmptyString(row.id))
    .filter((value): value is string => Boolean(value));
}

function chunkArray<T>(items: T[], size: number): T[][] {
  if (size <= 0 || items.length === 0) return [];
  const chunks: T[][] = [];
  for (let idx = 0; idx < items.length; idx += size) {
    chunks.push(items.slice(idx, idx + size));
  }
  return chunks;
}

async function enqueuePipelineJob(
  env: ScraperEnv,
  adapterId: PipelineAdapterId,
  reason: PipelineQueueMessage['reason'],
  config?: AdapterConfig
): Promise<boolean> {
  await env.SCRAPER_QUEUE.send(createPipelineJob(adapterId, reason, config));
  return true;
}

function createPipelineJob(
  adapterId: PipelineAdapterId,
  reason: PipelineQueueMessage['reason'],
  config?: AdapterConfig
): PipelineQueueMessage {
  return {
    adapterId,
    kind: 'pipeline',
    reason,
    config,
    enqueuedAt: new Date().toISOString(),
  };
}

function normalizeQueueMessage(body: unknown): QueueJobMessage | null {
  if (!body || typeof body !== 'object') return null;
  const record = body as Record<string, unknown>;
  const kind = record.kind === 'source' || record.kind === 'pipeline' ? record.kind : null;
  const enqueuedAt = asNonEmptyString(record.enqueuedAt) ?? new Date().toISOString();
  if (kind === 'source') {
    const targetId = asNonEmptyString(record.targetId);
    if (!targetId) return null;
    const reason = record.reason === 'manual' ? 'manual' : 'scheduled';
    return {
      targetId,
      kind,
      reason,
      enqueuedAt,
    };
  }
  if (kind === 'pipeline') {
    const adapterId =
      asNonEmptyString(record.adapterId) ??
      asNonEmptyString(record.targetId); // backward compatibility with old message format
    if (!adapterId || !isPipelineAdapterId(adapterId)) return null;
    const reason =
      record.reason === 'manual' || record.reason === 'chained' || record.reason === 'refresh'
        ? record.reason
        : 'refresh';
    const rawConfig = record.config;
    const config =
      rawConfig && typeof rawConfig === 'object' && !Array.isArray(rawConfig)
        ? (rawConfig as AdapterConfig)
        : undefined;
    return {
      adapterId,
      kind,
      reason,
      config,
      enqueuedAt,
    };
  }
  return null;
}

async function readPipelineConfig(request: Request): Promise<AdapterConfig | undefined> {
  const contentType = request.headers.get('content-type') ?? '';
  if (!contentType.toLowerCase().includes('application/json')) {
    return undefined;
  }

  try {
    const payload = await request.json<unknown>();
    if (!payload || typeof payload !== 'object' || Array.isArray(payload)) {
      return undefined;
    }
    return payload as AdapterConfig;
  } catch {
    return undefined;
  }
}

async function runTarget(
  env: ScraperEnv,
  targetId: string,
  mode: RunMode = 'direct'
): Promise<Record<string, unknown>> {
  const target = await getSourceTargetById(env, targetId);
  if (!target) throw new Error(`Target not found: ${targetId}`);
  return runSourceTarget(env, target, mode);
}

async function runSourceTarget(
  env: ScraperEnv,
  target: SourceTargetRow,
  mode: RunMode = 'direct'
): Promise<Record<string, unknown>> {
  const startedAt = Date.now();
  const adapter = ADAPTERS[target.adapter_id];
  if (!adapter) throw new Error(`No adapter registered for: ${target.adapter_id}`);

  const parsedConfig = parseJsonObject(target.config, `${SOURCE_TABLE}.${target.id}.config`);
  const config = await buildSourceScopedConfig(env, target, parsedConfig);
  const rateLimitPolicy = buildRateLimitPolicy(target.adapter_id, config);
  const result = await withRateLimitedFetch(rateLimitPolicy, () => adapter.run(config, env));

  await markSourceTargetRun(env, target.id, mode);

  return {
    ...(result as Record<string, unknown>),
    elapsed_ms: Date.now() - startedAt,
  };
}

async function runPipelineJob(
  env: ScraperEnv,
  job: PipelineQueueMessage,
  mode: RunMode = 'queued'
): Promise<Record<string, unknown>> {
  const startedAt = Date.now();
  const adapter = ADAPTERS[job.adapterId];
  if (!adapter) throw new Error(`No adapter registered for: ${job.adapterId}`);

  const config: AdapterConfig = {
    ...(PIPELINE_JOB_DEFAULT_CONFIG[job.adapterId] ?? {}),
    ...(job.config ?? {}),
  };
  const rateLimitPolicy = buildRateLimitPolicy(job.adapterId, config);
  const result = await withRateLimitedFetch(rateLimitPolicy, () => adapter.run(config, env));

  let queuedNext = 0;
  if (mode === 'queued') {
    for (const nextAdapterId of PIPELINE_JOB_CHAIN[job.adapterId] ?? []) {
      const queued = await enqueuePipelineJob(env, nextAdapterId, 'chained');
      if (queued) queuedNext++;
    }
  }

  return {
    adapter: job.adapterId,
    elapsed_ms: Date.now() - startedAt,
    queued_next: queuedNext,
    ...(result as Record<string, unknown>),
  };
}

async function markSourceTargetRun(
  env: ScraperEnv,
  targetId: string,
  mode: RunMode
): Promise<void> {
  const nextRunClause = mode === 'direct'
    ? `,\n              next_run_at = datetime('now', '+' || interval_hours || ' hours')`
    : '';
  await env.DB.prepare(
    `UPDATE ${SOURCE_TABLE}
        SET last_run_at = datetime('now')${nextRunClause}
      WHERE id = ?`
  ).bind(targetId).run();
}

async function buildSourceScopedConfig(
  env: ScraperEnv,
  target: SourceTargetRow,
  baseConfig: AdapterConfig
): Promise<AdapterConfig> {
  const { results } = await env.DB.prepare(
    `SELECT stc.*,
            c.name_en AS cinema_name_en,
            c.city_id AS city_id
       FROM source_target_cinemas stc
       LEFT JOIN cinemas c ON c.id = stc.cinema_id
      WHERE stc.source_target_id = ?
        AND stc.enabled = 1
      ORDER BY stc.priority ASC, stc.id ASC`
  ).bind(target.id).all<ScopedSourceCinemaRow>();

  const scoped = results ?? [];
  if (scoped.length === 0) {
    return baseConfig;
  }

  await upsertSourceDiscovery(env, target, scoped);

  if (target.adapter_id === 'grand-cinemas') {
    return buildGrandScopedConfig(target, baseConfig, scoped);
  }
  if (target.adapter_id === 'empire-cinemas') {
    return buildEmpireScopedConfig(target, baseConfig, scoped);
  }
  if (target.adapter_id === 'taj-cinemas') {
    return buildTajScopedConfig(target, baseConfig, scoped);
  }

  // Generic fallback for future chain adapters.
  return {
    ...baseConfig,
    cinemas: scoped.map((row) => {
      const override = parseScopedCinemaOverride(row);
      return {
        externalId: row.external_cinema_id,
        cinemaId: resolveCinemaId(override, row.cinema_id, row.external_cinema_id),
        ...override,
      };
    }),
  };
}

async function upsertSourceDiscovery(
  env: ScraperEnv,
  target: SourceTargetRow,
  scoped: ScopedSourceCinemaRow[]
): Promise<void> {
  for (const row of scoped) {
    await env.DB.prepare(
      `INSERT INTO source_discovered_cinemas
         (source_target_id, external_cinema_id, external_cinema_name, city_hint, country_code,
          cinema_id, status, confidence, metadata, first_seen_at, last_seen_at)
       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
       ON CONFLICT(source_target_id, external_cinema_id) DO UPDATE SET
         external_cinema_name = excluded.external_cinema_name,
         city_hint            = excluded.city_hint,
         country_code         = excluded.country_code,
         cinema_id            = excluded.cinema_id,
         status               = CASE
                                  WHEN excluded.cinema_id IS NOT NULL THEN 'mapped'
                                  ELSE source_discovered_cinemas.status
                                END,
         confidence           = CASE
                                  WHEN excluded.cinema_id IS NOT NULL THEN 1.0
                                  ELSE COALESCE(source_discovered_cinemas.confidence, excluded.confidence)
                                END,
         metadata             = excluded.metadata,
         last_seen_at         = datetime('now')`
    ).bind(
      target.id,
      row.external_cinema_id,
      row.external_cinema_name,
      row.city_id,
      target.country_code,
      row.cinema_id,
      row.cinema_id ? 'mapped' : 'new',
      row.cinema_id ? 1.0 : 0.5,
      row.config_override
    ).run();
  }
}

function buildGrandScopedConfig(
  target: SourceTargetRow,
  baseConfig: AdapterConfig,
  scoped: ScopedSourceCinemaRow[]
): AdapterConfig {
  const fallbackBase = asNonEmptyString(baseConfig.base) ?? inferGrandBase(target.country_code);
  const fallbackPathPrefix = asNonEmptyString(baseConfig.path_prefix)
    ?? asNonEmptyString(baseConfig.pathPrefix)
    ?? inferGrandPathPrefix(target.country_code);
  const existing = indexByExternalId(baseConfig.cinemas);

  return {
    ...baseConfig,
    cinemas: scoped.map((row) => {
      const override = parseScopedCinemaOverride(row);
      const seeded = existing.get(row.external_cinema_id) ?? {};
      const merged = { ...seeded, ...override };
      return {
        externalId: row.external_cinema_id,
        cinemaId: resolveCinemaId(merged, row.cinema_id, row.external_cinema_id),
        base: asNonEmptyString(merged.base) ?? fallbackBase,
        pathPrefix:
          asNonEmptyString(merged.pathPrefix) ??
          asNonEmptyString(merged.path_prefix) ??
          fallbackPathPrefix,
      };
    }),
  };
}

function buildEmpireScopedConfig(
  target: SourceTargetRow,
  baseConfig: AdapterConfig,
  scoped: ScopedSourceCinemaRow[]
): AdapterConfig {
  const existing = indexByExternalId(baseConfig.cinemas);
  const apiBase = asNonEmptyString(baseConfig.api_base) ?? inferEmpireApiBase(target.country_code);

  return {
    ...baseConfig,
    api_base: apiBase,
    cinemas: scoped.map((row) => {
      const override = parseScopedCinemaOverride(row);
      const seeded = existing.get(row.external_cinema_id) ?? {};
      const merged = { ...seeded, ...override };
      return {
        externalId: toIntegerCinemaId(
          asNonEmptyString(merged.externalId) ??
          asNonEmptyString(merged.external_id) ??
            row.external_cinema_id,
          `source_target_cinemas(${row.id}).external_cinema_id`
        ),
        cinemaId: resolveCinemaId(merged, row.cinema_id, row.external_cinema_id),
      };
    }),
  };
}

function buildTajScopedConfig(
  _target: SourceTargetRow,
  baseConfig: AdapterConfig,
  scoped: ScopedSourceCinemaRow[]
): AdapterConfig {
  const primary = scoped[0];
  const override = parseScopedCinemaOverride(primary);
  const cinemaId = resolveCinemaId(override, primary.cinema_id, primary.external_cinema_id);

  return {
    ...baseConfig,
    cinema_id: cinemaId,
  };
}

function buildRateLimitPolicy(adapterId: string, config: AdapterConfig): RateLimitPolicy {
  const fallbackDelay = ADAPTER_DEFAULT_RATE_LIMIT_MS[adapterId] ?? 0;
  const explicitDelay = toPositiveNumberOrZero(config.rate_limit_ms ?? config.rateLimitMs);
  const defaultDelayMs = explicitDelay ?? fallbackDelay;
  const hostDelayMs = toHostDelayMap(config.rate_limit_hosts ?? config.rateLimitHosts);
  return { defaultDelayMs, hostDelayMs };
}

async function withRateLimitedFetch<T>(
  policy: RateLimitPolicy,
  run: () => Promise<T>
): Promise<T> {
  if (
    policy.defaultDelayMs <= 0 &&
    Object.keys(policy.hostDelayMs).length === 0
  ) {
    return run();
  }

  const globalObj = globalThis as typeof globalThis & { fetch: typeof fetch };
  const originalFetch = globalObj.fetch.bind(globalObj);
  globalObj.fetch = createRateLimitedFetch(originalFetch, policy);
  try {
    return await run();
  } finally {
    globalObj.fetch = originalFetch;
  }
}

function createRateLimitedFetch(
  baseFetch: typeof fetch,
  policy: RateLimitPolicy
): typeof fetch {
  const tails = new Map<string, Promise<void>>();
  return (async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
    const host = extractHost(input) ?? '__default__';
    const delayMs = policy.hostDelayMs[host] ?? policy.defaultDelayMs;
    const previous = tails.get(host) ?? Promise.resolve();
    let release: () => void = () => {};
    const current = new Promise<void>((resolve) => {
      release = resolve;
    });
    tails.set(host, current);
    await previous.catch(() => undefined);
    if (delayMs > 0) await sleep(delayMs);
    try {
      return await baseFetch(input as RequestInfo, init);
    } finally {
      release();
      if (tails.get(host) === current) {
        tails.delete(host);
      }
    }
  }) as typeof fetch;
}

function extractHost(input: RequestInfo | URL): string | null {
  if (input instanceof URL) return input.host.toLowerCase();
  if (typeof input === 'string') {
    try {
      return new URL(input).host.toLowerCase();
    } catch {
      return null;
    }
  }
  if (typeof Request !== 'undefined' && input instanceof Request) {
    try {
      return new URL(input.url).host.toLowerCase();
    } catch {
      return null;
    }
  }
  return null;
}

function toHostDelayMap(value: unknown): Record<string, number> {
  if (!value || typeof value !== 'object' || Array.isArray(value)) return {};
  const out: Record<string, number> = {};
  for (const [host, raw] of Object.entries(value as Record<string, unknown>)) {
    const parsed = toPositiveNumberOrZero(raw);
    if (parsed !== null && parsed > 0) {
      out[host.toLowerCase()] = parsed;
    }
  }
  return out;
}

function toPositiveNumberOrZero(value: unknown): number | null {
  if (value === null || value === undefined) return null;
  const n = typeof value === 'number' ? value : Number(value);
  if (!Number.isFinite(n) || n < 0) return null;
  return Math.trunc(n);
}

function parseScopedCinemaOverride(
  row: Pick<SourceTargetCinemaRow, 'id' | 'config_override'>
): AdapterConfig {
  return parseJsonObject(
    row.config_override,
    `source_target_cinemas(${row.id}).config_override`
  );
}

function resolveCinemaId(
  record: Record<string, unknown>,
  fallbackCinemaId: string | null,
  externalCinemaId: string
): string {
  return (
    asNonEmptyString(record.cinemaId) ??
    asNonEmptyString(record.cinema_id) ??
    fallbackCinemaId ??
    externalCinemaId
  );
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

function parseJsonObject(raw: string | null | undefined, label: string): AdapterConfig {
  if (!raw || raw.trim() === '') return {};
  let parsed: unknown;
  try {
    parsed = JSON.parse(raw);
  } catch {
    throw new Error(`${label} must be valid JSON`);
  }
  if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed)) {
    throw new Error(`${label} must be a JSON object`);
  }
  return parsed as AdapterConfig;
}

function asNonEmptyString(value: unknown): string | null {
  if (typeof value !== 'string') return null;
  const trimmed = value.trim();
  return trimmed.length ? trimmed : null;
}

function requireScraperAdminAuth(request: Request, env: ScraperEnv): Response | null {
  const expectedToken = asNonEmptyString(env.SCRAPER_ADMIN_TOKEN);
  if (!expectedToken) {
    return Response.json({ error: 'SCRAPER_ADMIN_TOKEN is not configured' }, { status: 503 });
  }

  const authHeader = request.headers.get('authorization');
  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    return Response.json({ error: 'Unauthorized' }, { status: 401 });
  }

  const providedToken = authHeader.slice('Bearer '.length).trim();
  if (!secureEquals(expectedToken, providedToken)) {
    return Response.json({ error: 'Unauthorized' }, { status: 401 });
  }

  return null;
}

function secureEquals(a: string, b: string): boolean {
  const maxLen = Math.max(a.length, b.length);
  let mismatch = a.length ^ b.length;
  for (let i = 0; i < maxLen; i++) {
    mismatch |= (a.charCodeAt(i) || 0) ^ (b.charCodeAt(i) || 0);
  }
  return mismatch === 0;
}

function toResultNumber(value: unknown): number {
  const parsed = typeof value === 'number' ? value : Number(value);
  if (!Number.isFinite(parsed) || parsed <= 0) return 0;
  return Math.trunc(parsed);
}

function toDbNumber(row: unknown): number {
  const value = (row as Record<string, unknown> | null)?.n;
  const parsed = typeof value === 'number' ? value : Number(value ?? 0);
  return Number.isFinite(parsed) ? parsed : 0;
}

function isPipelineAdapterId(value: string): value is PipelineAdapterId {
  return PIPELINE_ADAPTERS.includes(value as PipelineAdapterId);
}

function isPublicPipelineAdapterId(value: string): value is PublicPipelineAdapterId {
  return value in INTERNAL_PIPELINE_ID_MAP;
}

function toPublicPipelineAdapterId(value: PipelineAdapterId): PublicPipelineAdapterId {
  return PUBLIC_PIPELINE_ID_MAP[value];
}

function toInternalPipelineAdapterId(value: string): PipelineAdapterId | null {
  if (isPipelineAdapterId(value)) return value;
  if (isPublicPipelineAdapterId(value)) return INTERNAL_PIPELINE_ID_MAP[value];
  return null;
}

function sanitizePipelineResponse(value: unknown): unknown {
  if (Array.isArray(value)) {
    return value.map(sanitizePipelineResponse);
  }
  if (value && typeof value === 'object') {
    return Object.fromEntries(
      Object.entries(value as Record<string, unknown>).map(([key, entry]) => [
        key,
        key === 'adapter' || key === 'adapterId'
          ? sanitizePipelineResponse(asNonEmptyString(entry) ?? entry)
          : sanitizePipelineResponse(entry),
      ]),
    );
  }
  if (typeof value === 'string') {
    const mapped = toInternalPipelineAdapterId(value);
    return mapped ? toPublicPipelineAdapterId(mapped) : value;
  }
  return value;
}

function isTerminalQueueError(message: string): boolean {
  const normalized = message.toLowerCase();
  return (
    normalized.includes('not found') ||
    normalized.includes('no adapter registered') ||
    normalized.includes('tmdb_api_key is not configured') ||
    normalized.includes('invalid api key') ||
    normalized.includes('authentication failed') ||
    normalized.includes('unauthorized') ||
    normalized.includes('forbidden') ||
    normalized.includes('request failed (401)') ||
    normalized.includes('request failed (403)')
  );
}

function toIntegerCinemaId(value: string, label: string): number {
  const n = Number(value);
  if (!Number.isInteger(n)) {
    throw new Error(`${label} must be numeric for empire-cinemas`);
  }
  return n;
}

function indexByExternalId(
  cinemas: unknown
): Map<string, Record<string, unknown>> {
  const map = new Map<string, Record<string, unknown>>();
  if (!Array.isArray(cinemas)) return map;
  for (const entry of cinemas) {
    if (!entry || typeof entry !== 'object') continue;
    const record = entry as Record<string, unknown>;
    const ext =
      asNonEmptyString(record.externalId) ??
      asNonEmptyString(record.external_id) ??
      (typeof record.externalId === 'number' ? String(record.externalId) : null);
    if (ext) map.set(ext, record);
  }
  return map;
}

function inferGrandBase(countryCode: string): string {
  return countryCode === 'JO'
    ? 'https://jo.grandcinemasme.com'
    : 'https://lb.grandcinemasme.com';
}

function inferGrandPathPrefix(countryCode: string): string {
  return countryCode === 'JO' ? '/handlers' : '/en';
}

function inferEmpireApiBase(countryCode: string): string {
  return countryCode === 'IQ'
    ? 'https://irq-cron.empirecinemas.com'
    : 'https://lb-cron.empirecinemas.com';
}
