10 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
22419106c1 Fix APP_PORT: make container listen port and healthcheck follow APP_PORT
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:39:26 +00:00
copilot-swe-agent[bot]
e45de85c69 Add Rescan Channel option to channels tab
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:34:53 +00:00
copilot-swe-agent[bot]
71a2e6a5e8 Fix Telegram worker: countdown timer, orphaned runs, fetch-listener reconnection, and logging
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:21:18 +00:00
copilot-swe-agent[bot]
1436b630e2 Initial plan 2026-03-05 20:05:41 +00:00
xCyanGrizzly
43af23d3be Merge pull request #13 from xCyanGrizzly/copilot/fix-docker-sync-issues
Fix worker getting stuck during channel message sync
2026-03-05 15:08:39 +01:00
copilot-swe-agent[bot]
49b82a352b Fix review issues: race condition in invokeWithTimeout and mutex queue entry
- Add settled flag to invokeWithTimeout to prevent double-settling
- Create mutex queue entry with wrapped resolve before pushing to queue

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:17:56 +00:00
copilot-swe-agent[bot]
2e242912af Remove worker/dist build artifacts from git, add to .gitignore
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:15:17 +00:00
copilot-swe-agent[bot]
9adbdb2a77 Fix worker getting stuck during sync: add timeouts, stuck detection, and safety limits
- Add invokeWithTimeout wrapper for TDLib API calls (2min timeout per call)
- Add stuck detection to getChannelMessages: break if from_message_id doesn't advance
- Add stuck detection to getTopicMessages: same protection for topic scanning
- Add stuck detection to getForumTopicList: break if pagination offsets don't advance
- Add max page limit (5000) to all scanning loops to prevent infinite pagination
- Add mutex wait timeout (30min) to prevent indefinite blocking when holder hangs
- Add cycle timeout (4h default, configurable via WORKER_CYCLE_TIMEOUT_MINUTES)
- Fix end-of-page detection to use actual limit value instead of hardcoded 100

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:14:53 +00:00
copilot-swe-agent[bot]
ad71346468 Initial plan 2026-03-05 13:02:41 +00:00
xCyanGrizzly
e19a80897d Merge pull request #12 from xCyanGrizzly/copilot/fix-worker-functionality-visibility
Fix worker activity tracking, add scan progress, default channels to disabled
2026-03-05 09:42:48 +01:00
21 changed files with 469 additions and 102 deletions

View File

@@ -13,6 +13,8 @@ AUTH_GITHUB_ID=""
AUTH_GITHUB_SECRET="" AUTH_GITHUB_SECRET=""
# App # App
# APP_PORT controls the port the container listens on AND how it is exposed on the host.
# If you change APP_PORT, also update NEXT_PUBLIC_APP_URL to match.
NEXT_PUBLIC_APP_URL="http://localhost:3000" NEXT_PUBLIC_APP_URL="http://localhost:3000"
APP_PORT=3000 APP_PORT=3000

1
.gitignore vendored
View File

@@ -18,6 +18,7 @@ worker/node_modules
# production # production
/build /build
worker/dist
# misc # misc
.DS_Store .DS_Store

View File

@@ -54,6 +54,7 @@ RUN chmod +x docker-entrypoint.sh
USER nextjs USER nextjs
# Default port — overridden at runtime by the PORT env var (set via docker-compose APP_PORT)
EXPOSE 3000 EXPOSE 3000
ENV PORT=3000 ENV PORT=3000
ENV HOSTNAME="0.0.0.0" ENV HOSTNAME="0.0.0.0"

View File

@@ -5,7 +5,7 @@ services:
dockerfile: Dockerfile dockerfile: Dockerfile
pull_policy: never pull_policy: never
ports: ports:
- "${APP_PORT:-3000}:3000" - "${APP_PORT:-3000}:${APP_PORT:-3000}"
environment: environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash} - DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env} - AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
@@ -17,11 +17,13 @@ services:
- BOT_TOKEN=${BOT_TOKEN:-} - BOT_TOKEN=${BOT_TOKEN:-}
- BOT_USERNAME=${BOT_USERNAME:-} - BOT_USERNAME=${BOT_USERNAME:-}
- LOG_LEVEL=${LOG_LEVEL:-info} - LOG_LEVEL=${LOG_LEVEL:-info}
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
- PORT=${APP_PORT:-3000}
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
healthcheck: healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health"] test: ["CMD-SHELL", "wget -q --spider http://localhost:$$PORT/api/health || exit 1"]
interval: 30s interval: 30s
timeout: 5s timeout: 5s
retries: 3 retries: 3

2
package-lock.json generated
View File

@@ -49,7 +49,7 @@
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsx": "^4.21.0", "tsx": "^4.21.0",
"tw-animate-css": "^1.4.0", "tw-animate-css": "^1.4.0",
"typescript": "^5" "typescript": "5.9.3"
} }
}, },
"node_modules/@alloc/quick-lru": { "node_modules/@alloc/quick-lru": {

View File

@@ -58,6 +58,6 @@
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsx": "^4.21.0", "tsx": "^4.21.0",
"tw-animate-css": "^1.4.0", "tw-animate-css": "^1.4.0",
"typescript": "^5" "typescript": "5.9.3"
} }
} }

View File

@@ -7,6 +7,7 @@ import {
Power, Power,
ArrowDownToLine, ArrowDownToLine,
ArrowUpFromLine, ArrowUpFromLine,
RefreshCcw,
} from "lucide-react"; } from "lucide-react";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
@@ -23,12 +24,14 @@ interface ChannelColumnsProps {
onToggleActive: (id: string) => void; onToggleActive: (id: string) => void;
onDelete: (id: string) => void; onDelete: (id: string) => void;
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void; onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
onRescan: (id: string) => void;
} }
export function getChannelColumns({ export function getChannelColumns({
onToggleActive, onToggleActive,
onDelete, onDelete,
onSetType, onSetType,
onRescan,
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] { }: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
return [ return [
{ {
@@ -121,6 +124,14 @@ export function getChannelColumns({
Set as Source Set as Source
</DropdownMenuItem> </DropdownMenuItem>
)} )}
{row.original.type === "SOURCE" && (
<DropdownMenuItem
onClick={() => onRescan(row.original.id)}
>
<RefreshCcw className="mr-2 h-3.5 w-3.5" />
Rescan Channel
</DropdownMenuItem>
)}
<DropdownMenuItem <DropdownMenuItem
onClick={() => onToggleActive(row.original.id)} onClick={() => onToggleActive(row.original.id)}
> >

View File

@@ -8,6 +8,7 @@ import {
deleteChannel, deleteChannel,
toggleChannelActive, toggleChannelActive,
setChannelType, setChannelType,
rescanChannel,
} from "../actions"; } from "../actions";
import { DataTable } from "@/components/shared/data-table"; import { DataTable } from "@/components/shared/data-table";
import { DeleteDialog } from "@/components/shared/delete-dialog"; import { DeleteDialog } from "@/components/shared/delete-dialog";
@@ -22,6 +23,7 @@ interface ChannelsTabProps {
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) { export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
const [isPending, startTransition] = useTransition(); const [isPending, startTransition] = useTransition();
const [deleteId, setDeleteId] = useState<string | null>(null); const [deleteId, setDeleteId] = useState<string | null>(null);
const [rescanId, setRescanId] = useState<string | null>(null);
const columns = getChannelColumns({ const columns = getChannelColumns({
onToggleActive: (id) => { onToggleActive: (id) => {
@@ -39,6 +41,7 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
else toast.error(result.error); else toast.error(result.error);
}); });
}, },
onRescan: (id) => setRescanId(id),
}); });
const { table } = useDataTable({ const { table } = useDataTable({
@@ -60,6 +63,19 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
}); });
}; };
const handleRescan = () => {
if (!rescanId) return;
startTransition(async () => {
const result = await rescanChannel(rescanId);
if (result.success) {
toast.success("Channel scan progress reset — it will be fully rescanned on the next sync");
setRescanId(null);
} else {
toast.error(result.error);
}
});
};
return ( return (
<div className="space-y-4"> <div className="space-y-4">
<DestinationCard destination={globalDestination} /> <DestinationCard destination={globalDestination} />
@@ -83,6 +99,16 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
onConfirm={handleDelete} onConfirm={handleDelete}
isLoading={isPending} isLoading={isPending}
/> />
<DeleteDialog
open={!!rescanId}
onOpenChange={(open) => !open && setRescanId(null)}
title="Rescan Channel"
description="This will reset all scan progress for this channel. On the next sync the worker will re-process every message from the beginning. Packages that are already in the library will be skipped (deduplication by hash), but any missing files will be re-downloaded and re-uploaded. This may take a long time for large channels."
confirmLabel="Rescan"
onConfirm={handleRescan}
isLoading={isPending}
/>
</div> </div>
); );
} }

View File

@@ -16,6 +16,7 @@ interface TelegramAdminProps {
ingestionStatus: IngestionAccountStatus[]; ingestionStatus: IngestionAccountStatus[];
globalDestination: GlobalDestination; globalDestination: GlobalDestination;
sendHistory: SendHistoryRow[]; sendHistory: SendHistoryRow[];
workerIntervalMinutes: number;
} }
export function TelegramAdmin({ export function TelegramAdmin({
@@ -24,6 +25,7 @@ export function TelegramAdmin({
ingestionStatus, ingestionStatus,
globalDestination, globalDestination,
sendHistory, sendHistory,
workerIntervalMinutes,
}: TelegramAdminProps) { }: TelegramAdminProps) {
return ( return (
<div className="space-y-4"> <div className="space-y-4">
@@ -32,7 +34,7 @@ export function TelegramAdmin({
description="Manage Telegram accounts, channels, and ingestion" description="Manage Telegram accounts, channels, and ingestion"
/> />
<WorkerStatusPanel initialStatus={ingestionStatus} /> <WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
<Tabs defaultValue="accounts" className="space-y-4"> <Tabs defaultValue="accounts" className="space-y-4">
<TabsList> <TabsList>

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useEffect, useState, useCallback } from "react"; import { useEffect, useState, useCallback, useTransition } from "react";
import { import {
Loader2, Loader2,
CheckCircle2, CheckCircle2,
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils"; import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { triggerIngestion } from "../actions";
import type { IngestionAccountStatus } from "@/lib/telegram/types"; import type { IngestionAccountStatus } from "@/lib/telegram/types";
interface WorkerStatusPanelProps { interface WorkerStatusPanelProps {
initialStatus: IngestionAccountStatus[]; initialStatus: IngestionAccountStatus[];
initialIntervalMinutes?: number;
} }
const AUTH_STATE_CONFIG: Record< const AUTH_STATE_CONFIG: Record<
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" }, EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
}; };
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
const [accounts, setAccounts] = useState(initialStatus); const [accounts, setAccounts] = useState(initialStatus);
const [error, setError] = useState(false); const [error, setError] = useState(false);
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null); const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
const [isPending, startTransition] = useTransition();
// Find active run // Find active run
const activeRun = accounts.find((a) => a.currentRun); const activeRun = accounts.find((a) => a.currentRun);
const isRunning = !!activeRun; const isRunning = !!activeRun;
const handleSyncNow = useCallback(() => {
startTransition(async () => {
const result = await triggerIngestion();
if (result.success) {
toast.success("Sync triggered — worker will start shortly");
} else {
toast.error(result.error ?? "Failed to trigger sync");
}
});
}, []);
// Poll for status // Poll for status
useEffect(() => { useEffect(() => {
let timer: ReturnType<typeof setTimeout>; let timer: ReturnType<typeof setTimeout>;
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
const data = await res.json(); const data = await res.json();
if (mounted) { if (mounted) {
setAccounts(data.accounts ?? []); setAccounts(data.accounts ?? []);
if (data.workerIntervalMinutes) {
setWorkerIntervalMinutes(data.workerIntervalMinutes);
}
setError(false); setError(false);
} }
} catch { } catch {
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter) // Estimate next run based on last run finish time + configured interval + up to 5 min jitter
const lastFinished = accounts const lastFinished = accounts
.filter((a) => a.lastRun?.finishedAt) .filter((a) => a.lastRun?.finishedAt)
.map((a) => new Date(a.lastRun!.finishedAt!).getTime()) .map((a) => new Date(a.lastRun!.finishedAt!).getTime())
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
const intervalMs = 5 * 60 * 1000; // 5 min base const intervalMs = workerIntervalMinutes * 60 * 1000;
const estimatedNext = lastFinished + intervalMs; const estimatedNext = lastFinished + intervalMs;
const tick = () => { const tick = () => {
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
tick(); tick();
const interval = setInterval(tick, 1_000); const interval = setInterval(tick, 1_000);
return () => clearInterval(interval); return () => clearInterval(interval);
}, [isRunning, accounts]); }, [isRunning, accounts, workerIntervalMinutes]);
if (accounts.length === 0 && !error) { if (accounts.length === 0 && !error) {
return ( return (
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
) : isRunning && activeRun?.currentRun ? ( ) : isRunning && activeRun?.currentRun ? (
<RunningStatus run={activeRun.currentRun} /> <RunningStatus run={activeRun.currentRun} />
) : ( ) : (
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} /> <IdleStatus
accounts={accounts}
nextRunCountdown={nextRunCountdown}
onSyncNow={handleSyncNow}
isSyncing={isPending}
/>
)} )}
</CardContent> </CardContent>
</Card> </Card>
@@ -256,9 +280,13 @@ function RunningStatus({
function IdleStatus({ function IdleStatus({
accounts, accounts,
nextRunCountdown, nextRunCountdown,
onSyncNow,
isSyncing,
}: { }: {
accounts: IngestionAccountStatus[]; accounts: IngestionAccountStatus[];
nextRunCountdown: string | null; nextRunCountdown: string | null;
onSyncNow: () => void;
isSyncing: boolean;
}) { }) {
const lastRun = accounts const lastRun = accounts
.filter((a) => a.lastRun) .filter((a) => a.lastRun)
@@ -321,14 +349,32 @@ function IdleStatus({
)} )}
</div> </div>
{nextRunCountdown && hasAuthenticated && ( <div className="flex items-center gap-2 shrink-0">
<div className="flex items-center gap-1.5 shrink-0"> {nextRunCountdown && hasAuthenticated && (
<RefreshCw className="h-3 w-3 text-muted-foreground" /> <div className="flex items-center gap-1.5">
<span className="text-xs text-muted-foreground tabular-nums"> <RefreshCw className="h-3 w-3 text-muted-foreground" />
Next: {nextRunCountdown} <span className="text-xs text-muted-foreground tabular-nums">
</span> Next: {nextRunCountdown}
</div> </span>
)} </div>
)}
{hasAuthenticated && (
<Button
variant="outline"
size="sm"
className="h-7 text-xs px-2"
onClick={onSyncNow}
disabled={isSyncing}
>
{isSyncing ? (
<Loader2 className="h-3 w-3 animate-spin mr-1" />
) : (
<RefreshCw className="h-3 w-3 mr-1" />
)}
Sync Now
</Button>
)}
</div>
</div> </div>
); );
} }

View File

@@ -297,6 +297,52 @@ export async function triggerChannelSync(): Promise<ActionResult> {
} }
} }
/**
* Reset all scan progress for a channel so the worker will re-process it
* from the very beginning on the next ingestion cycle.
*
* This clears:
* - `lastProcessedMessageId` on every AccountChannelMap linked to this channel
* - All TopicProgress records for those maps (for forum channels)
*/
export async function rescanChannel(channelId: string): Promise<ActionResult> {
const admin = await requireAdmin();
if (!admin.success) return admin;
const channel = await prisma.telegramChannel.findUnique({
where: { id: channelId },
});
if (!channel) return { success: false, error: "Channel not found" };
try {
// Find all account-channel maps for this channel
const maps = await prisma.accountChannelMap.findMany({
where: { channelId },
select: { id: true },
});
const mapIds = maps.map((m) => m.id);
// Delete all topic progress records for these maps (forum channels)
if (mapIds.length > 0) {
await prisma.topicProgress.deleteMany({
where: { accountChannelMapId: { in: mapIds } },
});
}
// Reset the scan cursor so the worker re-processes from the start
await prisma.accountChannelMap.updateMany({
where: { channelId },
data: { lastProcessedMessageId: null },
});
revalidatePath(REVALIDATE_PATH);
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to reset channel scan progress" };
}
}
// ── Account-Channel link actions ── // ── Account-Channel link actions ──
export async function linkChannel( export async function linkChannel(
@@ -377,7 +423,7 @@ export async function triggerIngestion(
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",") accounts.map((a: { id: string }) => a.id).join(",")
); );
} catch { } catch {
// Best-effort // Best-effort

View File

@@ -25,7 +25,7 @@ export default async function TelegramPage() {
}), }),
]); ]);
const serializedHistory = sendHistory.map((r) => ({ const serializedHistory = sendHistory.map((r: typeof sendHistory[number]) => ({
id: r.id, id: r.id,
packageName: r.package.fileName, packageName: r.package.fileName,
recipientName: r.telegramLink.telegramName, recipientName: r.telegramLink.telegramName,
@@ -42,6 +42,7 @@ export default async function TelegramPage() {
ingestionStatus={ingestionStatus} ingestionStatus={ingestionStatus}
globalDestination={globalDestination} globalDestination={globalDestination}
sendHistory={serializedHistory} sendHistory={serializedHistory}
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
/> />
); );
} }

View File

@@ -9,5 +9,9 @@ export async function GET(request: Request) {
if ("error" in authResult) return authResult.error; if ("error" in authResult) return authResult.error;
const accounts = await getIngestionStatus(); const accounts = await getIngestionStatus();
return NextResponse.json({ accounts }); const workerIntervalMinutes = parseInt(
process.env.WORKER_INTERVAL_MINUTES ?? "60",
10
);
return NextResponse.json({ accounts, workerIntervalMinutes });
} }

View File

@@ -45,33 +45,20 @@ export async function POST(request: Request) {
); );
} }
// Create ingestion runs marked as RUNNING — the worker will pick these up // Send pg_notify for immediate worker pickup.
// when it next polls, or we use pg_notify for immediate pickup // The worker creates its own IngestionRun records with proper activity tracking.
for (const account of accounts) {
// Only create if no run is already RUNNING for this account
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// Send pg_notify for immediate worker pickup
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",") accounts.map((a: { id: string }) => a.id).join(",")
); );
} catch { } catch {
// pg_notify is best-effort — worker will pick up on next cycle anyway // pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
} }
return NextResponse.json({ return NextResponse.json({
triggered: true, triggered: true,
accountIds: accounts.map((a) => a.id), accountIds: accounts.map((a: { id: string }) => a.id),
message: `Ingestion queued for ${accounts.length} account(s)`, message: `Ingestion triggered for ${accounts.length} account(s)`,
}); });
} }

View File

@@ -18,6 +18,8 @@ interface DeleteDialogProps {
description?: string; description?: string;
onConfirm: () => void; onConfirm: () => void;
isLoading?: boolean; isLoading?: boolean;
confirmLabel?: string;
confirmLoadingLabel?: string;
} }
export function DeleteDialog({ export function DeleteDialog({
@@ -27,6 +29,8 @@ export function DeleteDialog({
description = "This action cannot be undone.", description = "This action cannot be undone.",
onConfirm, onConfirm,
isLoading, isLoading,
confirmLabel = "Delete",
confirmLoadingLabel,
}: DeleteDialogProps) { }: DeleteDialogProps) {
return ( return (
<AlertDialog open={open} onOpenChange={onOpenChange}> <AlertDialog open={open} onOpenChange={onOpenChange}>
@@ -42,7 +46,7 @@ export function DeleteDialog({
disabled={isLoading} disabled={isLoading}
className="bg-destructive text-destructive-foreground hover:bg-destructive/90" className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
> >
{isLoading ? "Deleting..." : "Delete"} {isLoading ? (confirmLoadingLabel ?? `${confirmLabel}...`) : confirmLabel}
</AlertDialogAction> </AlertDialogAction>
</AlertDialogFooter> </AlertDialogFooter>
</AlertDialogContent> </AlertDialogContent>

View File

@@ -18,6 +18,10 @@ import {
const log = childLogger("fetch-listener"); const log = childLogger("fetch-listener");
let pgClient: pg.PoolClient | null = null; let pgClient: pg.PoolClient | null = null;
let stopped = false;
/** Delay (ms) before attempting to reconnect after a connection loss. */
const RECONNECT_DELAY_MS = 5_000;
/** /**
* Start listening for pg_notify signals from the web app. * Start listening for pg_notify signals from the web app.
@@ -27,30 +31,75 @@ let pgClient: pg.PoolClient | null = null;
* - `generate_invite` — payload = channelId → generate invite link for destination * - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle * - `ingestion_trigger` — trigger an immediate ingestion cycle
*
* If the underlying connection is lost, the listener automatically reconnects
* so that pg_notify signals are never silently dropped.
*/ */
export async function startFetchListener(): Promise<void> { export async function startFetchListener(): Promise<void> {
pgClient = await pool.connect(); stopped = false;
await pgClient.query("LISTEN channel_fetch"); await connectListener();
await pgClient.query("LISTEN generate_invite"); }
await pgClient.query("LISTEN create_destination");
await pgClient.query("LISTEN ingestion_trigger");
pgClient.on("notification", (msg) => { async function connectListener(): Promise<void> {
if (msg.channel === "channel_fetch" && msg.payload) { try {
handleChannelFetch(msg.payload); pgClient = await pool.connect();
} else if (msg.channel === "generate_invite" && msg.payload) { await pgClient.query("LISTEN channel_fetch");
handleGenerateInvite(msg.payload); await pgClient.query("LISTEN generate_invite");
} else if (msg.channel === "create_destination" && msg.payload) { await pgClient.query("LISTEN create_destination");
handleCreateDestination(msg.payload); await pgClient.query("LISTEN ingestion_trigger");
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger(); pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) {
handleChannelFetch(msg.payload);
} else if (msg.channel === "generate_invite" && msg.payload) {
handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload);
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger();
}
});
// Reconnect automatically when the connection ends unexpectedly
pgClient.on("end", () => {
if (!stopped) {
log.warn("Fetch listener connection lost — reconnecting");
pgClient = null;
scheduleReconnect();
}
});
pgClient.on("error", (err) => {
log.error({ err }, "Fetch listener connection error");
if (!stopped && pgClient) {
try {
pgClient.release(true);
} catch (releaseErr) {
log.debug({ err: releaseErr }, "Failed to release pg client after error");
}
pgClient = null;
scheduleReconnect();
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
} catch (err) {
log.error({ err }, "Failed to start fetch listener — retrying");
scheduleReconnect();
}
}
function scheduleReconnect(): void {
if (stopped) return;
setTimeout(() => {
if (!stopped) {
connectListener();
} }
}); }, RECONNECT_DELAY_MS);
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
} }
export function stopFetchListener(): void { export function stopFetchListener(): void {
stopped = true;
if (pgClient) { if (pgClient) {
pgClient.release(); pgClient.release();
pgClient = null; pgClient = null;

View File

@@ -10,6 +10,13 @@ let running = false;
let timer: ReturnType<typeof setTimeout> | null = null; let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0; let cycleCount = 0;
/**
* Maximum time for a single ingestion cycle (ms).
* After this, new accounts won't be started (in-progress work finishes).
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
*/
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
/** /**
* Run one ingestion cycle: * Run one ingestion cycle:
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
@@ -17,6 +24,10 @@ let cycleCount = 0;
* *
* All TDLib operations are wrapped in the mutex to ensure only one client * All TDLib operations are wrapped in the mutex to ensure only one client
* runs at a time (also shared with the fetch listener for on-demand requests). * runs at a time (also shared with the fetch listener for on-demand requests).
*
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress
* account processing is allowed to finish its current archive set.
*/ */
async function runCycle(): Promise<void> { async function runCycle(): Promise<void> {
if (running) { if (running) {
@@ -26,7 +37,8 @@ async function runCycle(): Promise<void> {
running = true; running = true;
cycleCount++; cycleCount++;
log.info({ cycle: cycleCount }, "Starting ingestion cycle"); const cycleStart = Date.now();
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
try { try {
// ── Phase 1: Authenticate pending accounts ── // ── Phase 1: Authenticate pending accounts ──
@@ -37,6 +49,10 @@ async function runCycle(): Promise<void> {
"Found pending accounts, starting authentication" "Found pending accounts, starting authentication"
); );
for (const account of pendingAccounts) { for (const account of pendingAccounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn("Cycle timeout reached during authentication phase, stopping");
break;
}
await withTdlibMutex(`auth:${account.phone}`, () => await withTdlibMutex(`auth:${account.phone}`, () =>
authenticateAccount(account) authenticateAccount(account)
); );
@@ -54,12 +70,22 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) { for (const account of accounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn(
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
"Cycle timeout reached, skipping remaining accounts"
);
break;
}
await withTdlibMutex(`ingest:${account.phone}`, () => await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account) runWorkerForAccount(account)
); );
} }
log.info("Ingestion cycle complete"); log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
"Ingestion cycle complete"
);
} catch (err) { } catch (err) {
log.error({ err }, "Ingestion cycle failed"); log.error({ err }, "Ingestion cycle failed");
} finally { } finally {

View File

@@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js";
const log = childLogger("download"); const log = childLogger("download");
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
export const MAX_SCAN_PAGES = 5000;
/** Timeout for a single TDLib API call (ms) */
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
interface TdPhotoSize { interface TdPhotoSize {
type: string; type: string;
photo: { photo: {
@@ -71,6 +77,44 @@ export interface ChannelScanResult {
export type ScanProgressCallback = (messagesScanned: number) => void; export type ScanProgressCallback = (messagesScanned: number) => void;
/**
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
* If TDLib does not respond within the timeout, the promise rejects.
*/
export async function invokeWithTimeout<T>(
client: Client,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: Record<string, any>,
timeoutMs = INVOKE_TIMEOUT_MS
): Promise<T> {
return new Promise<T>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
}
}, timeoutMs);
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
/** /**
* Fetch messages from a channel, stopping once we've scanned past the * Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety). * last-processed boundary (with one page of lookback for multipart safety).
@@ -80,6 +124,11 @@ export type ScanProgressCallback = (messagesScanned: number) => void;
* When `lastProcessedMessageId` is null (first run), scans everything. * When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets, * The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net. * and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/ */
export async function getChannelMessages( export async function getChannelMessages(
client: Client, client: Client,
@@ -94,17 +143,29 @@ export async function getChannelMessages(
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0; let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
const result = (await client.invoke({ if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
_: "getChatHistory", _: "getChatHistory",
chat_id: Number(chatId), chat_id: Number(chatId),
from_message_id: currentFromId, from_message_id: currentFromId,
offset: 0, offset: 0,
limit: Math.min(limit, 100), limit: Math.min(limit, 100),
only_local: false, only_local: false,
})) as { messages: TdMessage[] }; });
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
@@ -144,17 +205,26 @@ export async function getChannelMessages(
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break; if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break; if (result.messages.length < Math.min(limit, 100)) break;
// Rate limit delay // Rate limit delay
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
log.info( log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned }, { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete" "Channel scan complete"
); );

View File

@@ -5,6 +5,7 @@ import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js"; import type { TelegramPhoto } from "../preview/match.js";
import type { ChannelScanResult, ScanProgressCallback } from "./download.js"; import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
const log = childLogger("topics"); const log = childLogger("topics");
@@ -21,16 +22,16 @@ export async function isChatForum(
chatId: bigint chatId: bigint
): Promise<boolean> { ): Promise<boolean> {
try { try {
const chat = (await client.invoke({ const chat = await invokeWithTimeout<{
_: "getChat",
chat_id: Number(chatId),
})) as {
type?: { type?: {
_: string; _: string;
supergroup_id?: number; supergroup_id?: number;
is_forum?: boolean; is_forum?: boolean;
}; };
}; }>(client, {
_: "getChat",
chat_id: Number(chatId),
});
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) { if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
return true; return true;
@@ -38,10 +39,10 @@ export async function isChatForum(
// Also check via getSupergroup for older TDLib versions // Also check via getSupergroup for older TDLib versions
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) { if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
const sg = (await client.invoke({ const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
_: "getSupergroup", _: "getSupergroup",
supergroup_id: chat.type.supergroup_id, supergroup_id: chat.type.supergroup_id,
})) as { is_forum?: boolean }; });
return sg.is_forum === true; return sg.is_forum === true;
} }
@@ -54,6 +55,7 @@ export async function isChatForum(
/** /**
* Get all forum topics in a supergroup. * Get all forum topics in a supergroup.
* Includes stuck detection and timeout protection on API calls.
*/ */
export async function getForumTopicList( export async function getForumTopicList(
client: Client, client: Client,
@@ -63,18 +65,24 @@ export async function getForumTopicList(
let offsetDate = 0; let offsetDate = 0;
let offsetMessageId = 0; let offsetMessageId = 0;
let offsetMessageThreadId = 0; let offsetMessageThreadId = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
const result = (await client.invoke({ if (pageCount >= MAX_SCAN_PAGES) {
_: "getForumTopics", log.warn(
chat_id: Number(chatId), { chatId: chatId.toString(), pageCount, topicCount: topics.length },
query: "", "Hit max page limit for topic enumeration, stopping"
offset_date: offsetDate, );
offset_message_id: offsetMessageId, break;
offset_message_thread_id: offsetMessageThreadId, }
limit: 100, pageCount++;
})) as {
const prevOffsetDate = offsetDate;
const prevOffsetMessageId = offsetMessageId;
const prevOffsetMessageThreadId = offsetMessageThreadId;
const result = await invokeWithTimeout<{
topics?: { topics?: {
info?: { info?: {
message_thread_id?: number; message_thread_id?: number;
@@ -85,7 +93,15 @@ export async function getForumTopicList(
next_offset_date?: number; next_offset_date?: number;
next_offset_message_id?: number; next_offset_message_id?: number;
next_offset_message_thread_id?: number; next_offset_message_thread_id?: number;
}; }>(client, {
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
});
if (!result.topics || result.topics.length === 0) break; if (!result.topics || result.topics.length === 0) break;
@@ -113,6 +129,19 @@ export async function getForumTopicList(
offsetMessageId = result.next_offset_message_id ?? 0; offsetMessageId = result.next_offset_message_id ?? 0;
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0; offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
// Stuck detection: if offsets didn't advance, break
if (
offsetDate === prevOffsetDate &&
offsetMessageId === prevOffsetMessageId &&
offsetMessageThreadId === prevOffsetMessageThreadId
) {
log.warn(
{ chatId: chatId.toString(), topicCount: topics.length },
"Topic pagination stuck (offsets not advancing), breaking"
);
break;
}
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
@@ -134,6 +163,11 @@ export async function getForumTopicList(
* When `lastProcessedMessageId` is null (first run), scans everything. * When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets, * The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net. * and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/ */
export async function getTopicMessages( export async function getTopicMessages(
client: Client, client: Client,
@@ -149,22 +183,23 @@ export async function getTopicMessages(
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0; let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({ const result = await invokeWithTimeout<{
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
})) as {
messages?: { messages?: {
id: number; id: number;
date: number; date: number;
@@ -188,7 +223,18 @@ export async function getTopicMessages(
caption?: { text?: string }; caption?: { text?: string };
}; };
}[]; }[];
}; }>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
@@ -228,16 +274,25 @@ export async function getTopicMessages(
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break; if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break; if (result.messages.length < Math.min(limit, 100)) break;
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
log.info( log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned }, { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete" "Topic scan complete"
); );

View File

@@ -4,12 +4,21 @@ const log = childLogger("mutex");
let locked = false; let locked = false;
let holder = ""; let holder = "";
const queue: Array<{ resolve: () => void; label: string }> = []; const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
/** /**
* Ensures only one TDLib client runs at a time across the entire worker process. * Ensures only one TDLib client runs at a time across the entire worker process.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this * Both the scheduler (auth, ingestion) and the fetch listener acquire this
* before creating any TDLib client. * before creating any TDLib client.
*
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
*/ */
export async function withTdlibMutex<T>( export async function withTdlibMutex<T>(
label: string, label: string,
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
): Promise<T> { ): Promise<T> {
if (locked) { if (locked) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
await new Promise<void>((resolve) => queue.push({ resolve, label })); await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = queue.indexOf(entry);
if (idx !== -1) {
queue.splice(idx, 1);
reject(new Error(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
`(waiting: ${label}, holder: ${holder})`
));
}
}, MUTEX_WAIT_TIMEOUT_MS);
const entry = {
resolve: () => {
clearTimeout(timer);
resolve();
},
reject,
label,
};
queue.push(entry);
});
} }
locked = true; locked = true;

View File

@@ -351,6 +351,10 @@ export async function runWorkerForAccount(
const totalChannels = channelMappings.length; const totalChannels = channelMappings.length;
if (totalChannels === 0) {
accountLog.info("No active source channels linked to this account — nothing to ingest");
}
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx]; const mapping = channelMappings[chIdx];
const channel = mapping.channel; const channel = mapping.channel;
@@ -451,8 +455,8 @@ export async function runWorkerForAccount(
counters.messagesScanned += scanResult.totalScanned; counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug( accountLog.info(
{ channelId: channel.id, topic: topic.name }, { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic" "No new archives in topic"
); );
continue; continue;
@@ -525,7 +529,7 @@ export async function runWorkerForAccount(
counters.messagesScanned += scanResult.totalScanned; counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives"); accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
continue; continue;
} }