mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-10 22:01:16 +00:00
feat: group merge, ZIP/reply/caption grouping, integrity audit
Group merge UI: - Add mergeGroups query and mergeGroupsAction server action - Add "Start Merge" / "Merge Here" buttons to group row actions - Two-step UX: click Start on source, click Merge Here on target ZIP path prefix grouping (Signal 7): - Compare PackageFile.path root folders across ungrouped packages - Auto-group if 2+ packages share the same dominant root folder Reply chain grouping (Signal 6): - Capture reply_to_message_id during channel scanning - Group archives that reply to the same root message - Add replyToMessageId field to Package schema Caption fuzzy match grouping (Signal 8): - Capture source caption during channel scanning - Normalize captions (strip extensions, extract significant words) - Group packages with matching normalized caption keys - Add sourceCaption field to Package schema Periodic integrity audit: - Check multipart packages for completeness (parts vs destMessageIds) - Detect orphaned indexes (destChannelId set but no destMessageId) - Runs after each ingestion cycle, deduplicates notifications Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,8 @@ export interface TelegramMessage {
|
||||
fileSize: bigint;
|
||||
date: Date;
|
||||
mediaAlbumId?: string;
|
||||
replyToMessageId?: bigint; // NEW
|
||||
caption?: string; // NEW
|
||||
}
|
||||
|
||||
export interface ArchiveSet {
|
||||
|
||||
117
worker/src/audit.ts
Normal file
117
worker/src/audit.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { db } from "./db/client.js";
|
||||
import { childLogger } from "./util/logger.js";
|
||||
|
||||
const log = childLogger("audit");
|
||||
|
||||
/**
|
||||
* Periodic integrity audit: checks all packages for consistency.
|
||||
* Creates SystemNotification records for any issues found.
|
||||
*
|
||||
* Checks performed:
|
||||
* 1. Multipart completeness: destMessageIds.length should match partCount
|
||||
* 2. Missing destination: packages with destChannelId but no destMessageId
|
||||
*/
|
||||
export async function runIntegrityAudit(): Promise<{ checked: number; issues: number }> {
|
||||
log.info("Starting integrity audit");
|
||||
|
||||
let checked = 0;
|
||||
let issues = 0;
|
||||
|
||||
// Check 1: Multipart packages with wrong number of destination message IDs
|
||||
const multipartPackages = await db.package.findMany({
|
||||
where: {
|
||||
isMultipart: true,
|
||||
partCount: { gt: 1 },
|
||||
destMessageId: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
partCount: true,
|
||||
destMessageIds: true,
|
||||
sourceChannelId: true,
|
||||
sourceChannel: { select: { title: true } },
|
||||
},
|
||||
});
|
||||
|
||||
checked += multipartPackages.length;
|
||||
|
||||
for (const pkg of multipartPackages) {
|
||||
const actualParts = pkg.destMessageIds.length;
|
||||
if (actualParts > 0 && actualParts !== pkg.partCount) {
|
||||
issues++;
|
||||
|
||||
// Check if we already have a notification for this
|
||||
const existing = await db.systemNotification.findFirst({
|
||||
where: {
|
||||
type: "MISSING_PART",
|
||||
context: { path: ["packageId"], equals: pkg.id },
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (!existing) {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "MISSING_PART",
|
||||
severity: "WARNING",
|
||||
title: `Incomplete multipart: ${pkg.fileName}`,
|
||||
message: `Expected ${pkg.partCount} parts but only ${actualParts} destination message IDs stored`,
|
||||
context: {
|
||||
packageId: pkg.id,
|
||||
fileName: pkg.fileName,
|
||||
expectedParts: pkg.partCount,
|
||||
actualParts,
|
||||
sourceChannelId: pkg.sourceChannelId,
|
||||
channelTitle: pkg.sourceChannel.title,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
log.warn(
|
||||
{ packageId: pkg.id, fileName: pkg.fileName, expected: pkg.partCount, actual: actualParts },
|
||||
"Multipart package has mismatched part count"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check 2: Packages with dest channel but no dest message (orphaned index)
|
||||
const orphanedCount = await db.package.count({
|
||||
where: {
|
||||
destChannelId: { not: null },
|
||||
destMessageId: null,
|
||||
},
|
||||
});
|
||||
|
||||
if (orphanedCount > 0) {
|
||||
issues++;
|
||||
|
||||
const existing = await db.systemNotification.findFirst({
|
||||
where: {
|
||||
type: "INTEGRITY_AUDIT",
|
||||
context: { path: ["check"], equals: "orphaned_index" },
|
||||
createdAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (!existing) {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "INTEGRITY_AUDIT",
|
||||
severity: "INFO",
|
||||
title: `${orphanedCount} packages with missing destination message`,
|
||||
message: `Found ${orphanedCount} packages that have a destination channel set but no destination message ID. These may be from interrupted uploads.`,
|
||||
context: {
|
||||
check: "orphaned_index",
|
||||
count: orphanedCount,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
log.info({ checked, issues }, "Integrity audit complete");
|
||||
return { checked, issues };
|
||||
}
|
||||
@@ -119,6 +119,8 @@ export interface CreatePackageInput {
|
||||
tags?: string[];
|
||||
previewData?: Buffer | null;
|
||||
previewMsgId?: bigint | null;
|
||||
sourceCaption?: string | null;
|
||||
replyToMessageId?: bigint | null;
|
||||
files: {
|
||||
path: string;
|
||||
fileName: string;
|
||||
@@ -150,6 +152,8 @@ export async function createPackageWithFiles(input: CreatePackageInput) {
|
||||
tags: input.tags && input.tags.length > 0 ? input.tags : undefined,
|
||||
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
||||
previewMsgId: input.previewMsgId ?? undefined,
|
||||
sourceCaption: input.sourceCaption ?? undefined,
|
||||
replyToMessageId: input.replyToMessageId ?? undefined,
|
||||
files: {
|
||||
create: input.files,
|
||||
},
|
||||
@@ -613,7 +617,7 @@ export async function createAutoGroup(input: {
|
||||
sourceChannelId: string;
|
||||
name: string;
|
||||
packageIds: string[];
|
||||
groupingSource: "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION";
|
||||
groupingSource: "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY";
|
||||
}): Promise<string> {
|
||||
const group = await db.packageGroup.create({
|
||||
data: {
|
||||
|
||||
@@ -288,6 +288,243 @@ export async function processCreatorGroups(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that share the same root folder inside their archives.
|
||||
* E.g., if two packages both contain files under "ProjectX/", they're likely related.
|
||||
* Only considers packages with 3+ files (to avoid false positives from flat archives).
|
||||
*/
|
||||
export async function processZipPathGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
// Find ungrouped packages that have indexed files
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
fileCount: { gte: 3 },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
files: {
|
||||
select: { path: true },
|
||||
take: 50,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Extract the dominant root folder for each package
|
||||
const packageRoots = new Map<string, { id: string; fileName: string }[]>();
|
||||
|
||||
for (const pkg of ungrouped) {
|
||||
const root = extractRootFolder(pkg.files.map((f) => f.path));
|
||||
if (!root) continue;
|
||||
|
||||
const key = root.toLowerCase();
|
||||
const group = packageRoots.get(key) ?? [];
|
||||
group.push({ id: pkg.id, fileName: pkg.fileName });
|
||||
packageRoots.set(key, group);
|
||||
}
|
||||
|
||||
// Create groups for roots shared by 2+ packages
|
||||
for (const [root, members] of packageRoots) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name: root,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_ZIP",
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, rootFolder: root, memberCount: members.length },
|
||||
"Created ZIP path prefix group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, rootFolder: root }, "Failed to create ZIP path group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that reply to the same root message.
|
||||
* If message B and C both reply to message A, they're grouped together.
|
||||
*/
|
||||
export async function processReplyChainGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
replyToMessageId: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
replyToMessageId: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Group by replyToMessageId
|
||||
const replyMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
if (!pkg.replyToMessageId) continue;
|
||||
const key = pkg.replyToMessageId.toString();
|
||||
const group = replyMap.get(key) ?? [];
|
||||
group.push(pkg);
|
||||
replyMap.set(key, group);
|
||||
}
|
||||
|
||||
for (const [replyId, members] of replyMap) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
const name = findCommonPrefix(members.map((m) => m.fileName)) || members[0].fileName;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_REPLY" as const,
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, replyToMessageId: replyId, memberCount: members.length },
|
||||
"Created reply-chain group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, replyToMessageId: replyId }, "Failed to create reply-chain group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages with similar captions from the same channel.
|
||||
* Uses normalized caption comparison — two captions match if they share
|
||||
* the same significant words (ignoring common words and file extensions).
|
||||
*/
|
||||
export async function processCaptionGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
sourceCaption: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
sourceCaption: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Group by normalized caption key
|
||||
const captionMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
if (!pkg.sourceCaption) continue;
|
||||
const key = normalizeCaptionKey(pkg.sourceCaption);
|
||||
if (!key) continue;
|
||||
const group = captionMap.get(key) ?? [];
|
||||
group.push(pkg);
|
||||
captionMap.set(key, group);
|
||||
}
|
||||
|
||||
for (const [, members] of captionMap) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
const name = members[0].sourceCaption!.slice(0, 80);
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_CAPTION" as const,
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, memberCount: members.length },
|
||||
"Created caption-match group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err }, "Failed to create caption group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize a caption for grouping: lowercase, strip extensions and numbers,
|
||||
* extract significant words (3+ chars), sort, and join.
|
||||
* Two captions with the same key are considered a match.
|
||||
*/
|
||||
function normalizeCaptionKey(caption: string): string | null {
|
||||
const stripped = caption
|
||||
.toLowerCase()
|
||||
.replace(/\.(zip|rar|7z|stl|pdf|obj|gcode)(\.\d+)?/gi, "")
|
||||
.replace(/[^a-z0-9\s]/g, " ");
|
||||
|
||||
const words = stripped
|
||||
.split(/\s+/)
|
||||
.filter((w) => w.length >= 3)
|
||||
.filter((w) => !["the", "and", "for", "with", "from", "part", "file", "files"].includes(w));
|
||||
|
||||
if (words.length < 2) return null;
|
||||
|
||||
return words.sort().join(" ");
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the dominant root folder from a list of archive file paths.
|
||||
* Returns the first path segment that appears in >50% of files.
|
||||
* Returns null for flat archives or archives with no common root.
|
||||
*/
|
||||
function extractRootFolder(paths: string[]): string | null {
|
||||
if (paths.length === 0) return null;
|
||||
|
||||
// Count first path segments
|
||||
const segmentCounts = new Map<string, number>();
|
||||
for (const p of paths) {
|
||||
// Normalize separators and get first segment
|
||||
const normalized = p.replace(/\\/g, "/");
|
||||
const firstSlash = normalized.indexOf("/");
|
||||
if (firstSlash <= 0) continue; // Skip root-level files
|
||||
const segment = normalized.slice(0, firstSlash);
|
||||
// Skip common noise folders
|
||||
if (segment === "__MACOSX" || segment === ".DS_Store" || segment === "Thumbs.db") continue;
|
||||
segmentCounts.set(segment, (segmentCounts.get(segment) ?? 0) + 1);
|
||||
}
|
||||
|
||||
if (segmentCounts.size === 0) return null;
|
||||
|
||||
// Find the most common segment
|
||||
let maxSegment = "";
|
||||
let maxCount = 0;
|
||||
for (const [seg, count] of segmentCounts) {
|
||||
if (count > maxCount) {
|
||||
maxSegment = seg;
|
||||
maxCount = count;
|
||||
}
|
||||
}
|
||||
|
||||
// Must appear in >50% of files and be at least 3 chars
|
||||
if (maxCount < paths.length * 0.5 || maxSegment.length < 3) return null;
|
||||
|
||||
return maxSegment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the longest common prefix among a list of filenames,
|
||||
* trimming trailing separators and partial words.
|
||||
|
||||
@@ -3,6 +3,7 @@ import { childLogger } from "./util/logger.js";
|
||||
import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
||||
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
||||
import { runIntegrityAudit } from "./audit.js";
|
||||
|
||||
const log = childLogger("scheduler");
|
||||
|
||||
@@ -87,6 +88,16 @@ async function runCycle(): Promise<void> {
|
||||
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
|
||||
"Ingestion cycle complete"
|
||||
);
|
||||
|
||||
// Run integrity audit after all accounts are processed
|
||||
try {
|
||||
const auditResult = await runIntegrityAudit();
|
||||
if (auditResult.issues > 0) {
|
||||
log.info({ ...auditResult }, "Integrity audit found issues");
|
||||
}
|
||||
} catch (auditErr) {
|
||||
log.warn({ err: auditErr }, "Integrity audit failed");
|
||||
}
|
||||
} catch (err) {
|
||||
log.error({ err }, "Ingestion cycle failed");
|
||||
} finally {
|
||||
|
||||
@@ -39,6 +39,7 @@ interface TdMessage {
|
||||
id: number;
|
||||
date: number;
|
||||
media_album_id?: string;
|
||||
reply_to_message_id?: number;
|
||||
content: {
|
||||
_: string;
|
||||
document?: {
|
||||
@@ -216,6 +217,8 @@ export async function getChannelMessages(
|
||||
fileSize: BigInt(doc.document.size),
|
||||
date: new Date(msg.date * 1000),
|
||||
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
||||
replyToMessageId: msg.reply_to_message_id ? BigInt(msg.reply_to_message_id) : undefined,
|
||||
caption: msg.content?.caption?.text || undefined,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ import { readRarContents } from "./archive/rar-reader.js";
|
||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
||||
import { uploadToChannel } from "./upload/channel.js";
|
||||
import { processAlbumGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, type IndexedPackageRef } from "./grouping.js";
|
||||
import { processAlbumGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, type IndexedPackageRef } from "./grouping.js";
|
||||
import { db } from "./db/client.js";
|
||||
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
||||
import type { Client } from "tdl";
|
||||
@@ -816,6 +816,15 @@ async function processArchiveSets(
|
||||
|
||||
// Creator-based grouping (3+ files from same creator)
|
||||
await processCreatorGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// ZIP path prefix grouping (shared root folder inside archives)
|
||||
await processZipPathGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Reply chain grouping (messages replying to same root)
|
||||
await processReplyChainGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Caption fuzzy match grouping
|
||||
await processCaptionGroups(channel.id, indexedPackageRefs);
|
||||
}
|
||||
|
||||
return maxProcessedId;
|
||||
@@ -1235,6 +1244,8 @@ async function processOneArchiveSet(
|
||||
tags,
|
||||
previewData,
|
||||
previewMsgId,
|
||||
sourceCaption: archiveSet.parts[0].caption ?? null,
|
||||
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
|
||||
files: entries,
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user