diff --git a/.gitignore b/.gitignore index 0f0f125..7718c40 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ worker/node_modules # production /build +worker/dist # misc .DS_Store diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index edb02c1..6a5f6f7 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -10,6 +10,13 @@ let running = false; let timer: ReturnType | null = null; let cycleCount = 0; +/** + * Maximum time for a single ingestion cycle (ms). + * After this, new accounts won't be started (in-progress work finishes). + * Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES. + */ +const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000; + /** * Run one ingestion cycle: * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) @@ -17,6 +24,10 @@ let cycleCount = 0; * * All TDLib operations are wrapped in the mutex to ensure only one client * runs at a time (also shared with the fetch listener for on-demand requests). + * + * The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h). + * Once the timeout elapses, no new accounts will be started but any in-progress + * account processing is allowed to finish its current archive set. */ async function runCycle(): Promise { if (running) { @@ -26,7 +37,8 @@ async function runCycle(): Promise { running = true; cycleCount++; - log.info({ cycle: cycleCount }, "Starting ingestion cycle"); + const cycleStart = Date.now(); + log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle"); try { // ── Phase 1: Authenticate pending accounts ── @@ -37,6 +49,10 @@ async function runCycle(): Promise { "Found pending accounts, starting authentication" ); for (const account of pendingAccounts) { + if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { + log.warn("Cycle timeout reached during authentication phase, stopping"); + break; + } await withTdlibMutex(`auth:${account.phone}`, () => authenticateAccount(account) ); @@ -54,12 +70,22 @@ async function runCycle(): Promise { log.info({ accountCount: accounts.length }, "Processing accounts"); for (const account of accounts) { + if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { + log.warn( + { elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, + "Cycle timeout reached, skipping remaining accounts" + ); + break; + } await withTdlibMutex(`ingest:${account.phone}`, () => runWorkerForAccount(account) ); } - log.info("Ingestion cycle complete"); + log.info( + { elapsed: Math.round((Date.now() - cycleStart) / 1000) }, + "Ingestion cycle complete" + ); } catch (err) { log.error({ err }, "Ingestion cycle failed"); } finally { diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 0597ae3..600c7d7 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js"; const log = childLogger("download"); +/** Maximum number of pages to scan per channel/topic to prevent infinite loops */ +export const MAX_SCAN_PAGES = 5000; + +/** Timeout for a single TDLib API call (ms) */ +export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes + interface TdPhotoSize { type: string; photo: { @@ -71,6 +77,44 @@ export interface ChannelScanResult { export type ScanProgressCallback = (messagesScanned: number) => void; +/** + * Invoke a TDLib method with a timeout to prevent indefinite hangs. + * If TDLib does not respond within the timeout, the promise rejects. + */ +export async function invokeWithTimeout( + client: Client, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + request: Record, + timeoutMs = INVOKE_TIMEOUT_MS +): Promise { + return new Promise((resolve, reject) => { + let settled = false; + + const timer = setTimeout(() => { + if (!settled) { + settled = true; + reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`)); + } + }, timeoutMs); + + (client.invoke(request) as Promise) + .then((result) => { + if (!settled) { + settled = true; + clearTimeout(timer); + resolve(result); + } + }) + .catch((err) => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(err); + } + }); + }); +} + /** * Fetch messages from a channel, stopping once we've scanned past the * last-processed boundary (with one page of lookback for multipart safety). @@ -80,6 +124,11 @@ export type ScanProgressCallback = (messagesScanned: number) => void; * When `lastProcessedMessageId` is null (first run), scans everything. * The worker applies a post-grouping filter to skip fully-processed sets, * and keeps `packageExistsBySourceMessage` as a safety net. + * + * Safety features: + * - Max page limit to prevent infinite loops + * - Stuck detection: breaks if from_message_id stops advancing + * - Timeout on each TDLib API call */ export async function getChannelMessages( client: Client, @@ -94,17 +143,29 @@ export async function getChannelMessages( let currentFromId = 0; let totalScanned = 0; + let pageCount = 0; // eslint-disable-next-line no-constant-condition while (true) { - const result = (await client.invoke({ + if (pageCount >= MAX_SCAN_PAGES) { + log.warn( + { chatId: chatId.toString(), pageCount, totalScanned }, + "Hit max page limit for channel scan, stopping" + ); + break; + } + pageCount++; + + const previousFromId = currentFromId; + + const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, { _: "getChatHistory", chat_id: Number(chatId), from_message_id: currentFromId, offset: 0, limit: Math.min(limit, 100), only_local: false, - })) as { messages: TdMessage[] }; + }); if (!result.messages || result.messages.length === 0) break; @@ -144,17 +205,26 @@ export async function getChannelMessages( currentFromId = result.messages[result.messages.length - 1].id; + // Stuck detection: if from_message_id didn't advance, break to prevent infinite loop + if (currentFromId === previousFromId) { + log.warn( + { chatId: chatId.toString(), currentFromId, totalScanned }, + "Pagination stuck (from_message_id not advancing), breaking" + ); + break; + } + // Stop scanning once we've gone past the boundary (this page is the lookback) if (boundary && currentFromId < boundary) break; - if (result.messages.length < 100) break; + if (result.messages.length < Math.min(limit, 100)) break; // Rate limit delay await sleep(config.apiDelayMs); } log.info( - { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned }, + { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount }, "Channel scan complete" ); diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 107bf37..2f45a6b 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -5,6 +5,7 @@ import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; import type { ChannelScanResult, ScanProgressCallback } from "./download.js"; +import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js"; const log = childLogger("topics"); @@ -21,16 +22,16 @@ export async function isChatForum( chatId: bigint ): Promise { try { - const chat = (await client.invoke({ - _: "getChat", - chat_id: Number(chatId), - })) as { + const chat = await invokeWithTimeout<{ type?: { _: string; supergroup_id?: number; is_forum?: boolean; }; - }; + }>(client, { + _: "getChat", + chat_id: Number(chatId), + }); if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) { return true; @@ -38,10 +39,10 @@ export async function isChatForum( // Also check via getSupergroup for older TDLib versions if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) { - const sg = (await client.invoke({ + const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, { _: "getSupergroup", supergroup_id: chat.type.supergroup_id, - })) as { is_forum?: boolean }; + }); return sg.is_forum === true; } @@ -54,6 +55,7 @@ export async function isChatForum( /** * Get all forum topics in a supergroup. + * Includes stuck detection and timeout protection on API calls. */ export async function getForumTopicList( client: Client, @@ -63,18 +65,24 @@ export async function getForumTopicList( let offsetDate = 0; let offsetMessageId = 0; let offsetMessageThreadId = 0; + let pageCount = 0; // eslint-disable-next-line no-constant-condition while (true) { - const result = (await client.invoke({ - _: "getForumTopics", - chat_id: Number(chatId), - query: "", - offset_date: offsetDate, - offset_message_id: offsetMessageId, - offset_message_thread_id: offsetMessageThreadId, - limit: 100, - })) as { + if (pageCount >= MAX_SCAN_PAGES) { + log.warn( + { chatId: chatId.toString(), pageCount, topicCount: topics.length }, + "Hit max page limit for topic enumeration, stopping" + ); + break; + } + pageCount++; + + const prevOffsetDate = offsetDate; + const prevOffsetMessageId = offsetMessageId; + const prevOffsetMessageThreadId = offsetMessageThreadId; + + const result = await invokeWithTimeout<{ topics?: { info?: { message_thread_id?: number; @@ -85,7 +93,15 @@ export async function getForumTopicList( next_offset_date?: number; next_offset_message_id?: number; next_offset_message_thread_id?: number; - }; + }>(client, { + _: "getForumTopics", + chat_id: Number(chatId), + query: "", + offset_date: offsetDate, + offset_message_id: offsetMessageId, + offset_message_thread_id: offsetMessageThreadId, + limit: 100, + }); if (!result.topics || result.topics.length === 0) break; @@ -113,6 +129,19 @@ export async function getForumTopicList( offsetMessageId = result.next_offset_message_id ?? 0; offsetMessageThreadId = result.next_offset_message_thread_id ?? 0; + // Stuck detection: if offsets didn't advance, break + if ( + offsetDate === prevOffsetDate && + offsetMessageId === prevOffsetMessageId && + offsetMessageThreadId === prevOffsetMessageThreadId + ) { + log.warn( + { chatId: chatId.toString(), topicCount: topics.length }, + "Topic pagination stuck (offsets not advancing), breaking" + ); + break; + } + await sleep(config.apiDelayMs); } @@ -134,6 +163,11 @@ export async function getForumTopicList( * When `lastProcessedMessageId` is null (first run), scans everything. * The worker applies a post-grouping filter to skip fully-processed sets, * and keeps `packageExistsBySourceMessage` as a safety net. + * + * Safety features: + * - Max page limit to prevent infinite loops + * - Stuck detection: breaks if from_message_id stops advancing + * - Timeout on each TDLib API call */ export async function getTopicMessages( client: Client, @@ -149,22 +183,23 @@ export async function getTopicMessages( let currentFromId = 0; let totalScanned = 0; + let pageCount = 0; // eslint-disable-next-line no-constant-condition while (true) { + if (pageCount >= MAX_SCAN_PAGES) { + log.warn( + { chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned }, + "Hit max page limit for topic scan, stopping" + ); + break; + } + pageCount++; + + const previousFromId = currentFromId; + // eslint-disable-next-line @typescript-eslint/no-explicit-any - const result = (await client.invoke({ - _: "searchChatMessages", - chat_id: Number(chatId), - query: "", - message_thread_id: Number(topicId), - from_message_id: currentFromId, - offset: 0, - limit: Math.min(limit, 100), - filter: null, - sender_id: null, - saved_messages_topic_id: 0, - })) as { + const result = await invokeWithTimeout<{ messages?: { id: number; date: number; @@ -188,7 +223,18 @@ export async function getTopicMessages( caption?: { text?: string }; }; }[]; - }; + }>(client, { + _: "searchChatMessages", + chat_id: Number(chatId), + query: "", + message_thread_id: Number(topicId), + from_message_id: currentFromId, + offset: 0, + limit: Math.min(limit, 100), + filter: null, + sender_id: null, + saved_messages_topic_id: 0, + }); if (!result.messages || result.messages.length === 0) break; @@ -228,16 +274,25 @@ export async function getTopicMessages( currentFromId = result.messages[result.messages.length - 1].id; + // Stuck detection: if from_message_id didn't advance, break to prevent infinite loop + if (currentFromId === previousFromId) { + log.warn( + { chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned }, + "Topic pagination stuck (from_message_id not advancing), breaking" + ); + break; + } + // Stop scanning once we've gone past the boundary (this page is the lookback) if (boundary && currentFromId < boundary) break; - if (result.messages.length < 100) break; + if (result.messages.length < Math.min(limit, 100)) break; await sleep(config.apiDelayMs); } log.info( - { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned }, + { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount }, "Topic scan complete" ); diff --git a/worker/src/util/mutex.ts b/worker/src/util/mutex.ts index f35f193..e559318 100644 --- a/worker/src/util/mutex.ts +++ b/worker/src/util/mutex.ts @@ -4,12 +4,21 @@ const log = childLogger("mutex"); let locked = false; let holder = ""; -const queue: Array<{ resolve: () => void; label: string }> = []; +const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = []; + +/** + * Maximum time to wait for the TDLib mutex (ms). + * If the mutex is not available within this time, the operation is rejected. + * Default: 30 minutes (long enough for large downloads, short enough to detect hangs). + */ +const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000; /** * Ensures only one TDLib client runs at a time across the entire worker process. * Both the scheduler (auth, ingestion) and the fetch listener acquire this * before creating any TDLib client. + * + * Includes a wait timeout to prevent indefinite blocking if the current holder hangs. */ export async function withTdlibMutex( label: string, @@ -17,7 +26,28 @@ export async function withTdlibMutex( ): Promise { if (locked) { log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); - await new Promise((resolve) => queue.push({ resolve, label })); + await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + const idx = queue.indexOf(entry); + if (idx !== -1) { + queue.splice(idx, 1); + reject(new Error( + `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + + `(waiting: ${label}, holder: ${holder})` + )); + } + }, MUTEX_WAIT_TIMEOUT_MS); + + const entry = { + resolve: () => { + clearTimeout(timer); + resolve(); + }, + reject, + label, + }; + queue.push(entry); + }); } locked = true;