diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 4411ef3..1070109 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -625,6 +625,7 @@ export async function upsertSkippedPackage(data: { errorMessage: data.errorMessage ?? null, fileName: data.fileName, fileSize: data.fileSize, + attemptCount: { increment: 1 }, createdAt: new Date(), }, create: { @@ -642,6 +643,26 @@ export async function upsertSkippedPackage(data: { }); } +/** + * Return source-message IDs in a channel whose SkippedPackage attemptCount has + * reached or exceeded the cap — these are treated as "permanently failed for + * now" so the watermark can advance past them. The user can manually retry via + * the UI to reset the SkippedPackage record. + */ +export async function getCappedSkippedMessageIds( + sourceChannelId: string, + cap: number +): Promise> { + const rows = await db.skippedPackage.findMany({ + where: { + sourceChannelId, + attemptCount: { gte: cap }, + }, + select: { sourceMessageId: true }, + }); + return new Set(rows.map((r) => r.sourceMessageId)); +} + export async function deleteSkippedPackage( sourceChannelId: string, sourceMessageId: bigint diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index f684cde..f2807d3 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -136,6 +136,28 @@ async function sendWithRetry( ); } + // Transient Telegram server-side error (HTTP 5xx returned via + // updateMessageSendFailed). These are NOT FLOOD_WAIT, NOT stalls — just + // TG having a bad moment. They typically resolve on a short backoff, so + // retry up to MAX_UPLOAD_RETRIES with linear backoff before giving up. + const lowerMsg = errMsg.toLowerCase(); + const isTransientServerError = + lowerMsg.includes("internal server error") || + lowerMsg.includes("internal error") || + lowerMsg.includes("server error") || + lowerMsg.includes("bad gateway") || + lowerMsg.includes("service unavailable") || + lowerMsg.includes("gateway timeout"); + if (isTransientServerError && !isLastAttempt) { + const backoffMs = 15_000 * (attempt + 1) + Math.random() * 5_000; + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES, backoffMs: Math.round(backoffMs) }, + `Transient Telegram server error — retrying after backoff` + ); + await sleep(backoffMs); + continue; + } + throw err; } } diff --git a/worker/src/util/config.ts b/worker/src/util/config.ts index fb7ebb9..6f177a6 100644 --- a/worker/src/util/config.ts +++ b/worker/src/util/config.ts @@ -20,4 +20,8 @@ export const config = { apiDelayMs: 1000, /** Max retries for rate-limited requests */ maxRetries: 5, + /** After this many failed attempts on the same source message, the worker + * stops auto-retrying and lets the watermark advance past it. The user can + * manually retry via the UI to reset and try again. */ + maxSkipAttempts: parseInt(process.env.WORKER_MAX_SKIP_ATTEMPTS ?? "5", 10), } as const; diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 53e77b2..ca4bc1b 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -30,6 +30,7 @@ import { getUploadedPackageByHash, upsertSkippedPackage, deleteSkippedPackage, + getCappedSkippedMessageIds, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; @@ -408,6 +409,47 @@ export async function runWorkerForAccount( throw new Error("No global destination channel configured — set one in the admin UI"); } + // ── Ensure TDLib knows about the destination chat ── + // Source channels get an explicit getChat below, but the destination was + // previously only loaded via loadChats — which can miss it if the account + // archived/moved it. Failing here surfaces the problem clearly instead of + // letting every upload fail with a cryptic "Chat not found". + try { + await client.invoke({ + _: "getChat", + chat_id: Number(destChannel.telegramId), + }); + } catch (destErr) { + accountLog.error( + { err: destErr, destChannel: destChannel.title, telegramId: destChannel.telegramId.toString() }, + "Destination chat is not accessible to this account — uploads will fail. Re-join via invite link or remove this account." + ); + // Surface as a persistent notification so the admin sees it in the UI + try { + await db.systemNotification.create({ + data: { + type: "UPLOAD_FAILED", + severity: "ERROR", + title: `Destination chat unreachable for ${account.phone}`, + message: `Account ${account.phone} cannot access the destination chat "${destChannel.title}". Uploads for this account will fail until access is restored. Re-join via the invite link in admin settings.`, + context: { + accountId: account.id, + accountPhone: account.phone, + destChannelId: destChannel.id, + destChannelTitle: destChannel.title, + }, + }, + }); + } catch { + // Best-effort notification + } + // Skip this account's ingestion cycle entirely — there's no point + // scanning + downloading if we can't upload. + throw new Error( + `Destination chat "${destChannel.title}" is not accessible to account ${account.phone}` + ); + } + const totalChannels = channelMappings.length; if (totalChannels === 0) { @@ -674,6 +716,50 @@ export async function runWorkerForAccount( { err: channelErr, channelId: channel.id, title: channel.title }, "Failed to process channel, skipping to next" ); + + // If the channel is no longer accessible (account got removed, + // channel deleted, etc.), surface a persistent notification so the + // admin can decide what to do. Dedupe by (channelId, accountId) + // within the last 24h so we don't flood the notifications list every + // cycle. + const errMsg = channelErr instanceof Error ? channelErr.message : String(channelErr); + const isAccessError = + errMsg.includes("Can't access the chat") || + errMsg.includes("CHAT_FORBIDDEN") || + errMsg.includes("CHANNEL_PRIVATE") || + errMsg.includes("Chat not found"); + if (isAccessError) { + try { + const recent = await db.systemNotification.findFirst({ + where: { + type: "CHANNEL_ACCESS_LOST", + context: { path: ["channelId"], equals: channel.id }, + createdAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) }, + }, + select: { id: true }, + }); + if (!recent) { + await db.systemNotification.create({ + data: { + type: "CHANNEL_ACCESS_LOST", + severity: "WARNING", + title: `Lost access to "${channel.title}" for ${account.phone}`, + message: `Account ${account.phone} can no longer access source channel "${channel.title}". The worker is skipping this channel every cycle. Re-join the channel or unlink it from this account in admin settings.`, + context: { + channelId: channel.id, + channelTitle: channel.title, + telegramId: channel.telegramId.toString(), + accountId: account.id, + accountPhone: account.phone, + errorMessage: errMsg.slice(0, 200), + }, + }, + }); + } + } catch { + // Best-effort notification + } + } } } @@ -746,6 +832,26 @@ async function processArchiveSets( } } + // Filter out sets whose source message has hit the auto-retry cap — these are + // treated as "give up for now" so the watermark can advance past them. + // Removing them from archiveSets means they are NOT tracked in minFailedId, + // so the caller's watermark cap won't pin progress below them. The + // SkippedPackage record stays so the user can manually retry via the UI. + const cappedIds = await getCappedSkippedMessageIds(channel.id, config.maxSkipAttempts); + if (cappedIds.size > 0) { + const beforeCap = archiveSets.length; + archiveSets = archiveSets.filter( + (set) => !cappedIds.has(set.parts[0].id) + ); + const cappedSkipped = beforeCap - archiveSets.length; + if (cappedSkipped > 0) { + accountLog.warn( + { cappedSkipped, cap: config.maxSkipAttempts, remaining: archiveSets.length }, + "Skipping archive sets that hit the auto-retry attempt cap — watermark will advance past them" + ); + } + } + counters.zipsFound += archiveSets.length; // Match preview photos to archive sets @@ -885,11 +991,15 @@ async function processArchiveSets( try { const archiveSet = archiveSets[setIdx]; const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n); - const errMsg = setErr instanceof Error ? setErr.message : String(setErr); + const rawErrMsg = setErr instanceof Error ? setErr.message : String(setErr); + // Prefix with [] so the SkippedPackage / notification UI shows + // which account hit the error — important when two accounts share a + // source channel and only one is failing. + const errMsg = `[${ctx.accountPhone}] ${rawErrMsg}`; await upsertSkippedPackage({ fileName: archiveSet.parts[0].fileName, fileSize: totalSize, - reason: inferSkipReason(errMsg), + reason: inferSkipReason(rawErrMsg), errorMessage: errMsg, sourceChannelId: ctx.channel.id, sourceMessageId: archiveSet.parts[0].id, @@ -901,7 +1011,7 @@ async function processArchiveSets( // Also create a persistent notification await db.systemNotification.create({ data: { - type: inferSkipReason(errMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED", + type: inferSkipReason(rawErrMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED", severity: "WARNING", title: `Failed to process ${archiveSet.parts[0].fileName}`, message: errMsg, @@ -910,7 +1020,8 @@ async function processArchiveSets( sourceChannelId: ctx.channel.id, sourceMessageId: Number(archiveSet.parts[0].id), channelTitle: ctx.channelTitle, - reason: inferSkipReason(errMsg), + accountPhone: ctx.accountPhone, + reason: inferSkipReason(rawErrMsg), }, }, });