feat: add two-phase DB write and hash advisory lock to prevent double-uploads

This commit is contained in:
2026-05-02 23:13:55 +02:00
parent 614c8e5b74
commit b48cc510a4

View File

@@ -2,13 +2,14 @@ import path from "path";
import { unlink, readdir, mkdir, rm } from "fs/promises"; import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js"; import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js"; import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock } from "./db/locks.js"; import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
import { import {
getSourceChannelMappings, getSourceChannelMappings,
getGlobalDestinationChannel, getGlobalDestinationChannel,
packageExistsByHash, packageExistsByHash,
packageExistsBySourceMessage, packageExistsBySourceMessage,
createPackageWithFiles, createPackageStub,
updatePackageWithMetadata,
createIngestionRun, createIngestionRun,
completeIngestionRun, completeIngestionRun,
failIngestionRun, failIngestionRun,
@@ -301,6 +302,7 @@ interface PipelineContext {
/** Forum topic ID (null for non-forum). */ /** Forum topic ID (null for non-forum). */
sourceTopicId: bigint | null; sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>; accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
} }
/** /**
@@ -453,6 +455,7 @@ export async function runWorkerForAccount(
topicCreator: null, topicCreator: null,
sourceTopicId: null, sourceTopicId: null,
accountLog, accountLog,
maxUploadSize: BigInt(config.maxPartSizeMB) * 1024n * 1024n,
}; };
if (forum) { if (forum) {
@@ -1028,6 +1031,30 @@ async function processOneArchiveSet(
return null; return null;
} }
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
const hashLockAcquired = await tryAcquireHashLock(contentHash);
if (!hashLockAcquired) {
counters.zipsDuplicate++;
accountLog.info(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Hash lock held by another worker — skipping concurrent duplicate"
);
return null;
}
// Re-check after acquiring lock: another worker may have finished between
// the first check above and this point.
const existsAfterLock = await packageExistsByHash(contentHash);
if (existsAfterLock) {
await releaseHashLock(contentHash);
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Duplicate detected after acquiring hash lock — skipping"
);
return null;
}
// ── Reading metadata ── // ── Reading metadata ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Reading file list from ${archiveName}`, currentActivity: `Reading file list from ${archiveName}`,
@@ -1070,7 +1097,7 @@ async function processOneArchiveSet(
(sum, p) => sum + p.fileSize, (sum, p) => sum + p.fileSize,
0n 0n
); );
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE); const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (hasOversizedPart) { if (hasOversizedPart) {
@@ -1140,6 +1167,12 @@ async function processOneArchiveSet(
); );
} }
// Hoist creator/tags so they're visible after try block for logging
let creator: string | null = null;
const tags: string[] = [];
let stub: { id: string } | null = null;
try {
// ── Uploading ── // ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario: // Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing) // file reached Telegram but DB write failed or worker crashed before indexing)
@@ -1205,7 +1238,47 @@ async function processOneArchiveSet(
} }
} }
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);
creator =
topicCreator ??
extractCreatorFromFileName(archiveName) ??
extractCreatorFromChannelTitle(channelTitle) ??
null;
if (channel.category) {
tags.push(channel.category);
}
stub = await createPackageStub({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
});
counters.zipsIngested++;
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
} finally {
await releaseHashLock(contentHash);
}
if (!stub) return null;
// ── Preview thumbnail ── // ── Preview thumbnail ──
// (moved here from before stub creation — lock is released, preview doesn't need it)
let previewData: Buffer | null = null; let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null; let previewMsgId: bigint | null = null;
const matchedPhoto = previewMatches.get(archiveSet.baseName); const matchedPhoto = previewMatches.get(archiveSet.baseName);
@@ -1219,8 +1292,6 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId); previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
// Only set previewMsgId if we actually got the image data —
// otherwise the UI thinks there's a preview but the API returns 404
if (previewData) { if (previewData) {
previewMsgId = matchedPhoto.id; previewMsgId = matchedPhoto.id;
} }
@@ -1243,13 +1314,7 @@ async function processOneArchiveSet(
} }
} }
// ── Resolve creator: topic name > filename extraction > channel title > null ── // ── Phase 2: Update stub with file entries and preview ──
const creator = topicCreator
?? extractCreatorFromFileName(archiveName)
?? extractCreatorFromChannelTitle(channelTitle)
?? null;
// ── Indexing ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`, currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
currentStep: "indexing", currentStep: "indexing",
@@ -1259,43 +1324,12 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
// Clean up any orphaned record (same hash but no dest upload) before creating await updatePackageWithMetadata(stub.id, {
await deleteOrphanedPackageByHash(contentHash); files: entries,
// Auto-inherit source channel category as initial tag
const tags: string[] = [];
if (channel.category) {
tags.push(channel.category);
}
const pkg = await createPackageWithFiles({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart:
archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
previewData, previewData,
previewMsgId, previewMsgId,
sourceCaption: archiveSet.parts[0].caption ?? null,
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
files: entries,
}); });
counters.zipsIngested++;
// Clean up any prior skip record for this archive
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`, currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
currentStep: "complete", currentStep: "complete",
@@ -1311,7 +1345,7 @@ async function processOneArchiveSet(
"Archive ingested" "Archive ingested"
); );
return pkg.id; return stub.id;
} finally { } finally {
// ALWAYS delete temp files and the set directory // ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]); await deleteFiles([...tempPaths, ...splitPaths]);