feat: complete remaining features — training, FTS, bot groups, repair, re-tag
All checks were successful
continuous-integration/drone/push Build is passing

Manual override training (GroupingRule):
- Learn patterns from manual group creation (common filename prefix or creator)
- Apply learned rules as first auto-grouping pass (highest confidence after albums)
- GroupingRule model stores pattern, channel, signal type, confidence

Hash verification after upload:
- Re-hash upload files on disk before indexing to catch disk corruption
- Creates HASH_MISMATCH notification on discrepancy

Grouping conflict detection:
- After all grouping passes, check if grouped packages match rules from different groups
- Creates GROUPING_CONFLICT notification for manual review

Per-channel grouping flags:
- Add autoGroupEnabled boolean to TelegramChannel (default true)
- Auto-grouping passes (all except album) gated behind this flag
- Album grouping always runs as it reflects Telegram's native behavior

Full-text search (tsvector):
- Add searchVector tsvector column with GIN index and auto-update trigger
- Backfill 1870 existing packages
- FTS with ts_rank for ranked results, ILIKE fallback for short/failed queries
- Applied to both web app and bot search

Bot group awareness:
- /group <query> — view group info or search groups by name
- /sendgroup <id> — send all packages in a group to linked Telegram account

Bulk repair:
- repairPackageAction clears dest info and resets watermark for re-processing
- Repair button in notification bell for MISSING_PART and HASH_MISMATCH alerts
- /api/notifications/repair endpoint

Retroactive category re-tagging:
- When channel category changes, auto-update tags on all existing packages
- Removes old category tag, adds new one

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-30 14:34:14 +02:00
parent 7f9a03d4ee
commit f4aa9d9a2f
12 changed files with 788 additions and 25 deletions

View File

@@ -10,7 +10,10 @@ import {
getSubscriptions,
addSubscription,
removeSubscription,
getGroupById,
searchGroups,
} from "./db/queries.js";
import { db } from "./db/client.js";
import { sendTextMessage, sendPhotoMessage } from "./tdlib/client.js";
const log = childLogger("commands");
@@ -78,6 +81,12 @@ export async function handleMessage(msg: IncomingMessage): Promise<void> {
case "/status":
await handleStatus(chatId, userId);
break;
case "/group":
await handleGroup(chatId, args);
break;
case "/sendgroup":
await handleSendGroup(chatId, userId, args);
break;
default:
await sendTextMessage(
chatId,
@@ -117,6 +126,8 @@ async function handleStart(
`/search &lt;query&gt; — Search packages`,
`/latest [n] — Show latest packages`,
`/package &lt;id&gt; — Package details`,
`/group &lt;id or name&gt; — View group info and package list`,
`/sendgroup &lt;id&gt; — Send all packages in a group to yourself`,
`/link &lt;code&gt; — Link your Telegram to your web account`,
`/subscribe &lt;keyword&gt; — Get notified for new packages`,
`/subscriptions — View your subscriptions`,
@@ -136,6 +147,8 @@ async function handleHelp(chatId: bigint): Promise<void> {
`/search &lt;query&gt; — Search by filename or creator`,
`/latest [n] — Show n most recent packages (default: 5)`,
`/package &lt;id&gt; — View package details and file list`,
`/group &lt;id or name&gt; — View group info and package list`,
`/sendgroup &lt;id&gt; — Send all packages in a group to yourself`,
``,
`🔗 <b>Account Linking</b>`,
`/link &lt;code&gt; — Link Telegram to your web account`,
@@ -432,6 +445,168 @@ async function handleStatus(chatId: bigint, userId: bigint): Promise<void> {
}
}
async function handleGroup(chatId: bigint, query: string): Promise<void> {
if (!query) {
await sendTextMessage(
chatId,
"Usage: /group &lt;id or name&gt;\n\nProvide a group ID (starts with 'c') or a name to search.",
"textParseModeHTML"
);
return;
}
const trimmed = query.trim();
// If it looks like a cuid (starts with 'c', ~25 chars), look up by ID directly
if (/^c[a-z0-9]{20,}$/i.test(trimmed)) {
const group = await getGroupById(trimmed);
if (!group) {
await sendTextMessage(chatId, "Group not found.", "textParseModeHTML");
return;
}
const packageLines = group.packages.slice(0, 20).map((pkg, i) => {
const size = formatSize(pkg.fileSize);
return ` ${i + 1}. <b>${escapeHtml(pkg.fileName)}</b> (${size}, ${pkg.fileCount} files) — <code>${pkg.id}</code>`;
});
const more = group.packages.length > 20
? `\n ... and ${group.packages.length - 20} more`
: "";
const response = [
`📦 <b>Group: ${escapeHtml(group.name)}</b>`,
``,
`Packages: ${group.packages.length}`,
`ID: <code>${group.id}</code>`,
``,
`<b>Contents:</b>`,
...packageLines,
more,
``,
`Use /sendgroup ${group.id} to receive all packages.`,
]
.filter((l) => l !== "")
.join("\n");
await sendTextMessage(chatId, response, "textParseModeHTML");
return;
}
// Otherwise search by name
const groups = await searchGroups(trimmed, 5);
if (groups.length === 0) {
await sendTextMessage(
chatId,
`No groups found matching "<b>${escapeHtml(trimmed)}</b>".`,
"textParseModeHTML"
);
return;
}
const lines = groups.map(
(g, i) =>
`${i + 1}. <b>${escapeHtml(g.name)}</b> — ${g._count.packages} package(s)\n ID: <code>${g.id}</code>`
);
const response = [
`🔍 <b>Groups matching "${escapeHtml(trimmed)}":</b>`,
``,
...lines,
``,
`Use /group &lt;id&gt; for full details.`,
].join("\n");
await sendTextMessage(chatId, response, "textParseModeHTML");
}
async function handleSendGroup(
chatId: bigint,
userId: bigint,
args: string
): Promise<void> {
if (!args) {
await sendTextMessage(
chatId,
"Usage: /sendgroup &lt;group-id&gt;",
"textParseModeHTML"
);
return;
}
const groupId = args.trim();
const group = await getGroupById(groupId);
if (!group) {
await sendTextMessage(chatId, "Group not found.", "textParseModeHTML");
return;
}
// Require account linking
const link = await findLinkByTelegramUserId(userId);
if (!link) {
await sendTextMessage(
chatId,
"You must link your account before receiving packages.\nUse /link &lt;code&gt; to connect.",
"textParseModeHTML"
);
return;
}
// Only send packages that have been uploaded to the destination channel
const sendable = group.packages.filter(
(pkg) => pkg.destChannelId && pkg.destMessageId
);
if (sendable.length === 0) {
await sendTextMessage(
chatId,
`No packages in group "<b>${escapeHtml(group.name)}</b>" are ready to send yet.`,
"textParseModeHTML"
);
return;
}
// Create a BotSendRequest for each sendable package
const requests = await Promise.all(
sendable.map((pkg) =>
db.botSendRequest.create({
data: {
packageId: pkg.id,
telegramLinkId: link.id,
requestedByUserId: link.userId,
status: "PENDING",
},
})
)
);
// Fire pg_notify for each request so the send listener picks them up
for (const req of requests) {
await db.$queryRawUnsafe(
`SELECT pg_notify('bot_send', $1)`,
req.id
).catch(() => {
// Best-effort — the bot also processes PENDING requests on its send queue
});
}
await sendTextMessage(
chatId,
[
`✅ <b>Queued ${requests.length} package(s) from "${escapeHtml(group.name)}"</b>`,
``,
`You'll receive each archive shortly. Use /package &lt;id&gt; to check individual packages.`,
].join("\n"),
"textParseModeHTML"
);
log.info(
{ groupId, packageCount: requests.length, userId: userId.toString() },
"Group send queued"
);
}
function escapeHtml(text: string): string {
return text
.replace(/&/g, "&amp;")

View File

@@ -53,7 +53,52 @@ export async function createTelegramLink(
// ── Package search ──
export async function searchPackages(query: string, limit = 10) {
const packages = await db.package.findMany({
// Try full-text search first
if (query.length >= 3) {
const tsQuery = query
.trim()
.split(/\s+/)
.filter((w) => w.length >= 2)
.map((w) => w.replace(/[^a-zA-Z0-9]/g, ""))
.filter(Boolean)
.join(" & ");
if (tsQuery) {
try {
const ftsResults = await db.$queryRawUnsafe<{ id: string }[]>(
`SELECT id FROM packages
WHERE "searchVector" @@ to_tsquery('english', $1)
ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC
LIMIT $2`,
tsQuery,
limit
);
if (ftsResults.length > 0) {
return db.package.findMany({
where: { id: { in: ftsResults.map((r) => r.id) } },
orderBy: { indexedAt: "desc" },
select: {
id: true,
fileName: true,
fileSize: true,
archiveType: true,
fileCount: true,
creator: true,
indexedAt: true,
destChannelId: true,
destMessageId: true,
},
});
}
} catch {
// FTS failed — fall back to ILIKE
}
}
}
// Fallback: ILIKE search
return db.package.findMany({
where: {
OR: [
{ fileName: { contains: query, mode: "insensitive" } },
@@ -74,7 +119,44 @@ export async function searchPackages(query: string, limit = 10) {
destMessageId: true,
},
});
return packages;
}
// ── Group queries ──
export async function getGroupById(groupId: string) {
return db.packageGroup.findUnique({
where: { id: groupId },
include: {
packages: {
orderBy: { indexedAt: "desc" },
select: {
id: true,
fileName: true,
fileSize: true,
archiveType: true,
fileCount: true,
creator: true,
destChannelId: true,
destMessageId: true,
},
},
},
});
}
export async function searchGroups(query: string, limit = 5) {
return db.packageGroup.findMany({
where: {
name: { contains: query, mode: "insensitive" },
},
orderBy: { createdAt: "desc" },
take: limit,
select: {
id: true,
name: true,
_count: { select: { packages: true } },
},
});
}
export async function getLatestPackages(limit = 5) {

View File

@@ -0,0 +1,47 @@
-- AlterTable: add autoGroupEnabled to telegram_channels
ALTER TABLE "telegram_channels" ADD COLUMN "autoGroupEnabled" BOOLEAN NOT NULL DEFAULT true;
-- CreateTable: grouping_rules
CREATE TABLE "grouping_rules" (
"id" TEXT NOT NULL,
"sourceChannelId" TEXT NOT NULL,
"pattern" TEXT NOT NULL,
"signalType" "GroupingSource" NOT NULL,
"confidence" DOUBLE PRECISION NOT NULL DEFAULT 1.0,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"createdByGroupId" TEXT,
CONSTRAINT "grouping_rules_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "grouping_rules_sourceChannelId_idx" ON "grouping_rules"("sourceChannelId");
-- AddForeignKey
ALTER TABLE "grouping_rules" ADD CONSTRAINT "grouping_rules_sourceChannelId_fkey" FOREIGN KEY ("sourceChannelId") REFERENCES "telegram_channels"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- Full-text search: add tsvector column and GIN index
ALTER TABLE "packages" ADD COLUMN IF NOT EXISTS "searchVector" tsvector;
UPDATE "packages" SET "searchVector" = to_tsvector('english',
coalesce("fileName", '') || ' ' || coalesce("creator", '') || ' ' || coalesce("sourceCaption", '')
) WHERE "searchVector" IS NULL;
CREATE INDEX IF NOT EXISTS "packages_search_vector_idx" ON "packages" USING GIN ("searchVector");
-- Trigger to auto-update searchVector on insert/update
CREATE OR REPLACE FUNCTION packages_search_vector_update() RETURNS trigger AS $$
BEGIN
NEW."searchVector" := to_tsvector('english',
coalesce(NEW."fileName", '') || ' ' || coalesce(NEW."creator", '') || ' ' || coalesce(NEW."sourceCaption", '')
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS packages_search_vector_trigger ON "packages";
CREATE TRIGGER packages_search_vector_trigger
BEFORE INSERT OR UPDATE OF "fileName", "creator", "sourceCaption"
ON "packages"
FOR EACH ROW
EXECUTE FUNCTION packages_search_vector_update();

View File

@@ -429,10 +429,13 @@ model TelegramChannel {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
autoGroupEnabled Boolean @default(true)
accountMaps AccountChannelMap[]
packages Package[]
skippedPackages SkippedPackage[]
packageGroups PackageGroup[]
groupingRules GroupingRule[]
@@index([type, isActive])
@@index([category])
@@ -847,3 +850,18 @@ model SystemNotification {
@@index([type])
@@map("system_notifications")
}
model GroupingRule {
id String @id @default(cuid())
sourceChannelId String
pattern String // Regex or keyword pattern learned from manual grouping
signalType GroupingSource // Which grouping signal this rule applies to
confidence Float @default(1.0)
createdAt DateTime @default(now())
createdByGroupId String? // The manual group that spawned this rule
sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id], onDelete: Cascade)
@@index([sourceChannelId])
@@map("grouping_rules")
}

View File

@@ -186,6 +186,62 @@ export async function setPreviewFromExtract(
}
}
export async function repairPackageAction(
packageId: string
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
const pkg = await prisma.package.findUnique({
where: { id: packageId },
select: {
id: true,
fileName: true,
sourceChannelId: true,
sourceMessageId: true,
destChannelId: true,
destMessageId: true,
},
});
if (!pkg) return { success: false, error: "Package not found" };
// Clear the destination info so the worker re-processes it
await prisma.package.update({
where: { id: packageId },
data: {
destMessageId: null,
destMessageIds: [],
destChannelId: null,
},
});
// Reset the channel watermark to before this message so worker picks it up
await prisma.accountChannelMap.updateMany({
where: {
channelId: pkg.sourceChannelId,
lastProcessedMessageId: { gte: pkg.sourceMessageId },
},
data: { lastProcessedMessageId: pkg.sourceMessageId - BigInt(1) },
});
// Mark related notifications as read
await prisma.systemNotification.updateMany({
where: {
context: { path: ["packageId"], equals: packageId },
isRead: false,
},
data: { isRead: true },
});
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to schedule repair" };
}
}
export async function retrySkippedPackageAction(
id: string
): Promise<ActionResult> {

View File

@@ -291,10 +291,25 @@ export async function setChannelCategory(
if (!admin.success) return admin;
try {
const existing = await prisma.telegramChannel.findUnique({
where: { id },
select: { category: true },
});
if (!existing) return { success: false, error: "Channel not found" };
const oldCategory = existing.category;
const newCategory = category?.trim() || null;
await prisma.telegramChannel.update({
where: { id },
data: { category: category?.trim() || null },
data: { category: newCategory },
});
// Retroactively re-tag packages from this channel when category changes
if (oldCategory !== newCategory && newCategory) {
await retagChannelPackages(id, oldCategory, newCategory);
}
revalidatePath("/telegram");
return { success: true, data: undefined };
} catch {
@@ -302,6 +317,50 @@ export async function setChannelCategory(
}
}
export async function retagChannelPackages(
channelId: string,
oldCategory: string | null,
newCategory: string
): Promise<ActionResult<{ updated: number }>> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
// Find packages from this channel that have the old category tag (or no category tag)
const packages = await prisma.package.findMany({
where: { sourceChannelId: channelId },
select: { id: true, tags: true },
});
let updated = 0;
for (const pkg of packages) {
const tags = [...pkg.tags];
// Remove old category tag if present
if (oldCategory) {
const idx = tags.indexOf(oldCategory);
if (idx !== -1) tags.splice(idx, 1);
}
// Add new category tag if not already present
if (!tags.includes(newCategory)) {
tags.push(newCategory);
}
// Only update if tags actually changed
if (JSON.stringify(tags) !== JSON.stringify(pkg.tags)) {
await prisma.package.update({
where: { id: pkg.id },
data: { tags },
});
updated++;
}
}
revalidatePath("/stls");
return { success: true, data: { updated } };
} catch {
return { success: false, error: "Failed to re-tag packages" };
}
}
export async function setChannelType(
id: string,
type: "SOURCE" | "DESTINATION"

View File

@@ -0,0 +1,43 @@
import { NextResponse } from "next/server";
import { auth } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
export const dynamic = "force-dynamic";
export async function POST(request: Request) {
const session = await auth();
if (!session?.user?.id) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const body = await request.json().catch(() => ({}));
const notificationId = body.notificationId as string;
if (!notificationId) {
return NextResponse.json({ error: "notificationId required" }, { status: 400 });
}
const notification = await prisma.systemNotification.findUnique({
where: { id: notificationId },
});
if (!notification) {
return NextResponse.json({ error: "Notification not found" }, { status: 404 });
}
const context = notification.context as Record<string, unknown> | null;
const packageId = context?.packageId as string | undefined;
if (!packageId) {
return NextResponse.json({ error: "Notification has no associated package" }, { status: 400 });
}
// Import and call the repair action
const { repairPackageAction } = await import("@/app/(app)/stls/actions");
const result = await repairPackageAction(packageId);
if (!result.success) {
return NextResponse.json({ error: result.error }, { status: 500 });
}
return NextResponse.json({ success: true });
}

View File

@@ -10,6 +10,7 @@ import {
PopoverTrigger,
} from "@/components/ui/popover";
import { ScrollArea } from "@/components/ui/scroll-area";
import { toast } from "sonner";
interface Notification {
id: string;
@@ -93,6 +94,22 @@ export function NotificationBell() {
}
}
async function handleRepair(notificationId: string) {
try {
const res = await fetch("/api/notifications/repair", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ notificationId }),
});
if (res.ok) {
toast.success("Repair scheduled — package will be re-processed on next cycle");
fetchNotifications();
}
} catch {
// Ignore
}
}
function formatTime(iso: string): string {
const d = new Date(iso);
const now = new Date();
@@ -147,12 +164,19 @@ export function NotificationBell() {
const Icon = severityIcon[n.severity] ?? Info;
const color = severityColor[n.severity] ?? "text-muted-foreground";
return (
<button
<div
key={n.id}
className={`flex w-full gap-3 px-4 py-3 text-left hover:bg-muted/50 transition-colors ${
!n.isRead ? "bg-muted/20" : ""
}`}
role="button"
tabIndex={0}
onClick={() => !n.isRead && handleMarkRead(n.id)}
onKeyDown={(e) => {
if (e.key === "Enter" || e.key === " ") {
if (!n.isRead) handleMarkRead(n.id);
}
}}
>
<Icon className={`h-4 w-4 mt-0.5 shrink-0 ${color}`} />
<div className="flex-1 min-w-0">
@@ -170,8 +194,21 @@ export function NotificationBell() {
<p className="text-[10px] text-muted-foreground mt-1">
{formatTime(n.createdAt)}
</p>
{(n.type === "MISSING_PART" || n.type === "HASH_MISMATCH") && (
<Button
variant="outline"
size="sm"
className="h-6 px-2 text-xs mt-1"
onClick={(e) => {
e.stopPropagation();
handleRepair(n.id);
}}
>
Repair
</Button>
)}
</div>
</button>
</div>
);
})}
</div>

View File

@@ -340,6 +340,30 @@ export async function listPackageFiles(options: {
};
}
async function fullTextSearchPackageIds(query: string, limit: number): Promise<string[]> {
// Convert user query to tsquery — handle multi-word by joining with &
const tsQuery = query
.trim()
.split(/\s+/)
.filter((w) => w.length >= 2)
.map((w) => w.replace(/[^a-zA-Z0-9]/g, ""))
.filter(Boolean)
.join(" & ");
if (!tsQuery) return [];
const results = await prisma.$queryRawUnsafe<{ id: string }[]>(
`SELECT id FROM packages
WHERE "searchVector" @@ to_tsquery('english', $1)
ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC
LIMIT $2`,
tsQuery,
limit
);
return results.map((r) => r.id);
}
export async function searchPackages(options: {
query: string;
page: number;
@@ -366,14 +390,26 @@ export async function searchPackages(options: {
);
const fileMatchedIds = fileMatches.map((f) => f.packageId);
// Try full-text search first (better ranking, handles word stemming)
let ftsPackageNameIds: string[] = [];
if (options.searchIn === "both" && q.length >= 3) {
try {
ftsPackageNameIds = await fullTextSearchPackageIds(q, 200);
} catch {
// FTS failed — fall back to ILIKE below
}
}
const packageNameIds =
options.searchIn === "both"
? (
await prisma.package.findMany({
where: { fileName: { contains: q, mode: "insensitive" } },
select: { id: true },
})
).map((p) => p.id)
? ftsPackageNameIds.length > 0
? ftsPackageNameIds
: (
await prisma.package.findMany({
where: { fileName: { contains: q, mode: "insensitive" } },
select: { id: true },
})
).map((p) => p.id)
: [];
// Also match by group name
@@ -696,6 +732,53 @@ export async function createManualGroup(name: string, packageIds: string[]) {
data: { packageGroupId: group.id },
});
// Learn a grouping rule from the manual override
try {
const linkedPkgs = await prisma.package.findMany({
where: { id: { in: packageIds } },
select: { fileName: true, creator: true },
});
// Extract the common filename pattern
const fileNames = linkedPkgs.map((p) => p.fileName);
let pattern = "";
if (fileNames.length > 1) {
// Find longest common prefix
let prefix = fileNames[0];
for (let i = 1; i < fileNames.length; i++) {
while (!fileNames[i].startsWith(prefix)) {
prefix = prefix.slice(0, -1);
if (!prefix) break;
}
}
const trimmed = prefix.replace(/[\s\-_.(]+$/, "");
if (trimmed.length >= 4) {
pattern = trimmed;
}
}
// Fall back to shared creator
if (!pattern) {
const creators = [...new Set(linkedPkgs.map((p) => p.creator).filter(Boolean))];
if (creators.length === 1 && creators[0]) {
pattern = creators[0];
}
}
if (pattern) {
await prisma.groupingRule.create({
data: {
sourceChannelId: firstPkg.sourceChannelId,
pattern,
signalType: "MANUAL",
createdByGroupId: group.id,
},
});
}
} catch {
// Best-effort — don't fail the group creation if rule learning fails
}
// Clean up empty groups left behind
await prisma.packageGroup.deleteMany({
where: { packages: { none: {} }, id: { not: group.id } },

View File

@@ -617,7 +617,7 @@ export async function createAutoGroup(input: {
sourceChannelId: string;
name: string;
packageIds: string[];
groupingSource: "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY";
groupingSource: "ALBUM" | "MANUAL" | "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY";
}): Promise<string> {
const group = await db.packageGroup.create({
data: {

View File

@@ -79,6 +79,69 @@ export async function processAlbumGroups(
}
}
/**
* Apply learned GroupingRules from manual overrides.
* For each rule, find ungrouped packages whose fileName contains the pattern.
*/
export async function processRuleBasedGroups(
sourceChannelId: string,
indexedPackages: IndexedPackageRef[]
): Promise<void> {
const rules = await db.groupingRule.findMany({
where: { sourceChannelId },
orderBy: { confidence: "desc" },
});
if (rules.length === 0) return;
const ungrouped = await db.package.findMany({
where: {
id: { in: indexedPackages.map((p) => p.packageId) },
packageGroupId: null,
},
select: { id: true, fileName: true, creator: true },
});
if (ungrouped.length < 2) return;
for (const rule of rules) {
const matches = ungrouped.filter((pkg) => {
const lower = rule.pattern.toLowerCase();
return pkg.fileName.toLowerCase().includes(lower) ||
(pkg.creator && pkg.creator.toLowerCase().includes(lower));
});
if (matches.length < 2) continue;
// Check if any are already grouped (by a previous rule in this loop)
const stillUngrouped = await db.package.findMany({
where: {
id: { in: matches.map((m) => m.id) },
packageGroupId: null,
},
select: { id: true },
});
if (stillUngrouped.length < 2) continue;
try {
const groupId = await createAutoGroup({
sourceChannelId,
name: rule.pattern,
packageIds: stillUngrouped.map((m) => m.id),
groupingSource: "MANUAL",
});
log.info(
{ groupId, ruleId: rule.id, pattern: rule.pattern, memberCount: stillUngrouped.length },
"Applied learned grouping rule"
);
} catch (err) {
log.warn({ err, ruleId: rule.id }, "Failed to apply grouping rule");
}
}
}
/**
* After album grouping, cluster remaining ungrouped packages from the same channel
* that were posted within a configurable time window.
@@ -525,6 +588,64 @@ function extractRootFolder(paths: string[]): string | null {
return maxSegment;
}
/**
* Detect packages that could have been grouped differently.
* Checks if any grouped package's filename matches a GroupingRule
* that would place it in a different group.
*/
export async function detectGroupingConflicts(
sourceChannelId: string,
indexedPackages: IndexedPackageRef[]
): Promise<void> {
const rules = await db.groupingRule.findMany({
where: { sourceChannelId },
});
if (rules.length === 0) return;
const grouped = await db.package.findMany({
where: {
id: { in: indexedPackages.map((p) => p.packageId) },
packageGroupId: { not: null },
},
select: {
id: true,
fileName: true,
packageGroupId: true,
packageGroup: { select: { name: true, groupingSource: true } },
},
});
for (const pkg of grouped) {
for (const rule of rules) {
if (pkg.fileName.toLowerCase().includes(rule.pattern.toLowerCase())) {
// Check if the rule's source group is different from current group
if (rule.createdByGroupId && rule.createdByGroupId !== pkg.packageGroupId) {
try {
await db.systemNotification.create({
data: {
type: "GROUPING_CONFLICT",
severity: "INFO",
title: `Potential grouping conflict: ${pkg.fileName}`,
message: `Grouped by ${pkg.packageGroup?.groupingSource ?? "unknown"} into "${pkg.packageGroup?.name}", but also matches rule "${rule.pattern}" from a different manual group`,
context: {
packageId: pkg.id,
fileName: pkg.fileName,
currentGroupId: pkg.packageGroupId,
matchedRuleId: rule.id,
matchedPattern: rule.pattern,
},
},
});
} catch {
// Best-effort
}
break; // One notification per package
}
}
}
}
}
/**
* Find the longest common prefix among a list of filenames,
* trimming trailing separators and partial words.

View File

@@ -47,7 +47,7 @@ import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js";
import { processAlbumGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, type IndexedPackageRef } from "./grouping.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
import type { Client } from "tdl";
@@ -808,23 +808,37 @@ async function processArchiveSets(
scanResult.photos
);
// Time-window grouping for remaining ungrouped packages
await processTimeWindowGroups(channel.id, indexedPackageRefs);
// Auto-grouping passes (gated by per-channel flag)
const channelRecord = await db.telegramChannel.findUnique({
where: { id: channel.id },
select: { autoGroupEnabled: true },
});
// Pattern-based grouping (date patterns, project slugs)
await processPatternGroups(channel.id, indexedPackageRefs);
if (channelRecord?.autoGroupEnabled !== false) {
// Learned rule-based grouping (from manual overrides)
await processRuleBasedGroups(channel.id, indexedPackageRefs);
// Creator-based grouping (3+ files from same creator)
await processCreatorGroups(channel.id, indexedPackageRefs);
// Time-window grouping for remaining ungrouped packages
await processTimeWindowGroups(channel.id, indexedPackageRefs);
// ZIP path prefix grouping (shared root folder inside archives)
await processZipPathGroups(channel.id, indexedPackageRefs);
// Pattern-based grouping (date patterns, project slugs)
await processPatternGroups(channel.id, indexedPackageRefs);
// Reply chain grouping (messages replying to same root)
await processReplyChainGroups(channel.id, indexedPackageRefs);
// Creator-based grouping (3+ files from same creator)
await processCreatorGroups(channel.id, indexedPackageRefs);
// Caption fuzzy match grouping
await processCaptionGroups(channel.id, indexedPackageRefs);
// ZIP path prefix grouping (shared root folder inside archives)
await processZipPathGroups(channel.id, indexedPackageRefs);
// Reply chain grouping (messages replying to same root)
await processReplyChainGroups(channel.id, indexedPackageRefs);
// Caption fuzzy match grouping
await processCaptionGroups(channel.id, indexedPackageRefs);
}
// Check for potential grouping conflicts
await detectGroupingConflicts(channel.id, indexedPackageRefs);
}
return maxProcessedId;
@@ -1162,6 +1176,34 @@ async function processOneArchiveSet(
);
}
// ── Post-upload integrity check ──
// Verify the files on disk still match before we index
if (uploadPaths.length > 0 && !existingUpload) {
try {
const postUploadHash = await hashParts(uploadPaths);
if (splitPaths.length > 0) {
// Split files — hash should match the split hash (already verified above)
// No additional check needed since we verified split hash = original hash
} else if (postUploadHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
"Hash changed between hashing and upload — possible disk corruption"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Post-upload hash mismatch: ${archiveName}`,
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}`,
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
},
});
}
} catch {
// Best-effort — don't fail the ingestion
}
}
// ── Preview thumbnail ──
let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null;