Fix Telegram worker: countdown timer, orphaned runs, fetch-listener reconnection, and logging

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-03-05 20:21:18 +00:00
parent 1436b630e2
commit 71a2e6a5e8
8 changed files with 147 additions and 53 deletions

View File

@@ -17,6 +17,7 @@ services:
- BOT_TOKEN=${BOT_TOKEN:-} - BOT_TOKEN=${BOT_TOKEN:-}
- BOT_USERNAME=${BOT_USERNAME:-} - BOT_USERNAME=${BOT_USERNAME:-}
- LOG_LEVEL=${LOG_LEVEL:-info} - LOG_LEVEL=${LOG_LEVEL:-info}
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy

View File

@@ -16,6 +16,7 @@ interface TelegramAdminProps {
ingestionStatus: IngestionAccountStatus[]; ingestionStatus: IngestionAccountStatus[];
globalDestination: GlobalDestination; globalDestination: GlobalDestination;
sendHistory: SendHistoryRow[]; sendHistory: SendHistoryRow[];
workerIntervalMinutes: number;
} }
export function TelegramAdmin({ export function TelegramAdmin({
@@ -24,6 +25,7 @@ export function TelegramAdmin({
ingestionStatus, ingestionStatus,
globalDestination, globalDestination,
sendHistory, sendHistory,
workerIntervalMinutes,
}: TelegramAdminProps) { }: TelegramAdminProps) {
return ( return (
<div className="space-y-4"> <div className="space-y-4">
@@ -32,7 +34,7 @@ export function TelegramAdmin({
description="Manage Telegram accounts, channels, and ingestion" description="Manage Telegram accounts, channels, and ingestion"
/> />
<WorkerStatusPanel initialStatus={ingestionStatus} /> <WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
<Tabs defaultValue="accounts" className="space-y-4"> <Tabs defaultValue="accounts" className="space-y-4">
<TabsList> <TabsList>

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useEffect, useState, useCallback } from "react"; import { useEffect, useState, useCallback, useTransition } from "react";
import { import {
Loader2, Loader2,
CheckCircle2, CheckCircle2,
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils"; import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { triggerIngestion } from "../actions";
import type { IngestionAccountStatus } from "@/lib/telegram/types"; import type { IngestionAccountStatus } from "@/lib/telegram/types";
interface WorkerStatusPanelProps { interface WorkerStatusPanelProps {
initialStatus: IngestionAccountStatus[]; initialStatus: IngestionAccountStatus[];
initialIntervalMinutes?: number;
} }
const AUTH_STATE_CONFIG: Record< const AUTH_STATE_CONFIG: Record<
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" }, EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
}; };
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
const [accounts, setAccounts] = useState(initialStatus); const [accounts, setAccounts] = useState(initialStatus);
const [error, setError] = useState(false); const [error, setError] = useState(false);
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null); const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
const [isPending, startTransition] = useTransition();
// Find active run // Find active run
const activeRun = accounts.find((a) => a.currentRun); const activeRun = accounts.find((a) => a.currentRun);
const isRunning = !!activeRun; const isRunning = !!activeRun;
const handleSyncNow = useCallback(() => {
startTransition(async () => {
const result = await triggerIngestion();
if (result.success) {
toast.success("Sync triggered — worker will start shortly");
} else {
toast.error(result.error ?? "Failed to trigger sync");
}
});
}, []);
// Poll for status // Poll for status
useEffect(() => { useEffect(() => {
let timer: ReturnType<typeof setTimeout>; let timer: ReturnType<typeof setTimeout>;
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
const data = await res.json(); const data = await res.json();
if (mounted) { if (mounted) {
setAccounts(data.accounts ?? []); setAccounts(data.accounts ?? []);
if (data.workerIntervalMinutes) {
setWorkerIntervalMinutes(data.workerIntervalMinutes);
}
setError(false); setError(false);
} }
} catch { } catch {
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter) // Estimate next run based on last run finish time + configured interval + up to 5 min jitter
const lastFinished = accounts const lastFinished = accounts
.filter((a) => a.lastRun?.finishedAt) .filter((a) => a.lastRun?.finishedAt)
.map((a) => new Date(a.lastRun!.finishedAt!).getTime()) .map((a) => new Date(a.lastRun!.finishedAt!).getTime())
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
const intervalMs = 5 * 60 * 1000; // 5 min base const intervalMs = workerIntervalMinutes * 60 * 1000;
const estimatedNext = lastFinished + intervalMs; const estimatedNext = lastFinished + intervalMs;
const tick = () => { const tick = () => {
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
tick(); tick();
const interval = setInterval(tick, 1_000); const interval = setInterval(tick, 1_000);
return () => clearInterval(interval); return () => clearInterval(interval);
}, [isRunning, accounts]); }, [isRunning, accounts, workerIntervalMinutes]);
if (accounts.length === 0 && !error) { if (accounts.length === 0 && !error) {
return ( return (
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
) : isRunning && activeRun?.currentRun ? ( ) : isRunning && activeRun?.currentRun ? (
<RunningStatus run={activeRun.currentRun} /> <RunningStatus run={activeRun.currentRun} />
) : ( ) : (
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} /> <IdleStatus
accounts={accounts}
nextRunCountdown={nextRunCountdown}
onSyncNow={handleSyncNow}
isSyncing={isPending}
/>
)} )}
</CardContent> </CardContent>
</Card> </Card>
@@ -256,9 +280,13 @@ function RunningStatus({
function IdleStatus({ function IdleStatus({
accounts, accounts,
nextRunCountdown, nextRunCountdown,
onSyncNow,
isSyncing,
}: { }: {
accounts: IngestionAccountStatus[]; accounts: IngestionAccountStatus[];
nextRunCountdown: string | null; nextRunCountdown: string | null;
onSyncNow: () => void;
isSyncing: boolean;
}) { }) {
const lastRun = accounts const lastRun = accounts
.filter((a) => a.lastRun) .filter((a) => a.lastRun)
@@ -321,14 +349,32 @@ function IdleStatus({
)} )}
</div> </div>
<div className="flex items-center gap-2 shrink-0">
{nextRunCountdown && hasAuthenticated && ( {nextRunCountdown && hasAuthenticated && (
<div className="flex items-center gap-1.5 shrink-0"> <div className="flex items-center gap-1.5">
<RefreshCw className="h-3 w-3 text-muted-foreground" /> <RefreshCw className="h-3 w-3 text-muted-foreground" />
<span className="text-xs text-muted-foreground tabular-nums"> <span className="text-xs text-muted-foreground tabular-nums">
Next: {nextRunCountdown} Next: {nextRunCountdown}
</span> </span>
</div> </div>
)} )}
{hasAuthenticated && (
<Button
variant="outline"
size="sm"
className="h-7 text-xs px-2"
onClick={onSyncNow}
disabled={isSyncing}
>
{isSyncing ? (
<Loader2 className="h-3 w-3 animate-spin mr-1" />
) : (
<RefreshCw className="h-3 w-3 mr-1" />
)}
Sync Now
</Button>
)}
</div>
</div> </div>
); );
} }

View File

@@ -42,6 +42,7 @@ export default async function TelegramPage() {
ingestionStatus={ingestionStatus} ingestionStatus={ingestionStatus}
globalDestination={globalDestination} globalDestination={globalDestination}
sendHistory={serializedHistory} sendHistory={serializedHistory}
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
/> />
); );
} }

View File

@@ -9,5 +9,9 @@ export async function GET(request: Request) {
if ("error" in authResult) return authResult.error; if ("error" in authResult) return authResult.error;
const accounts = await getIngestionStatus(); const accounts = await getIngestionStatus();
return NextResponse.json({ accounts }); const workerIntervalMinutes = parseInt(
process.env.WORKER_INTERVAL_MINUTES ?? "60",
10
);
return NextResponse.json({ accounts, workerIntervalMinutes });
} }

View File

@@ -45,33 +45,20 @@ export async function POST(request: Request) {
); );
} }
// Create ingestion runs marked as RUNNING — the worker will pick these up // Send pg_notify for immediate worker pickup.
// when it next polls, or we use pg_notify for immediate pickup // The worker creates its own IngestionRun records with proper activity tracking.
for (const account of accounts) {
// Only create if no run is already RUNNING for this account
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// Send pg_notify for immediate worker pickup
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",") accounts.map((a) => a.id).join(",")
); );
} catch { } catch {
// pg_notify is best-effort — worker will pick up on next cycle anyway // pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
} }
return NextResponse.json({ return NextResponse.json({
triggered: true, triggered: true,
accountIds: accounts.map((a) => a.id), accountIds: accounts.map((a) => a.id),
message: `Ingestion queued for ${accounts.length} account(s)`, message: `Ingestion triggered for ${accounts.length} account(s)`,
}); });
} }

View File

@@ -18,6 +18,10 @@ import {
const log = childLogger("fetch-listener"); const log = childLogger("fetch-listener");
let pgClient: pg.PoolClient | null = null; let pgClient: pg.PoolClient | null = null;
let stopped = false;
/** Delay (ms) before attempting to reconnect after a connection loss. */
const RECONNECT_DELAY_MS = 5_000;
/** /**
* Start listening for pg_notify signals from the web app. * Start listening for pg_notify signals from the web app.
@@ -27,8 +31,17 @@ let pgClient: pg.PoolClient | null = null;
* - `generate_invite` — payload = channelId → generate invite link for destination * - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle * - `ingestion_trigger` — trigger an immediate ingestion cycle
*
* If the underlying connection is lost, the listener automatically reconnects
* so that pg_notify signals are never silently dropped.
*/ */
export async function startFetchListener(): Promise<void> { export async function startFetchListener(): Promise<void> {
stopped = false;
await connectListener();
}
async function connectListener(): Promise<void> {
try {
pgClient = await pool.connect(); pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN generate_invite");
@@ -47,10 +60,46 @@ export async function startFetchListener(): Promise<void> {
} }
}); });
// Reconnect automatically when the connection ends unexpectedly
pgClient.on("end", () => {
if (!stopped) {
log.warn("Fetch listener connection lost — reconnecting");
pgClient = null;
scheduleReconnect();
}
});
pgClient.on("error", (err) => {
log.error({ err }, "Fetch listener connection error");
if (!stopped && pgClient) {
try {
pgClient.release(true);
} catch (releaseErr) {
log.debug({ err: releaseErr }, "Failed to release pg client after error");
}
pgClient = null;
scheduleReconnect();
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
} catch (err) {
log.error({ err }, "Failed to start fetch listener — retrying");
scheduleReconnect();
}
}
function scheduleReconnect(): void {
if (stopped) return;
setTimeout(() => {
if (!stopped) {
connectListener();
}
}, RECONNECT_DELAY_MS);
} }
export function stopFetchListener(): void { export function stopFetchListener(): void {
stopped = true;
if (pgClient) { if (pgClient) {
pgClient.release(); pgClient.release();
pgClient = null; pgClient = null;

View File

@@ -351,6 +351,10 @@ export async function runWorkerForAccount(
const totalChannels = channelMappings.length; const totalChannels = channelMappings.length;
if (totalChannels === 0) {
accountLog.info("No active source channels linked to this account — nothing to ingest");
}
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx]; const mapping = channelMappings[chIdx];
const channel = mapping.channel; const channel = mapping.channel;
@@ -451,8 +455,8 @@ export async function runWorkerForAccount(
counters.messagesScanned += scanResult.totalScanned; counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug( accountLog.info(
{ channelId: channel.id, topic: topic.name }, { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic" "No new archives in topic"
); );
continue; continue;
@@ -525,7 +529,7 @@ export async function runWorkerForAccount(
counters.messagesScanned += scanResult.totalScanned; counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives"); accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
continue; continue;
} }