feat(worker): per-account safeguards for second-account upload failures

Driven by a real production case: secondary account was attached to 17
source channels but ingesting only ~2-3 archives per cycle. Log analysis
showed three distinct issues that this commit addresses.

1. Auto-retry cap (WORKER_MAX_SKIP_ATTEMPTS, default 5)
   processArchiveSets now filters out SkippedPackage rows whose
   attemptCount has reached the cap. Removing them from the working
   list means they are not tracked in minFailedId, so the watermark
   cap from d99a506 does not pin progress below them anymore. A bad
   file no longer blocks the rest of the channel forever; the user
   can manually retry via the UI to reset the count.

2. Account phone in error messages
   Every SkippedPackage row and SystemNotification produced from a
   failure is now prefixed with [<phone>] in errorMessage / message,
   and the JSON context includes accountPhone. When two accounts
   share a source channel and only one is failing, the UI tells you
   which one.

3. Explicit getChat for destination at run start
   loadChats only loads main/archive/folder chat lists. If an account
   archived or moved the destination chat, sendMessage failed silently
   per-archive. Now we getChat the destination once per cycle; on
   failure we record a SystemNotification and skip the account's
   entire ingestion cycle (no point downloading what we can't upload).

4. Retry on transient Telegram server errors
   The "Turnbase Delivery Folder.7z" failure on the secondary and
   "10. Kingdom of the Depth.part1.rar" on the main were both
   "Internal Server Error during file upload" — a TG-side hiccup, not
   a stall or FLOOD_WAIT. These now retry up to MAX_UPLOAD_RETRIES
   with linear backoff (15s, 30s, 45s + jitter) before giving up.

5. Channel-access-lost notification
   "Iridium 2 w/ Add-ons [Completed]" has been throwing
   "Can't access the chat" every cycle for the secondary. The worker
   now surfaces a CHANNEL_ACCESS_LOST notification (deduped to once per
   24h per channel/account) so the admin sees it and can re-join or
   unlink the channel instead of just losing visibility into the loop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-22 23:07:57 +02:00
parent 7a79b52baf
commit 379bf246cd
4 changed files with 162 additions and 4 deletions

View File

@@ -625,6 +625,7 @@ export async function upsertSkippedPackage(data: {
errorMessage: data.errorMessage ?? null,
fileName: data.fileName,
fileSize: data.fileSize,
attemptCount: { increment: 1 },
createdAt: new Date(),
},
create: {
@@ -642,6 +643,26 @@ export async function upsertSkippedPackage(data: {
});
}
/**
* Return source-message IDs in a channel whose SkippedPackage attemptCount has
* reached or exceeded the cap — these are treated as "permanently failed for
* now" so the watermark can advance past them. The user can manually retry via
* the UI to reset the SkippedPackage record.
*/
export async function getCappedSkippedMessageIds(
sourceChannelId: string,
cap: number
): Promise<Set<bigint>> {
const rows = await db.skippedPackage.findMany({
where: {
sourceChannelId,
attemptCount: { gte: cap },
},
select: { sourceMessageId: true },
});
return new Set(rows.map((r) => r.sourceMessageId));
}
export async function deleteSkippedPackage(
sourceChannelId: string,
sourceMessageId: bigint

View File

@@ -136,6 +136,28 @@ async function sendWithRetry(
);
}
// Transient Telegram server-side error (HTTP 5xx returned via
// updateMessageSendFailed). These are NOT FLOOD_WAIT, NOT stalls — just
// TG having a bad moment. They typically resolve on a short backoff, so
// retry up to MAX_UPLOAD_RETRIES with linear backoff before giving up.
const lowerMsg = errMsg.toLowerCase();
const isTransientServerError =
lowerMsg.includes("internal server error") ||
lowerMsg.includes("internal error") ||
lowerMsg.includes("server error") ||
lowerMsg.includes("bad gateway") ||
lowerMsg.includes("service unavailable") ||
lowerMsg.includes("gateway timeout");
if (isTransientServerError && !isLastAttempt) {
const backoffMs = 15_000 * (attempt + 1) + Math.random() * 5_000;
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES, backoffMs: Math.round(backoffMs) },
`Transient Telegram server error — retrying after backoff`
);
await sleep(backoffMs);
continue;
}
throw err;
}
}

View File

@@ -20,4 +20,8 @@ export const config = {
apiDelayMs: 1000,
/** Max retries for rate-limited requests */
maxRetries: 5,
/** After this many failed attempts on the same source message, the worker
* stops auto-retrying and lets the watermark advance past it. The user can
* manually retry via the UI to reset and try again. */
maxSkipAttempts: parseInt(process.env.WORKER_MAX_SKIP_ATTEMPTS ?? "5", 10),
} as const;

View File

@@ -30,6 +30,7 @@ import {
getUploadedPackageByHash,
upsertSkippedPackage,
deleteSkippedPackage,
getCappedSkippedMessageIds,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
@@ -408,6 +409,47 @@ export async function runWorkerForAccount(
throw new Error("No global destination channel configured — set one in the admin UI");
}
// ── Ensure TDLib knows about the destination chat ──
// Source channels get an explicit getChat below, but the destination was
// previously only loaded via loadChats — which can miss it if the account
// archived/moved it. Failing here surfaces the problem clearly instead of
// letting every upload fail with a cryptic "Chat not found".
try {
await client.invoke({
_: "getChat",
chat_id: Number(destChannel.telegramId),
});
} catch (destErr) {
accountLog.error(
{ err: destErr, destChannel: destChannel.title, telegramId: destChannel.telegramId.toString() },
"Destination chat is not accessible to this account — uploads will fail. Re-join via invite link or remove this account."
);
// Surface as a persistent notification so the admin sees it in the UI
try {
await db.systemNotification.create({
data: {
type: "UPLOAD_FAILED",
severity: "ERROR",
title: `Destination chat unreachable for ${account.phone}`,
message: `Account ${account.phone} cannot access the destination chat "${destChannel.title}". Uploads for this account will fail until access is restored. Re-join via the invite link in admin settings.`,
context: {
accountId: account.id,
accountPhone: account.phone,
destChannelId: destChannel.id,
destChannelTitle: destChannel.title,
},
},
});
} catch {
// Best-effort notification
}
// Skip this account's ingestion cycle entirely — there's no point
// scanning + downloading if we can't upload.
throw new Error(
`Destination chat "${destChannel.title}" is not accessible to account ${account.phone}`
);
}
const totalChannels = channelMappings.length;
if (totalChannels === 0) {
@@ -674,6 +716,50 @@ export async function runWorkerForAccount(
{ err: channelErr, channelId: channel.id, title: channel.title },
"Failed to process channel, skipping to next"
);
// If the channel is no longer accessible (account got removed,
// channel deleted, etc.), surface a persistent notification so the
// admin can decide what to do. Dedupe by (channelId, accountId)
// within the last 24h so we don't flood the notifications list every
// cycle.
const errMsg = channelErr instanceof Error ? channelErr.message : String(channelErr);
const isAccessError =
errMsg.includes("Can't access the chat") ||
errMsg.includes("CHAT_FORBIDDEN") ||
errMsg.includes("CHANNEL_PRIVATE") ||
errMsg.includes("Chat not found");
if (isAccessError) {
try {
const recent = await db.systemNotification.findFirst({
where: {
type: "CHANNEL_ACCESS_LOST",
context: { path: ["channelId"], equals: channel.id },
createdAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
},
select: { id: true },
});
if (!recent) {
await db.systemNotification.create({
data: {
type: "CHANNEL_ACCESS_LOST",
severity: "WARNING",
title: `Lost access to "${channel.title}" for ${account.phone}`,
message: `Account ${account.phone} can no longer access source channel "${channel.title}". The worker is skipping this channel every cycle. Re-join the channel or unlink it from this account in admin settings.`,
context: {
channelId: channel.id,
channelTitle: channel.title,
telegramId: channel.telegramId.toString(),
accountId: account.id,
accountPhone: account.phone,
errorMessage: errMsg.slice(0, 200),
},
},
});
}
} catch {
// Best-effort notification
}
}
}
}
@@ -746,6 +832,26 @@ async function processArchiveSets(
}
}
// Filter out sets whose source message has hit the auto-retry cap — these are
// treated as "give up for now" so the watermark can advance past them.
// Removing them from archiveSets means they are NOT tracked in minFailedId,
// so the caller's watermark cap won't pin progress below them. The
// SkippedPackage record stays so the user can manually retry via the UI.
const cappedIds = await getCappedSkippedMessageIds(channel.id, config.maxSkipAttempts);
if (cappedIds.size > 0) {
const beforeCap = archiveSets.length;
archiveSets = archiveSets.filter(
(set) => !cappedIds.has(set.parts[0].id)
);
const cappedSkipped = beforeCap - archiveSets.length;
if (cappedSkipped > 0) {
accountLog.warn(
{ cappedSkipped, cap: config.maxSkipAttempts, remaining: archiveSets.length },
"Skipping archive sets that hit the auto-retry attempt cap — watermark will advance past them"
);
}
}
counters.zipsFound += archiveSets.length;
// Match preview photos to archive sets
@@ -885,11 +991,15 @@ async function processArchiveSets(
try {
const archiveSet = archiveSets[setIdx];
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const errMsg = setErr instanceof Error ? setErr.message : String(setErr);
const rawErrMsg = setErr instanceof Error ? setErr.message : String(setErr);
// Prefix with [<phone>] so the SkippedPackage / notification UI shows
// which account hit the error — important when two accounts share a
// source channel and only one is failing.
const errMsg = `[${ctx.accountPhone}] ${rawErrMsg}`;
await upsertSkippedPackage({
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
reason: inferSkipReason(errMsg),
reason: inferSkipReason(rawErrMsg),
errorMessage: errMsg,
sourceChannelId: ctx.channel.id,
sourceMessageId: archiveSet.parts[0].id,
@@ -901,7 +1011,7 @@ async function processArchiveSets(
// Also create a persistent notification
await db.systemNotification.create({
data: {
type: inferSkipReason(errMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED",
type: inferSkipReason(rawErrMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED",
severity: "WARNING",
title: `Failed to process ${archiveSet.parts[0].fileName}`,
message: errMsg,
@@ -910,7 +1020,8 @@ async function processArchiveSets(
sourceChannelId: ctx.channel.id,
sourceMessageId: Number(archiveSet.parts[0].id),
channelTitle: ctx.channelTitle,
reason: inferSkipReason(errMsg),
accountPhone: ctx.accountPhone,
reason: inferSkipReason(rawErrMsg),
},
},
});