diff --git a/prisma/migrations/20260524000000_package_remote_unique_id/migration.sql b/prisma/migrations/20260524000000_package_remote_unique_id/migration.sql new file mode 100644 index 0000000..ed2b456 --- /dev/null +++ b/prisma/migrations/20260524000000_package_remote_unique_id/migration.sql @@ -0,0 +1,10 @@ +-- AlterTable: capture TDLib's stable per-content identifier for new packages. +-- Existing rows are NULL; they fall through to the other dedup checks until +-- they're re-encountered organically. +ALTER TABLE "packages" ADD COLUMN "remoteUniqueId" TEXT; + +-- CreateIndex: scoped to source channel because we want to dedup +-- per-channel (the same file appearing in two different channels is still +-- worth indexing twice — they're different ingestion sources). +CREATE INDEX "packages_sourceChannelId_remoteUniqueId_idx" + ON "packages"("sourceChannelId", "remoteUniqueId"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 9fd2eca..a3b9d94 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -472,6 +472,11 @@ model Package { sourceChannelId String sourceMessageId BigInt sourceTopicId BigInt? + /// TDLib's `remote.unique_id` for the FIRST part's file. Stable across + /// reposts of identical content in the same channel — used as the + /// strongest pre-download dedup signal (no false positives unlike + /// fileName + size matching). + remoteUniqueId String? destChannelId String? destMessageId BigInt? destMessageIds BigInt[] @default([]) @@ -503,6 +508,7 @@ model Package { @@index([archiveType]) @@index([creator]) @@index([packageGroupId]) + @@index([sourceChannelId, remoteUniqueId]) @@map("packages") } diff --git a/worker/src/archive/multipart.ts b/worker/src/archive/multipart.ts index d37d72d..cf5dca7 100644 --- a/worker/src/archive/multipart.ts +++ b/worker/src/archive/multipart.ts @@ -11,8 +11,11 @@ export interface TelegramMessage { fileSize: bigint; date: Date; mediaAlbumId?: string; - replyToMessageId?: bigint; // NEW - caption?: string; // NEW + replyToMessageId?: bigint; + caption?: string; + /** TDLib's `remote.unique_id` for the file — stable across reposts of + * the exact same content. Empty string if the message didn't expose it. */ + remoteUniqueId?: string; } export interface ArchiveSet { diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index e2592ca..455c0a3 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -82,6 +82,8 @@ export interface CreatePackageStubInput { sourceChannelId: string; sourceMessageId: bigint; sourceTopicId?: bigint | null; + /** TDLib remote.unique_id of the first part — for future dedup. */ + remoteUniqueId?: string | null; destChannelId: string; destMessageId: bigint; destMessageIds: bigint[]; @@ -111,6 +113,7 @@ export async function createPackageStub( sourceChannelId: input.sourceChannelId, sourceMessageId: input.sourceMessageId, sourceTopicId: input.sourceTopicId ?? undefined, + remoteUniqueId: input.remoteUniqueId ?? undefined, destChannelId: input.destChannelId, destMessageId: input.destMessageId, destMessageIds: input.destMessageIds, @@ -189,6 +192,34 @@ export async function packageExistsBySourceMessage( return pkg !== null; } +/** + * Strongest pre-download dedup signal: a Package in this channel already + * has a matching TDLib remote.unique_id. The unique_id is stable across + * reposts of the exact same file content, so a hit is a guaranteed + * (lossless) duplicate. No false positives. + * + * Falls back to the older findRepostedPackage (name + size) for packages + * that were ingested before we started capturing remote.unique_id. + */ +export async function findPackageByRemoteUniqueId( + sourceChannelId: string, + remoteUniqueId: string +): Promise<{ + id: string; + destMessageId: bigint | null; + sourceTopicId: bigint | null; +} | null> { + return db.package.findFirst({ + where: { + sourceChannelId, + remoteUniqueId, + destMessageId: { not: null }, + }, + orderBy: { sourceTopicId: { sort: "desc", nulls: "last" } }, + select: { id: true, destMessageId: true, sourceTopicId: true }, + }); +} + /** * Detect a likely repost: same source channel + same fileName + same total * fileSize already exists with destMessageId set. Used to skip downloads diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 2e0f611..8751154 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -51,6 +51,10 @@ interface TdMessage { path?: string; is_downloading_completed?: boolean; }; + remote?: { + /** Stable identifier across reposts of the same file content. */ + unique_id?: string; + }; }; }; photo?: { @@ -231,6 +235,7 @@ export async function getChannelMessages( mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined, replyToMessageId: msg.reply_to_message_id ? BigInt(msg.reply_to_message_id) : undefined, caption: msg.content?.caption?.text || undefined, + remoteUniqueId: doc.document.remote?.unique_id || undefined, }); continue; } diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 58b1e8c..b6d0f41 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -210,6 +210,7 @@ export async function getTopicMessages( document?: { id: number; size: number; + remote?: { unique_id?: string }; }; }; photo?: { @@ -257,6 +258,7 @@ export async function getTopicMessages( fileSize: BigInt(doc.document.size), date: new Date(msg.date * 1000), mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined, + remoteUniqueId: doc.document.remote?.unique_id || undefined, }); continue; } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 444cd9e..c3c5490 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -32,6 +32,7 @@ import { deleteSkippedPackage, getCappedSkippedMessageIds, findRepostedPackage, + findPackageByRemoteUniqueId, getRetryableSkippedMessageIds, updatePackageTopicContext, } from "./db/queries.js"; @@ -1240,6 +1241,38 @@ async function processOneArchiveSet( const archiveName = archiveSet.parts[0].fileName; + // ── Earliest skip: remote.unique_id match ── + // TDLib reports a stable unique_id per file content. If we already have a + // Package in this channel with the same unique_id, it's the exact same + // file content reposted at a new message ID — zero false positives. + const firstRemoteUniqueId = archiveSet.parts[0].remoteUniqueId; + if (firstRemoteUniqueId) { + const match = await findPackageByRemoteUniqueId(channel.id, firstRemoteUniqueId); + if (match) { + counters.zipsDuplicate++; + accountLog.info( + { + fileName: archiveSet.parts[0].fileName, + sourceMessageId: Number(archiveSet.parts[0].id), + remoteUniqueId: firstRemoteUniqueId, + existingPackageId: match.id, + existingDestMessageId: match.destMessageId ? Number(match.destMessageId) : null, + }, + "Skipping — remote.unique_id matches an existing Package in this channel" + ); + await updateRunActivity(runId, { + currentActivity: `Skipped ${archiveSet.parts[0].fileName} (already ingested by unique_id)`, + currentStep: "deduplicating", + currentChannel: channelTitle, + currentFile: archiveSet.parts[0].fileName, + currentFileNum: setIdx + 1, + totalFiles: totalSets, + zipsDuplicate: counters.zipsDuplicate, + }); + return null; + } + } + // ── Early skip: check if this archive set was already ingested ── // This avoids re-downloading large archives that were processed in a prior run. const alreadyIngested = await packageExistsBySourceMessage( @@ -1705,6 +1738,7 @@ async function processOneArchiveSet( sourceChannelId: channel.id, sourceMessageId: archiveSet.parts[0].id, sourceTopicId, + remoteUniqueId: archiveSet.parts[0].remoteUniqueId ?? null, destChannelId, destMessageId: destResult.messageId, destMessageIds: destResult.messageIds,