feat: parallel account ingestion via per-key TDLib mutex

This commit is contained in:
2026-05-02 23:31:02 +02:00
parent 4f59d19ac2
commit e9017fc518
4 changed files with 53 additions and 51 deletions

View File

@@ -103,7 +103,7 @@ export async function processExtractRequest(requestId: string): Promise<void> {
// Wrap the entire TDLib session in the mutex so no other TDLib // Wrap the entire TDLib session in the mutex so no other TDLib
// operation can run concurrently (TDLib is single-session). // operation can run concurrently (TDLib is single-session).
await withTdlibMutex("extract", async () => { await withTdlibMutex("global", "extract", async () => {
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
if (accounts.length === 0) { if (accounts.length === 0) {
throw new Error("No authenticated Telegram accounts available"); throw new Error("No authenticated Telegram accounts available");

View File

@@ -133,7 +133,7 @@ let fetchQueue: Promise<void> = Promise.resolve();
function handleChannelFetch(requestId: string): void { function handleChannelFetch(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("fetch-channels", () => await withTdlibMutex("global", "fetch-channels", () =>
processFetchRequest(requestId) processFetchRequest(requestId)
); );
} catch (err) { } catch (err) {
@@ -147,7 +147,7 @@ function handleChannelFetch(requestId: string): void {
function handleGenerateInvite(channelId: string): void { function handleGenerateInvite(channelId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("generate-invite", async () => { await withTdlibMutex("global", "generate-invite", async () => {
const destChannel = await getGlobalDestinationChannel(); const destChannel = await getGlobalDestinationChannel();
if (!destChannel || destChannel.id !== channelId) { if (!destChannel || destChannel.id !== channelId) {
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
@@ -187,7 +187,7 @@ function handleCreateDestination(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; title: string }; const parsed = JSON.parse(payload) as { requestId: string; title: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("create-destination", async () => { await withTdlibMutex("global", "create-destination", async () => {
const { db } = await import("./db/client.js"); const { db } = await import("./db/client.js");
// Mark the request as in-progress // Mark the request as in-progress
@@ -328,7 +328,7 @@ function handleJoinChannel(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("join-channel", async () => { await withTdlibMutex("global", "join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
@@ -507,7 +507,7 @@ function handleIngestionTrigger(): void {
function handleRebuildPackages(requestId: string): void { function handleRebuildPackages(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("rebuild-packages", () => await withTdlibMutex("global", "rebuild-packages", () =>
rebuildPackageDatabase(requestId) rebuildPackageDatabase(requestId)
); );
} catch (err) { } catch (err) {

View File

@@ -24,8 +24,8 @@ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
* 2. Process all active AUTHENTICATED accounts for ingestion * 2. Process all active AUTHENTICATED accounts for ingestion
* *
* All TDLib operations are wrapped in the mutex to ensure only one client * Each account's TDLib operations are wrapped in a per-key mutex so different
* runs at a time (also shared with the fetch listener for on-demand requests). * accounts run concurrently while the same account is still serialized.
* *
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h). * The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress * Once the timeout elapses, no new accounts will be started but any in-progress
@@ -55,7 +55,7 @@ async function runCycle(): Promise<void> {
log.warn("Cycle timeout reached during authentication phase, stopping"); log.warn("Cycle timeout reached during authentication phase, stopping");
break; break;
} }
await withTdlibMutex(`auth:${account.phone}`, () => await withTdlibMutex(account.phone, `auth:${account.phone}`, () =>
authenticateAccount(account) authenticateAccount(account)
); );
} }
@@ -71,18 +71,13 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) { await Promise.allSettled(
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { accounts.map((account) =>
log.warn( withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, runWorkerForAccount(account)
"Cycle timeout reached, skipping remaining accounts" )
); )
break; );
}
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account)
);
}
log.info( log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) }, { elapsed: Math.round((Date.now() - cycleStart) / 1000) },

View File

@@ -2,39 +2,43 @@ import { childLogger } from "./logger.js";
const log = childLogger("mutex"); const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000; const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
const locks = new Map<string, boolean>();
const holders = new Map<string, string>();
const queues = new Map<
string,
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
>();
/** /**
* Ensures only one TDLib client runs at a time across the entire worker process. * Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this * Different keys run concurrently — this allows two accounts to ingest in parallel
* before creating any TDLib client. * while still preventing concurrent use of the same account's TDLib state dir.
* *
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs. * key: the account phone number for account-specific ops (auth, ingest),
* or 'global' for ops that don't belong to a specific account.
* label: human-readable name for logging.
*/ */
export async function withTdlibMutex<T>( export async function withTdlibMutex<T>(
key: string,
label: string, label: string,
fn: () => Promise<T> fn: () => Promise<T>
): Promise<T> { ): Promise<T> {
if (locked) { if (locks.get(key)) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex");
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => { const timer = setTimeout(() => {
const idx = queue.indexOf(entry); const q = queues.get(key) ?? [];
const idx = q.indexOf(entry);
if (idx !== -1) { if (idx !== -1) {
queue.splice(idx, 1); q.splice(idx, 1);
reject(new Error( reject(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + new Error(
`(waiting: ${label}, holder: ${holder})` `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
)); `(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})`
)
);
} }
}, MUTEX_WAIT_TIMEOUT_MS); }, MUTEX_WAIT_TIMEOUT_MS);
@@ -46,25 +50,28 @@ export async function withTdlibMutex<T>(
reject, reject,
label, label,
}; };
queue.push(entry);
if (!queues.has(key)) queues.set(key, []);
queues.get(key)!.push(entry);
}); });
} }
locked = true; locks.set(key, true);
holder = label; holders.set(key, label);
log.debug({ label }, "TDLib mutex acquired"); log.debug({ key, label }, "TDLib mutex acquired");
try { try {
return await fn(); return await fn();
} finally { } finally {
locked = false; locks.delete(key);
holder = ""; holders.delete(key);
const next = queue.shift(); const next = queues.get(key)?.shift();
if (next) { if (next) {
log.debug({ next: next.label }, "TDLib mutex releasing to next waiter"); log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter");
next.resolve(); next.resolve();
} else { } else {
log.debug({ label }, "TDLib mutex released"); queues.delete(key);
log.debug({ key, label }, "TDLib mutex released");
} }
} }
} }