diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index 2de1f54..4162cf3 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -1,6 +1,6 @@ import { config } from "./util/config.js"; import { childLogger } from "./util/logger.js"; -import { withTdlibMutex } from "./util/mutex.js"; +import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js"; import { getActiveAccounts, getPendingAccounts } from "./db/queries.js"; import { runWorkerForAccount, authenticateAccount } from "./worker.js"; import { runIntegrityAudit } from "./audit.js"; @@ -90,10 +90,18 @@ async function runCycle(): Promise { for (let i = 0; i < results.length; i++) { if (results[i].status === "rejected") { + const reason = (results[i] as PromiseRejectedResult).reason; log.error( - { phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason }, + { phone: accounts[i].phone, err: reason }, "Account ingestion failed" ); + // If the cycle timed out, force-release the mutex so the next cycle + // (or other operations like fetch-channels) can proceed immediately + // instead of waiting 30 minutes for the mutex timeout. + const errMsg = reason instanceof Error ? reason.message : String(reason); + if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) { + forceReleaseMutex(accounts[i].phone); + } } } diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index fd8c7fd..d22a0a0 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js"; const log = childLogger("upload"); +/** + * Custom error class to distinguish upload stalls from other errors. + * When consecutive stalls occur, the caller can use this signal to + * recreate the TDLib client (whose event stream may have degraded). + */ +export class UploadStallError extends Error { + constructor(message: string) { + super(message); + this.name = "UploadStallError"; + } +} + export interface UploadResult { messageId: bigint; messageIds: bigint[]; @@ -109,13 +121,21 @@ async function sendWithRetry( // Stall or timeout — retry with a cooldown const errMsg = err instanceof Error ? err.message : ""; - if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) { - log.warn( - { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, - "Upload stalled/timed out — retrying" + if (errMsg.includes("stalled") || errMsg.includes("timed out")) { + if (!isLastAttempt) { + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, + "Upload stalled/timed out — retrying" + ); + await sleep(10_000); + continue; + } + // All stall retries exhausted — throw UploadStallError so the caller + // knows the TDLib client's event stream is likely degraded and can + // recreate the client before continuing. + throw new UploadStallError( + `Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}` ); - await sleep(10_000); - continue; } throw err; @@ -166,8 +186,10 @@ async function sendAndWaitForUpload( } }, timeoutMs); - // Stall detection: no progress for 5 minutes after upload started → reject - const STALL_TIMEOUT_MS = 5 * 60_000; + // Stall detection: no progress for 3 minutes after upload started → reject + // (reduced from 5min — once data is fully sent, confirmation should arrive quickly; + // a 3min silence strongly indicates a degraded TDLib event stream) + const STALL_TIMEOUT_MS = 3 * 60_000; const stallChecker = setInterval(() => { if (settled || !uploadStarted) return; const stallMs = Date.now() - lastProgressTime; diff --git a/worker/src/util/mutex.ts b/worker/src/util/mutex.ts index 34e1576..92a494a 100644 --- a/worker/src/util/mutex.ts +++ b/worker/src/util/mutex.ts @@ -11,6 +11,29 @@ const queues = new Map< Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> >(); +/** + * Force-release a stuck mutex. + * This should only be called when the holder is known to be stuck (e.g. after + * a cycle timeout). It releases the lock and lets the next queued waiter proceed. + */ +export function forceReleaseMutex(key: string): void { + if (!locks.has(key)) return; + + const holder = holders.get(key); + log.warn({ key, holder }, "Force-releasing stuck TDLib mutex"); + + locks.delete(key); + holders.delete(key); + const next = queues.get(key)?.shift(); + if (next) { + log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter"); + next.resolve(); + } else { + queues.delete(key); + log.info({ key }, "TDLib mutex force-released (no waiters)"); + } +} + /** * Ensures only one TDLib operation runs at a time FOR THE SAME KEY. * Different keys run concurrently — this allows two accounts to ingest in parallel diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 3e87341..940d58d 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js"; import { readRarContents } from "./archive/rar-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; -import { uploadToChannel } from "./upload/channel.js"; +import { uploadToChannel, UploadStallError } from "./upload/channel.js"; import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js"; import { db } from "./db/client.js"; import type { TelegramAccount, TelegramChannel } from "@prisma/client"; @@ -286,6 +286,7 @@ interface PipelineContext { client: Client; runId: string; accountId: string; + accountPhone: string; channelTitle: string; channel: TelegramChannel; destChannelTelegramId: bigint; @@ -303,6 +304,8 @@ interface PipelineContext { sourceTopicId: bigint | null; accountLog: ReturnType; maxUploadSize: bigint; + /** How many consecutive upload stalls have occurred (resets on success). */ + consecutiveStalls: number; } /** @@ -338,7 +341,8 @@ export async function runWorkerForAccount( currentStep: "connecting", }); - const { client, isPremium } = await createTdlibClient({ + // Use let so the client can be replaced on TDLib recreation after stalls + let { client, isPremium } = await createTdlibClient({ id: account.id, phone: account.phone, }); @@ -448,6 +452,7 @@ export async function runWorkerForAccount( client, runId: activeRunId, accountId: account.id, + accountPhone: account.phone, channelTitle: channel.title, channel, destChannelTelegramId: destChannel.telegramId, @@ -458,6 +463,7 @@ export async function runWorkerForAccount( sourceTopicId: null, accountLog, maxUploadSize, + consecutiveStalls: 0, }; if (forum) { @@ -546,6 +552,8 @@ export async function runWorkerForAccount( pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`; const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId); + // Sync client back in case it was recreated during upload stall recovery + client = pipelineCtx.client; // Only advance progress to the highest successfully processed message if (maxProcessedId) { @@ -617,6 +625,8 @@ export async function runWorkerForAccount( pipelineCtx.channelTitle = channel.title; const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId); + // Sync client back in case it was recreated during upload stall recovery + client = pipelineCtx.client; // Only advance progress to the highest successfully processed message if (maxProcessedId) { @@ -760,12 +770,68 @@ async function processArchiveSets( if (setMaxId > (maxProcessedId ?? 0n)) { maxProcessedId = setMaxId; } + + // Reset stall counter on any successful upload + ctx.consecutiveStalls = 0; } catch (setErr) { // If a set fails, do NOT advance the watermark past it accountLog.warn( { err: setErr, baseName: archiveSets[setIdx].baseName }, "Archive set failed, watermark will not advance past this set" ); + + // ── TDLib client recreation on repeated upload stalls ── + // When the TDLib event stream degrades, uploads complete (bytes sent) + // but confirmations never arrive. Retrying with the same broken client + // is futile. Recreate the client to get a fresh connection. + if (setErr instanceof UploadStallError) { + ctx.consecutiveStalls++; + accountLog.warn( + { consecutiveStalls: ctx.consecutiveStalls }, + "Upload stall detected — TDLib event stream may be degraded" + ); + + // After 1 stalled set (= 3 failed retry attempts already), recreate the client + if (ctx.consecutiveStalls >= 1) { + accountLog.info("Recreating TDLib client after consecutive upload stalls"); + try { + await closeTdlibClient(ctx.client); + } catch (closeErr) { + accountLog.warn({ err: closeErr }, "Error closing stale TDLib client"); + } + + try { + const { client: newClient } = await createTdlibClient({ + id: ctx.accountId, + phone: ctx.accountPhone, + }); + ctx.client = newClient; + + // Reload chats so the new client can access channels + try { + for (let page = 0; page < 500; page++) { + await newClient.invoke({ + _: "loadChats", + chat_list: { _: "chatListMain" }, + limit: 100, + }); + } + } catch { + // 404 = all loaded (expected) + } + + ctx.consecutiveStalls = 0; + accountLog.info("TDLib client recreated successfully — continuing ingestion"); + } catch (recreateErr) { + accountLog.error( + { err: recreateErr }, + "Failed to recreate TDLib client — aborting remaining uploads" + ); + break; + } + } + } + // Record the failure for visibility in the UI try { const archiveSet = archiveSets[setIdx];