fix: improve worker error handling and reliability
All checks were successful
continuous-integration/drone/push Build is passing

1. Distinguish failure reasons: inspect error messages to label skipped
   packages as DOWNLOAD_FAILED, UPLOAD_FAILED, or EXTRACT_FAILED
   instead of catch-all DOWNLOAD_FAILED.

2. Detect orphaned uploads: before uploading, check if the same content
   hash already has a successful upload on the destination channel. Reuse
   the existing message ID instead of re-uploading (prevents duplicates
   when worker crashed between upload and DB write).

3. Increase timeouts: download from max(5min, GB*10min) to
   max(10min, GB*15min), upload from GB*10min to GB*15min.
   Prevents premature timeouts on slow connections.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 02:37:23 +01:00
parent 55bdf3c890
commit fe28c31b9e
4 changed files with 64 additions and 23 deletions

View File

@@ -62,6 +62,18 @@ export async function packageExistsByHash(contentHash: string) {
return pkg !== null;
}
/**
* Find an already-uploaded package by content hash.
* Used to detect orphaned uploads — files that reached Telegram
* but whose package record was created from a previous successful run.
*/
export async function getUploadedPackageByHash(contentHash: string) {
return db.package.findFirst({
where: { contentHash, destMessageId: { not: null }, destChannelId: { not: null } },
select: { destChannelId: true, destMessageId: true },
});
}
/**
* Check if a package already exists for a given source message ID
* AND was successfully uploaded to the destination (destMessageId is set).

View File

@@ -354,10 +354,10 @@ export async function downloadFile(
let lastLoggedPercent = 0;
let settled = false;
// Timeout: 10 minutes per GB, minimum 5 minutes
// Timeout: 15 minutes per GB, minimum 10 minutes
const timeoutMs = Math.max(
5 * 60_000,
(totalBytes / (1024 * 1024 * 1024)) * 10 * 60_000
10 * 60_000,
(totalBytes / (1024 * 1024 * 1024)) * 15 * 60_000
);
const timer = setTimeout(() => {
if (!settled) {

View File

@@ -94,10 +94,10 @@ async function sendAndWaitForUpload(
let lastLoggedPercent = 0;
let tempMsgId: number | null = null;
// Timeout: 10 minutes per GB, minimum 10 minutes
// Timeout: 15 minutes per GB, minimum 10 minutes
const timeoutMs = Math.max(
10 * 60_000,
(fileSizeMB / 1024) * 10 * 60_000
(fileSizeMB / 1024) * 15 * 60_000
);
const timer = setTimeout(() => {

View File

@@ -26,6 +26,7 @@ import {
getExistingChannelsByTelegramId,
getAccountById,
deleteOrphanedPackageByHash,
getUploadedPackageByHash,
upsertSkippedPackage,
deleteSkippedPackage,
} from "./db/queries.js";
@@ -643,6 +644,20 @@ export async function runWorkerForAccount(
}
}
/**
* Infer the SkipReason from an error message so the UI shows the correct badge.
*/
function inferSkipReason(errMsg: string): "DOWNLOAD_FAILED" | "UPLOAD_FAILED" | "EXTRACT_FAILED" {
const lower = errMsg.toLowerCase();
if (lower.includes("upload") || lower.includes("too many requests") || lower.includes("retry after") || lower.includes("send")) {
return "UPLOAD_FAILED";
}
if (lower.includes("extract") || lower.includes("metadata") || lower.includes("central directory") || lower.includes("archive")) {
return "EXTRACT_FAILED";
}
return "DOWNLOAD_FAILED";
}
/**
* Process a scan result through the archive pipeline:
* group → download → hash → dedup → metadata → split → upload → preview → index.
@@ -737,11 +752,12 @@ async function processArchiveSets(
try {
const archiveSet = archiveSets[setIdx];
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const errMsg = setErr instanceof Error ? setErr.message : String(setErr);
await upsertSkippedPackage({
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
reason: "DOWNLOAD_FAILED",
errorMessage: setErr instanceof Error ? setErr.message : String(setErr),
reason: inferSkipReason(errMsg),
errorMessage: errMsg,
sourceChannelId: ctx.channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
@@ -1017,6 +1033,18 @@ async function processOneArchiveSet(
}
// ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing)
const existingUpload = await getUploadedPackageByHash(contentHash);
let destResult: { messageId: bigint };
if (existingUpload && existingUpload.destMessageId) {
accountLog.info(
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
"Reusing existing upload (file already on destination channel)"
);
destResult = { messageId: existingUpload.destMessageId };
} else {
const uploadLabel = uploadPaths.length > 1
? ` (${uploadPaths.length} parts)`
: "";
@@ -1029,11 +1057,12 @@ async function processOneArchiveSet(
totalFiles: totalSets,
});
const destResult = await uploadToChannel(
destResult = await uploadToChannel(
client,
destChannelTelegramId,
uploadPaths
);
}
// ── Preview thumbnail ──
let previewData: Buffer | null = null;