diff --git a/docker-compose.yml b/docker-compose.yml index 2f2e8f2..51839f5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,7 @@ services: - BOT_TOKEN=${BOT_TOKEN:-} - BOT_USERNAME=${BOT_USERNAME:-} - LOG_LEVEL=${LOG_LEVEL:-info} + - WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60} depends_on: db: condition: service_healthy diff --git a/src/app/(app)/telegram/_components/telegram-admin.tsx b/src/app/(app)/telegram/_components/telegram-admin.tsx index d83e239..9632dc2 100644 --- a/src/app/(app)/telegram/_components/telegram-admin.tsx +++ b/src/app/(app)/telegram/_components/telegram-admin.tsx @@ -16,6 +16,7 @@ interface TelegramAdminProps { ingestionStatus: IngestionAccountStatus[]; globalDestination: GlobalDestination; sendHistory: SendHistoryRow[]; + workerIntervalMinutes: number; } export function TelegramAdmin({ @@ -24,6 +25,7 @@ export function TelegramAdmin({ ingestionStatus, globalDestination, sendHistory, + workerIntervalMinutes, }: TelegramAdminProps) { return (
@@ -32,7 +34,7 @@ export function TelegramAdmin({ description="Manage Telegram accounts, channels, and ingestion" /> - + diff --git a/src/app/(app)/telegram/_components/worker-status-panel.tsx b/src/app/(app)/telegram/_components/worker-status-panel.tsx index ef7601d..215ecc3 100644 --- a/src/app/(app)/telegram/_components/worker-status-panel.tsx +++ b/src/app/(app)/telegram/_components/worker-status-panel.tsx @@ -1,6 +1,6 @@ "use client"; -import { useEffect, useState, useCallback } from "react"; +import { useEffect, useState, useCallback, useTransition } from "react"; import { Loader2, CheckCircle2, @@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { cn } from "@/lib/utils"; +import { toast } from "sonner"; +import { triggerIngestion } from "../actions"; import type { IngestionAccountStatus } from "@/lib/telegram/types"; interface WorkerStatusPanelProps { initialStatus: IngestionAccountStatus[]; + initialIntervalMinutes?: number; } const AUTH_STATE_CONFIG: Record< @@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record< EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" }, }; -export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { +export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) { const [accounts, setAccounts] = useState(initialStatus); const [error, setError] = useState(false); const [nextRunCountdown, setNextRunCountdown] = useState(null); + const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes); + const [isPending, startTransition] = useTransition(); // Find active run const activeRun = accounts.find((a) => a.currentRun); const isRunning = !!activeRun; + const handleSyncNow = useCallback(() => { + startTransition(async () => { + const result = await triggerIngestion(); + if (result.success) { + toast.success("Sync triggered — worker will start shortly"); + } else { + toast.error(result.error ?? "Failed to trigger sync"); + } + }); + }, []); + // Poll for status useEffect(() => { let timer: ReturnType; @@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { const data = await res.json(); if (mounted) { setAccounts(data.accounts ?? []); + if (data.workerIntervalMinutes) { + setWorkerIntervalMinutes(data.workerIntervalMinutes); + } setError(false); } } catch { @@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { return; } - // Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter) + // Estimate next run based on last run finish time + configured interval + up to 5 min jitter const lastFinished = accounts .filter((a) => a.lastRun?.finishedAt) .map((a) => new Date(a.lastRun!.finishedAt!).getTime()) @@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { return; } - const intervalMs = 5 * 60 * 1000; // 5 min base + const intervalMs = workerIntervalMinutes * 60 * 1000; const estimatedNext = lastFinished + intervalMs; const tick = () => { @@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { tick(); const interval = setInterval(tick, 1_000); return () => clearInterval(interval); - }, [isRunning, accounts]); + }, [isRunning, accounts, workerIntervalMinutes]); if (accounts.length === 0 && !error) { return ( @@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { ) : isRunning && activeRun?.currentRun ? ( ) : ( - + )} @@ -256,9 +280,13 @@ function RunningStatus({ function IdleStatus({ accounts, nextRunCountdown, + onSyncNow, + isSyncing, }: { accounts: IngestionAccountStatus[]; nextRunCountdown: string | null; + onSyncNow: () => void; + isSyncing: boolean; }) { const lastRun = accounts .filter((a) => a.lastRun) @@ -321,14 +349,32 @@ function IdleStatus({ )}
- {nextRunCountdown && hasAuthenticated && ( -
- - - Next: {nextRunCountdown} - -
- )} +
+ {nextRunCountdown && hasAuthenticated && ( +
+ + + Next: {nextRunCountdown} + +
+ )} + {hasAuthenticated && ( + + )} +
); } diff --git a/src/app/(app)/telegram/page.tsx b/src/app/(app)/telegram/page.tsx index a0f575d..2100d40 100644 --- a/src/app/(app)/telegram/page.tsx +++ b/src/app/(app)/telegram/page.tsx @@ -42,6 +42,7 @@ export default async function TelegramPage() { ingestionStatus={ingestionStatus} globalDestination={globalDestination} sendHistory={serializedHistory} + workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)} /> ); } diff --git a/src/app/api/ingestion/status/route.ts b/src/app/api/ingestion/status/route.ts index dc8c76d..239fa59 100644 --- a/src/app/api/ingestion/status/route.ts +++ b/src/app/api/ingestion/status/route.ts @@ -9,5 +9,9 @@ export async function GET(request: Request) { if ("error" in authResult) return authResult.error; const accounts = await getIngestionStatus(); - return NextResponse.json({ accounts }); + const workerIntervalMinutes = parseInt( + process.env.WORKER_INTERVAL_MINUTES ?? "60", + 10 + ); + return NextResponse.json({ accounts, workerIntervalMinutes }); } diff --git a/src/app/api/ingestion/trigger/route.ts b/src/app/api/ingestion/trigger/route.ts index ebf9562..5586b6b 100644 --- a/src/app/api/ingestion/trigger/route.ts +++ b/src/app/api/ingestion/trigger/route.ts @@ -45,33 +45,20 @@ export async function POST(request: Request) { ); } - // Create ingestion runs marked as RUNNING — the worker will pick these up - // when it next polls, or we use pg_notify for immediate pickup - for (const account of accounts) { - // Only create if no run is already RUNNING for this account - const existing = await prisma.ingestionRun.findFirst({ - where: { accountId: account.id, status: "RUNNING" }, - }); - if (!existing) { - await prisma.ingestionRun.create({ - data: { accountId: account.id, status: "RUNNING" }, - }); - } - } - - // Send pg_notify for immediate worker pickup + // Send pg_notify for immediate worker pickup. + // The worker creates its own IngestionRun records with proper activity tracking. try { await prisma.$queryRawUnsafe( `SELECT pg_notify('ingestion_trigger', $1)`, accounts.map((a) => a.id).join(",") ); } catch { - // pg_notify is best-effort — worker will pick up on next cycle anyway + // pg_notify is best-effort — worker will pick up on next scheduled cycle anyway } return NextResponse.json({ triggered: true, accountIds: accounts.map((a) => a.id), - message: `Ingestion queued for ${accounts.length} account(s)`, + message: `Ingestion triggered for ${accounts.length} account(s)`, }); } diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index 681979f..3d49e38 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -18,6 +18,10 @@ import { const log = childLogger("fetch-listener"); let pgClient: pg.PoolClient | null = null; +let stopped = false; + +/** Delay (ms) before attempting to reconnect after a connection loss. */ +const RECONNECT_DELAY_MS = 5_000; /** * Start listening for pg_notify signals from the web app. @@ -27,30 +31,75 @@ let pgClient: pg.PoolClient | null = null; * - `generate_invite` — payload = channelId → generate invite link for destination * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `ingestion_trigger` — trigger an immediate ingestion cycle + * + * If the underlying connection is lost, the listener automatically reconnects + * so that pg_notify signals are never silently dropped. */ export async function startFetchListener(): Promise { - pgClient = await pool.connect(); - await pgClient.query("LISTEN channel_fetch"); - await pgClient.query("LISTEN generate_invite"); - await pgClient.query("LISTEN create_destination"); - await pgClient.query("LISTEN ingestion_trigger"); + stopped = false; + await connectListener(); +} - pgClient.on("notification", (msg) => { - if (msg.channel === "channel_fetch" && msg.payload) { - handleChannelFetch(msg.payload); - } else if (msg.channel === "generate_invite" && msg.payload) { - handleGenerateInvite(msg.payload); - } else if (msg.channel === "create_destination" && msg.payload) { - handleCreateDestination(msg.payload); - } else if (msg.channel === "ingestion_trigger") { - handleIngestionTrigger(); +async function connectListener(): Promise { + try { + pgClient = await pool.connect(); + await pgClient.query("LISTEN channel_fetch"); + await pgClient.query("LISTEN generate_invite"); + await pgClient.query("LISTEN create_destination"); + await pgClient.query("LISTEN ingestion_trigger"); + + pgClient.on("notification", (msg) => { + if (msg.channel === "channel_fetch" && msg.payload) { + handleChannelFetch(msg.payload); + } else if (msg.channel === "generate_invite" && msg.payload) { + handleGenerateInvite(msg.payload); + } else if (msg.channel === "create_destination" && msg.payload) { + handleCreateDestination(msg.payload); + } else if (msg.channel === "ingestion_trigger") { + handleIngestionTrigger(); + } + }); + + // Reconnect automatically when the connection ends unexpectedly + pgClient.on("end", () => { + if (!stopped) { + log.warn("Fetch listener connection lost — reconnecting"); + pgClient = null; + scheduleReconnect(); + } + }); + + pgClient.on("error", (err) => { + log.error({ err }, "Fetch listener connection error"); + if (!stopped && pgClient) { + try { + pgClient.release(true); + } catch (releaseErr) { + log.debug({ err: releaseErr }, "Failed to release pg client after error"); + } + pgClient = null; + scheduleReconnect(); + } + }); + + log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); + } catch (err) { + log.error({ err }, "Failed to start fetch listener — retrying"); + scheduleReconnect(); + } +} + +function scheduleReconnect(): void { + if (stopped) return; + setTimeout(() => { + if (!stopped) { + connectListener(); } - }); - - log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); + }, RECONNECT_DELAY_MS); } export function stopFetchListener(): void { + stopped = true; if (pgClient) { pgClient.release(); pgClient = null; diff --git a/worker/src/worker.ts b/worker/src/worker.ts index e61c64c..f72955e 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -351,6 +351,10 @@ export async function runWorkerForAccount( const totalChannels = channelMappings.length; + if (totalChannels === 0) { + accountLog.info("No active source channels linked to this account — nothing to ingest"); + } + for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { const mapping = channelMappings[chIdx]; const channel = mapping.channel; @@ -451,8 +455,8 @@ export async function runWorkerForAccount( counters.messagesScanned += scanResult.totalScanned; if (scanResult.archives.length === 0) { - accountLog.debug( - { channelId: channel.id, topic: topic.name }, + accountLog.info( + { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned }, "No new archives in topic" ); continue; @@ -525,7 +529,7 @@ export async function runWorkerForAccount( counters.messagesScanned += scanResult.totalScanned; if (scanResult.archives.length === 0) { - accountLog.debug({ channelId: channel.id }, "No new archives"); + accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel"); continue; }