fix: skip download when the same file was already uploaded from this channel

Diagnosed from production: in 8 hours of main's current run, zero
uploads happened despite the worker being busy 100% of the time. Logs
showed continuous "Downloading archive part" entries with no
corresponding upload activity.

Root cause: the source channel ("Model Printing Emporium") frequently
reposts the same file at new Telegram message IDs. Concrete example
from the DB:
  - "(EN) PaintGuides All.zip"  → present 6 times, msgIds 44B → 92B
  - "00 Welcome Pack.7z"        → present 2 times, msgIds 91B and 177B
  - "FanteZi April 2022-...zip" → uploaded May 8 at msgId 24,697,110,528;
                                  current run re-downloading at 87,488,987,136

packageExistsBySourceMessage(channelId, msgId) correctly misses because
the msgId is different. We download the (potentially gigabyte-sized)
file, hash it, then packageExistsByHash hits and we discard the
download. ~30 seconds wasted per repost x thousands of reposts = whole
runs spent uploading nothing.

Fix: add findRepostedPackage(sourceChannelId, fileName, fileSize) — a
pre-download check that catches reposts by the strong (channel + name
+ total size) signal. On hit, skip the set entirely. Watermark
advances normally (no minFailedId tracking) so the next cycle sees
the channel as caught up.

False-positive risk: two unrelated files in the same channel with
identical name AND identical total fileSize. Extremely rare in
practice; if it ever happens, the new file is silently treated as a
duplicate. Logged at info level with the existing Package ID and dest
message ID so the user can audit if a file is mysteriously missing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-23 08:54:20 +02:00
parent 77aeb4cc00
commit ff4e150544
2 changed files with 75 additions and 1 deletions

View File

@@ -189,6 +189,36 @@ export async function packageExistsBySourceMessage(
return pkg !== null;
}
/**
* Detect a likely repost: same source channel + same fileName + same total
* fileSize already exists with destMessageId set. Used to skip downloads
* when the channel admin re-posts the same file under a new message ID
* (which `packageExistsBySourceMessage` cannot catch because the message ID
* is different).
*
* Returns the existing package's destMessageId for logging/observability,
* or null if no match. Approximate: same name + same total size is an
* extremely strong signal that it's the same content, but theoretically
* two unrelated files could collide. If that ever happens, the new file
* gets treated as a duplicate and is lost; the user can manually re-link
* via the UI by removing the existing Package.
*/
export async function findRepostedPackage(
sourceChannelId: string,
fileName: string,
fileSize: bigint
): Promise<{ id: string; destMessageId: bigint | null } | null> {
return db.package.findFirst({
where: {
sourceChannelId,
fileName,
fileSize,
destMessageId: { not: null },
},
select: { id: true, destMessageId: true },
});
}
/**
* Delete orphaned Package rows that have the same content hash but never
* completed the upload (destMessageId is null). Called before creating a

View File

@@ -31,6 +31,7 @@ import {
upsertSkippedPackage,
deleteSkippedPackage,
getCappedSkippedMessageIds,
findRepostedPackage,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
@@ -1160,8 +1161,51 @@ async function processOneArchiveSet(
return null;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
// Compute the total size across all parts (used by the repost check below
// AND by the size guard further down).
const totalArchiveSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
// ── Pre-download repost detection ──
// The source channel admin frequently reposts the same file at new message
// IDs. packageExistsBySourceMessage misses these (different msgId), so we
// historically downloaded the file just to discover via hash that it's a
// duplicate — wasting hours of bandwidth per run.
//
// Match by (sourceChannelId, fileName, totalSize). The totalSize comparison
// makes this very strong — name-and-size collision between unrelated files
// is rare in practice. If it ever happens, the new file is treated as a
// duplicate; the user can remove the existing Package via the UI to force
// a re-ingestion.
const reposted = await findRepostedPackage(
channel.id,
archiveName,
totalArchiveSize
);
if (reposted) {
counters.zipsDuplicate++;
accountLog.info(
{
fileName: archiveName,
sourceMessageId: Number(archiveSet.parts[0].id),
existingPackageId: reposted.id,
existingDestMessageId: reposted.destMessageId ? Number(reposted.destMessageId) : null,
totalSize: Number(totalArchiveSize),
},
"Skipping repost — same fileName + size already uploaded in this channel"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (repost of already-uploaded file)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
const maxSizeBytes = BigInt(config.maxZipSizeMB) * 1024n * 1024n;
if (totalArchiveSize > maxSizeBytes) {
accountLog.warn(