diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 177003b..7624086 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -473,3 +473,53 @@ export async function resetPackageDestination(packageId: string) { data: { destChannelId: null, destMessageId: null }, }); } + +export async function upsertSkippedPackage(data: { + fileName: string; + fileSize: bigint; + reason: "SIZE_LIMIT" | "DOWNLOAD_FAILED" | "EXTRACT_FAILED" | "UPLOAD_FAILED"; + errorMessage?: string; + sourceChannelId: string; + sourceMessageId: bigint; + sourceTopicId?: bigint | null; + isMultipart: boolean; + partCount: number; + accountId: string; +}) { + return db.skippedPackage.upsert({ + where: { + sourceChannelId_sourceMessageId: { + sourceChannelId: data.sourceChannelId, + sourceMessageId: data.sourceMessageId, + }, + }, + update: { + reason: data.reason, + errorMessage: data.errorMessage ?? null, + fileName: data.fileName, + fileSize: data.fileSize, + createdAt: new Date(), + }, + create: { + fileName: data.fileName, + fileSize: data.fileSize, + reason: data.reason, + errorMessage: data.errorMessage ?? null, + sourceChannelId: data.sourceChannelId, + sourceMessageId: data.sourceMessageId, + sourceTopicId: data.sourceTopicId ?? null, + isMultipart: data.isMultipart, + partCount: data.partCount, + accountId: data.accountId, + }, + }); +} + +export async function deleteSkippedPackage( + sourceChannelId: string, + sourceMessageId: bigint +) { + return db.skippedPackage.deleteMany({ + where: { sourceChannelId, sourceMessageId }, + }); +} diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 56d17d0..473b4ac 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -26,6 +26,8 @@ import { getExistingChannelsByTelegramId, getAccountById, deleteOrphanedPackageByHash, + upsertSkippedPackage, + deleteSkippedPackage, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; @@ -279,6 +281,7 @@ function createThrottledActivityUpdater(runId: string, minIntervalMs = 2000) { interface PipelineContext { client: Client; runId: string; + accountId: string; channelTitle: string; channel: TelegramChannel; destChannelTelegramId: bigint; @@ -436,6 +439,7 @@ export async function runWorkerForAccount( const pipelineCtx: PipelineContext = { client, runId: activeRunId, + accountId: account.id, channelTitle: channel.title, channel, destChannelTelegramId: destChannel.telegramId, @@ -729,6 +733,25 @@ async function processArchiveSets( { err: setErr, baseName: archiveSets[setIdx].baseName }, "Archive set failed, watermark will not advance past this set" ); + // Record the failure for visibility in the UI + try { + const archiveSet = archiveSets[setIdx]; + const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n); + await upsertSkippedPackage({ + fileName: archiveSet.parts[0].fileName, + fileSize: totalSize, + reason: "DOWNLOAD_FAILED", + errorMessage: setErr instanceof Error ? setErr.message : String(setErr), + sourceChannelId: ctx.channel.id, + sourceMessageId: archiveSet.parts[0].id, + sourceTopicId: ctx.sourceTopicId, + isMultipart: archiveSet.isMultipart, + partCount: archiveSet.parts.length, + accountId: ctx.accountId, + }); + } catch { + // Best-effort — don't fail the run if skip recording fails + } } } @@ -798,6 +821,17 @@ async function processOneArchiveSet( currentFileNum: setIdx + 1, totalFiles: totalSets, }); + await upsertSkippedPackage({ + fileName: archiveName, + fileSize: totalArchiveSize, + reason: "SIZE_LIMIT", + sourceChannelId: channel.id, + sourceMessageId: archiveSet.parts[0].id, + sourceTopicId: ctx.sourceTopicId, + isMultipart: archiveSet.isMultipart, + partCount: archiveSet.parts.length, + accountId: ctx.accountId, + }); return; } @@ -1086,6 +1120,8 @@ async function processOneArchiveSet( }); counters.zipsIngested++; + // Clean up any prior skip record for this archive + await deleteSkippedPackage(channel.id, archiveSet.parts[0].id); await updateRunActivity(runId, { currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,