mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-13 04:31:16 +00:00
feat(worker): use TDLib remote.unique_id as zero-false-positive dedup signal
The fileName + size repost detection from ff4e150 works but has a
theoretical false-positive: two unrelated files in the same channel
with identical names and identical total sizes get treated as duplicates.
TDLib's document.remote.unique_id is a stable identifier per file
content — every repost of the exact same file across messages keeps
the same unique_id. Using it as the first dedup check eliminates the
false-positive risk entirely.
Schema:
- Package.remoteUniqueId (nullable, since existing rows lack it)
- Index on (sourceChannelId, remoteUniqueId)
Pipeline:
1. Capture remoteUniqueId in getChannelMessages + getTopicMessages
2. Pass through TelegramMessage type
3. processOneArchiveSet checks findPackageByRemoteUniqueId FIRST
(before packageExistsBySourceMessage / findRepostedPackage)
4. createPackageStub stores it on the new Package row
Existing 19,952 Packages have remoteUniqueId = NULL — they fall through
to the existing checks (source-msg-id, name+size, content-hash). New
ingestions populate it and benefit from the strong signal immediately.
Old Packages get backfilled organically when their content is
re-encountered and a new Package would otherwise be created.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,10 @@
|
|||||||
|
-- AlterTable: capture TDLib's stable per-content identifier for new packages.
|
||||||
|
-- Existing rows are NULL; they fall through to the other dedup checks until
|
||||||
|
-- they're re-encountered organically.
|
||||||
|
ALTER TABLE "packages" ADD COLUMN "remoteUniqueId" TEXT;
|
||||||
|
|
||||||
|
-- CreateIndex: scoped to source channel because we want to dedup
|
||||||
|
-- per-channel (the same file appearing in two different channels is still
|
||||||
|
-- worth indexing twice — they're different ingestion sources).
|
||||||
|
CREATE INDEX "packages_sourceChannelId_remoteUniqueId_idx"
|
||||||
|
ON "packages"("sourceChannelId", "remoteUniqueId");
|
||||||
@@ -472,6 +472,11 @@ model Package {
|
|||||||
sourceChannelId String
|
sourceChannelId String
|
||||||
sourceMessageId BigInt
|
sourceMessageId BigInt
|
||||||
sourceTopicId BigInt?
|
sourceTopicId BigInt?
|
||||||
|
/// TDLib's `remote.unique_id` for the FIRST part's file. Stable across
|
||||||
|
/// reposts of identical content in the same channel — used as the
|
||||||
|
/// strongest pre-download dedup signal (no false positives unlike
|
||||||
|
/// fileName + size matching).
|
||||||
|
remoteUniqueId String?
|
||||||
destChannelId String?
|
destChannelId String?
|
||||||
destMessageId BigInt?
|
destMessageId BigInt?
|
||||||
destMessageIds BigInt[] @default([])
|
destMessageIds BigInt[] @default([])
|
||||||
@@ -503,6 +508,7 @@ model Package {
|
|||||||
@@index([archiveType])
|
@@index([archiveType])
|
||||||
@@index([creator])
|
@@index([creator])
|
||||||
@@index([packageGroupId])
|
@@index([packageGroupId])
|
||||||
|
@@index([sourceChannelId, remoteUniqueId])
|
||||||
@@map("packages")
|
@@map("packages")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,8 +11,11 @@ export interface TelegramMessage {
|
|||||||
fileSize: bigint;
|
fileSize: bigint;
|
||||||
date: Date;
|
date: Date;
|
||||||
mediaAlbumId?: string;
|
mediaAlbumId?: string;
|
||||||
replyToMessageId?: bigint; // NEW
|
replyToMessageId?: bigint;
|
||||||
caption?: string; // NEW
|
caption?: string;
|
||||||
|
/** TDLib's `remote.unique_id` for the file — stable across reposts of
|
||||||
|
* the exact same content. Empty string if the message didn't expose it. */
|
||||||
|
remoteUniqueId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ArchiveSet {
|
export interface ArchiveSet {
|
||||||
|
|||||||
@@ -82,6 +82,8 @@ export interface CreatePackageStubInput {
|
|||||||
sourceChannelId: string;
|
sourceChannelId: string;
|
||||||
sourceMessageId: bigint;
|
sourceMessageId: bigint;
|
||||||
sourceTopicId?: bigint | null;
|
sourceTopicId?: bigint | null;
|
||||||
|
/** TDLib remote.unique_id of the first part — for future dedup. */
|
||||||
|
remoteUniqueId?: string | null;
|
||||||
destChannelId: string;
|
destChannelId: string;
|
||||||
destMessageId: bigint;
|
destMessageId: bigint;
|
||||||
destMessageIds: bigint[];
|
destMessageIds: bigint[];
|
||||||
@@ -111,6 +113,7 @@ export async function createPackageStub(
|
|||||||
sourceChannelId: input.sourceChannelId,
|
sourceChannelId: input.sourceChannelId,
|
||||||
sourceMessageId: input.sourceMessageId,
|
sourceMessageId: input.sourceMessageId,
|
||||||
sourceTopicId: input.sourceTopicId ?? undefined,
|
sourceTopicId: input.sourceTopicId ?? undefined,
|
||||||
|
remoteUniqueId: input.remoteUniqueId ?? undefined,
|
||||||
destChannelId: input.destChannelId,
|
destChannelId: input.destChannelId,
|
||||||
destMessageId: input.destMessageId,
|
destMessageId: input.destMessageId,
|
||||||
destMessageIds: input.destMessageIds,
|
destMessageIds: input.destMessageIds,
|
||||||
@@ -189,6 +192,34 @@ export async function packageExistsBySourceMessage(
|
|||||||
return pkg !== null;
|
return pkg !== null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Strongest pre-download dedup signal: a Package in this channel already
|
||||||
|
* has a matching TDLib remote.unique_id. The unique_id is stable across
|
||||||
|
* reposts of the exact same file content, so a hit is a guaranteed
|
||||||
|
* (lossless) duplicate. No false positives.
|
||||||
|
*
|
||||||
|
* Falls back to the older findRepostedPackage (name + size) for packages
|
||||||
|
* that were ingested before we started capturing remote.unique_id.
|
||||||
|
*/
|
||||||
|
export async function findPackageByRemoteUniqueId(
|
||||||
|
sourceChannelId: string,
|
||||||
|
remoteUniqueId: string
|
||||||
|
): Promise<{
|
||||||
|
id: string;
|
||||||
|
destMessageId: bigint | null;
|
||||||
|
sourceTopicId: bigint | null;
|
||||||
|
} | null> {
|
||||||
|
return db.package.findFirst({
|
||||||
|
where: {
|
||||||
|
sourceChannelId,
|
||||||
|
remoteUniqueId,
|
||||||
|
destMessageId: { not: null },
|
||||||
|
},
|
||||||
|
orderBy: { sourceTopicId: { sort: "desc", nulls: "last" } },
|
||||||
|
select: { id: true, destMessageId: true, sourceTopicId: true },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Detect a likely repost: same source channel + same fileName + same total
|
* Detect a likely repost: same source channel + same fileName + same total
|
||||||
* fileSize already exists with destMessageId set. Used to skip downloads
|
* fileSize already exists with destMessageId set. Used to skip downloads
|
||||||
|
|||||||
@@ -51,6 +51,10 @@ interface TdMessage {
|
|||||||
path?: string;
|
path?: string;
|
||||||
is_downloading_completed?: boolean;
|
is_downloading_completed?: boolean;
|
||||||
};
|
};
|
||||||
|
remote?: {
|
||||||
|
/** Stable identifier across reposts of the same file content. */
|
||||||
|
unique_id?: string;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
photo?: {
|
photo?: {
|
||||||
@@ -231,6 +235,7 @@ export async function getChannelMessages(
|
|||||||
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
||||||
replyToMessageId: msg.reply_to_message_id ? BigInt(msg.reply_to_message_id) : undefined,
|
replyToMessageId: msg.reply_to_message_id ? BigInt(msg.reply_to_message_id) : undefined,
|
||||||
caption: msg.content?.caption?.text || undefined,
|
caption: msg.content?.caption?.text || undefined,
|
||||||
|
remoteUniqueId: doc.document.remote?.unique_id || undefined,
|
||||||
});
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -210,6 +210,7 @@ export async function getTopicMessages(
|
|||||||
document?: {
|
document?: {
|
||||||
id: number;
|
id: number;
|
||||||
size: number;
|
size: number;
|
||||||
|
remote?: { unique_id?: string };
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
photo?: {
|
photo?: {
|
||||||
@@ -257,6 +258,7 @@ export async function getTopicMessages(
|
|||||||
fileSize: BigInt(doc.document.size),
|
fileSize: BigInt(doc.document.size),
|
||||||
date: new Date(msg.date * 1000),
|
date: new Date(msg.date * 1000),
|
||||||
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
||||||
|
remoteUniqueId: doc.document.remote?.unique_id || undefined,
|
||||||
});
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import {
|
|||||||
deleteSkippedPackage,
|
deleteSkippedPackage,
|
||||||
getCappedSkippedMessageIds,
|
getCappedSkippedMessageIds,
|
||||||
findRepostedPackage,
|
findRepostedPackage,
|
||||||
|
findPackageByRemoteUniqueId,
|
||||||
getRetryableSkippedMessageIds,
|
getRetryableSkippedMessageIds,
|
||||||
updatePackageTopicContext,
|
updatePackageTopicContext,
|
||||||
} from "./db/queries.js";
|
} from "./db/queries.js";
|
||||||
@@ -1240,6 +1241,38 @@ async function processOneArchiveSet(
|
|||||||
|
|
||||||
const archiveName = archiveSet.parts[0].fileName;
|
const archiveName = archiveSet.parts[0].fileName;
|
||||||
|
|
||||||
|
// ── Earliest skip: remote.unique_id match ──
|
||||||
|
// TDLib reports a stable unique_id per file content. If we already have a
|
||||||
|
// Package in this channel with the same unique_id, it's the exact same
|
||||||
|
// file content reposted at a new message ID — zero false positives.
|
||||||
|
const firstRemoteUniqueId = archiveSet.parts[0].remoteUniqueId;
|
||||||
|
if (firstRemoteUniqueId) {
|
||||||
|
const match = await findPackageByRemoteUniqueId(channel.id, firstRemoteUniqueId);
|
||||||
|
if (match) {
|
||||||
|
counters.zipsDuplicate++;
|
||||||
|
accountLog.info(
|
||||||
|
{
|
||||||
|
fileName: archiveSet.parts[0].fileName,
|
||||||
|
sourceMessageId: Number(archiveSet.parts[0].id),
|
||||||
|
remoteUniqueId: firstRemoteUniqueId,
|
||||||
|
existingPackageId: match.id,
|
||||||
|
existingDestMessageId: match.destMessageId ? Number(match.destMessageId) : null,
|
||||||
|
},
|
||||||
|
"Skipping — remote.unique_id matches an existing Package in this channel"
|
||||||
|
);
|
||||||
|
await updateRunActivity(runId, {
|
||||||
|
currentActivity: `Skipped ${archiveSet.parts[0].fileName} (already ingested by unique_id)`,
|
||||||
|
currentStep: "deduplicating",
|
||||||
|
currentChannel: channelTitle,
|
||||||
|
currentFile: archiveSet.parts[0].fileName,
|
||||||
|
currentFileNum: setIdx + 1,
|
||||||
|
totalFiles: totalSets,
|
||||||
|
zipsDuplicate: counters.zipsDuplicate,
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Early skip: check if this archive set was already ingested ──
|
// ── Early skip: check if this archive set was already ingested ──
|
||||||
// This avoids re-downloading large archives that were processed in a prior run.
|
// This avoids re-downloading large archives that were processed in a prior run.
|
||||||
const alreadyIngested = await packageExistsBySourceMessage(
|
const alreadyIngested = await packageExistsBySourceMessage(
|
||||||
@@ -1705,6 +1738,7 @@ async function processOneArchiveSet(
|
|||||||
sourceChannelId: channel.id,
|
sourceChannelId: channel.id,
|
||||||
sourceMessageId: archiveSet.parts[0].id,
|
sourceMessageId: archiveSet.parts[0].id,
|
||||||
sourceTopicId,
|
sourceTopicId,
|
||||||
|
remoteUniqueId: archiveSet.parts[0].remoteUniqueId ?? null,
|
||||||
destChannelId,
|
destChannelId,
|
||||||
destMessageId: destResult.messageId,
|
destMessageId: destResult.messageId,
|
||||||
destMessageIds: destResult.messageIds,
|
destMessageIds: destResult.messageIds,
|
||||||
|
|||||||
Reference in New Issue
Block a user