feat: record skipped/failed archives in database for UI visibility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 16:16:12 +01:00
parent 780e6200d8
commit d53e581623
2 changed files with 86 additions and 0 deletions

View File

@@ -473,3 +473,53 @@ export async function resetPackageDestination(packageId: string) {
data: { destChannelId: null, destMessageId: null },
});
}
export async function upsertSkippedPackage(data: {
fileName: string;
fileSize: bigint;
reason: "SIZE_LIMIT" | "DOWNLOAD_FAILED" | "EXTRACT_FAILED" | "UPLOAD_FAILED";
errorMessage?: string;
sourceChannelId: string;
sourceMessageId: bigint;
sourceTopicId?: bigint | null;
isMultipart: boolean;
partCount: number;
accountId: string;
}) {
return db.skippedPackage.upsert({
where: {
sourceChannelId_sourceMessageId: {
sourceChannelId: data.sourceChannelId,
sourceMessageId: data.sourceMessageId,
},
},
update: {
reason: data.reason,
errorMessage: data.errorMessage ?? null,
fileName: data.fileName,
fileSize: data.fileSize,
createdAt: new Date(),
},
create: {
fileName: data.fileName,
fileSize: data.fileSize,
reason: data.reason,
errorMessage: data.errorMessage ?? null,
sourceChannelId: data.sourceChannelId,
sourceMessageId: data.sourceMessageId,
sourceTopicId: data.sourceTopicId ?? null,
isMultipart: data.isMultipart,
partCount: data.partCount,
accountId: data.accountId,
},
});
}
export async function deleteSkippedPackage(
sourceChannelId: string,
sourceMessageId: bigint
) {
return db.skippedPackage.deleteMany({
where: { sourceChannelId, sourceMessageId },
});
}

View File

@@ -26,6 +26,8 @@ import {
getExistingChannelsByTelegramId,
getAccountById,
deleteOrphanedPackageByHash,
upsertSkippedPackage,
deleteSkippedPackage,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
@@ -279,6 +281,7 @@ function createThrottledActivityUpdater(runId: string, minIntervalMs = 2000) {
interface PipelineContext {
client: Client;
runId: string;
accountId: string;
channelTitle: string;
channel: TelegramChannel;
destChannelTelegramId: bigint;
@@ -436,6 +439,7 @@ export async function runWorkerForAccount(
const pipelineCtx: PipelineContext = {
client,
runId: activeRunId,
accountId: account.id,
channelTitle: channel.title,
channel,
destChannelTelegramId: destChannel.telegramId,
@@ -729,6 +733,25 @@ async function processArchiveSets(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
// Record the failure for visibility in the UI
try {
const archiveSet = archiveSets[setIdx];
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
await upsertSkippedPackage({
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
reason: "DOWNLOAD_FAILED",
errorMessage: setErr instanceof Error ? setErr.message : String(setErr),
sourceChannelId: ctx.channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
} catch {
// Best-effort — don't fail the run if skip recording fails
}
}
}
@@ -798,6 +821,17 @@ async function processOneArchiveSet(
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
await upsertSkippedPackage({
fileName: archiveName,
fileSize: totalArchiveSize,
reason: "SIZE_LIMIT",
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
return;
}
@@ -1086,6 +1120,8 @@ async function processOneArchiveSet(
});
counters.zipsIngested++;
// Clean up any prior skip record for this archive
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,