From d6c82ede1e567376cb2adcfc21a131f208c31d59 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Mon, 4 May 2026 18:02:42 +0200 Subject: [PATCH] fix: auto-recover from TDLib upload stalls by recreating client When TDLib's event stream degrades, uploads complete (bytes sent) but confirmations never arrive. Previously the worker retried 3x with the same broken client, wasting 60+ min per archive and holding the mutex. - Add UploadStallError class to distinguish stalls from other failures - Reduce stall detection timeout from 5min to 3min (faster detection) - Recreate TDLib client after consecutive upload stalls instead of retrying on the same degraded connection - Add forceReleaseMutex() to prevent cascade failures when one account blocks others via stuck mutex after cycle timeout Co-Authored-By: Claude Opus 4.6 (1M context) --- worker/src/scheduler.ts | 12 +++++-- worker/src/upload/channel.ts | 38 +++++++++++++++----- worker/src/util/mutex.ts | 23 ++++++++++++ worker/src/worker.ts | 70 ++++++++++++++++++++++++++++++++++-- 4 files changed, 131 insertions(+), 12 deletions(-) diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index 2de1f54..4162cf3 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -1,6 +1,6 @@ import { config } from "./util/config.js"; import { childLogger } from "./util/logger.js"; -import { withTdlibMutex } from "./util/mutex.js"; +import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js"; import { getActiveAccounts, getPendingAccounts } from "./db/queries.js"; import { runWorkerForAccount, authenticateAccount } from "./worker.js"; import { runIntegrityAudit } from "./audit.js"; @@ -90,10 +90,18 @@ async function runCycle(): Promise { for (let i = 0; i < results.length; i++) { if (results[i].status === "rejected") { + const reason = (results[i] as PromiseRejectedResult).reason; log.error( - { phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason }, + { phone: accounts[i].phone, err: reason }, "Account ingestion failed" ); + // If the cycle timed out, force-release the mutex so the next cycle + // (or other operations like fetch-channels) can proceed immediately + // instead of waiting 30 minutes for the mutex timeout. + const errMsg = reason instanceof Error ? reason.message : String(reason); + if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) { + forceReleaseMutex(accounts[i].phone); + } } } diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index fd8c7fd..d22a0a0 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js"; const log = childLogger("upload"); +/** + * Custom error class to distinguish upload stalls from other errors. + * When consecutive stalls occur, the caller can use this signal to + * recreate the TDLib client (whose event stream may have degraded). + */ +export class UploadStallError extends Error { + constructor(message: string) { + super(message); + this.name = "UploadStallError"; + } +} + export interface UploadResult { messageId: bigint; messageIds: bigint[]; @@ -109,13 +121,21 @@ async function sendWithRetry( // Stall or timeout — retry with a cooldown const errMsg = err instanceof Error ? err.message : ""; - if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) { - log.warn( - { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, - "Upload stalled/timed out — retrying" + if (errMsg.includes("stalled") || errMsg.includes("timed out")) { + if (!isLastAttempt) { + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, + "Upload stalled/timed out — retrying" + ); + await sleep(10_000); + continue; + } + // All stall retries exhausted — throw UploadStallError so the caller + // knows the TDLib client's event stream is likely degraded and can + // recreate the client before continuing. + throw new UploadStallError( + `Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}` ); - await sleep(10_000); - continue; } throw err; @@ -166,8 +186,10 @@ async function sendAndWaitForUpload( } }, timeoutMs); - // Stall detection: no progress for 5 minutes after upload started → reject - const STALL_TIMEOUT_MS = 5 * 60_000; + // Stall detection: no progress for 3 minutes after upload started → reject + // (reduced from 5min — once data is fully sent, confirmation should arrive quickly; + // a 3min silence strongly indicates a degraded TDLib event stream) + const STALL_TIMEOUT_MS = 3 * 60_000; const stallChecker = setInterval(() => { if (settled || !uploadStarted) return; const stallMs = Date.now() - lastProgressTime; diff --git a/worker/src/util/mutex.ts b/worker/src/util/mutex.ts index 34e1576..92a494a 100644 --- a/worker/src/util/mutex.ts +++ b/worker/src/util/mutex.ts @@ -11,6 +11,29 @@ const queues = new Map< Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> >(); +/** + * Force-release a stuck mutex. + * This should only be called when the holder is known to be stuck (e.g. after + * a cycle timeout). It releases the lock and lets the next queued waiter proceed. + */ +export function forceReleaseMutex(key: string): void { + if (!locks.has(key)) return; + + const holder = holders.get(key); + log.warn({ key, holder }, "Force-releasing stuck TDLib mutex"); + + locks.delete(key); + holders.delete(key); + const next = queues.get(key)?.shift(); + if (next) { + log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter"); + next.resolve(); + } else { + queues.delete(key); + log.info({ key }, "TDLib mutex force-released (no waiters)"); + } +} + /** * Ensures only one TDLib operation runs at a time FOR THE SAME KEY. * Different keys run concurrently — this allows two accounts to ingest in parallel diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 3e87341..940d58d 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js"; import { readRarContents } from "./archive/rar-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; -import { uploadToChannel } from "./upload/channel.js"; +import { uploadToChannel, UploadStallError } from "./upload/channel.js"; import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js"; import { db } from "./db/client.js"; import type { TelegramAccount, TelegramChannel } from "@prisma/client"; @@ -286,6 +286,7 @@ interface PipelineContext { client: Client; runId: string; accountId: string; + accountPhone: string; channelTitle: string; channel: TelegramChannel; destChannelTelegramId: bigint; @@ -303,6 +304,8 @@ interface PipelineContext { sourceTopicId: bigint | null; accountLog: ReturnType; maxUploadSize: bigint; + /** How many consecutive upload stalls have occurred (resets on success). */ + consecutiveStalls: number; } /** @@ -338,7 +341,8 @@ export async function runWorkerForAccount( currentStep: "connecting", }); - const { client, isPremium } = await createTdlibClient({ + // Use let so the client can be replaced on TDLib recreation after stalls + let { client, isPremium } = await createTdlibClient({ id: account.id, phone: account.phone, }); @@ -448,6 +452,7 @@ export async function runWorkerForAccount( client, runId: activeRunId, accountId: account.id, + accountPhone: account.phone, channelTitle: channel.title, channel, destChannelTelegramId: destChannel.telegramId, @@ -458,6 +463,7 @@ export async function runWorkerForAccount( sourceTopicId: null, accountLog, maxUploadSize, + consecutiveStalls: 0, }; if (forum) { @@ -546,6 +552,8 @@ export async function runWorkerForAccount( pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`; const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId); + // Sync client back in case it was recreated during upload stall recovery + client = pipelineCtx.client; // Only advance progress to the highest successfully processed message if (maxProcessedId) { @@ -617,6 +625,8 @@ export async function runWorkerForAccount( pipelineCtx.channelTitle = channel.title; const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId); + // Sync client back in case it was recreated during upload stall recovery + client = pipelineCtx.client; // Only advance progress to the highest successfully processed message if (maxProcessedId) { @@ -760,12 +770,68 @@ async function processArchiveSets( if (setMaxId > (maxProcessedId ?? 0n)) { maxProcessedId = setMaxId; } + + // Reset stall counter on any successful upload + ctx.consecutiveStalls = 0; } catch (setErr) { // If a set fails, do NOT advance the watermark past it accountLog.warn( { err: setErr, baseName: archiveSets[setIdx].baseName }, "Archive set failed, watermark will not advance past this set" ); + + // ── TDLib client recreation on repeated upload stalls ── + // When the TDLib event stream degrades, uploads complete (bytes sent) + // but confirmations never arrive. Retrying with the same broken client + // is futile. Recreate the client to get a fresh connection. + if (setErr instanceof UploadStallError) { + ctx.consecutiveStalls++; + accountLog.warn( + { consecutiveStalls: ctx.consecutiveStalls }, + "Upload stall detected — TDLib event stream may be degraded" + ); + + // After 1 stalled set (= 3 failed retry attempts already), recreate the client + if (ctx.consecutiveStalls >= 1) { + accountLog.info("Recreating TDLib client after consecutive upload stalls"); + try { + await closeTdlibClient(ctx.client); + } catch (closeErr) { + accountLog.warn({ err: closeErr }, "Error closing stale TDLib client"); + } + + try { + const { client: newClient } = await createTdlibClient({ + id: ctx.accountId, + phone: ctx.accountPhone, + }); + ctx.client = newClient; + + // Reload chats so the new client can access channels + try { + for (let page = 0; page < 500; page++) { + await newClient.invoke({ + _: "loadChats", + chat_list: { _: "chatListMain" }, + limit: 100, + }); + } + } catch { + // 404 = all loaded (expected) + } + + ctx.consecutiveStalls = 0; + accountLog.info("TDLib client recreated successfully — continuing ingestion"); + } catch (recreateErr) { + accountLog.error( + { err: recreateErr }, + "Failed to recreate TDLib client — aborting remaining uploads" + ); + break; + } + } + } + // Record the failure for visibility in the UI try { const archiveSet = archiveSets[setIdx];