fix: auto-recover from TDLib upload stalls by recreating client

When TDLib's event stream degrades, uploads complete (bytes sent) but
confirmations never arrive. Previously the worker retried 3x with the
same broken client, wasting 60+ min per archive and holding the mutex.

- Add UploadStallError class to distinguish stalls from other failures
- Reduce stall detection timeout from 5min to 3min (faster detection)
- Recreate TDLib client after consecutive upload stalls instead of
  retrying on the same degraded connection
- Add forceReleaseMutex() to prevent cascade failures when one account
  blocks others via stuck mutex after cycle timeout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-04 18:02:42 +02:00
parent 7e48131f67
commit d6c82ede1e
4 changed files with 131 additions and 12 deletions

View File

@@ -1,6 +1,6 @@
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js";
import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
import { runIntegrityAudit } from "./audit.js";
@@ -90,10 +90,18 @@ async function runCycle(): Promise<void> {
for (let i = 0; i < results.length; i++) {
if (results[i].status === "rejected") {
const reason = (results[i] as PromiseRejectedResult).reason;
log.error(
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
{ phone: accounts[i].phone, err: reason },
"Account ingestion failed"
);
// If the cycle timed out, force-release the mutex so the next cycle
// (or other operations like fetch-channels) can proceed immediately
// instead of waiting 30 minutes for the mutex timeout.
const errMsg = reason instanceof Error ? reason.message : String(reason);
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
forceReleaseMutex(accounts[i].phone);
}
}
}

View File

@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
const log = childLogger("upload");
/**
* Custom error class to distinguish upload stalls from other errors.
* When consecutive stalls occur, the caller can use this signal to
* recreate the TDLib client (whose event stream may have degraded).
*/
export class UploadStallError extends Error {
constructor(message: string) {
super(message);
this.name = "UploadStallError";
}
}
export interface UploadResult {
messageId: bigint;
messageIds: bigint[];
@@ -109,13 +121,21 @@ async function sendWithRetry(
// Stall or timeout — retry with a cooldown
const errMsg = err instanceof Error ? err.message : "";
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
"Upload stalled/timed out — retrying"
if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
if (!isLastAttempt) {
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
"Upload stalled/timed out — retrying"
);
await sleep(10_000);
continue;
}
// All stall retries exhausted — throw UploadStallError so the caller
// knows the TDLib client's event stream is likely degraded and can
// recreate the client before continuing.
throw new UploadStallError(
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
);
await sleep(10_000);
continue;
}
throw err;
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
}
}, timeoutMs);
// Stall detection: no progress for 5 minutes after upload started → reject
const STALL_TIMEOUT_MS = 5 * 60_000;
// Stall detection: no progress for 3 minutes after upload started → reject
// (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
// a 3min silence strongly indicates a degraded TDLib event stream)
const STALL_TIMEOUT_MS = 3 * 60_000;
const stallChecker = setInterval(() => {
if (settled || !uploadStarted) return;
const stallMs = Date.now() - lastProgressTime;

View File

@@ -11,6 +11,29 @@ const queues = new Map<
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
>();
/**
* Force-release a stuck mutex.
* This should only be called when the holder is known to be stuck (e.g. after
* a cycle timeout). It releases the lock and lets the next queued waiter proceed.
*/
export function forceReleaseMutex(key: string): void {
if (!locks.has(key)) return;
const holder = holders.get(key);
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
locks.delete(key);
holders.delete(key);
const next = queues.get(key)?.shift();
if (next) {
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
next.resolve();
} else {
queues.delete(key);
log.info({ key }, "TDLib mutex force-released (no waiters)");
}
}
/**
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
* Different keys run concurrently — this allows two accounts to ingest in parallel

View File

@@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js";
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
@@ -286,6 +286,7 @@ interface PipelineContext {
client: Client;
runId: string;
accountId: string;
accountPhone: string;
channelTitle: string;
channel: TelegramChannel;
destChannelTelegramId: bigint;
@@ -303,6 +304,8 @@ interface PipelineContext {
sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
/** How many consecutive upload stalls have occurred (resets on success). */
consecutiveStalls: number;
}
/**
@@ -338,7 +341,8 @@ export async function runWorkerForAccount(
currentStep: "connecting",
});
const { client, isPremium } = await createTdlibClient({
// Use let so the client can be replaced on TDLib recreation after stalls
let { client, isPremium } = await createTdlibClient({
id: account.id,
phone: account.phone,
});
@@ -448,6 +452,7 @@ export async function runWorkerForAccount(
client,
runId: activeRunId,
accountId: account.id,
accountPhone: account.phone,
channelTitle: channel.title,
channel,
destChannelTelegramId: destChannel.telegramId,
@@ -458,6 +463,7 @@ export async function runWorkerForAccount(
sourceTopicId: null,
accountLog,
maxUploadSize,
consecutiveStalls: 0,
};
if (forum) {
@@ -546,6 +552,8 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
@@ -617,6 +625,8 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = channel.title;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
@@ -760,12 +770,68 @@ async function processArchiveSets(
if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId;
}
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) {
// If a set fails, do NOT advance the watermark past it
accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
// ── TDLib client recreation on repeated upload stalls ──
// When the TDLib event stream degrades, uploads complete (bytes sent)
// but confirmations never arrive. Retrying with the same broken client
// is futile. Recreate the client to get a fresh connection.
if (setErr instanceof UploadStallError) {
ctx.consecutiveStalls++;
accountLog.warn(
{ consecutiveStalls: ctx.consecutiveStalls },
"Upload stall detected — TDLib event stream may be degraded"
);
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
if (ctx.consecutiveStalls >= 1) {
accountLog.info("Recreating TDLib client after consecutive upload stalls");
try {
await closeTdlibClient(ctx.client);
} catch (closeErr) {
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
}
try {
const { client: newClient } = await createTdlibClient({
id: ctx.accountId,
phone: ctx.accountPhone,
});
ctx.client = newClient;
// Reload chats so the new client can access channels
try {
for (let page = 0; page < 500; page++) {
await newClient.invoke({
_: "loadChats",
chat_list: { _: "chatListMain" },
limit: 100,
});
}
} catch {
// 404 = all loaded (expected)
}
ctx.consecutiveStalls = 0;
accountLog.info("TDLib client recreated successfully — continuing ingestion");
} catch (recreateErr) {
accountLog.error(
{ err: recreateErr },
"Failed to recreate TDLib client — aborting remaining uploads"
);
break;
}
}
}
// Record the failure for visibility in the UI
try {
const archiveSet = archiveSets[setIdx];