feat: add album grouping post-processing to worker pipeline

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 22:28:19 +01:00
parent b632533f54
commit 218ccb9282
2 changed files with 108 additions and 6 deletions

79
worker/src/grouping.ts Normal file
View File

@@ -0,0 +1,79 @@
import type { Client } from "tdl";
import type { TelegramPhoto } from "./preview/match.js";
import { downloadPhotoThumbnail } from "./tdlib/download.js";
import { createOrFindPackageGroup, linkPackagesToGroup } from "./db/queries.js";
import { childLogger } from "./util/logger.js";
import { db } from "./db/client.js";
const log = childLogger("grouping");
export interface IndexedPackageRef {
packageId: string;
sourceMessageId: bigint;
mediaAlbumId?: string;
}
/**
* After a scan cycle's packages are individually indexed, detect album groups
* and create PackageGroup records linking the members.
*/
export async function processAlbumGroups(
client: Client,
sourceChannelId: string,
indexedPackages: IndexedPackageRef[],
photos: TelegramPhoto[]
): Promise<void> {
// Group indexed packages by mediaAlbumId
const albumMap = new Map<string, IndexedPackageRef[]>();
for (const pkg of indexedPackages) {
if (!pkg.mediaAlbumId || pkg.mediaAlbumId === "0") continue;
const group = albumMap.get(pkg.mediaAlbumId) ?? [];
group.push(pkg);
albumMap.set(pkg.mediaAlbumId, group);
}
if (albumMap.size === 0) return;
log.info({ albumCount: albumMap.size }, "Detected album groups to process");
for (const [albumId, members] of albumMap) {
if (members.length < 2) continue;
try {
// Find the first package's fileName for the group name fallback
const firstPkg = await db.package.findFirst({
where: { id: { in: members.map((m) => m.packageId) } },
orderBy: { sourceMessageId: "asc" },
select: { id: true, fileName: true },
});
// Try to find a caption from the album's photo message
const albumPhoto = photos.find((p) => p.mediaAlbumId === albumId);
const groupName = albumPhoto?.caption || firstPkg?.fileName || "Unnamed Group";
// Download preview from album photo if available
let previewData: Buffer | null = null;
if (albumPhoto) {
previewData = await downloadPhotoThumbnail(client, albumPhoto.fileId);
}
const groupId = await createOrFindPackageGroup({
mediaAlbumId: albumId,
sourceChannelId,
name: groupName,
previewData,
});
// Idempotent link — safe to re-run if some packages were indexed in prior scans
const packageIds = members.map((m) => m.packageId);
await linkPackagesToGroup(packageIds, groupId);
log.info(
{ albumId, groupId, groupName, memberCount: packageIds.length },
"Linked packages to album group"
);
} catch (err) {
log.warn({ albumId, err }, "Failed to create album group — packages still indexed individually");
}
}
}

View File

@@ -47,6 +47,7 @@ import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js";
import { processAlbumGroups, type IndexedPackageRef } from "./grouping.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
import type { Client } from "tdl";
@@ -722,10 +723,11 @@ async function processArchiveSets(
// Track the highest message ID that was successfully processed
let maxProcessedId: bigint | null = null;
const indexedPackageRefs: IndexedPackageRef[] = [];
for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) {
try {
await processOneArchiveSet(
const packageId = await processOneArchiveSet(
ctx,
archiveSets[setIdx],
setIdx,
@@ -734,6 +736,15 @@ async function processArchiveSets(
ingestionRunId
);
if (packageId) {
const firstPart = archiveSets[setIdx].parts[0];
indexedPackageRefs.push({
packageId,
sourceMessageId: firstPart.id,
mediaAlbumId: firstPart.mediaAlbumId,
});
}
// Set completed (ingested or confirmed duplicate) — advance watermark
const setMaxId = archiveSets[setIdx].parts.reduce(
(max, p) => (p.id > max ? p.id : max),
@@ -771,6 +782,16 @@ async function processArchiveSets(
}
}
// Post-processing: group packages by Telegram album ID
if (indexedPackageRefs.length > 0) {
await processAlbumGroups(
ctx.client,
channel.id,
indexedPackageRefs,
scanResult.photos
);
}
return maxProcessedId;
}
@@ -784,7 +805,7 @@ async function processOneArchiveSet(
totalSets: number,
previewMatches: Map<string, { id: bigint; fileId: string }>,
ingestionRunId: string
): Promise<void> {
): Promise<string | null> {
const {
client, runId, channelTitle, channel,
destChannelTelegramId, destChannelId,
@@ -814,7 +835,7 @@ async function processOneArchiveSet(
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return;
return null;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
@@ -848,7 +869,7 @@ async function processOneArchiveSet(
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
return;
return null;
}
const tempPaths: string[] = [];
@@ -954,7 +975,7 @@ async function processOneArchiveSet(
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return;
return null;
}
// ── Reading metadata ──
@@ -1127,7 +1148,7 @@ async function processOneArchiveSet(
tags.push(channel.category);
}
await createPackageWithFiles({
const pkg = await createPackageWithFiles({
contentHash,
fileName: archiveName,
fileSize: totalSize,
@@ -1166,6 +1187,8 @@ async function processOneArchiveSet(
{ fileName: archiveName, contentHash, fileCount: entries.length, creator },
"Archive ingested"
);
return pkg.id;
} finally {
// ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]);