diff --git a/worker/src/extract-listener.ts b/worker/src/extract-listener.ts index 35ca107..30a2c97 100644 --- a/worker/src/extract-listener.ts +++ b/worker/src/extract-listener.ts @@ -101,15 +101,13 @@ export async function processExtractRequest(requestId: string): Promise { try { await mkdir(tempDir, { recursive: true }); - // Wrap the entire TDLib session in the mutex so no other TDLib - // operation can run concurrently (TDLib is single-session). - await withTdlibMutex("global", "extract", async () => { - const accounts = await getActiveAccounts(); - if (accounts.length === 0) { - throw new Error("No authenticated Telegram accounts available"); - } + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + throw new Error("No authenticated Telegram accounts available"); + } + const account = accounts[0]; - const account = accounts[0]; + await withTdlibMutex(account.phone, "extract", async () => { const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); try { diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index 075bcaf..3e14743 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -14,6 +14,7 @@ import { getGlobalSetting, setGlobalSetting, getActiveAccounts, + getChannelFetchRequest, upsertChannel, ensureAccountChannelLink, updateFetchRequestStatus, @@ -133,7 +134,9 @@ let fetchQueue: Promise = Promise.resolve(); function handleChannelFetch(requestId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("global", "fetch-channels", () => + const request = await getChannelFetchRequest(requestId); + const key = request?.account?.phone ?? "global"; + await withTdlibMutex(key, "fetch-channels", () => processFetchRequest(requestId) ); } catch (err) { @@ -147,21 +150,19 @@ function handleChannelFetch(requestId: string): void { function handleGenerateInvite(channelId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("global", "generate-invite", async () => { + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + log.warn("No authenticated accounts to generate invite link"); + return; + } + const account = accounts[0]; + await withTdlibMutex(account.phone, "generate-invite", async () => { const destChannel = await getGlobalDestinationChannel(); if (!destChannel || destChannel.id !== channelId) { log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); return; } - // Use the first available authenticated account to generate the link - const accounts = await getActiveAccounts(); - if (accounts.length === 0) { - log.warn("No authenticated accounts to generate invite link"); - return; - } - - const account = accounts[0]; const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); try { @@ -187,7 +188,13 @@ function handleCreateDestination(payload: string): void { const parsed = JSON.parse(payload) as { requestId: string; title: string }; requestId = parsed.requestId; - await withTdlibMutex("global", "create-destination", async () => { + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + throw new Error("No authenticated accounts available to create the group"); + } + const account = accounts[0]; + + await withTdlibMutex(account.phone, "create-destination", async () => { const { db } = await import("./db/client.js"); // Mark the request as in-progress @@ -196,13 +203,6 @@ function handleCreateDestination(payload: string): void { data: { status: "IN_PROGRESS" }, }); - // Use the first available authenticated account - const accounts = await getActiveAccounts(); - if (accounts.length === 0) { - throw new Error("No authenticated accounts available to create the group"); - } - - const account = accounts[0]; const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); try { @@ -328,14 +328,14 @@ function handleJoinChannel(payload: string): void { const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; requestId = parsed.requestId; - await withTdlibMutex("global", "join-channel", async () => { - await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); + const accounts = await getActiveAccounts(); + const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0]; + if (!account) { + throw new Error("No authenticated accounts available"); + } - const accounts = await getActiveAccounts(); - const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0]; - if (!account) { - throw new Error("No authenticated accounts available"); - } + await withTdlibMutex(account.phone, "join-channel", async () => { + await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); @@ -507,7 +507,12 @@ function handleIngestionTrigger(): void { function handleRebuildPackages(requestId: string): void { fetchQueue = fetchQueue.then(async () => { try { - await withTdlibMutex("global", "rebuild-packages", () => + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + log.warn("No authenticated accounts to rebuild packages"); + return; + } + await withTdlibMutex(accounts[0].phone, "rebuild-packages", () => rebuildPackageDatabase(requestId) ); } catch (err) { diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index 58f7347..8786363 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -71,14 +71,31 @@ async function runCycle(): Promise { log.info({ accountCount: accounts.length }, "Processing accounts"); - await Promise.allSettled( + const results = await Promise.allSettled( accounts.map((account) => - withTdlibMutex(account.phone, `ingest:${account.phone}`, () => - runWorkerForAccount(account) - ) + Promise.race([ + withTdlibMutex(account.phone, `ingest:${account.phone}`, () => + runWorkerForAccount(account) + ), + new Promise((_, reject) => + setTimeout( + () => reject(new Error(`Account ${account.phone} ingestion timed out after ${CYCLE_TIMEOUT_MS / 60_000}min`)), + CYCLE_TIMEOUT_MS + ) + ), + ]) ) ); + for (let i = 0; i < results.length; i++) { + if (results[i].status === "rejected") { + log.error( + { phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason }, + "Account ingestion failed" + ); + } + } + log.info( { elapsed: Math.round((Date.now() - cycleStart) / 1000) }, "Ingestion cycle complete"