fix: use per-account mutex keys in fetch/extract listeners, add cycle timeout and error logging

This commit is contained in:
2026-05-02 23:40:37 +02:00
parent e9017fc518
commit a79cb4749b
3 changed files with 58 additions and 38 deletions

View File

@@ -101,15 +101,13 @@ export async function processExtractRequest(requestId: string): Promise<void> {
try { try {
await mkdir(tempDir, { recursive: true }); await mkdir(tempDir, { recursive: true });
// Wrap the entire TDLib session in the mutex so no other TDLib
// operation can run concurrently (TDLib is single-session).
await withTdlibMutex("global", "extract", async () => {
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
if (accounts.length === 0) { if (accounts.length === 0) {
throw new Error("No authenticated Telegram accounts available"); throw new Error("No authenticated Telegram accounts available");
} }
const account = accounts[0]; const account = accounts[0];
await withTdlibMutex(account.phone, "extract", async () => {
const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {

View File

@@ -14,6 +14,7 @@ import {
getGlobalSetting, getGlobalSetting,
setGlobalSetting, setGlobalSetting,
getActiveAccounts, getActiveAccounts,
getChannelFetchRequest,
upsertChannel, upsertChannel,
ensureAccountChannelLink, ensureAccountChannelLink,
updateFetchRequestStatus, updateFetchRequestStatus,
@@ -133,7 +134,9 @@ let fetchQueue: Promise<void> = Promise.resolve();
function handleChannelFetch(requestId: string): void { function handleChannelFetch(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("global", "fetch-channels", () => const request = await getChannelFetchRequest(requestId);
const key = request?.account?.phone ?? "global";
await withTdlibMutex(key, "fetch-channels", () =>
processFetchRequest(requestId) processFetchRequest(requestId)
); );
} catch (err) { } catch (err) {
@@ -147,21 +150,19 @@ function handleChannelFetch(requestId: string): void {
function handleGenerateInvite(channelId: string): void { function handleGenerateInvite(channelId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("global", "generate-invite", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
await withTdlibMutex(account.phone, "generate-invite", async () => {
const destChannel = await getGlobalDestinationChannel(); const destChannel = await getGlobalDestinationChannel();
if (!destChannel || destChannel.id !== channelId) { if (!destChannel || destChannel.id !== channelId) {
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
return; return;
} }
// Use the first available authenticated account to generate the link
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
@@ -187,7 +188,13 @@ function handleCreateDestination(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; title: string }; const parsed = JSON.parse(payload) as { requestId: string; title: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("global", "create-destination", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
await withTdlibMutex(account.phone, "create-destination", async () => {
const { db } = await import("./db/client.js"); const { db } = await import("./db/client.js");
// Mark the request as in-progress // Mark the request as in-progress
@@ -196,13 +203,6 @@ function handleCreateDestination(payload: string): void {
data: { status: "IN_PROGRESS" }, data: { status: "IN_PROGRESS" },
}); });
// Use the first available authenticated account
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
@@ -328,15 +328,15 @@ function handleJoinChannel(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("global", "join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0]; const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
if (!account) { if (!account) {
throw new Error("No authenticated accounts available"); throw new Error("No authenticated accounts available");
} }
await withTdlibMutex(account.phone, "join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const { client } = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
@@ -507,7 +507,12 @@ function handleIngestionTrigger(): void {
function handleRebuildPackages(requestId: string): void { function handleRebuildPackages(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("global", "rebuild-packages", () => const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to rebuild packages");
return;
}
await withTdlibMutex(accounts[0].phone, "rebuild-packages", () =>
rebuildPackageDatabase(requestId) rebuildPackageDatabase(requestId)
); );
} catch (err) { } catch (err) {

View File

@@ -71,14 +71,31 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
await Promise.allSettled( const results = await Promise.allSettled(
accounts.map((account) => accounts.map((account) =>
Promise.race([
withTdlibMutex(account.phone, `ingest:${account.phone}`, () => withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
runWorkerForAccount(account) runWorkerForAccount(account)
),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`Account ${account.phone} ingestion timed out after ${CYCLE_TIMEOUT_MS / 60_000}min`)),
CYCLE_TIMEOUT_MS
) )
),
])
) )
); );
for (let i = 0; i < results.length; i++) {
if (results[i].status === "rejected") {
log.error(
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
"Account ingestion failed"
);
}
}
log.info( log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) }, { elapsed: Math.round((Date.now() - cycleStart) / 1000) },
"Ingestion cycle complete" "Ingestion cycle complete"