diff --git a/bot/src/commands.ts b/bot/src/commands.ts index 1841b75..5937bc7 100644 --- a/bot/src/commands.ts +++ b/bot/src/commands.ts @@ -10,7 +10,10 @@ import { getSubscriptions, addSubscription, removeSubscription, + getGroupById, + searchGroups, } from "./db/queries.js"; +import { db } from "./db/client.js"; import { sendTextMessage, sendPhotoMessage } from "./tdlib/client.js"; const log = childLogger("commands"); @@ -78,6 +81,12 @@ export async function handleMessage(msg: IncomingMessage): Promise { case "/status": await handleStatus(chatId, userId); break; + case "/group": + await handleGroup(chatId, args); + break; + case "/sendgroup": + await handleSendGroup(chatId, userId, args); + break; default: await sendTextMessage( chatId, @@ -117,6 +126,8 @@ async function handleStart( `/search <query> — Search packages`, `/latest [n] — Show latest packages`, `/package <id> — Package details`, + `/group <id or name> — View group info and package list`, + `/sendgroup <id> — Send all packages in a group to yourself`, `/link <code> — Link your Telegram to your web account`, `/subscribe <keyword> — Get notified for new packages`, `/subscriptions — View your subscriptions`, @@ -136,6 +147,8 @@ async function handleHelp(chatId: bigint): Promise { `/search <query> — Search by filename or creator`, `/latest [n] — Show n most recent packages (default: 5)`, `/package <id> — View package details and file list`, + `/group <id or name> — View group info and package list`, + `/sendgroup <id> — Send all packages in a group to yourself`, ``, `🔗 Account Linking`, `/link <code> — Link Telegram to your web account`, @@ -432,6 +445,168 @@ async function handleStatus(chatId: bigint, userId: bigint): Promise { } } +async function handleGroup(chatId: bigint, query: string): Promise { + if (!query) { + await sendTextMessage( + chatId, + "Usage: /group <id or name>\n\nProvide a group ID (starts with 'c') or a name to search.", + "textParseModeHTML" + ); + return; + } + + const trimmed = query.trim(); + + // If it looks like a cuid (starts with 'c', ~25 chars), look up by ID directly + if (/^c[a-z0-9]{20,}$/i.test(trimmed)) { + const group = await getGroupById(trimmed); + if (!group) { + await sendTextMessage(chatId, "Group not found.", "textParseModeHTML"); + return; + } + + const packageLines = group.packages.slice(0, 20).map((pkg, i) => { + const size = formatSize(pkg.fileSize); + return ` ${i + 1}. ${escapeHtml(pkg.fileName)} (${size}, ${pkg.fileCount} files) — ${pkg.id}`; + }); + const more = group.packages.length > 20 + ? `\n ... and ${group.packages.length - 20} more` + : ""; + + const response = [ + `📦 Group: ${escapeHtml(group.name)}`, + ``, + `Packages: ${group.packages.length}`, + `ID: ${group.id}`, + ``, + `Contents:`, + ...packageLines, + more, + ``, + `Use /sendgroup ${group.id} to receive all packages.`, + ] + .filter((l) => l !== "") + .join("\n"); + + await sendTextMessage(chatId, response, "textParseModeHTML"); + return; + } + + // Otherwise search by name + const groups = await searchGroups(trimmed, 5); + + if (groups.length === 0) { + await sendTextMessage( + chatId, + `No groups found matching "${escapeHtml(trimmed)}".`, + "textParseModeHTML" + ); + return; + } + + const lines = groups.map( + (g, i) => + `${i + 1}. ${escapeHtml(g.name)} — ${g._count.packages} package(s)\n ID: ${g.id}` + ); + + const response = [ + `🔍 Groups matching "${escapeHtml(trimmed)}":`, + ``, + ...lines, + ``, + `Use /group <id> for full details.`, + ].join("\n"); + + await sendTextMessage(chatId, response, "textParseModeHTML"); +} + +async function handleSendGroup( + chatId: bigint, + userId: bigint, + args: string +): Promise { + if (!args) { + await sendTextMessage( + chatId, + "Usage: /sendgroup <group-id>", + "textParseModeHTML" + ); + return; + } + + const groupId = args.trim(); + const group = await getGroupById(groupId); + + if (!group) { + await sendTextMessage(chatId, "Group not found.", "textParseModeHTML"); + return; + } + + // Require account linking + const link = await findLinkByTelegramUserId(userId); + if (!link) { + await sendTextMessage( + chatId, + "You must link your account before receiving packages.\nUse /link <code> to connect.", + "textParseModeHTML" + ); + return; + } + + // Only send packages that have been uploaded to the destination channel + const sendable = group.packages.filter( + (pkg) => pkg.destChannelId && pkg.destMessageId + ); + + if (sendable.length === 0) { + await sendTextMessage( + chatId, + `No packages in group "${escapeHtml(group.name)}" are ready to send yet.`, + "textParseModeHTML" + ); + return; + } + + // Create a BotSendRequest for each sendable package + const requests = await Promise.all( + sendable.map((pkg) => + db.botSendRequest.create({ + data: { + packageId: pkg.id, + telegramLinkId: link.id, + requestedByUserId: link.userId, + status: "PENDING", + }, + }) + ) + ); + + // Fire pg_notify for each request so the send listener picks them up + for (const req of requests) { + await db.$queryRawUnsafe( + `SELECT pg_notify('bot_send', $1)`, + req.id + ).catch(() => { + // Best-effort — the bot also processes PENDING requests on its send queue + }); + } + + await sendTextMessage( + chatId, + [ + `✅ Queued ${requests.length} package(s) from "${escapeHtml(group.name)}"`, + ``, + `You'll receive each archive shortly. Use /package <id> to check individual packages.`, + ].join("\n"), + "textParseModeHTML" + ); + + log.info( + { groupId, packageCount: requests.length, userId: userId.toString() }, + "Group send queued" + ); +} + function escapeHtml(text: string): string { return text .replace(/&/g, "&") diff --git a/bot/src/db/queries.ts b/bot/src/db/queries.ts index a0be57a..e6f8bdf 100644 --- a/bot/src/db/queries.ts +++ b/bot/src/db/queries.ts @@ -53,7 +53,52 @@ export async function createTelegramLink( // ── Package search ── export async function searchPackages(query: string, limit = 10) { - const packages = await db.package.findMany({ + // Try full-text search first + if (query.length >= 3) { + const tsQuery = query + .trim() + .split(/\s+/) + .filter((w) => w.length >= 2) + .map((w) => w.replace(/[^a-zA-Z0-9]/g, "")) + .filter(Boolean) + .join(" & "); + + if (tsQuery) { + try { + const ftsResults = await db.$queryRawUnsafe<{ id: string }[]>( + `SELECT id FROM packages + WHERE "searchVector" @@ to_tsquery('english', $1) + ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC + LIMIT $2`, + tsQuery, + limit + ); + + if (ftsResults.length > 0) { + return db.package.findMany({ + where: { id: { in: ftsResults.map((r) => r.id) } }, + orderBy: { indexedAt: "desc" }, + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + fileCount: true, + creator: true, + indexedAt: true, + destChannelId: true, + destMessageId: true, + }, + }); + } + } catch { + // FTS failed — fall back to ILIKE + } + } + } + + // Fallback: ILIKE search + return db.package.findMany({ where: { OR: [ { fileName: { contains: query, mode: "insensitive" } }, @@ -74,7 +119,44 @@ export async function searchPackages(query: string, limit = 10) { destMessageId: true, }, }); - return packages; +} + +// ── Group queries ── + +export async function getGroupById(groupId: string) { + return db.packageGroup.findUnique({ + where: { id: groupId }, + include: { + packages: { + orderBy: { indexedAt: "desc" }, + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + fileCount: true, + creator: true, + destChannelId: true, + destMessageId: true, + }, + }, + }, + }); +} + +export async function searchGroups(query: string, limit = 5) { + return db.packageGroup.findMany({ + where: { + name: { contains: query, mode: "insensitive" }, + }, + orderBy: { createdAt: "desc" }, + take: limit, + select: { + id: true, + name: true, + _count: { select: { packages: true } }, + }, + }); } export async function getLatestPackages(limit = 5) { diff --git a/prisma/migrations/20260330140000_grouping_rules_and_flags/migration.sql b/prisma/migrations/20260330140000_grouping_rules_and_flags/migration.sql new file mode 100644 index 0000000..976b97e --- /dev/null +++ b/prisma/migrations/20260330140000_grouping_rules_and_flags/migration.sql @@ -0,0 +1,47 @@ +-- AlterTable: add autoGroupEnabled to telegram_channels +ALTER TABLE "telegram_channels" ADD COLUMN "autoGroupEnabled" BOOLEAN NOT NULL DEFAULT true; + +-- CreateTable: grouping_rules +CREATE TABLE "grouping_rules" ( + "id" TEXT NOT NULL, + "sourceChannelId" TEXT NOT NULL, + "pattern" TEXT NOT NULL, + "signalType" "GroupingSource" NOT NULL, + "confidence" DOUBLE PRECISION NOT NULL DEFAULT 1.0, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdByGroupId" TEXT, + + CONSTRAINT "grouping_rules_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "grouping_rules_sourceChannelId_idx" ON "grouping_rules"("sourceChannelId"); + +-- AddForeignKey +ALTER TABLE "grouping_rules" ADD CONSTRAINT "grouping_rules_sourceChannelId_fkey" FOREIGN KEY ("sourceChannelId") REFERENCES "telegram_channels"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- Full-text search: add tsvector column and GIN index +ALTER TABLE "packages" ADD COLUMN IF NOT EXISTS "searchVector" tsvector; + +UPDATE "packages" SET "searchVector" = to_tsvector('english', + coalesce("fileName", '') || ' ' || coalesce("creator", '') || ' ' || coalesce("sourceCaption", '') +) WHERE "searchVector" IS NULL; + +CREATE INDEX IF NOT EXISTS "packages_search_vector_idx" ON "packages" USING GIN ("searchVector"); + +-- Trigger to auto-update searchVector on insert/update +CREATE OR REPLACE FUNCTION packages_search_vector_update() RETURNS trigger AS $$ +BEGIN + NEW."searchVector" := to_tsvector('english', + coalesce(NEW."fileName", '') || ' ' || coalesce(NEW."creator", '') || ' ' || coalesce(NEW."sourceCaption", '') + ); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS packages_search_vector_trigger ON "packages"; +CREATE TRIGGER packages_search_vector_trigger + BEFORE INSERT OR UPDATE OF "fileName", "creator", "sourceCaption" + ON "packages" + FOR EACH ROW + EXECUTE FUNCTION packages_search_vector_update(); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index aa3400a..f1c4ccc 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -429,10 +429,13 @@ model TelegramChannel { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt + autoGroupEnabled Boolean @default(true) + accountMaps AccountChannelMap[] packages Package[] skippedPackages SkippedPackage[] packageGroups PackageGroup[] + groupingRules GroupingRule[] @@index([type, isActive]) @@index([category]) @@ -847,3 +850,18 @@ model SystemNotification { @@index([type]) @@map("system_notifications") } + +model GroupingRule { + id String @id @default(cuid()) + sourceChannelId String + pattern String // Regex or keyword pattern learned from manual grouping + signalType GroupingSource // Which grouping signal this rule applies to + confidence Float @default(1.0) + createdAt DateTime @default(now()) + createdByGroupId String? // The manual group that spawned this rule + + sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id], onDelete: Cascade) + + @@index([sourceChannelId]) + @@map("grouping_rules") +} diff --git a/src/app/(app)/stls/actions.ts b/src/app/(app)/stls/actions.ts index dfd6194..d58a074 100644 --- a/src/app/(app)/stls/actions.ts +++ b/src/app/(app)/stls/actions.ts @@ -186,6 +186,62 @@ export async function setPreviewFromExtract( } } +export async function repairPackageAction( + packageId: string +): Promise { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + try { + const pkg = await prisma.package.findUnique({ + where: { id: packageId }, + select: { + id: true, + fileName: true, + sourceChannelId: true, + sourceMessageId: true, + destChannelId: true, + destMessageId: true, + }, + }); + + if (!pkg) return { success: false, error: "Package not found" }; + + // Clear the destination info so the worker re-processes it + await prisma.package.update({ + where: { id: packageId }, + data: { + destMessageId: null, + destMessageIds: [], + destChannelId: null, + }, + }); + + // Reset the channel watermark to before this message so worker picks it up + await prisma.accountChannelMap.updateMany({ + where: { + channelId: pkg.sourceChannelId, + lastProcessedMessageId: { gte: pkg.sourceMessageId }, + }, + data: { lastProcessedMessageId: pkg.sourceMessageId - BigInt(1) }, + }); + + // Mark related notifications as read + await prisma.systemNotification.updateMany({ + where: { + context: { path: ["packageId"], equals: packageId }, + isRead: false, + }, + data: { isRead: true }, + }); + + revalidatePath("/stls"); + return { success: true, data: undefined }; + } catch { + return { success: false, error: "Failed to schedule repair" }; + } +} + export async function retrySkippedPackageAction( id: string ): Promise { diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 6d100e5..2096dca 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -291,10 +291,25 @@ export async function setChannelCategory( if (!admin.success) return admin; try { + const existing = await prisma.telegramChannel.findUnique({ + where: { id }, + select: { category: true }, + }); + if (!existing) return { success: false, error: "Channel not found" }; + + const oldCategory = existing.category; + const newCategory = category?.trim() || null; + await prisma.telegramChannel.update({ where: { id }, - data: { category: category?.trim() || null }, + data: { category: newCategory }, }); + + // Retroactively re-tag packages from this channel when category changes + if (oldCategory !== newCategory && newCategory) { + await retagChannelPackages(id, oldCategory, newCategory); + } + revalidatePath("/telegram"); return { success: true, data: undefined }; } catch { @@ -302,6 +317,50 @@ export async function setChannelCategory( } } +export async function retagChannelPackages( + channelId: string, + oldCategory: string | null, + newCategory: string +): Promise> { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + try { + // Find packages from this channel that have the old category tag (or no category tag) + const packages = await prisma.package.findMany({ + where: { sourceChannelId: channelId }, + select: { id: true, tags: true }, + }); + + let updated = 0; + for (const pkg of packages) { + const tags = [...pkg.tags]; + // Remove old category tag if present + if (oldCategory) { + const idx = tags.indexOf(oldCategory); + if (idx !== -1) tags.splice(idx, 1); + } + // Add new category tag if not already present + if (!tags.includes(newCategory)) { + tags.push(newCategory); + } + // Only update if tags actually changed + if (JSON.stringify(tags) !== JSON.stringify(pkg.tags)) { + await prisma.package.update({ + where: { id: pkg.id }, + data: { tags }, + }); + updated++; + } + } + + revalidatePath("/stls"); + return { success: true, data: { updated } }; + } catch { + return { success: false, error: "Failed to re-tag packages" }; + } +} + export async function setChannelType( id: string, type: "SOURCE" | "DESTINATION" diff --git a/src/app/api/notifications/repair/route.ts b/src/app/api/notifications/repair/route.ts new file mode 100644 index 0000000..49e378e --- /dev/null +++ b/src/app/api/notifications/repair/route.ts @@ -0,0 +1,43 @@ +import { NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { prisma } from "@/lib/prisma"; + +export const dynamic = "force-dynamic"; + +export async function POST(request: Request) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const body = await request.json().catch(() => ({})); + const notificationId = body.notificationId as string; + if (!notificationId) { + return NextResponse.json({ error: "notificationId required" }, { status: 400 }); + } + + const notification = await prisma.systemNotification.findUnique({ + where: { id: notificationId }, + }); + + if (!notification) { + return NextResponse.json({ error: "Notification not found" }, { status: 404 }); + } + + const context = notification.context as Record | null; + const packageId = context?.packageId as string | undefined; + + if (!packageId) { + return NextResponse.json({ error: "Notification has no associated package" }, { status: 400 }); + } + + // Import and call the repair action + const { repairPackageAction } = await import("@/app/(app)/stls/actions"); + const result = await repairPackageAction(packageId); + + if (!result.success) { + return NextResponse.json({ error: result.error }, { status: 500 }); + } + + return NextResponse.json({ success: true }); +} diff --git a/src/components/layout/notification-bell.tsx b/src/components/layout/notification-bell.tsx index 0a07111..b935cfa 100644 --- a/src/components/layout/notification-bell.tsx +++ b/src/components/layout/notification-bell.tsx @@ -10,6 +10,7 @@ import { PopoverTrigger, } from "@/components/ui/popover"; import { ScrollArea } from "@/components/ui/scroll-area"; +import { toast } from "sonner"; interface Notification { id: string; @@ -93,6 +94,22 @@ export function NotificationBell() { } } + async function handleRepair(notificationId: string) { + try { + const res = await fetch("/api/notifications/repair", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ notificationId }), + }); + if (res.ok) { + toast.success("Repair scheduled — package will be re-processed on next cycle"); + fetchNotifications(); + } + } catch { + // Ignore + } + } + function formatTime(iso: string): string { const d = new Date(iso); const now = new Date(); @@ -147,12 +164,19 @@ export function NotificationBell() { const Icon = severityIcon[n.severity] ?? Info; const color = severityColor[n.severity] ?? "text-muted-foreground"; return ( - + )} - + ); })} diff --git a/src/lib/telegram/queries.ts b/src/lib/telegram/queries.ts index 800f0b1..6ece96a 100644 --- a/src/lib/telegram/queries.ts +++ b/src/lib/telegram/queries.ts @@ -340,6 +340,30 @@ export async function listPackageFiles(options: { }; } +async function fullTextSearchPackageIds(query: string, limit: number): Promise { + // Convert user query to tsquery — handle multi-word by joining with & + const tsQuery = query + .trim() + .split(/\s+/) + .filter((w) => w.length >= 2) + .map((w) => w.replace(/[^a-zA-Z0-9]/g, "")) + .filter(Boolean) + .join(" & "); + + if (!tsQuery) return []; + + const results = await prisma.$queryRawUnsafe<{ id: string }[]>( + `SELECT id FROM packages + WHERE "searchVector" @@ to_tsquery('english', $1) + ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC + LIMIT $2`, + tsQuery, + limit + ); + + return results.map((r) => r.id); +} + export async function searchPackages(options: { query: string; page: number; @@ -366,14 +390,26 @@ export async function searchPackages(options: { ); const fileMatchedIds = fileMatches.map((f) => f.packageId); + // Try full-text search first (better ranking, handles word stemming) + let ftsPackageNameIds: string[] = []; + if (options.searchIn === "both" && q.length >= 3) { + try { + ftsPackageNameIds = await fullTextSearchPackageIds(q, 200); + } catch { + // FTS failed — fall back to ILIKE below + } + } + const packageNameIds = options.searchIn === "both" - ? ( - await prisma.package.findMany({ - where: { fileName: { contains: q, mode: "insensitive" } }, - select: { id: true }, - }) - ).map((p) => p.id) + ? ftsPackageNameIds.length > 0 + ? ftsPackageNameIds + : ( + await prisma.package.findMany({ + where: { fileName: { contains: q, mode: "insensitive" } }, + select: { id: true }, + }) + ).map((p) => p.id) : []; // Also match by group name @@ -696,6 +732,53 @@ export async function createManualGroup(name: string, packageIds: string[]) { data: { packageGroupId: group.id }, }); + // Learn a grouping rule from the manual override + try { + const linkedPkgs = await prisma.package.findMany({ + where: { id: { in: packageIds } }, + select: { fileName: true, creator: true }, + }); + + // Extract the common filename pattern + const fileNames = linkedPkgs.map((p) => p.fileName); + let pattern = ""; + if (fileNames.length > 1) { + // Find longest common prefix + let prefix = fileNames[0]; + for (let i = 1; i < fileNames.length; i++) { + while (!fileNames[i].startsWith(prefix)) { + prefix = prefix.slice(0, -1); + if (!prefix) break; + } + } + const trimmed = prefix.replace(/[\s\-_.(]+$/, ""); + if (trimmed.length >= 4) { + pattern = trimmed; + } + } + + // Fall back to shared creator + if (!pattern) { + const creators = [...new Set(linkedPkgs.map((p) => p.creator).filter(Boolean))]; + if (creators.length === 1 && creators[0]) { + pattern = creators[0]; + } + } + + if (pattern) { + await prisma.groupingRule.create({ + data: { + sourceChannelId: firstPkg.sourceChannelId, + pattern, + signalType: "MANUAL", + createdByGroupId: group.id, + }, + }); + } + } catch { + // Best-effort — don't fail the group creation if rule learning fails + } + // Clean up empty groups left behind await prisma.packageGroup.deleteMany({ where: { packages: { none: {} }, id: { not: group.id } }, diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 1943014..b463b5a 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -617,7 +617,7 @@ export async function createAutoGroup(input: { sourceChannelId: string; name: string; packageIds: string[]; - groupingSource: "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY"; + groupingSource: "ALBUM" | "MANUAL" | "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY"; }): Promise { const group = await db.packageGroup.create({ data: { diff --git a/worker/src/grouping.ts b/worker/src/grouping.ts index ceec994..66b370d 100644 --- a/worker/src/grouping.ts +++ b/worker/src/grouping.ts @@ -79,6 +79,69 @@ export async function processAlbumGroups( } } +/** + * Apply learned GroupingRules from manual overrides. + * For each rule, find ungrouped packages whose fileName contains the pattern. + */ +export async function processRuleBasedGroups( + sourceChannelId: string, + indexedPackages: IndexedPackageRef[] +): Promise { + const rules = await db.groupingRule.findMany({ + where: { sourceChannelId }, + orderBy: { confidence: "desc" }, + }); + + if (rules.length === 0) return; + + const ungrouped = await db.package.findMany({ + where: { + id: { in: indexedPackages.map((p) => p.packageId) }, + packageGroupId: null, + }, + select: { id: true, fileName: true, creator: true }, + }); + + if (ungrouped.length < 2) return; + + for (const rule of rules) { + const matches = ungrouped.filter((pkg) => { + const lower = rule.pattern.toLowerCase(); + return pkg.fileName.toLowerCase().includes(lower) || + (pkg.creator && pkg.creator.toLowerCase().includes(lower)); + }); + + if (matches.length < 2) continue; + + // Check if any are already grouped (by a previous rule in this loop) + const stillUngrouped = await db.package.findMany({ + where: { + id: { in: matches.map((m) => m.id) }, + packageGroupId: null, + }, + select: { id: true }, + }); + + if (stillUngrouped.length < 2) continue; + + try { + const groupId = await createAutoGroup({ + sourceChannelId, + name: rule.pattern, + packageIds: stillUngrouped.map((m) => m.id), + groupingSource: "MANUAL", + }); + + log.info( + { groupId, ruleId: rule.id, pattern: rule.pattern, memberCount: stillUngrouped.length }, + "Applied learned grouping rule" + ); + } catch (err) { + log.warn({ err, ruleId: rule.id }, "Failed to apply grouping rule"); + } + } +} + /** * After album grouping, cluster remaining ungrouped packages from the same channel * that were posted within a configurable time window. @@ -525,6 +588,64 @@ function extractRootFolder(paths: string[]): string | null { return maxSegment; } +/** + * Detect packages that could have been grouped differently. + * Checks if any grouped package's filename matches a GroupingRule + * that would place it in a different group. + */ +export async function detectGroupingConflicts( + sourceChannelId: string, + indexedPackages: IndexedPackageRef[] +): Promise { + const rules = await db.groupingRule.findMany({ + where: { sourceChannelId }, + }); + if (rules.length === 0) return; + + const grouped = await db.package.findMany({ + where: { + id: { in: indexedPackages.map((p) => p.packageId) }, + packageGroupId: { not: null }, + }, + select: { + id: true, + fileName: true, + packageGroupId: true, + packageGroup: { select: { name: true, groupingSource: true } }, + }, + }); + + for (const pkg of grouped) { + for (const rule of rules) { + if (pkg.fileName.toLowerCase().includes(rule.pattern.toLowerCase())) { + // Check if the rule's source group is different from current group + if (rule.createdByGroupId && rule.createdByGroupId !== pkg.packageGroupId) { + try { + await db.systemNotification.create({ + data: { + type: "GROUPING_CONFLICT", + severity: "INFO", + title: `Potential grouping conflict: ${pkg.fileName}`, + message: `Grouped by ${pkg.packageGroup?.groupingSource ?? "unknown"} into "${pkg.packageGroup?.name}", but also matches rule "${rule.pattern}" from a different manual group`, + context: { + packageId: pkg.id, + fileName: pkg.fileName, + currentGroupId: pkg.packageGroupId, + matchedRuleId: rule.id, + matchedPattern: rule.pattern, + }, + }, + }); + } catch { + // Best-effort + } + break; // One notification per package + } + } + } + } +} + /** * Find the longest common prefix among a list of filenames, * trimming trailing separators and partial words. diff --git a/worker/src/worker.ts b/worker/src/worker.ts index da652f4..f7365b3 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -47,7 +47,7 @@ import { readRarContents } from "./archive/rar-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; import { uploadToChannel } from "./upload/channel.js"; -import { processAlbumGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, type IndexedPackageRef } from "./grouping.js"; +import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js"; import { db } from "./db/client.js"; import type { TelegramAccount, TelegramChannel } from "@prisma/client"; import type { Client } from "tdl"; @@ -808,23 +808,37 @@ async function processArchiveSets( scanResult.photos ); - // Time-window grouping for remaining ungrouped packages - await processTimeWindowGroups(channel.id, indexedPackageRefs); + // Auto-grouping passes (gated by per-channel flag) + const channelRecord = await db.telegramChannel.findUnique({ + where: { id: channel.id }, + select: { autoGroupEnabled: true }, + }); - // Pattern-based grouping (date patterns, project slugs) - await processPatternGroups(channel.id, indexedPackageRefs); + if (channelRecord?.autoGroupEnabled !== false) { + // Learned rule-based grouping (from manual overrides) + await processRuleBasedGroups(channel.id, indexedPackageRefs); - // Creator-based grouping (3+ files from same creator) - await processCreatorGroups(channel.id, indexedPackageRefs); + // Time-window grouping for remaining ungrouped packages + await processTimeWindowGroups(channel.id, indexedPackageRefs); - // ZIP path prefix grouping (shared root folder inside archives) - await processZipPathGroups(channel.id, indexedPackageRefs); + // Pattern-based grouping (date patterns, project slugs) + await processPatternGroups(channel.id, indexedPackageRefs); - // Reply chain grouping (messages replying to same root) - await processReplyChainGroups(channel.id, indexedPackageRefs); + // Creator-based grouping (3+ files from same creator) + await processCreatorGroups(channel.id, indexedPackageRefs); - // Caption fuzzy match grouping - await processCaptionGroups(channel.id, indexedPackageRefs); + // ZIP path prefix grouping (shared root folder inside archives) + await processZipPathGroups(channel.id, indexedPackageRefs); + + // Reply chain grouping (messages replying to same root) + await processReplyChainGroups(channel.id, indexedPackageRefs); + + // Caption fuzzy match grouping + await processCaptionGroups(channel.id, indexedPackageRefs); + } + + // Check for potential grouping conflicts + await detectGroupingConflicts(channel.id, indexedPackageRefs); } return maxProcessedId; @@ -1162,6 +1176,34 @@ async function processOneArchiveSet( ); } + // ── Post-upload integrity check ── + // Verify the files on disk still match before we index + if (uploadPaths.length > 0 && !existingUpload) { + try { + const postUploadHash = await hashParts(uploadPaths); + if (splitPaths.length > 0) { + // Split files — hash should match the split hash (already verified above) + // No additional check needed since we verified split hash = original hash + } else if (postUploadHash !== contentHash) { + accountLog.error( + { fileName: archiveName, originalHash: contentHash, postUploadHash }, + "Hash changed between hashing and upload — possible disk corruption" + ); + await db.systemNotification.create({ + data: { + type: "HASH_MISMATCH", + severity: "ERROR", + title: `Post-upload hash mismatch: ${archiveName}`, + message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`, + context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id }, + }, + }); + } + } catch { + // Best-effort — don't fail the ingestion + } + } + // ── Preview thumbnail ── let previewData: Buffer | null = null; let previewMsgId: bigint | null = null;