diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 7624086..fae3bb3 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -62,6 +62,18 @@ export async function packageExistsByHash(contentHash: string) { return pkg !== null; } +/** + * Find an already-uploaded package by content hash. + * Used to detect orphaned uploads — files that reached Telegram + * but whose package record was created from a previous successful run. + */ +export async function getUploadedPackageByHash(contentHash: string) { + return db.package.findFirst({ + where: { contentHash, destMessageId: { not: null }, destChannelId: { not: null } }, + select: { destChannelId: true, destMessageId: true }, + }); +} + /** * Check if a package already exists for a given source message ID * AND was successfully uploaded to the destination (destMessageId is set). diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index b470386..336a83b 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -354,10 +354,10 @@ export async function downloadFile( let lastLoggedPercent = 0; let settled = false; - // Timeout: 10 minutes per GB, minimum 5 minutes + // Timeout: 15 minutes per GB, minimum 10 minutes const timeoutMs = Math.max( - 5 * 60_000, - (totalBytes / (1024 * 1024 * 1024)) * 10 * 60_000 + 10 * 60_000, + (totalBytes / (1024 * 1024 * 1024)) * 15 * 60_000 ); const timer = setTimeout(() => { if (!settled) { diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index cd67719..e019d33 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -94,10 +94,10 @@ async function sendAndWaitForUpload( let lastLoggedPercent = 0; let tempMsgId: number | null = null; - // Timeout: 10 minutes per GB, minimum 10 minutes + // Timeout: 15 minutes per GB, minimum 10 minutes const timeoutMs = Math.max( 10 * 60_000, - (fileSizeMB / 1024) * 10 * 60_000 + (fileSizeMB / 1024) * 15 * 60_000 ); const timer = setTimeout(() => { diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 473b4ac..f028132 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -26,6 +26,7 @@ import { getExistingChannelsByTelegramId, getAccountById, deleteOrphanedPackageByHash, + getUploadedPackageByHash, upsertSkippedPackage, deleteSkippedPackage, } from "./db/queries.js"; @@ -643,6 +644,20 @@ export async function runWorkerForAccount( } } +/** + * Infer the SkipReason from an error message so the UI shows the correct badge. + */ +function inferSkipReason(errMsg: string): "DOWNLOAD_FAILED" | "UPLOAD_FAILED" | "EXTRACT_FAILED" { + const lower = errMsg.toLowerCase(); + if (lower.includes("upload") || lower.includes("too many requests") || lower.includes("retry after") || lower.includes("send")) { + return "UPLOAD_FAILED"; + } + if (lower.includes("extract") || lower.includes("metadata") || lower.includes("central directory") || lower.includes("archive")) { + return "EXTRACT_FAILED"; + } + return "DOWNLOAD_FAILED"; +} + /** * Process a scan result through the archive pipeline: * group → download → hash → dedup → metadata → split → upload → preview → index. @@ -737,11 +752,12 @@ async function processArchiveSets( try { const archiveSet = archiveSets[setIdx]; const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n); + const errMsg = setErr instanceof Error ? setErr.message : String(setErr); await upsertSkippedPackage({ fileName: archiveSet.parts[0].fileName, fileSize: totalSize, - reason: "DOWNLOAD_FAILED", - errorMessage: setErr instanceof Error ? setErr.message : String(setErr), + reason: inferSkipReason(errMsg), + errorMessage: errMsg, sourceChannelId: ctx.channel.id, sourceMessageId: archiveSet.parts[0].id, sourceTopicId: ctx.sourceTopicId, @@ -1017,23 +1033,36 @@ async function processOneArchiveSet( } // ── Uploading ── - const uploadLabel = uploadPaths.length > 1 - ? ` (${uploadPaths.length} parts)` - : ""; - await updateRunActivity(runId, { - currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`, - currentStep: "uploading", - currentChannel: channelTitle, - currentFile: archiveName, - currentFileNum: setIdx + 1, - totalFiles: totalSets, - }); + // Check if a prior run already uploaded this file (orphaned upload scenario: + // file reached Telegram but DB write failed or worker crashed before indexing) + const existingUpload = await getUploadedPackageByHash(contentHash); + let destResult: { messageId: bigint }; - const destResult = await uploadToChannel( - client, - destChannelTelegramId, - uploadPaths - ); + if (existingUpload && existingUpload.destMessageId) { + accountLog.info( + { fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) }, + "Reusing existing upload (file already on destination channel)" + ); + destResult = { messageId: existingUpload.destMessageId }; + } else { + const uploadLabel = uploadPaths.length > 1 + ? ` (${uploadPaths.length} parts)` + : ""; + await updateRunActivity(runId, { + currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`, + currentStep: "uploading", + currentChannel: channelTitle, + currentFile: archiveName, + currentFileNum: setIdx + 1, + totalFiles: totalSets, + }); + + destResult = await uploadToChannel( + client, + destChannelTelegramId, + uploadPaths + ); + } // ── Preview thumbnail ── let previewData: Buffer | null = null;