From 218ccb92829600ea8e0e8a57aa72b8e293315ef9 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Wed, 25 Mar 2026 22:28:19 +0100 Subject: [PATCH] feat: add album grouping post-processing to worker pipeline Co-Authored-By: Claude Opus 4.6 (1M context) --- worker/src/grouping.ts | 79 ++++++++++++++++++++++++++++++++++++++++++ worker/src/worker.ts | 35 +++++++++++++++---- 2 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 worker/src/grouping.ts diff --git a/worker/src/grouping.ts b/worker/src/grouping.ts new file mode 100644 index 0000000..35c91b0 --- /dev/null +++ b/worker/src/grouping.ts @@ -0,0 +1,79 @@ +import type { Client } from "tdl"; +import type { TelegramPhoto } from "./preview/match.js"; +import { downloadPhotoThumbnail } from "./tdlib/download.js"; +import { createOrFindPackageGroup, linkPackagesToGroup } from "./db/queries.js"; +import { childLogger } from "./util/logger.js"; +import { db } from "./db/client.js"; + +const log = childLogger("grouping"); + +export interface IndexedPackageRef { + packageId: string; + sourceMessageId: bigint; + mediaAlbumId?: string; +} + +/** + * After a scan cycle's packages are individually indexed, detect album groups + * and create PackageGroup records linking the members. + */ +export async function processAlbumGroups( + client: Client, + sourceChannelId: string, + indexedPackages: IndexedPackageRef[], + photos: TelegramPhoto[] +): Promise { + // Group indexed packages by mediaAlbumId + const albumMap = new Map(); + for (const pkg of indexedPackages) { + if (!pkg.mediaAlbumId || pkg.mediaAlbumId === "0") continue; + const group = albumMap.get(pkg.mediaAlbumId) ?? []; + group.push(pkg); + albumMap.set(pkg.mediaAlbumId, group); + } + + if (albumMap.size === 0) return; + + log.info({ albumCount: albumMap.size }, "Detected album groups to process"); + + for (const [albumId, members] of albumMap) { + if (members.length < 2) continue; + + try { + // Find the first package's fileName for the group name fallback + const firstPkg = await db.package.findFirst({ + where: { id: { in: members.map((m) => m.packageId) } }, + orderBy: { sourceMessageId: "asc" }, + select: { id: true, fileName: true }, + }); + + // Try to find a caption from the album's photo message + const albumPhoto = photos.find((p) => p.mediaAlbumId === albumId); + const groupName = albumPhoto?.caption || firstPkg?.fileName || "Unnamed Group"; + + // Download preview from album photo if available + let previewData: Buffer | null = null; + if (albumPhoto) { + previewData = await downloadPhotoThumbnail(client, albumPhoto.fileId); + } + + const groupId = await createOrFindPackageGroup({ + mediaAlbumId: albumId, + sourceChannelId, + name: groupName, + previewData, + }); + + // Idempotent link — safe to re-run if some packages were indexed in prior scans + const packageIds = members.map((m) => m.packageId); + await linkPackagesToGroup(packageIds, groupId); + + log.info( + { albumId, groupId, groupName, memberCount: packageIds.length }, + "Linked packages to album group" + ); + } catch (err) { + log.warn({ albumId, err }, "Failed to create album group — packages still indexed individually"); + } + } +} diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 37d134e..d86f068 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -47,6 +47,7 @@ import { readRarContents } from "./archive/rar-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; import { uploadToChannel } from "./upload/channel.js"; +import { processAlbumGroups, type IndexedPackageRef } from "./grouping.js"; import type { TelegramAccount, TelegramChannel } from "@prisma/client"; import type { Client } from "tdl"; @@ -722,10 +723,11 @@ async function processArchiveSets( // Track the highest message ID that was successfully processed let maxProcessedId: bigint | null = null; + const indexedPackageRefs: IndexedPackageRef[] = []; for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) { try { - await processOneArchiveSet( + const packageId = await processOneArchiveSet( ctx, archiveSets[setIdx], setIdx, @@ -734,6 +736,15 @@ async function processArchiveSets( ingestionRunId ); + if (packageId) { + const firstPart = archiveSets[setIdx].parts[0]; + indexedPackageRefs.push({ + packageId, + sourceMessageId: firstPart.id, + mediaAlbumId: firstPart.mediaAlbumId, + }); + } + // Set completed (ingested or confirmed duplicate) — advance watermark const setMaxId = archiveSets[setIdx].parts.reduce( (max, p) => (p.id > max ? p.id : max), @@ -771,6 +782,16 @@ async function processArchiveSets( } } + // Post-processing: group packages by Telegram album ID + if (indexedPackageRefs.length > 0) { + await processAlbumGroups( + ctx.client, + channel.id, + indexedPackageRefs, + scanResult.photos + ); + } + return maxProcessedId; } @@ -784,7 +805,7 @@ async function processOneArchiveSet( totalSets: number, previewMatches: Map, ingestionRunId: string -): Promise { +): Promise { const { client, runId, channelTitle, channel, destChannelTelegramId, destChannelId, @@ -814,7 +835,7 @@ async function processOneArchiveSet( totalFiles: totalSets, zipsDuplicate: counters.zipsDuplicate, }); - return; + return null; } // ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ── @@ -848,7 +869,7 @@ async function processOneArchiveSet( partCount: archiveSet.parts.length, accountId: ctx.accountId, }); - return; + return null; } const tempPaths: string[] = []; @@ -954,7 +975,7 @@ async function processOneArchiveSet( totalFiles: totalSets, zipsDuplicate: counters.zipsDuplicate, }); - return; + return null; } // ── Reading metadata ── @@ -1127,7 +1148,7 @@ async function processOneArchiveSet( tags.push(channel.category); } - await createPackageWithFiles({ + const pkg = await createPackageWithFiles({ contentHash, fileName: archiveName, fileSize: totalSize, @@ -1166,6 +1187,8 @@ async function processOneArchiveSet( { fileName: archiveName, contentHash, fileCount: entries.length, creator }, "Archive ingested" ); + + return pkg.id; } finally { // ALWAYS delete temp files and the set directory await deleteFiles([...tempPaths, ...splitPaths]);