From b48cc510a4f31910b9b9d0f294a3ec32171741fe Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Sat, 2 May 2026 23:13:55 +0200 Subject: [PATCH] feat: add two-phase DB write and hash advisory lock to prevent double-uploads --- worker/src/worker.ts | 244 ++++++++++++++++++++++++------------------- 1 file changed, 139 insertions(+), 105 deletions(-) diff --git a/worker/src/worker.ts b/worker/src/worker.ts index be76d7d..bc1e742 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -2,13 +2,14 @@ import path from "path"; import { unlink, readdir, mkdir, rm } from "fs/promises"; import { config } from "./util/config.js"; import { childLogger } from "./util/logger.js"; -import { tryAcquireLock, releaseLock } from "./db/locks.js"; +import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js"; import { getSourceChannelMappings, getGlobalDestinationChannel, packageExistsByHash, packageExistsBySourceMessage, - createPackageWithFiles, + createPackageStub, + updatePackageWithMetadata, createIngestionRun, completeIngestionRun, failIngestionRun, @@ -301,6 +302,7 @@ interface PipelineContext { /** Forum topic ID (null for non-forum). */ sourceTopicId: bigint | null; accountLog: ReturnType; + maxUploadSize: bigint; } /** @@ -453,6 +455,7 @@ export async function runWorkerForAccount( topicCreator: null, sourceTopicId: null, accountLog, + maxUploadSize: BigInt(config.maxPartSizeMB) * 1024n * 1024n, }; if (forum) { @@ -1028,6 +1031,30 @@ async function processOneArchiveSet( return null; } + // ── Hash lock: prevent concurrent workers racing on shared-channel archives ── + const hashLockAcquired = await tryAcquireHashLock(contentHash); + if (!hashLockAcquired) { + counters.zipsDuplicate++; + accountLog.info( + { fileName: archiveName, hash: contentHash.slice(0, 16) }, + "Hash lock held by another worker — skipping concurrent duplicate" + ); + return null; + } + + // Re-check after acquiring lock: another worker may have finished between + // the first check above and this point. + const existsAfterLock = await packageExistsByHash(contentHash); + if (existsAfterLock) { + await releaseHashLock(contentHash); + counters.zipsDuplicate++; + accountLog.debug( + { fileName: archiveName, hash: contentHash.slice(0, 16) }, + "Duplicate detected after acquiring hash lock — skipping" + ); + return null; + } + // ── Reading metadata ── await updateRunActivity(runId, { currentActivity: `Reading file list from ${archiveName}`, @@ -1070,7 +1097,7 @@ async function processOneArchiveSet( (sum, p) => sum + p.fileSize, 0n ); - const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; + const MAX_UPLOAD_SIZE = ctx.maxUploadSize; const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE); if (hasOversizedPart) { @@ -1140,72 +1167,118 @@ async function processOneArchiveSet( ); } - // ── Uploading ── - // Check if a prior run already uploaded this file (orphaned upload scenario: - // file reached Telegram but DB write failed or worker crashed before indexing) - const existingUpload = await getUploadedPackageByHash(contentHash); - let destResult: { messageId: bigint; messageIds: bigint[] }; + // Hoist creator/tags so they're visible after try block for logging + let creator: string | null = null; + const tags: string[] = []; - if (existingUpload && existingUpload.destMessageId) { - accountLog.info( - { fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) }, - "Reusing existing upload (file already on destination channel)" - ); - destResult = { - messageId: existingUpload.destMessageId, - messageIds: existingUpload.destMessageIds?.length - ? (existingUpload.destMessageIds as bigint[]) - : [existingUpload.destMessageId], - }; - } else { - const uploadLabel = uploadPaths.length > 1 - ? ` (${uploadPaths.length} parts)` - : ""; - await updateRunActivity(runId, { - currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`, - currentStep: "uploading", - currentChannel: channelTitle, - currentFile: archiveName, - currentFileNum: setIdx + 1, - totalFiles: totalSets, + let stub: { id: string } | null = null; + try { + // ── Uploading ── + // Check if a prior run already uploaded this file (orphaned upload scenario: + // file reached Telegram but DB write failed or worker crashed before indexing) + const existingUpload = await getUploadedPackageByHash(contentHash); + let destResult: { messageId: bigint; messageIds: bigint[] }; + + if (existingUpload && existingUpload.destMessageId) { + accountLog.info( + { fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) }, + "Reusing existing upload (file already on destination channel)" + ); + destResult = { + messageId: existingUpload.destMessageId, + messageIds: existingUpload.destMessageIds?.length + ? (existingUpload.destMessageIds as bigint[]) + : [existingUpload.destMessageId], + }; + } else { + const uploadLabel = uploadPaths.length > 1 + ? ` (${uploadPaths.length} parts)` + : ""; + await updateRunActivity(runId, { + currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`, + currentStep: "uploading", + currentChannel: channelTitle, + currentFile: archiveName, + currentFileNum: setIdx + 1, + totalFiles: totalSets, + }); + + destResult = await uploadToChannel( + client, + destChannelTelegramId, + uploadPaths + ); + } + + // ── Post-upload integrity check ── + // Verify the files on disk still match before we index + if (uploadPaths.length > 0 && !existingUpload) { + try { + const postUploadHash = await hashParts(uploadPaths); + if (splitPaths.length > 0) { + // Split files — hash should match the split hash (already verified above) + // No additional check needed since we verified split hash = original hash + } else if (postUploadHash !== contentHash) { + accountLog.error( + { fileName: archiveName, originalHash: contentHash, postUploadHash }, + "Hash changed between hashing and upload — possible disk corruption" + ); + await db.systemNotification.create({ + data: { + type: "HASH_MISMATCH", + severity: "ERROR", + title: `Post-upload hash mismatch: ${archiveName}`, + message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`, + context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id }, + }, + }); + } + } catch { + // Best-effort — don't fail the ingestion + } + } + + // ── Phase 1: Stub record — persisted immediately after upload ── + await deleteOrphanedPackageByHash(contentHash); + + creator = + topicCreator ?? + extractCreatorFromFileName(archiveName) ?? + extractCreatorFromChannelTitle(channelTitle) ?? + null; + + if (channel.category) { + tags.push(channel.category); + } + + stub = await createPackageStub({ + contentHash, + fileName: archiveName, + fileSize: totalSize, + archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type, + sourceChannelId: channel.id, + sourceMessageId: archiveSet.parts[0].id, + sourceTopicId, + destChannelId, + destMessageId: destResult.messageId, + destMessageIds: destResult.messageIds, + isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1, + partCount: uploadPaths.length, + ingestionRunId, + creator, + tags, }); - destResult = await uploadToChannel( - client, - destChannelTelegramId, - uploadPaths - ); + counters.zipsIngested++; + await deleteSkippedPackage(channel.id, archiveSet.parts[0].id); + } finally { + await releaseHashLock(contentHash); } - // ── Post-upload integrity check ── - // Verify the files on disk still match before we index - if (uploadPaths.length > 0 && !existingUpload) { - try { - const postUploadHash = await hashParts(uploadPaths); - if (splitPaths.length > 0) { - // Split files — hash should match the split hash (already verified above) - // No additional check needed since we verified split hash = original hash - } else if (postUploadHash !== contentHash) { - accountLog.error( - { fileName: archiveName, originalHash: contentHash, postUploadHash }, - "Hash changed between hashing and upload — possible disk corruption" - ); - await db.systemNotification.create({ - data: { - type: "HASH_MISMATCH", - severity: "ERROR", - title: `Post-upload hash mismatch: ${archiveName}`, - message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`, - context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id }, - }, - }); - } - } catch { - // Best-effort — don't fail the ingestion - } - } + if (!stub) return null; // ── Preview thumbnail ── + // (moved here from before stub creation — lock is released, preview doesn't need it) let previewData: Buffer | null = null; let previewMsgId: bigint | null = null; const matchedPhoto = previewMatches.get(archiveSet.baseName); @@ -1219,8 +1292,6 @@ async function processOneArchiveSet( totalFiles: totalSets, }); previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId); - // Only set previewMsgId if we actually got the image data — - // otherwise the UI thinks there's a preview but the API returns 404 if (previewData) { previewMsgId = matchedPhoto.id; } @@ -1243,13 +1314,7 @@ async function processOneArchiveSet( } } - // ── Resolve creator: topic name > filename extraction > channel title > null ── - const creator = topicCreator - ?? extractCreatorFromFileName(archiveName) - ?? extractCreatorFromChannelTitle(channelTitle) - ?? null; - - // ── Indexing ── + // ── Phase 2: Update stub with file entries and preview ── await updateRunActivity(runId, { currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`, currentStep: "indexing", @@ -1259,43 +1324,12 @@ async function processOneArchiveSet( totalFiles: totalSets, }); - // Clean up any orphaned record (same hash but no dest upload) before creating - await deleteOrphanedPackageByHash(contentHash); - - // Auto-inherit source channel category as initial tag - const tags: string[] = []; - if (channel.category) { - tags.push(channel.category); - } - - const pkg = await createPackageWithFiles({ - contentHash, - fileName: archiveName, - fileSize: totalSize, - archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type, - sourceChannelId: channel.id, - sourceMessageId: archiveSet.parts[0].id, - sourceTopicId, - destChannelId, - destMessageId: destResult.messageId, - destMessageIds: destResult.messageIds, - isMultipart: - archiveSet.parts.length > 1 || uploadPaths.length > 1, - partCount: uploadPaths.length, - ingestionRunId, - creator, - tags, + await updatePackageWithMetadata(stub.id, { + files: entries, previewData, previewMsgId, - sourceCaption: archiveSet.parts[0].caption ?? null, - replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null, - files: entries, }); - counters.zipsIngested++; - // Clean up any prior skip record for this archive - await deleteSkippedPackage(channel.id, archiveSet.parts[0].id); - await updateRunActivity(runId, { currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`, currentStep: "complete", @@ -1311,7 +1345,7 @@ async function processOneArchiveSet( "Archive ingested" ); - return pkg.id; + return stub.id; } finally { // ALWAYS delete temp files and the set directory await deleteFiles([...tempPaths, ...splitPaths]);