From e9017fc518c60f36c8447adb8e213a82764b1d97 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Sat, 2 May 2026 23:31:02 +0200 Subject: [PATCH] feat: parallel account ingestion via per-key TDLib mutex --- worker/src/extract-listener.ts | 2 +- worker/src/fetch-listener.ts | 10 ++--- worker/src/scheduler.ts | 25 +++++-------- worker/src/util/mutex.ts | 67 +++++++++++++++++++--------------- 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/worker/src/extract-listener.ts b/worker/src/extract-listener.ts index 0f77ded..35ca107 100644 --- a/worker/src/extract-listener.ts +++ b/worker/src/extract-listener.ts @@ -103,7 +103,7 @@ export async function processExtractRequest(requestId: string): Promise { // Wrap the entire TDLib session in the mutex so no other TDLib // operation can run concurrently (TDLib is single-session). - await withTdlibMutex("extract", async () => { + await withTdlibMutex("global", "extract", async () => { const accounts = await getActiveAccounts(); if (accounts.length === 0) { throw new Error("No authenticated Telegram accounts available"); diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index fcd89f1..075bcaf 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -133,7 +133,7 @@ let fetchQueue: Promise = Promise.resolve(); function handleChannelFetch(requestId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("fetch-channels", () => + await withTdlibMutex("global", "fetch-channels", () => processFetchRequest(requestId) ); } catch (err) { @@ -147,7 +147,7 @@ function handleChannelFetch(requestId: string): void { function handleGenerateInvite(channelId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("generate-invite", async () => { + await withTdlibMutex("global", "generate-invite", async () => { const destChannel = await getGlobalDestinationChannel(); if (!destChannel || destChannel.id !== channelId) { log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); @@ -187,7 +187,7 @@ function handleCreateDestination(payload: string): void { const parsed = JSON.parse(payload) as { requestId: string; title: string }; requestId = parsed.requestId; - await withTdlibMutex("create-destination", async () => { + await withTdlibMutex("global", "create-destination", async () => { const { db } = await import("./db/client.js"); // Mark the request as in-progress @@ -328,7 +328,7 @@ function handleJoinChannel(payload: string): void { const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; requestId = parsed.requestId; - await withTdlibMutex("join-channel", async () => { + await withTdlibMutex("global", "join-channel", async () => { await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); const accounts = await getActiveAccounts(); @@ -507,7 +507,7 @@ function handleIngestionTrigger(): void { function handleRebuildPackages(requestId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("rebuild-packages", () => + await withTdlibMutex("global", "rebuild-packages", () => rebuildPackageDatabase(requestId) ); } catch (err) { diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index e4923a0..58f7347 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -24,8 +24,8 @@ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? " * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 2. Process all active AUTHENTICATED accounts for ingestion * - * All TDLib operations are wrapped in the mutex to ensure only one client - * runs at a time (also shared with the fetch listener for on-demand requests). + * Each account's TDLib operations are wrapped in a per-key mutex so different + * accounts run concurrently while the same account is still serialized. * * The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h). * Once the timeout elapses, no new accounts will be started but any in-progress @@ -55,7 +55,7 @@ async function runCycle(): Promise { log.warn("Cycle timeout reached during authentication phase, stopping"); break; } - await withTdlibMutex(`auth:${account.phone}`, () => + await withTdlibMutex(account.phone, `auth:${account.phone}`, () => authenticateAccount(account) ); } @@ -71,18 +71,13 @@ async function runCycle(): Promise { log.info({ accountCount: accounts.length }, "Processing accounts"); - for (const account of accounts) { - if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { - log.warn( - { elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, - "Cycle timeout reached, skipping remaining accounts" - ); - break; - } - await withTdlibMutex(`ingest:${account.phone}`, () => - runWorkerForAccount(account) - ); - } + await Promise.allSettled( + accounts.map((account) => + withTdlibMutex(account.phone, `ingest:${account.phone}`, () => + runWorkerForAccount(account) + ) + ) + ); log.info( { elapsed: Math.round((Date.now() - cycleStart) / 1000) }, diff --git a/worker/src/util/mutex.ts b/worker/src/util/mutex.ts index e559318..34e1576 100644 --- a/worker/src/util/mutex.ts +++ b/worker/src/util/mutex.ts @@ -2,39 +2,43 @@ import { childLogger } from "./logger.js"; const log = childLogger("mutex"); -let locked = false; -let holder = ""; -const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = []; - -/** - * Maximum time to wait for the TDLib mutex (ms). - * If the mutex is not available within this time, the operation is rejected. - * Default: 30 minutes (long enough for large downloads, short enough to detect hangs). - */ const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000; +const locks = new Map(); +const holders = new Map(); +const queues = new Map< + string, + Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> +>(); + /** - * Ensures only one TDLib client runs at a time across the entire worker process. - * Both the scheduler (auth, ingestion) and the fetch listener acquire this - * before creating any TDLib client. + * Ensures only one TDLib operation runs at a time FOR THE SAME KEY. + * Different keys run concurrently — this allows two accounts to ingest in parallel + * while still preventing concurrent use of the same account's TDLib state dir. * - * Includes a wait timeout to prevent indefinite blocking if the current holder hangs. + * key: the account phone number for account-specific ops (auth, ingest), + * or 'global' for ops that don't belong to a specific account. + * label: human-readable name for logging. */ export async function withTdlibMutex( + key: string, label: string, fn: () => Promise ): Promise { - if (locked) { - log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); + if (locks.get(key)) { + log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex"); await new Promise((resolve, reject) => { const timer = setTimeout(() => { - const idx = queue.indexOf(entry); + const q = queues.get(key) ?? []; + const idx = q.indexOf(entry); if (idx !== -1) { - queue.splice(idx, 1); - reject(new Error( - `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + - `(waiting: ${label}, holder: ${holder})` - )); + q.splice(idx, 1); + reject( + new Error( + `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + + `(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})` + ) + ); } }, MUTEX_WAIT_TIMEOUT_MS); @@ -46,25 +50,28 @@ export async function withTdlibMutex( reject, label, }; - queue.push(entry); + + if (!queues.has(key)) queues.set(key, []); + queues.get(key)!.push(entry); }); } - locked = true; - holder = label; - log.debug({ label }, "TDLib mutex acquired"); + locks.set(key, true); + holders.set(key, label); + log.debug({ key, label }, "TDLib mutex acquired"); try { return await fn(); } finally { - locked = false; - holder = ""; - const next = queue.shift(); + locks.delete(key); + holders.delete(key); + const next = queues.get(key)?.shift(); if (next) { - log.debug({ next: next.label }, "TDLib mutex releasing to next waiter"); + log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter"); next.resolve(); } else { - log.debug({ label }, "TDLib mutex released"); + queues.delete(key); + log.debug({ key, label }, "TDLib mutex released"); } } }