4 Commits

Author SHA1 Message Date
59038889ae fix: prevent pool exhaustion that caused 4-hour duplicate check stall
All checks were successful
continuous-integration/drone/push Build is passing
The pg pool had max=5 connections shared between Prisma operations and
advisory locks. With 2 account locks held permanently and hash locks
from timed-out (but still running) background work, pool.connect()
would block forever — causing the Turnbase.7z stall.

- Increase pool max from 5 to 15 for headroom
- Add 30s connectionTimeoutMillis so pool.connect() throws instead of
  hanging forever when the pool is exhausted
- On startup, terminate zombie PostgreSQL sessions from previous worker
  instances that hold stale advisory locks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:39:00 +02:00
77c26adb31 perf: set watermarks even when no archives found to prevent re-scanning
All checks were successful
continuous-integration/drone/push Build is passing
Previously, channels/topics with no new archives never had their
watermark updated. This meant every cycle re-scanned all messages from
scratch just to discover nothing new — especially costly for the 1079-
topic Model Printing Emporium forum.

- Add maxScannedMessageId to ChannelScanResult (highest msg ID seen)
- Set channel watermark to scan boundary when no archives are found
- Set topic watermark to scan boundary when no archives are found
- Fall back to scan watermark when archive processing doesn't advance it

After one full cycle, subsequent cycles will skip already-scanned
messages via the early-exit boundary check, dramatically reducing
TDLib API calls on channels with mostly non-archive content.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 20:37:42 +02:00
35cce3151c perf: early-exit channel scan when all messages are below watermark
searchChatMessages returns newest-first. Once the oldest message on a
page is at or below the lastProcessedMessageId boundary, all remaining
pages are even older. Stop scanning immediately instead of reading every
message in the channel.

This was already implemented for topic scans but missing from channel
scans. On a test run, total messages scanned dropped from 3805 to 1615
(57% reduction) for an account with no new archives.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 19:58:30 +02:00
d6c82ede1e fix: auto-recover from TDLib upload stalls by recreating client
When TDLib's event stream degrades, uploads complete (bytes sent) but
confirmations never arrive. Previously the worker retried 3x with the
same broken client, wasting 60+ min per archive and holding the mutex.

- Add UploadStallError class to distinguish stalls from other failures
- Reduce stall detection timeout from 5min to 3min (faster detection)
- Recreate TDLib client after consecutive upload stalls instead of
  retrying on the same degraded connection
- Add forceReleaseMutex() to prevent cascade failures when one account
  blocks others via stuck mutex after cycle timeout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 18:02:42 +02:00
8 changed files with 211 additions and 19 deletions

View File

@@ -5,7 +5,14 @@ import { config } from "../util/config.js";
const pool = new pg.Pool({
connectionString: config.databaseUrl,
max: 5,
// Pool needs headroom for: 2 account advisory locks (held for entire cycle),
// up to 2 concurrent hash locks, plus Prisma operations from both accounts.
// Previously max=5 caused pool exhaustion and indefinite hangs.
max: 15,
// Prevent pool.connect() from blocking forever when pool is exhausted.
// Throws an error after 30s so the operation can fail and retry instead of
// silently hanging for hours (as happened with the Turnbase.7z stall).
connectionTimeoutMillis: 30_000,
});
const adapter = new PrismaPg(pool);

View File

@@ -27,6 +27,33 @@ async function main(): Promise<void> {
await cleanupTempDir();
await markStaleRunsAsFailed();
// Release any advisory locks orphaned by a previous worker instance.
// When Docker kills a container, PostgreSQL may keep the session alive
// (zombie connections), holding advisory locks that block the new worker.
try {
const result = await pool.query(`
SELECT pid, state, left(query, 80) as query, age(clock_timestamp(), state_change) as idle_time
FROM pg_stat_activity
WHERE datname = current_database()
AND pid != pg_backend_pid()
AND state = 'idle'
AND query LIKE '%pg_try_advisory_lock%'
AND state_change < clock_timestamp() - interval '5 minutes'
`);
for (const row of result.rows) {
log.warn(
{ pid: row.pid, idleTime: row.idle_time, query: row.query },
"Terminating stale advisory lock session from previous worker"
);
await pool.query("SELECT pg_terminate_backend($1)", [row.pid]);
}
if (result.rows.length > 0) {
log.info({ terminated: result.rows.length }, "Cleaned up stale advisory lock sessions");
}
} catch (err) {
log.warn({ err }, "Failed to clean up stale advisory locks (non-fatal)");
}
// Verify destination messages exist for all "uploaded" packages.
// Resets any packages whose dest message is missing so they get re-processed.
await recoverIncompleteUploads();

View File

@@ -1,6 +1,6 @@
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js";
import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
import { runIntegrityAudit } from "./audit.js";
@@ -90,10 +90,18 @@ async function runCycle(): Promise<void> {
for (let i = 0; i < results.length; i++) {
if (results[i].status === "rejected") {
const reason = (results[i] as PromiseRejectedResult).reason;
log.error(
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
{ phone: accounts[i].phone, err: reason },
"Account ingestion failed"
);
// If the cycle timed out, force-release the mutex so the next cycle
// (or other operations like fetch-channels) can proceed immediately
// instead of waiting 30 minutes for the mutex timeout.
const errMsg = reason instanceof Error ? reason.message : String(reason);
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
forceReleaseMutex(accounts[i].phone);
}
}
}

View File

@@ -79,6 +79,8 @@ export interface ChannelScanResult {
archives: TelegramMessage[];
photos: TelegramPhoto[];
totalScanned: number;
/** Highest message ID seen during scan (for watermark, even when no archives found). */
maxScannedMessageId: bigint | null;
}
export type ScanProgressCallback = (messagesScanned: number) => void;
@@ -158,6 +160,7 @@ export async function getChannelMessages(
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let maxScannedMessageId: bigint | null = null;
// Open the chat so TDLib can access it
try {
@@ -204,6 +207,12 @@ export async function getChannelMessages(
totalScanned += result.messages.length;
// Track highest message ID (first message in batch = newest, since results are newest-first)
const batchMaxId = BigInt(result.messages[0].id);
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
maxScannedMessageId = batchMaxId;
}
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -246,6 +255,11 @@ export async function getChannelMessages(
fromMessageId = result.messages[result.messages.length - 1].id;
if (result.messages.length < Math.min(limit, 100)) break;
// Early exit: searchChatMessages returns newest-first. Once the oldest
// message on this page is at or below the boundary, all remaining pages
// are even older — no new messages exist, stop scanning immediately.
if (boundary && fromMessageId <= boundary) break;
await sleep(config.apiDelayMs);
}
}
@@ -266,6 +280,7 @@ export async function getChannelMessages(
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
maxScannedMessageId,
};
}

View File

@@ -178,6 +178,7 @@ export async function getTopicMessages(
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let maxScannedMessageId: bigint | null = null;
let currentFromId = 0;
let totalScanned = 0;
@@ -239,6 +240,12 @@ export async function getTopicMessages(
totalScanned += result.messages.length;
// Track highest message ID (first message = newest, since results are newest-first)
const batchMaxId = BigInt(result.messages[0].id);
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
maxScannedMessageId = batchMaxId;
}
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -302,6 +309,7 @@ export async function getTopicMessages(
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
maxScannedMessageId,
};
}

View File

@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
const log = childLogger("upload");
/**
* Custom error class to distinguish upload stalls from other errors.
* When consecutive stalls occur, the caller can use this signal to
* recreate the TDLib client (whose event stream may have degraded).
*/
export class UploadStallError extends Error {
constructor(message: string) {
super(message);
this.name = "UploadStallError";
}
}
export interface UploadResult {
messageId: bigint;
messageIds: bigint[];
@@ -109,7 +121,8 @@ async function sendWithRetry(
// Stall or timeout — retry with a cooldown
const errMsg = err instanceof Error ? err.message : "";
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
if (!isLastAttempt) {
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
"Upload stalled/timed out — retrying"
@@ -117,6 +130,13 @@ async function sendWithRetry(
await sleep(10_000);
continue;
}
// All stall retries exhausted — throw UploadStallError so the caller
// knows the TDLib client's event stream is likely degraded and can
// recreate the client before continuing.
throw new UploadStallError(
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
);
}
throw err;
}
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
}
}, timeoutMs);
// Stall detection: no progress for 5 minutes after upload started → reject
const STALL_TIMEOUT_MS = 5 * 60_000;
// Stall detection: no progress for 3 minutes after upload started → reject
// (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
// a 3min silence strongly indicates a degraded TDLib event stream)
const STALL_TIMEOUT_MS = 3 * 60_000;
const stallChecker = setInterval(() => {
if (settled || !uploadStarted) return;
const stallMs = Date.now() - lastProgressTime;

View File

@@ -11,6 +11,29 @@ const queues = new Map<
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
>();
/**
* Force-release a stuck mutex.
* This should only be called when the holder is known to be stuck (e.g. after
* a cycle timeout). It releases the lock and lets the next queued waiter proceed.
*/
export function forceReleaseMutex(key: string): void {
if (!locks.has(key)) return;
const holder = holders.get(key);
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
locks.delete(key);
holders.delete(key);
const next = queues.get(key)?.shift();
if (next) {
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
next.resolve();
} else {
queues.delete(key);
log.info({ key }, "TDLib mutex force-released (no waiters)");
}
}
/**
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
* Different keys run concurrently — this allows two accounts to ingest in parallel

View File

@@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js";
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
@@ -286,6 +286,7 @@ interface PipelineContext {
client: Client;
runId: string;
accountId: string;
accountPhone: string;
channelTitle: string;
channel: TelegramChannel;
destChannelTelegramId: bigint;
@@ -303,6 +304,8 @@ interface PipelineContext {
sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
/** How many consecutive upload stalls have occurred (resets on success). */
consecutiveStalls: number;
}
/**
@@ -338,7 +341,8 @@ export async function runWorkerForAccount(
currentStep: "connecting",
});
const { client, isPremium } = await createTdlibClient({
// Use let so the client can be replaced on TDLib recreation after stalls
let { client, isPremium } = await createTdlibClient({
id: account.id,
phone: account.phone,
});
@@ -448,6 +452,7 @@ export async function runWorkerForAccount(
client,
runId: activeRunId,
accountId: account.id,
accountPhone: account.phone,
channelTitle: channel.title,
channel,
destChannelTelegramId: destChannel.telegramId,
@@ -458,6 +463,7 @@ export async function runWorkerForAccount(
sourceTopicId: null,
accountLog,
maxUploadSize,
consecutiveStalls: 0,
};
if (forum) {
@@ -532,6 +538,15 @@ export async function runWorkerForAccount(
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic"
);
// Still advance topic watermark so we don't re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
scanResult.maxScannedMessageId
);
}
continue;
}
@@ -546,14 +561,17 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
// Advance progress: use archive watermark if available, fall back to scan watermark
const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (topicWatermark) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
maxProcessedId
topicWatermark
);
}
} catch (topicErr) {
@@ -603,6 +621,11 @@ export async function runWorkerForAccount(
if (scanResult.archives.length === 0) {
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
// Still advance watermark to highest scanned message so we don't
// re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
}
continue;
}
@@ -617,10 +640,13 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = channel.title;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
await updateLastProcessedMessage(mapping.id, maxProcessedId);
// Advance progress: use archive watermark if available, fall back to scan watermark
const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (channelWatermark) {
await updateLastProcessedMessage(mapping.id, channelWatermark);
}
}
} catch (channelErr) {
@@ -760,12 +786,68 @@ async function processArchiveSets(
if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId;
}
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) {
// If a set fails, do NOT advance the watermark past it
accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
// ── TDLib client recreation on repeated upload stalls ──
// When the TDLib event stream degrades, uploads complete (bytes sent)
// but confirmations never arrive. Retrying with the same broken client
// is futile. Recreate the client to get a fresh connection.
if (setErr instanceof UploadStallError) {
ctx.consecutiveStalls++;
accountLog.warn(
{ consecutiveStalls: ctx.consecutiveStalls },
"Upload stall detected — TDLib event stream may be degraded"
);
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
if (ctx.consecutiveStalls >= 1) {
accountLog.info("Recreating TDLib client after consecutive upload stalls");
try {
await closeTdlibClient(ctx.client);
} catch (closeErr) {
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
}
try {
const { client: newClient } = await createTdlibClient({
id: ctx.accountId,
phone: ctx.accountPhone,
});
ctx.client = newClient;
// Reload chats so the new client can access channels
try {
for (let page = 0; page < 500; page++) {
await newClient.invoke({
_: "loadChats",
chat_list: { _: "chatListMain" },
limit: 100,
});
}
} catch {
// 404 = all loaded (expected)
}
ctx.consecutiveStalls = 0;
accountLog.info("TDLib client recreated successfully — continuing ingestion");
} catch (recreateErr) {
accountLog.error(
{ err: recreateErr },
"Failed to recreate TDLib client — aborting remaining uploads"
);
break;
}
}
}
// Record the failure for visibility in the UI
try {
const archiveSet = archiveSets[setIdx];