diff --git a/docs/superpowers/plans/2026-03-30-grouping-phase1-implementation.md b/docs/superpowers/plans/2026-03-30-grouping-phase1-implementation.md new file mode 100644 index 0000000..e0bf89c --- /dev/null +++ b/docs/superpowers/plans/2026-03-30-grouping-phase1-implementation.md @@ -0,0 +1,67 @@ +# Grouping Phase 1: Foundation + Time-Window Grouping + +> **For agentic workers:** Use superpowers:subagent-driven-development to implement this plan. + +**Goal:** Add grouping infrastructure (schema, enums, notifications model), an ungrouped staging queue in the UI, and time-window auto-grouping as the first automatic signal beyond album grouping. + +**Architecture:** Schema changes lay the foundation. Ungrouped tab is a query filter. Time-window grouping runs as a post-processing pass after album grouping in the worker pipeline. + +**Tech Stack:** Prisma schema + migration, worker TypeScript, Next.js App Router. + +--- + +## Task 1: Schema Migration + +**Files:** +- Modify: `prisma/schema.prisma` +- Create: migration SQL + +Add: +1. `GroupingSource` enum: `ALBUM`, `MANUAL`, `AUTO_TIME`, `AUTO_PATTERN`, `AUTO_REPLY`, `AUTO_ZIP`, `AUTO_CAPTION` +2. `groupingSource GroupingSource @default(MANUAL)` on `PackageGroup` +3. `SystemNotification` model with `type`, `severity`, `title`, `message`, `context` (Json), `isRead` +4. `NotificationType` enum: `HASH_MISMATCH`, `MISSING_PART`, `UPLOAD_FAILED`, `DOWNLOAD_FAILED`, `GROUPING_CONFLICT`, `INTEGRITY_AUDIT` +5. `NotificationSeverity` enum: `INFO`, `WARNING`, `ERROR` + +Backfill: `UPDATE package_groups SET "groupingSource" = 'ALBUM' WHERE "mediaAlbumId" IS NOT NULL` + +--- + +## Task 2: Ungrouped Staging Tab in STL Page + +**Files:** +- Modify: `src/lib/telegram/queries.ts` — add `listUngroupedPackages()` query +- Modify: `src/app/(app)/stls/page.tsx` — add tab parameter support +- Modify: `src/app/(app)/stls/_components/stl-table.tsx` — add "Ungrouped" tab + +Add a tab next to the existing "Skipped" tab that shows packages where `packageGroupId IS NULL`. Uses the existing `PackageListItem` type and table rendering. This gives users a clear view of files that need manual grouping. + +--- + +## Task 3: Time-Window Auto-Grouping in Worker + +**Files:** +- Create: `worker/src/grouping.ts` — add `processTimeWindowGroups()` after existing `processAlbumGroups()` +- Modify: `worker/src/worker.ts` — call time-window grouping after album grouping +- Modify: `worker/src/util/config.ts` — add `autoGroupTimeWindowMinutes` config + +After album grouping completes, find remaining ungrouped packages from the same channel scan. Cluster packages whose `sourceMessageId` timestamps are within the configured window (default 5 minutes). Create groups for clusters of 2+ with `groupingSource = AUTO_TIME` and name derived from the common filename prefix or first file's base name. + +--- + +## Task 4: Hash Verification After Split + +**Files:** +- Modify: `worker/src/worker.ts` — add hash re-check after concat+split +- Modify: `worker/src/archive/hash.ts` — (no changes needed, reuse `hashParts`) + +After `concatenateFiles()` + `byteLevelSplit()`, re-hash the split parts and compare to the original `contentHash`. If mismatch, log error and create a `SystemNotification` (once that table exists). This closes the integrity gap identified in the audit. + +--- + +## Task 5: Build & Deploy + +Rebuild worker and app images. Deploy. Verify: +- Worker logs show `maxPartSizeMB` and new `autoGroupTimeWindowMinutes` in config +- Ungrouped tab visible in STL page +- Previously-skipped large archives begin processing diff --git a/prisma/migrations/20260330120000_grouping_and_notifications/migration.sql b/prisma/migrations/20260330120000_grouping_and_notifications/migration.sql new file mode 100644 index 0000000..4e868b8 --- /dev/null +++ b/prisma/migrations/20260330120000_grouping_and_notifications/migration.sql @@ -0,0 +1,32 @@ +-- CreateEnum GroupingSource +CREATE TYPE "GroupingSource" AS ENUM ('ALBUM', 'MANUAL', 'AUTO_TIME', 'AUTO_PATTERN', 'AUTO_REPLY', 'AUTO_ZIP', 'AUTO_CAPTION'); + +-- CreateEnum NotificationType +CREATE TYPE "NotificationType" AS ENUM ('HASH_MISMATCH', 'MISSING_PART', 'UPLOAD_FAILED', 'DOWNLOAD_FAILED', 'GROUPING_CONFLICT', 'INTEGRITY_AUDIT'); + +-- CreateEnum NotificationSeverity +CREATE TYPE "NotificationSeverity" AS ENUM ('INFO', 'WARNING', 'ERROR'); + +-- AlterTable: add groupingSource to package_groups +ALTER TABLE "package_groups" ADD COLUMN "groupingSource" "GroupingSource" NOT NULL DEFAULT 'MANUAL'; + +-- Backfill: mark album-based groups +UPDATE "package_groups" SET "groupingSource" = 'ALBUM' WHERE "mediaAlbumId" IS NOT NULL; + +-- CreateTable: system_notifications +CREATE TABLE "system_notifications" ( + "id" TEXT NOT NULL, + "type" "NotificationType" NOT NULL, + "severity" "NotificationSeverity" NOT NULL DEFAULT 'INFO', + "title" TEXT NOT NULL, + "message" TEXT NOT NULL, + "context" JSONB, + "isRead" BOOLEAN NOT NULL DEFAULT false, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "system_notifications_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "system_notifications_isRead_createdAt_idx" ON "system_notifications"("isRead", "createdAt"); +CREATE INDEX "system_notifications_type_idx" ON "system_notifications"("type"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 3e2e0e9..ea0a150 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -522,6 +522,7 @@ model PackageGroup { name String mediaAlbumId String? sourceChannelId String + groupingSource GroupingSource @default(MANUAL) previewData Bytes? createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -802,3 +803,45 @@ model KickstarterPackage { @@id([kickstarterId, packageId]) @@map("kickstarter_packages") } + +// ── Grouping & Notifications ── + +enum GroupingSource { + ALBUM + MANUAL + AUTO_TIME + AUTO_PATTERN + AUTO_REPLY + AUTO_ZIP + AUTO_CAPTION +} + +enum NotificationType { + HASH_MISMATCH + MISSING_PART + UPLOAD_FAILED + DOWNLOAD_FAILED + GROUPING_CONFLICT + INTEGRITY_AUDIT +} + +enum NotificationSeverity { + INFO + WARNING + ERROR +} + +model SystemNotification { + id String @id @default(cuid()) + type NotificationType + severity NotificationSeverity @default(INFO) + title String + message String + context Json? + isRead Boolean @default(false) + createdAt DateTime @default(now()) + + @@index([isRead, createdAt]) + @@index([type]) + @@map("system_notifications") +} diff --git a/src/app/(app)/stls/_components/stl-table.tsx b/src/app/(app)/stls/_components/stl-table.tsx index cc54cbf..468b6c0 100644 --- a/src/app/(app)/stls/_components/stl-table.tsx +++ b/src/app/(app)/stls/_components/stl-table.tsx @@ -38,7 +38,7 @@ import { } from "@/components/ui/dialog"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; import { Badge } from "@/components/ui/badge"; -import type { DisplayItem, IngestionAccountStatus } from "@/lib/telegram/types"; +import type { DisplayItem, IngestionAccountStatus, PackageListItem } from "@/lib/telegram/types"; import type { SkippedRow } from "./skipped-columns"; import { updatePackageCreator, @@ -61,6 +61,9 @@ interface StlTableProps { skippedData: SkippedRow[]; skippedPageCount: number; skippedTotalCount: number; + ungroupedData: PackageListItem[]; + ungroupedPageCount: number; + ungroupedTotalCount: number; } export function StlTable({ @@ -73,6 +76,9 @@ export function StlTable({ skippedData, skippedPageCount, skippedTotalCount, + ungroupedData, + ungroupedPageCount, + ungroupedTotalCount, }: StlTableProps) { const router = useRouter(); const pathname = usePathname(); @@ -379,6 +385,23 @@ export function StlTable({ const { table } = useDataTable({ data: tableRows, columns, pageCount }); + const ungroupedRows: StlTableRow[] = useMemo( + () => + ungroupedData.map((pkg) => ({ + ...pkg, + _rowType: "package" as const, + _groupId: null, + _isGroupMember: false, + })), + [ungroupedData] + ); + + const { table: ungroupedTable } = useDataTable({ + data: ungroupedRows, + columns, + pageCount: ungroupedPageCount, + }); + const activeTag = searchParams.get("tag") ?? ""; return ( @@ -401,6 +424,14 @@ export function StlTable({ )} + + Ungrouped + {ungroupedTotalCount > 0 && ( + + {ungroupedTotalCount} + + )} + @@ -472,6 +503,11 @@ export function StlTable({ totalCount={skippedTotalCount} /> + + + + + ); } diff --git a/src/lib/telegram/queries.ts b/src/lib/telegram/queries.ts index 80745c9..a5c72fb 100644 --- a/src/lib/telegram/queries.ts +++ b/src/lib/telegram/queries.ts @@ -571,6 +571,72 @@ export async function countSkippedPackages(): Promise { return prisma.skippedPackage.count(); } +export async function listUngroupedPackages(options: { + page: number; + limit: number; +}) { + const { page, limit } = options; + const skip = (page - 1) * limit; + + const where = { packageGroupId: null, destMessageId: { not: null } }; + + const [items, total] = await Promise.all([ + prisma.package.findMany({ + where, + orderBy: { indexedAt: "desc" }, + skip, + take: limit, + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + creator: true, + fileCount: true, + isMultipart: true, + partCount: true, + tags: true, + indexedAt: true, + previewData: true, + sourceChannel: { select: { id: true, title: true } }, + }, + }), + prisma.package.count({ where }), + ]); + + return { + items: items.map((p) => ({ + id: p.id, + fileName: p.fileName, + fileSize: p.fileSize.toString(), + contentHash: "", + archiveType: p.archiveType, + creator: p.creator, + fileCount: p.fileCount, + isMultipart: p.isMultipart, + partCount: p.partCount, + tags: p.tags, + indexedAt: p.indexedAt.toISOString(), + hasPreview: !!p.previewData, + sourceChannel: p.sourceChannel, + matchedFileCount: 0, + matchedByContent: false, + })), + pagination: { + total, + totalPages: Math.ceil(total / limit), + page, + limit, + }, + }; +} + +export async function countUngroupedPackages(): Promise { + return prisma.package.count({ + where: { packageGroupId: null, destMessageId: { not: null } }, + }); +} + export async function getPackageGroup(groupId: string) { return prisma.packageGroup.findUnique({ where: { id: groupId }, diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index edc330f..8b6e8aa 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -587,3 +587,24 @@ export async function linkPackagesToGroup( data: { packageGroupId: groupId }, }); } + +export async function createTimeWindowGroup(input: { + sourceChannelId: string; + name: string; + packageIds: string[]; +}): Promise { + const group = await db.packageGroup.create({ + data: { + sourceChannelId: input.sourceChannelId, + name: input.name, + groupingSource: "AUTO_TIME", + }, + }); + + await db.package.updateMany({ + where: { id: { in: input.packageIds } }, + data: { packageGroupId: group.id }, + }); + + return group.id; +} diff --git a/worker/src/grouping.ts b/worker/src/grouping.ts index 35c91b0..28ed191 100644 --- a/worker/src/grouping.ts +++ b/worker/src/grouping.ts @@ -1,7 +1,8 @@ import type { Client } from "tdl"; import type { TelegramPhoto } from "./preview/match.js"; import { downloadPhotoThumbnail } from "./tdlib/download.js"; -import { createOrFindPackageGroup, linkPackagesToGroup } from "./db/queries.js"; +import { createOrFindPackageGroup, linkPackagesToGroup, createTimeWindowGroup } from "./db/queries.js"; +import { config } from "./util/config.js"; import { childLogger } from "./util/logger.js"; import { db } from "./db/client.js"; @@ -77,3 +78,95 @@ export async function processAlbumGroups( } } } + +/** + * After album grouping, cluster remaining ungrouped packages from the same channel + * that were posted within a configurable time window. + * Only groups packages that were just indexed in this scan cycle (the `indexedPackages` list). + */ +export async function processTimeWindowGroups( + sourceChannelId: string, + indexedPackages: IndexedPackageRef[] +): Promise { + if (config.autoGroupTimeWindowMinutes <= 0) return; + + // Find which of the just-indexed packages are still ungrouped + const ungrouped = await db.package.findMany({ + where: { + id: { in: indexedPackages.map((p) => p.packageId) }, + packageGroupId: null, + }, + orderBy: { sourceMessageId: "asc" }, + select: { + id: true, + fileName: true, + sourceMessageId: true, + indexedAt: true, + }, + }); + + if (ungrouped.length < 2) return; + + const windowMs = config.autoGroupTimeWindowMinutes * 60 * 1000; + + // Cluster by time proximity: walk through sorted list, start new cluster when gap > window + const clusters: typeof ungrouped[] = []; + let current: typeof ungrouped = [ungrouped[0]]; + + for (let i = 1; i < ungrouped.length; i++) { + const prev = current[current.length - 1]; + const gap = Math.abs(ungrouped[i].indexedAt.getTime() - prev.indexedAt.getTime()); + + if (gap <= windowMs) { + current.push(ungrouped[i]); + } else { + clusters.push(current); + current = [ungrouped[i]]; + } + } + clusters.push(current); + + // Create groups for clusters with 2+ packages + for (const cluster of clusters) { + if (cluster.length < 2) continue; + + // Derive group name from common filename prefix + const name = findCommonPrefix(cluster.map((p) => p.fileName)) || cluster[0].fileName; + + try { + const groupId = await createTimeWindowGroup({ + sourceChannelId, + name, + packageIds: cluster.map((p) => p.id), + }); + + log.info( + { groupId, name, memberCount: cluster.length }, + "Created time-window group" + ); + } catch (err) { + log.warn({ err, clusterSize: cluster.length }, "Failed to create time-window group"); + } + } +} + +/** + * Find the longest common prefix among a list of filenames, + * trimming trailing separators and partial words. + */ +function findCommonPrefix(names: string[]): string { + if (names.length === 0) return ""; + if (names.length === 1) return names[0]; + + let prefix = names[0]; + for (let i = 1; i < names.length; i++) { + while (!names[i].startsWith(prefix)) { + prefix = prefix.slice(0, -1); + if (prefix.length === 0) return ""; + } + } + + // Trim trailing separators and partial words + const trimmed = prefix.replace(/[\s\-_.(]+$/, ""); + return trimmed.length >= 3 ? trimmed : ""; +} diff --git a/worker/src/util/config.ts b/worker/src/util/config.ts index 8a8c011..fb7ebb9 100644 --- a/worker/src/util/config.ts +++ b/worker/src/util/config.ts @@ -10,6 +10,8 @@ export const config = { /** Maximum file part size for Telegram upload (in MiB). Default 1950 (under 2GB non-Premium limit). * Set to 3900 for Premium accounts (under 4GB limit). */ maxPartSizeMB: parseInt(process.env.MAX_PART_SIZE_MB ?? "1950", 10), + /** Time window for auto-grouping ungrouped packages from the same channel (minutes). 0 = disabled. */ + autoGroupTimeWindowMinutes: parseInt(process.env.AUTO_GROUP_TIME_WINDOW_MINUTES ?? "5", 10), /** Maximum jitter added to scheduler interval (in minutes) */ jitterMinutes: 5, /** Maximum time span for multipart archive parts (in hours). 0 = no limit. */ diff --git a/worker/src/worker.ts b/worker/src/worker.ts index abd0dea..ebd9852 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -47,7 +47,8 @@ import { readRarContents } from "./archive/rar-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; import { uploadToChannel } from "./upload/channel.js"; -import { processAlbumGroups, type IndexedPackageRef } from "./grouping.js"; +import { processAlbumGroups, processTimeWindowGroups, type IndexedPackageRef } from "./grouping.js"; +import { db } from "./db/client.js"; import type { TelegramAccount, TelegramChannel } from "@prisma/client"; import type { Client } from "tdl"; @@ -790,6 +791,9 @@ async function processArchiveSets( indexedPackageRefs, scanResult.photos ); + + // Time-window grouping for remaining ungrouped packages + await processTimeWindowGroups(channel.id, indexedPackageRefs); } return maxProcessedId; @@ -1053,6 +1057,43 @@ async function processOneArchiveSet( uploadPaths = splitPaths; } + // ── Hash verification after split ── + // If we split/repacked, verify the split parts hash matches the original + if (splitPaths.length > 0) { + const splitHash = await hashParts(splitPaths); + if (splitHash !== contentHash) { + accountLog.error( + { fileName: archiveName, originalHash: contentHash, splitHash, parts: splitPaths.length }, + "Hash mismatch after split — file may be corrupted" + ); + // Record notification for visibility + try { + await db.systemNotification.create({ + data: { + type: "HASH_MISMATCH", + severity: "ERROR", + title: `Hash mismatch after splitting ${archiveName}`, + message: `Expected ${contentHash.slice(0, 16)}… but got ${splitHash.slice(0, 16)}… after splitting into ${splitPaths.length} parts`, + context: { + fileName: archiveName, + originalHash: contentHash, + splitHash, + partCount: splitPaths.length, + sourceChannelId: channel.id, + }, + }, + }); + } catch { + // Best-effort notification + } + throw new Error(`Hash mismatch after split for ${archiveName}: expected ${contentHash}, got ${splitHash}`); + } + accountLog.debug( + { fileName: archiveName, hash: contentHash.slice(0, 16), parts: splitPaths.length }, + "Split hash verified — matches original" + ); + } + // ── Uploading ── // Check if a prior run already uploaded this file (orphaned upload scenario: // file reached Telegram but DB write failed or worker crashed before indexing)