mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-10 22:01:16 +00:00
Compare commits
4 Commits
194c87a256
...
f4aa9d9a2f
| Author | SHA1 | Date | |
|---|---|---|---|
| f4aa9d9a2f | |||
| 7f9a03d4ee | |||
| 2c46ab0843 | |||
| 9e78cc5d19 |
@@ -10,7 +10,10 @@ import {
|
||||
getSubscriptions,
|
||||
addSubscription,
|
||||
removeSubscription,
|
||||
getGroupById,
|
||||
searchGroups,
|
||||
} from "./db/queries.js";
|
||||
import { db } from "./db/client.js";
|
||||
import { sendTextMessage, sendPhotoMessage } from "./tdlib/client.js";
|
||||
|
||||
const log = childLogger("commands");
|
||||
@@ -78,6 +81,12 @@ export async function handleMessage(msg: IncomingMessage): Promise<void> {
|
||||
case "/status":
|
||||
await handleStatus(chatId, userId);
|
||||
break;
|
||||
case "/group":
|
||||
await handleGroup(chatId, args);
|
||||
break;
|
||||
case "/sendgroup":
|
||||
await handleSendGroup(chatId, userId, args);
|
||||
break;
|
||||
default:
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
@@ -117,6 +126,8 @@ async function handleStart(
|
||||
`/search <query> — Search packages`,
|
||||
`/latest [n] — Show latest packages`,
|
||||
`/package <id> — Package details`,
|
||||
`/group <id or name> — View group info and package list`,
|
||||
`/sendgroup <id> — Send all packages in a group to yourself`,
|
||||
`/link <code> — Link your Telegram to your web account`,
|
||||
`/subscribe <keyword> — Get notified for new packages`,
|
||||
`/subscriptions — View your subscriptions`,
|
||||
@@ -136,6 +147,8 @@ async function handleHelp(chatId: bigint): Promise<void> {
|
||||
`/search <query> — Search by filename or creator`,
|
||||
`/latest [n] — Show n most recent packages (default: 5)`,
|
||||
`/package <id> — View package details and file list`,
|
||||
`/group <id or name> — View group info and package list`,
|
||||
`/sendgroup <id> — Send all packages in a group to yourself`,
|
||||
``,
|
||||
`🔗 <b>Account Linking</b>`,
|
||||
`/link <code> — Link Telegram to your web account`,
|
||||
@@ -432,6 +445,168 @@ async function handleStatus(chatId: bigint, userId: bigint): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
async function handleGroup(chatId: bigint, query: string): Promise<void> {
|
||||
if (!query) {
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
"Usage: /group <id or name>\n\nProvide a group ID (starts with 'c') or a name to search.",
|
||||
"textParseModeHTML"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const trimmed = query.trim();
|
||||
|
||||
// If it looks like a cuid (starts with 'c', ~25 chars), look up by ID directly
|
||||
if (/^c[a-z0-9]{20,}$/i.test(trimmed)) {
|
||||
const group = await getGroupById(trimmed);
|
||||
if (!group) {
|
||||
await sendTextMessage(chatId, "Group not found.", "textParseModeHTML");
|
||||
return;
|
||||
}
|
||||
|
||||
const packageLines = group.packages.slice(0, 20).map((pkg, i) => {
|
||||
const size = formatSize(pkg.fileSize);
|
||||
return ` ${i + 1}. <b>${escapeHtml(pkg.fileName)}</b> (${size}, ${pkg.fileCount} files) — <code>${pkg.id}</code>`;
|
||||
});
|
||||
const more = group.packages.length > 20
|
||||
? `\n ... and ${group.packages.length - 20} more`
|
||||
: "";
|
||||
|
||||
const response = [
|
||||
`📦 <b>Group: ${escapeHtml(group.name)}</b>`,
|
||||
``,
|
||||
`Packages: ${group.packages.length}`,
|
||||
`ID: <code>${group.id}</code>`,
|
||||
``,
|
||||
`<b>Contents:</b>`,
|
||||
...packageLines,
|
||||
more,
|
||||
``,
|
||||
`Use /sendgroup ${group.id} to receive all packages.`,
|
||||
]
|
||||
.filter((l) => l !== "")
|
||||
.join("\n");
|
||||
|
||||
await sendTextMessage(chatId, response, "textParseModeHTML");
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise search by name
|
||||
const groups = await searchGroups(trimmed, 5);
|
||||
|
||||
if (groups.length === 0) {
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
`No groups found matching "<b>${escapeHtml(trimmed)}</b>".`,
|
||||
"textParseModeHTML"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const lines = groups.map(
|
||||
(g, i) =>
|
||||
`${i + 1}. <b>${escapeHtml(g.name)}</b> — ${g._count.packages} package(s)\n ID: <code>${g.id}</code>`
|
||||
);
|
||||
|
||||
const response = [
|
||||
`🔍 <b>Groups matching "${escapeHtml(trimmed)}":</b>`,
|
||||
``,
|
||||
...lines,
|
||||
``,
|
||||
`Use /group <id> for full details.`,
|
||||
].join("\n");
|
||||
|
||||
await sendTextMessage(chatId, response, "textParseModeHTML");
|
||||
}
|
||||
|
||||
async function handleSendGroup(
|
||||
chatId: bigint,
|
||||
userId: bigint,
|
||||
args: string
|
||||
): Promise<void> {
|
||||
if (!args) {
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
"Usage: /sendgroup <group-id>",
|
||||
"textParseModeHTML"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const groupId = args.trim();
|
||||
const group = await getGroupById(groupId);
|
||||
|
||||
if (!group) {
|
||||
await sendTextMessage(chatId, "Group not found.", "textParseModeHTML");
|
||||
return;
|
||||
}
|
||||
|
||||
// Require account linking
|
||||
const link = await findLinkByTelegramUserId(userId);
|
||||
if (!link) {
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
"You must link your account before receiving packages.\nUse /link <code> to connect.",
|
||||
"textParseModeHTML"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Only send packages that have been uploaded to the destination channel
|
||||
const sendable = group.packages.filter(
|
||||
(pkg) => pkg.destChannelId && pkg.destMessageId
|
||||
);
|
||||
|
||||
if (sendable.length === 0) {
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
`No packages in group "<b>${escapeHtml(group.name)}</b>" are ready to send yet.`,
|
||||
"textParseModeHTML"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a BotSendRequest for each sendable package
|
||||
const requests = await Promise.all(
|
||||
sendable.map((pkg) =>
|
||||
db.botSendRequest.create({
|
||||
data: {
|
||||
packageId: pkg.id,
|
||||
telegramLinkId: link.id,
|
||||
requestedByUserId: link.userId,
|
||||
status: "PENDING",
|
||||
},
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
// Fire pg_notify for each request so the send listener picks them up
|
||||
for (const req of requests) {
|
||||
await db.$queryRawUnsafe(
|
||||
`SELECT pg_notify('bot_send', $1)`,
|
||||
req.id
|
||||
).catch(() => {
|
||||
// Best-effort — the bot also processes PENDING requests on its send queue
|
||||
});
|
||||
}
|
||||
|
||||
await sendTextMessage(
|
||||
chatId,
|
||||
[
|
||||
`✅ <b>Queued ${requests.length} package(s) from "${escapeHtml(group.name)}"</b>`,
|
||||
``,
|
||||
`You'll receive each archive shortly. Use /package <id> to check individual packages.`,
|
||||
].join("\n"),
|
||||
"textParseModeHTML"
|
||||
);
|
||||
|
||||
log.info(
|
||||
{ groupId, packageCount: requests.length, userId: userId.toString() },
|
||||
"Group send queued"
|
||||
);
|
||||
}
|
||||
|
||||
function escapeHtml(text: string): string {
|
||||
return text
|
||||
.replace(/&/g, "&")
|
||||
|
||||
@@ -53,7 +53,52 @@ export async function createTelegramLink(
|
||||
// ── Package search ──
|
||||
|
||||
export async function searchPackages(query: string, limit = 10) {
|
||||
const packages = await db.package.findMany({
|
||||
// Try full-text search first
|
||||
if (query.length >= 3) {
|
||||
const tsQuery = query
|
||||
.trim()
|
||||
.split(/\s+/)
|
||||
.filter((w) => w.length >= 2)
|
||||
.map((w) => w.replace(/[^a-zA-Z0-9]/g, ""))
|
||||
.filter(Boolean)
|
||||
.join(" & ");
|
||||
|
||||
if (tsQuery) {
|
||||
try {
|
||||
const ftsResults = await db.$queryRawUnsafe<{ id: string }[]>(
|
||||
`SELECT id FROM packages
|
||||
WHERE "searchVector" @@ to_tsquery('english', $1)
|
||||
ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC
|
||||
LIMIT $2`,
|
||||
tsQuery,
|
||||
limit
|
||||
);
|
||||
|
||||
if (ftsResults.length > 0) {
|
||||
return db.package.findMany({
|
||||
where: { id: { in: ftsResults.map((r) => r.id) } },
|
||||
orderBy: { indexedAt: "desc" },
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
fileSize: true,
|
||||
archiveType: true,
|
||||
fileCount: true,
|
||||
creator: true,
|
||||
indexedAt: true,
|
||||
destChannelId: true,
|
||||
destMessageId: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// FTS failed — fall back to ILIKE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: ILIKE search
|
||||
return db.package.findMany({
|
||||
where: {
|
||||
OR: [
|
||||
{ fileName: { contains: query, mode: "insensitive" } },
|
||||
@@ -74,7 +119,44 @@ export async function searchPackages(query: string, limit = 10) {
|
||||
destMessageId: true,
|
||||
},
|
||||
});
|
||||
return packages;
|
||||
}
|
||||
|
||||
// ── Group queries ──
|
||||
|
||||
export async function getGroupById(groupId: string) {
|
||||
return db.packageGroup.findUnique({
|
||||
where: { id: groupId },
|
||||
include: {
|
||||
packages: {
|
||||
orderBy: { indexedAt: "desc" },
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
fileSize: true,
|
||||
archiveType: true,
|
||||
fileCount: true,
|
||||
creator: true,
|
||||
destChannelId: true,
|
||||
destMessageId: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function searchGroups(query: string, limit = 5) {
|
||||
return db.packageGroup.findMany({
|
||||
where: {
|
||||
name: { contains: query, mode: "insensitive" },
|
||||
},
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: limit,
|
||||
select: {
|
||||
id: true,
|
||||
name: true,
|
||||
_count: { select: { packages: true } },
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function getLatestPackages(limit = 5) {
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
# Grouping Phase 1: Foundation + Time-Window Grouping
|
||||
|
||||
> **For agentic workers:** Use superpowers:subagent-driven-development to implement this plan.
|
||||
|
||||
**Goal:** Add grouping infrastructure (schema, enums, notifications model), an ungrouped staging queue in the UI, and time-window auto-grouping as the first automatic signal beyond album grouping.
|
||||
|
||||
**Architecture:** Schema changes lay the foundation. Ungrouped tab is a query filter. Time-window grouping runs as a post-processing pass after album grouping in the worker pipeline.
|
||||
|
||||
**Tech Stack:** Prisma schema + migration, worker TypeScript, Next.js App Router.
|
||||
|
||||
---
|
||||
|
||||
## Task 1: Schema Migration
|
||||
|
||||
**Files:**
|
||||
- Modify: `prisma/schema.prisma`
|
||||
- Create: migration SQL
|
||||
|
||||
Add:
|
||||
1. `GroupingSource` enum: `ALBUM`, `MANUAL`, `AUTO_TIME`, `AUTO_PATTERN`, `AUTO_REPLY`, `AUTO_ZIP`, `AUTO_CAPTION`
|
||||
2. `groupingSource GroupingSource @default(MANUAL)` on `PackageGroup`
|
||||
3. `SystemNotification` model with `type`, `severity`, `title`, `message`, `context` (Json), `isRead`
|
||||
4. `NotificationType` enum: `HASH_MISMATCH`, `MISSING_PART`, `UPLOAD_FAILED`, `DOWNLOAD_FAILED`, `GROUPING_CONFLICT`, `INTEGRITY_AUDIT`
|
||||
5. `NotificationSeverity` enum: `INFO`, `WARNING`, `ERROR`
|
||||
|
||||
Backfill: `UPDATE package_groups SET "groupingSource" = 'ALBUM' WHERE "mediaAlbumId" IS NOT NULL`
|
||||
|
||||
---
|
||||
|
||||
## Task 2: Ungrouped Staging Tab in STL Page
|
||||
|
||||
**Files:**
|
||||
- Modify: `src/lib/telegram/queries.ts` — add `listUngroupedPackages()` query
|
||||
- Modify: `src/app/(app)/stls/page.tsx` — add tab parameter support
|
||||
- Modify: `src/app/(app)/stls/_components/stl-table.tsx` — add "Ungrouped" tab
|
||||
|
||||
Add a tab next to the existing "Skipped" tab that shows packages where `packageGroupId IS NULL`. Uses the existing `PackageListItem` type and table rendering. This gives users a clear view of files that need manual grouping.
|
||||
|
||||
---
|
||||
|
||||
## Task 3: Time-Window Auto-Grouping in Worker
|
||||
|
||||
**Files:**
|
||||
- Create: `worker/src/grouping.ts` — add `processTimeWindowGroups()` after existing `processAlbumGroups()`
|
||||
- Modify: `worker/src/worker.ts` — call time-window grouping after album grouping
|
||||
- Modify: `worker/src/util/config.ts` — add `autoGroupTimeWindowMinutes` config
|
||||
|
||||
After album grouping completes, find remaining ungrouped packages from the same channel scan. Cluster packages whose `sourceMessageId` timestamps are within the configured window (default 5 minutes). Create groups for clusters of 2+ with `groupingSource = AUTO_TIME` and name derived from the common filename prefix or first file's base name.
|
||||
|
||||
---
|
||||
|
||||
## Task 4: Hash Verification After Split
|
||||
|
||||
**Files:**
|
||||
- Modify: `worker/src/worker.ts` — add hash re-check after concat+split
|
||||
- Modify: `worker/src/archive/hash.ts` — (no changes needed, reuse `hashParts`)
|
||||
|
||||
After `concatenateFiles()` + `byteLevelSplit()`, re-hash the split parts and compare to the original `contentHash`. If mismatch, log error and create a `SystemNotification` (once that table exists). This closes the integrity gap identified in the audit.
|
||||
|
||||
---
|
||||
|
||||
## Task 5: Build & Deploy
|
||||
|
||||
Rebuild worker and app images. Deploy. Verify:
|
||||
- Worker logs show `maxPartSizeMB` and new `autoGroupTimeWindowMinutes` in config
|
||||
- Ungrouped tab visible in STL page
|
||||
- Previously-skipped large archives begin processing
|
||||
@@ -0,0 +1,32 @@
|
||||
-- CreateEnum GroupingSource
|
||||
CREATE TYPE "GroupingSource" AS ENUM ('ALBUM', 'MANUAL', 'AUTO_TIME', 'AUTO_PATTERN', 'AUTO_REPLY', 'AUTO_ZIP', 'AUTO_CAPTION');
|
||||
|
||||
-- CreateEnum NotificationType
|
||||
CREATE TYPE "NotificationType" AS ENUM ('HASH_MISMATCH', 'MISSING_PART', 'UPLOAD_FAILED', 'DOWNLOAD_FAILED', 'GROUPING_CONFLICT', 'INTEGRITY_AUDIT');
|
||||
|
||||
-- CreateEnum NotificationSeverity
|
||||
CREATE TYPE "NotificationSeverity" AS ENUM ('INFO', 'WARNING', 'ERROR');
|
||||
|
||||
-- AlterTable: add groupingSource to package_groups
|
||||
ALTER TABLE "package_groups" ADD COLUMN "groupingSource" "GroupingSource" NOT NULL DEFAULT 'MANUAL';
|
||||
|
||||
-- Backfill: mark album-based groups
|
||||
UPDATE "package_groups" SET "groupingSource" = 'ALBUM' WHERE "mediaAlbumId" IS NOT NULL;
|
||||
|
||||
-- CreateTable: system_notifications
|
||||
CREATE TABLE "system_notifications" (
|
||||
"id" TEXT NOT NULL,
|
||||
"type" "NotificationType" NOT NULL,
|
||||
"severity" "NotificationSeverity" NOT NULL DEFAULT 'INFO',
|
||||
"title" TEXT NOT NULL,
|
||||
"message" TEXT NOT NULL,
|
||||
"context" JSONB,
|
||||
"isRead" BOOLEAN NOT NULL DEFAULT false,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
CONSTRAINT "system_notifications_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "system_notifications_isRead_createdAt_idx" ON "system_notifications"("isRead", "createdAt");
|
||||
CREATE INDEX "system_notifications_type_idx" ON "system_notifications"("type");
|
||||
@@ -0,0 +1,3 @@
|
||||
-- AlterTable: add sourceCaption and replyToMessageId to packages
|
||||
ALTER TABLE "packages" ADD COLUMN "sourceCaption" TEXT;
|
||||
ALTER TABLE "packages" ADD COLUMN "replyToMessageId" BIGINT;
|
||||
@@ -0,0 +1,47 @@
|
||||
-- AlterTable: add autoGroupEnabled to telegram_channels
|
||||
ALTER TABLE "telegram_channels" ADD COLUMN "autoGroupEnabled" BOOLEAN NOT NULL DEFAULT true;
|
||||
|
||||
-- CreateTable: grouping_rules
|
||||
CREATE TABLE "grouping_rules" (
|
||||
"id" TEXT NOT NULL,
|
||||
"sourceChannelId" TEXT NOT NULL,
|
||||
"pattern" TEXT NOT NULL,
|
||||
"signalType" "GroupingSource" NOT NULL,
|
||||
"confidence" DOUBLE PRECISION NOT NULL DEFAULT 1.0,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"createdByGroupId" TEXT,
|
||||
|
||||
CONSTRAINT "grouping_rules_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "grouping_rules_sourceChannelId_idx" ON "grouping_rules"("sourceChannelId");
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "grouping_rules" ADD CONSTRAINT "grouping_rules_sourceChannelId_fkey" FOREIGN KEY ("sourceChannelId") REFERENCES "telegram_channels"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
-- Full-text search: add tsvector column and GIN index
|
||||
ALTER TABLE "packages" ADD COLUMN IF NOT EXISTS "searchVector" tsvector;
|
||||
|
||||
UPDATE "packages" SET "searchVector" = to_tsvector('english',
|
||||
coalesce("fileName", '') || ' ' || coalesce("creator", '') || ' ' || coalesce("sourceCaption", '')
|
||||
) WHERE "searchVector" IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS "packages_search_vector_idx" ON "packages" USING GIN ("searchVector");
|
||||
|
||||
-- Trigger to auto-update searchVector on insert/update
|
||||
CREATE OR REPLACE FUNCTION packages_search_vector_update() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
NEW."searchVector" := to_tsvector('english',
|
||||
coalesce(NEW."fileName", '') || ' ' || coalesce(NEW."creator", '') || ' ' || coalesce(NEW."sourceCaption", '')
|
||||
);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
DROP TRIGGER IF EXISTS packages_search_vector_trigger ON "packages";
|
||||
CREATE TRIGGER packages_search_vector_trigger
|
||||
BEFORE INSERT OR UPDATE OF "fileName", "creator", "sourceCaption"
|
||||
ON "packages"
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION packages_search_vector_update();
|
||||
@@ -429,10 +429,13 @@ model TelegramChannel {
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
autoGroupEnabled Boolean @default(true)
|
||||
|
||||
accountMaps AccountChannelMap[]
|
||||
packages Package[]
|
||||
skippedPackages SkippedPackage[]
|
||||
packageGroups PackageGroup[]
|
||||
groupingRules GroupingRule[]
|
||||
|
||||
@@index([type, isActive])
|
||||
@@index([category])
|
||||
@@ -474,6 +477,8 @@ model Package {
|
||||
partCount Int @default(1)
|
||||
fileCount Int @default(0)
|
||||
tags String[] @default([])
|
||||
sourceCaption String? // Caption text from source Telegram message
|
||||
replyToMessageId BigInt? // reply_to_message_id from source message (for reply chain grouping)
|
||||
previewData Bytes? // JPEG thumbnail from nearby Telegram photo (stored as raw bytes)
|
||||
previewMsgId BigInt? // Telegram message ID of the matched photo
|
||||
packageGroupId String?
|
||||
@@ -522,6 +527,7 @@ model PackageGroup {
|
||||
name String
|
||||
mediaAlbumId String?
|
||||
sourceChannelId String
|
||||
groupingSource GroupingSource @default(MANUAL)
|
||||
previewData Bytes?
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
@@ -802,3 +808,60 @@ model KickstarterPackage {
|
||||
@@id([kickstarterId, packageId])
|
||||
@@map("kickstarter_packages")
|
||||
}
|
||||
|
||||
// ── Grouping & Notifications ──
|
||||
|
||||
enum GroupingSource {
|
||||
ALBUM
|
||||
MANUAL
|
||||
AUTO_TIME
|
||||
AUTO_PATTERN
|
||||
AUTO_REPLY
|
||||
AUTO_ZIP
|
||||
AUTO_CAPTION
|
||||
}
|
||||
|
||||
enum NotificationType {
|
||||
HASH_MISMATCH
|
||||
MISSING_PART
|
||||
UPLOAD_FAILED
|
||||
DOWNLOAD_FAILED
|
||||
GROUPING_CONFLICT
|
||||
INTEGRITY_AUDIT
|
||||
}
|
||||
|
||||
enum NotificationSeverity {
|
||||
INFO
|
||||
WARNING
|
||||
ERROR
|
||||
}
|
||||
|
||||
model SystemNotification {
|
||||
id String @id @default(cuid())
|
||||
type NotificationType
|
||||
severity NotificationSeverity @default(INFO)
|
||||
title String
|
||||
message String
|
||||
context Json?
|
||||
isRead Boolean @default(false)
|
||||
createdAt DateTime @default(now())
|
||||
|
||||
@@index([isRead, createdAt])
|
||||
@@index([type])
|
||||
@@map("system_notifications")
|
||||
}
|
||||
|
||||
model GroupingRule {
|
||||
id String @id @default(cuid())
|
||||
sourceChannelId String
|
||||
pattern String // Regex or keyword pattern learned from manual grouping
|
||||
signalType GroupingSource // Which grouping signal this rule applies to
|
||||
confidence Float @default(1.0)
|
||||
createdAt DateTime @default(now())
|
||||
createdByGroupId String? // The manual group that spawned this rule
|
||||
|
||||
sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id], onDelete: Cascade)
|
||||
|
||||
@@index([sourceChannelId])
|
||||
@@map("grouping_rules")
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"use client";
|
||||
|
||||
import { type ColumnDef } from "@tanstack/react-table";
|
||||
import { FileArchive, Eye, ChevronRight, Layers, Ungroup, Send, ImagePlus } from "lucide-react";
|
||||
import { FileArchive, Eye, ChevronRight, Layers, Ungroup, Send, ImagePlus, GitMerge } from "lucide-react";
|
||||
import { DataTableColumnHeader } from "@/components/shared/data-table-column-header";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import { Button } from "@/components/ui/button";
|
||||
@@ -69,6 +69,9 @@ interface PackageColumnsProps {
|
||||
onGroupPreviewUpload: (groupId: string) => void;
|
||||
selectedPackages: Set<string>;
|
||||
onToggleSelect: (packageId: string) => void;
|
||||
mergeSourceId: string | null;
|
||||
onStartMerge: (groupId: string) => void;
|
||||
onCompleteMerge: (targetGroupId: string) => void;
|
||||
}
|
||||
|
||||
export function formatBytes(bytesStr: string): string {
|
||||
@@ -148,6 +151,9 @@ export function getPackageColumns({
|
||||
onGroupPreviewUpload,
|
||||
selectedPackages,
|
||||
onToggleSelect,
|
||||
mergeSourceId,
|
||||
onStartMerge,
|
||||
onCompleteMerge,
|
||||
}: PackageColumnsProps): ColumnDef<StlTableRow, unknown>[] {
|
||||
return [
|
||||
{
|
||||
@@ -392,6 +398,8 @@ export function getPackageColumns({
|
||||
cell: ({ row }) => {
|
||||
const data = row.original;
|
||||
if (isGroupRow(data)) {
|
||||
const isMergeSource = mergeSourceId === data.id;
|
||||
const canMergeHere = mergeSourceId !== null && mergeSourceId !== data.id;
|
||||
return (
|
||||
<div className="flex items-center gap-0.5">
|
||||
<Button
|
||||
@@ -403,6 +411,26 @@ export function getPackageColumns({
|
||||
>
|
||||
<Send className="h-4 w-4" />
|
||||
</Button>
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="icon"
|
||||
className={`h-8 w-8 ${isMergeSource ? "text-amber-500 bg-amber-500/10 hover:bg-amber-500/20" : ""}`}
|
||||
onClick={() => onStartMerge(data.id)}
|
||||
title={isMergeSource ? "Cancel merge (this group is the merge source)" : "Start merge — mark this group as merge source"}
|
||||
>
|
||||
<GitMerge className="h-4 w-4" />
|
||||
</Button>
|
||||
{canMergeHere && (
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="icon"
|
||||
className="h-8 w-8 text-primary bg-primary/10 hover:bg-primary/20"
|
||||
onClick={() => onCompleteMerge(data.id)}
|
||||
title="Merge source group into this group"
|
||||
>
|
||||
<Layers className="h-4 w-4" />
|
||||
</Button>
|
||||
)}
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="icon"
|
||||
|
||||
@@ -38,7 +38,7 @@ import {
|
||||
} from "@/components/ui/dialog";
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import type { DisplayItem, IngestionAccountStatus } from "@/lib/telegram/types";
|
||||
import type { DisplayItem, IngestionAccountStatus, PackageListItem } from "@/lib/telegram/types";
|
||||
import type { SkippedRow } from "./skipped-columns";
|
||||
import {
|
||||
updatePackageCreator,
|
||||
@@ -49,6 +49,7 @@ import {
|
||||
removeFromGroupAction,
|
||||
sendAllInGroupAction,
|
||||
updateGroupPreviewAction,
|
||||
mergeGroupsAction,
|
||||
} from "../actions";
|
||||
|
||||
interface StlTableProps {
|
||||
@@ -61,6 +62,9 @@ interface StlTableProps {
|
||||
skippedData: SkippedRow[];
|
||||
skippedPageCount: number;
|
||||
skippedTotalCount: number;
|
||||
ungroupedData: PackageListItem[];
|
||||
ungroupedPageCount: number;
|
||||
ungroupedTotalCount: number;
|
||||
}
|
||||
|
||||
export function StlTable({
|
||||
@@ -73,6 +77,9 @@ export function StlTable({
|
||||
skippedData,
|
||||
skippedPageCount,
|
||||
skippedTotalCount,
|
||||
ungroupedData,
|
||||
ungroupedPageCount,
|
||||
ungroupedTotalCount,
|
||||
}: StlTableProps) {
|
||||
const router = useRouter();
|
||||
const pathname = usePathname();
|
||||
@@ -96,6 +103,9 @@ export function StlTable({
|
||||
const previewInputRef = useRef<HTMLInputElement>(null);
|
||||
const [uploadGroupId, setUploadGroupId] = useState<string | null>(null);
|
||||
|
||||
// Group merge state
|
||||
const [mergeSourceId, setMergeSourceId] = useState<string | null>(null);
|
||||
|
||||
const toggleGroup = useCallback((groupId: string) => {
|
||||
setExpandedGroups((prev) => {
|
||||
const next = new Set(prev);
|
||||
@@ -334,6 +344,35 @@ export function StlTable({
|
||||
[uploadGroupId, router]
|
||||
);
|
||||
|
||||
const handleStartMerge = useCallback((groupId: string) => {
|
||||
setMergeSourceId((prev) => {
|
||||
if (prev === groupId) {
|
||||
toast.info("Merge cancelled");
|
||||
return null;
|
||||
}
|
||||
toast.info("Merge source selected — click the merge-here button on the target group");
|
||||
return groupId;
|
||||
});
|
||||
}, []);
|
||||
|
||||
const handleMergeGroups = useCallback(
|
||||
(targetGroupId: string) => {
|
||||
if (!mergeSourceId) return;
|
||||
const sourceId = mergeSourceId;
|
||||
startTransition(async () => {
|
||||
const result = await mergeGroupsAction(targetGroupId, sourceId);
|
||||
if (result.success) {
|
||||
toast.success("Groups merged successfully");
|
||||
setMergeSourceId(null);
|
||||
router.refresh();
|
||||
} else {
|
||||
toast.error(result.error);
|
||||
}
|
||||
});
|
||||
},
|
||||
[mergeSourceId, router]
|
||||
);
|
||||
|
||||
const columns = getPackageColumns({
|
||||
onViewFiles: (pkg) => setViewPkg(pkg),
|
||||
searchTerm,
|
||||
@@ -375,10 +414,30 @@ export function StlTable({
|
||||
onGroupPreviewUpload: handleGroupPreviewUpload,
|
||||
selectedPackages,
|
||||
onToggleSelect: toggleSelect,
|
||||
mergeSourceId,
|
||||
onStartMerge: handleStartMerge,
|
||||
onCompleteMerge: handleMergeGroups,
|
||||
});
|
||||
|
||||
const { table } = useDataTable({ data: tableRows, columns, pageCount });
|
||||
|
||||
const ungroupedRows: StlTableRow[] = useMemo(
|
||||
() =>
|
||||
ungroupedData.map((pkg) => ({
|
||||
...pkg,
|
||||
_rowType: "package" as const,
|
||||
_groupId: null,
|
||||
_isGroupMember: false,
|
||||
})),
|
||||
[ungroupedData]
|
||||
);
|
||||
|
||||
const { table: ungroupedTable } = useDataTable({
|
||||
data: ungroupedRows,
|
||||
columns,
|
||||
pageCount: ungroupedPageCount,
|
||||
});
|
||||
|
||||
const activeTag = searchParams.get("tag") ?? "";
|
||||
|
||||
return (
|
||||
@@ -401,6 +460,14 @@ export function StlTable({
|
||||
</Badge>
|
||||
)}
|
||||
</TabsTrigger>
|
||||
<TabsTrigger value="ungrouped" className="gap-1.5">
|
||||
Ungrouped
|
||||
{ungroupedTotalCount > 0 && (
|
||||
<Badge variant="secondary" className="h-5 px-1.5 text-[10px]">
|
||||
{ungroupedTotalCount}
|
||||
</Badge>
|
||||
)}
|
||||
</TabsTrigger>
|
||||
</TabsList>
|
||||
|
||||
<TabsContent value="packages" className="space-y-4">
|
||||
@@ -472,6 +539,11 @@ export function StlTable({
|
||||
totalCount={skippedTotalCount}
|
||||
/>
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="ungrouped" className="space-y-4">
|
||||
<DataTable table={ungroupedTable} emptyMessage="All packages are grouped!" />
|
||||
<DataTablePagination table={ungroupedTable} totalCount={ungroupedTotalCount} />
|
||||
</TabsContent>
|
||||
</Tabs>
|
||||
|
||||
<PackageFilesDrawer
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
createManualGroup,
|
||||
removePackageFromGroup,
|
||||
dissolveGroup,
|
||||
mergeGroups,
|
||||
} from "@/lib/telegram/queries";
|
||||
|
||||
const ALLOWED_IMAGE_TYPES = [
|
||||
@@ -185,6 +186,62 @@ export async function setPreviewFromExtract(
|
||||
}
|
||||
}
|
||||
|
||||
export async function repairPackageAction(
|
||||
packageId: string
|
||||
): Promise<ActionResult> {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
|
||||
|
||||
try {
|
||||
const pkg = await prisma.package.findUnique({
|
||||
where: { id: packageId },
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
sourceChannelId: true,
|
||||
sourceMessageId: true,
|
||||
destChannelId: true,
|
||||
destMessageId: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!pkg) return { success: false, error: "Package not found" };
|
||||
|
||||
// Clear the destination info so the worker re-processes it
|
||||
await prisma.package.update({
|
||||
where: { id: packageId },
|
||||
data: {
|
||||
destMessageId: null,
|
||||
destMessageIds: [],
|
||||
destChannelId: null,
|
||||
},
|
||||
});
|
||||
|
||||
// Reset the channel watermark to before this message so worker picks it up
|
||||
await prisma.accountChannelMap.updateMany({
|
||||
where: {
|
||||
channelId: pkg.sourceChannelId,
|
||||
lastProcessedMessageId: { gte: pkg.sourceMessageId },
|
||||
},
|
||||
data: { lastProcessedMessageId: pkg.sourceMessageId - BigInt(1) },
|
||||
});
|
||||
|
||||
// Mark related notifications as read
|
||||
await prisma.systemNotification.updateMany({
|
||||
where: {
|
||||
context: { path: ["packageId"], equals: packageId },
|
||||
isRead: false,
|
||||
},
|
||||
data: { isRead: true },
|
||||
});
|
||||
|
||||
revalidatePath("/stls");
|
||||
return { success: true, data: undefined };
|
||||
} catch {
|
||||
return { success: false, error: "Failed to schedule repair" };
|
||||
}
|
||||
}
|
||||
|
||||
export async function retrySkippedPackageAction(
|
||||
id: string
|
||||
): Promise<ActionResult> {
|
||||
@@ -435,6 +492,26 @@ export async function updateGroupPreviewAction(
|
||||
}
|
||||
}
|
||||
|
||||
export async function mergeGroupsAction(
|
||||
targetGroupId: string,
|
||||
sourceGroupId: string
|
||||
): Promise<ActionResult> {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
|
||||
|
||||
if (targetGroupId === sourceGroupId) {
|
||||
return { success: false, error: "Cannot merge a group with itself" };
|
||||
}
|
||||
|
||||
try {
|
||||
await mergeGroups(targetGroupId, sourceGroupId);
|
||||
revalidatePath("/stls");
|
||||
return { success: true, data: undefined };
|
||||
} catch {
|
||||
return { success: false, error: "Failed to merge groups" };
|
||||
}
|
||||
}
|
||||
|
||||
export async function sendAllInGroupAction(
|
||||
groupId: string
|
||||
): Promise<ActionResult> {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { auth } from "@/lib/auth";
|
||||
import { redirect } from "next/navigation";
|
||||
import { listDisplayItems, searchPackages, getIngestionStatus, getAllPackageTags, listSkippedPackages, countSkippedPackages } from "@/lib/telegram/queries";
|
||||
import { listDisplayItems, searchPackages, getIngestionStatus, getAllPackageTags, listSkippedPackages, countSkippedPackages, listUngroupedPackages, countUngroupedPackages } from "@/lib/telegram/queries";
|
||||
import { StlTable } from "./_components/stl-table";
|
||||
import type { DisplayItem, PackageListItem } from "@/lib/telegram/types";
|
||||
|
||||
@@ -24,7 +24,7 @@ export default async function StlFilesPage({ searchParams }: Props) {
|
||||
const tab = (params.tab as string) ?? "packages";
|
||||
|
||||
// Fetch packages, ingestion status, tags, and skipped count in parallel
|
||||
const [result, ingestionStatus, availableTags, skippedCount] = await Promise.all([
|
||||
const [result, ingestionStatus, availableTags, skippedCount, ungroupedCount] = await Promise.all([
|
||||
search
|
||||
? searchPackages({
|
||||
query: search,
|
||||
@@ -43,6 +43,7 @@ export default async function StlFilesPage({ searchParams }: Props) {
|
||||
getIngestionStatus(),
|
||||
getAllPackageTags(),
|
||||
countSkippedPackages(),
|
||||
countUngroupedPackages(),
|
||||
]);
|
||||
|
||||
// For search results, wrap as DisplayItem[]; for non-search, already DisplayItem[]
|
||||
@@ -55,6 +56,11 @@ export default async function StlFilesPage({ searchParams }: Props) {
|
||||
? await listSkippedPackages({ page, limit: perPage })
|
||||
: null;
|
||||
|
||||
// Fetch ungrouped packages only if on that tab
|
||||
const ungroupedResult = tab === "ungrouped"
|
||||
? await listUngroupedPackages({ page, limit: perPage })
|
||||
: null;
|
||||
|
||||
return (
|
||||
<StlTable
|
||||
data={displayItems}
|
||||
@@ -66,6 +72,9 @@ export default async function StlFilesPage({ searchParams }: Props) {
|
||||
skippedData={skippedResult?.items ?? []}
|
||||
skippedPageCount={skippedResult?.pagination.totalPages ?? 0}
|
||||
skippedTotalCount={skippedCount}
|
||||
ungroupedData={ungroupedResult?.items ?? []}
|
||||
ungroupedPageCount={ungroupedResult?.pagination.totalPages ?? 0}
|
||||
ungroupedTotalCount={ungroupedCount}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -291,10 +291,25 @@ export async function setChannelCategory(
|
||||
if (!admin.success) return admin;
|
||||
|
||||
try {
|
||||
const existing = await prisma.telegramChannel.findUnique({
|
||||
where: { id },
|
||||
select: { category: true },
|
||||
});
|
||||
if (!existing) return { success: false, error: "Channel not found" };
|
||||
|
||||
const oldCategory = existing.category;
|
||||
const newCategory = category?.trim() || null;
|
||||
|
||||
await prisma.telegramChannel.update({
|
||||
where: { id },
|
||||
data: { category: category?.trim() || null },
|
||||
data: { category: newCategory },
|
||||
});
|
||||
|
||||
// Retroactively re-tag packages from this channel when category changes
|
||||
if (oldCategory !== newCategory && newCategory) {
|
||||
await retagChannelPackages(id, oldCategory, newCategory);
|
||||
}
|
||||
|
||||
revalidatePath("/telegram");
|
||||
return { success: true, data: undefined };
|
||||
} catch {
|
||||
@@ -302,6 +317,50 @@ export async function setChannelCategory(
|
||||
}
|
||||
}
|
||||
|
||||
export async function retagChannelPackages(
|
||||
channelId: string,
|
||||
oldCategory: string | null,
|
||||
newCategory: string
|
||||
): Promise<ActionResult<{ updated: number }>> {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
|
||||
|
||||
try {
|
||||
// Find packages from this channel that have the old category tag (or no category tag)
|
||||
const packages = await prisma.package.findMany({
|
||||
where: { sourceChannelId: channelId },
|
||||
select: { id: true, tags: true },
|
||||
});
|
||||
|
||||
let updated = 0;
|
||||
for (const pkg of packages) {
|
||||
const tags = [...pkg.tags];
|
||||
// Remove old category tag if present
|
||||
if (oldCategory) {
|
||||
const idx = tags.indexOf(oldCategory);
|
||||
if (idx !== -1) tags.splice(idx, 1);
|
||||
}
|
||||
// Add new category tag if not already present
|
||||
if (!tags.includes(newCategory)) {
|
||||
tags.push(newCategory);
|
||||
}
|
||||
// Only update if tags actually changed
|
||||
if (JSON.stringify(tags) !== JSON.stringify(pkg.tags)) {
|
||||
await prisma.package.update({
|
||||
where: { id: pkg.id },
|
||||
data: { tags },
|
||||
});
|
||||
updated++;
|
||||
}
|
||||
}
|
||||
|
||||
revalidatePath("/stls");
|
||||
return { success: true, data: { updated } };
|
||||
} catch {
|
||||
return { success: false, error: "Failed to re-tag packages" };
|
||||
}
|
||||
}
|
||||
|
||||
export async function setChannelType(
|
||||
id: string,
|
||||
type: "SOURCE" | "DESTINATION"
|
||||
|
||||
26
src/app/api/notifications/read/route.ts
Normal file
26
src/app/api/notifications/read/route.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { NextResponse } from "next/server";
|
||||
import { auth } from "@/lib/auth";
|
||||
import {
|
||||
markNotificationRead,
|
||||
markAllNotificationsRead,
|
||||
} from "@/data/notification.queries";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
export async function POST(request: Request) {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
|
||||
}
|
||||
|
||||
const body = await request.json().catch(() => ({}));
|
||||
const id = body.id as string | undefined;
|
||||
|
||||
if (id) {
|
||||
await markNotificationRead(id);
|
||||
} else {
|
||||
await markAllNotificationsRead();
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true });
|
||||
}
|
||||
43
src/app/api/notifications/repair/route.ts
Normal file
43
src/app/api/notifications/repair/route.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { NextResponse } from "next/server";
|
||||
import { auth } from "@/lib/auth";
|
||||
import { prisma } from "@/lib/prisma";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
export async function POST(request: Request) {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
|
||||
}
|
||||
|
||||
const body = await request.json().catch(() => ({}));
|
||||
const notificationId = body.notificationId as string;
|
||||
if (!notificationId) {
|
||||
return NextResponse.json({ error: "notificationId required" }, { status: 400 });
|
||||
}
|
||||
|
||||
const notification = await prisma.systemNotification.findUnique({
|
||||
where: { id: notificationId },
|
||||
});
|
||||
|
||||
if (!notification) {
|
||||
return NextResponse.json({ error: "Notification not found" }, { status: 404 });
|
||||
}
|
||||
|
||||
const context = notification.context as Record<string, unknown> | null;
|
||||
const packageId = context?.packageId as string | undefined;
|
||||
|
||||
if (!packageId) {
|
||||
return NextResponse.json({ error: "Notification has no associated package" }, { status: 400 });
|
||||
}
|
||||
|
||||
// Import and call the repair action
|
||||
const { repairPackageAction } = await import("@/app/(app)/stls/actions");
|
||||
const result = await repairPackageAction(packageId);
|
||||
|
||||
if (!result.success) {
|
||||
return NextResponse.json({ error: result.error }, { status: 500 });
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true });
|
||||
}
|
||||
27
src/app/api/notifications/route.ts
Normal file
27
src/app/api/notifications/route.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { NextResponse } from "next/server";
|
||||
import { auth } from "@/lib/auth";
|
||||
import {
|
||||
getRecentNotifications,
|
||||
getUnreadNotificationCount,
|
||||
} from "@/data/notification.queries";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
export async function GET() {
|
||||
const session = await auth();
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
|
||||
}
|
||||
|
||||
const [notifications, unreadCount] = await Promise.all([
|
||||
getRecentNotifications(30),
|
||||
getUnreadNotificationCount(),
|
||||
]);
|
||||
|
||||
const serialized = notifications.map((n) => ({
|
||||
...n,
|
||||
createdAt: n.createdAt.toISOString(),
|
||||
}));
|
||||
|
||||
return NextResponse.json({ notifications: serialized, unreadCount });
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import { Button } from "@/components/ui/button";
|
||||
import { Sheet, SheetContent, SheetTrigger } from "@/components/ui/sheet";
|
||||
import { UserMenu } from "./user-menu";
|
||||
import { MobileSidebar } from "./mobile-sidebar";
|
||||
import { NotificationBell } from "./notification-bell";
|
||||
|
||||
const routeTitles: Record<string, string> = {
|
||||
"/dashboard": "Dashboard",
|
||||
@@ -38,7 +39,8 @@ export function Header() {
|
||||
|
||||
<h1 className="text-lg font-semibold">{title}</h1>
|
||||
|
||||
<div className="ml-auto">
|
||||
<div className="ml-auto flex items-center gap-1">
|
||||
<NotificationBell />
|
||||
<UserMenu />
|
||||
</div>
|
||||
</header>
|
||||
|
||||
220
src/components/layout/notification-bell.tsx
Normal file
220
src/components/layout/notification-bell.tsx
Normal file
@@ -0,0 +1,220 @@
|
||||
"use client";
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { Bell, AlertTriangle, AlertCircle, Info, CheckCircle2 } from "lucide-react";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import {
|
||||
Popover,
|
||||
PopoverContent,
|
||||
PopoverTrigger,
|
||||
} from "@/components/ui/popover";
|
||||
import { ScrollArea } from "@/components/ui/scroll-area";
|
||||
import { toast } from "sonner";
|
||||
|
||||
interface Notification {
|
||||
id: string;
|
||||
type: string;
|
||||
severity: "INFO" | "WARNING" | "ERROR";
|
||||
title: string;
|
||||
message: string;
|
||||
isRead: boolean;
|
||||
createdAt: string;
|
||||
}
|
||||
|
||||
const severityIcon = {
|
||||
INFO: Info,
|
||||
WARNING: AlertTriangle,
|
||||
ERROR: AlertCircle,
|
||||
};
|
||||
|
||||
const severityColor = {
|
||||
INFO: "text-blue-400",
|
||||
WARNING: "text-orange-400",
|
||||
ERROR: "text-red-400",
|
||||
};
|
||||
|
||||
export function NotificationBell() {
|
||||
const [notifications, setNotifications] = useState<Notification[]>([]);
|
||||
const [unreadCount, setUnreadCount] = useState(0);
|
||||
const [open, setOpen] = useState(false);
|
||||
|
||||
const fetchNotifications = useCallback(async () => {
|
||||
try {
|
||||
const res = await fetch("/api/notifications");
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
setNotifications(data.notifications ?? []);
|
||||
setUnreadCount(data.unreadCount ?? 0);
|
||||
}
|
||||
} catch {
|
||||
// Ignore fetch errors
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Poll every 30 seconds + on mount
|
||||
useEffect(() => {
|
||||
fetchNotifications();
|
||||
const interval = setInterval(fetchNotifications, 30_000);
|
||||
return () => clearInterval(interval);
|
||||
}, [fetchNotifications]);
|
||||
|
||||
// Refresh when popover opens
|
||||
useEffect(() => {
|
||||
if (open) fetchNotifications();
|
||||
}, [open, fetchNotifications]);
|
||||
|
||||
async function handleMarkAllRead() {
|
||||
try {
|
||||
await fetch("/api/notifications/read", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({}),
|
||||
});
|
||||
setNotifications((prev) => prev.map((n) => ({ ...n, isRead: true })));
|
||||
setUnreadCount(0);
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
async function handleMarkRead(id: string) {
|
||||
try {
|
||||
await fetch("/api/notifications/read", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ id }),
|
||||
});
|
||||
setNotifications((prev) =>
|
||||
prev.map((n) => (n.id === id ? { ...n, isRead: true } : n))
|
||||
);
|
||||
setUnreadCount((c) => Math.max(0, c - 1));
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
async function handleRepair(notificationId: string) {
|
||||
try {
|
||||
const res = await fetch("/api/notifications/repair", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ notificationId }),
|
||||
});
|
||||
if (res.ok) {
|
||||
toast.success("Repair scheduled — package will be re-processed on next cycle");
|
||||
fetchNotifications();
|
||||
}
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
function formatTime(iso: string): string {
|
||||
const d = new Date(iso);
|
||||
const now = new Date();
|
||||
const diffMs = now.getTime() - d.getTime();
|
||||
const diffMin = Math.floor(diffMs / 60_000);
|
||||
if (diffMin < 1) return "just now";
|
||||
if (diffMin < 60) return `${diffMin}m ago`;
|
||||
const diffHr = Math.floor(diffMin / 60);
|
||||
if (diffHr < 24) return `${diffHr}h ago`;
|
||||
const diffDay = Math.floor(diffHr / 24);
|
||||
return `${diffDay}d ago`;
|
||||
}
|
||||
|
||||
return (
|
||||
<Popover open={open} onOpenChange={setOpen}>
|
||||
<PopoverTrigger asChild>
|
||||
<Button variant="ghost" size="icon" className="relative h-9 w-9">
|
||||
<Bell className="h-4 w-4" />
|
||||
{unreadCount > 0 && (
|
||||
<Badge
|
||||
variant="destructive"
|
||||
className="absolute -top-1 -right-1 h-4 min-w-4 px-1 text-[10px] leading-none"
|
||||
>
|
||||
{unreadCount > 99 ? "99+" : unreadCount}
|
||||
</Badge>
|
||||
)}
|
||||
</Button>
|
||||
</PopoverTrigger>
|
||||
<PopoverContent className="w-96 p-0" align="end">
|
||||
<div className="flex items-center justify-between border-b px-4 py-3">
|
||||
<h3 className="text-sm font-semibold">Notifications</h3>
|
||||
{unreadCount > 0 && (
|
||||
<Button
|
||||
variant="ghost"
|
||||
size="sm"
|
||||
className="h-7 text-xs"
|
||||
onClick={handleMarkAllRead}
|
||||
>
|
||||
Mark all read
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
<ScrollArea className="max-h-[400px]">
|
||||
{notifications.length === 0 ? (
|
||||
<div className="flex flex-col items-center justify-center py-8 text-muted-foreground">
|
||||
<CheckCircle2 className="h-8 w-8 mb-2 opacity-50" />
|
||||
<p className="text-sm">All clear!</p>
|
||||
</div>
|
||||
) : (
|
||||
<div className="divide-y">
|
||||
{notifications.map((n) => {
|
||||
const Icon = severityIcon[n.severity] ?? Info;
|
||||
const color = severityColor[n.severity] ?? "text-muted-foreground";
|
||||
return (
|
||||
<div
|
||||
key={n.id}
|
||||
className={`flex w-full gap-3 px-4 py-3 text-left hover:bg-muted/50 transition-colors ${
|
||||
!n.isRead ? "bg-muted/20" : ""
|
||||
}`}
|
||||
role="button"
|
||||
tabIndex={0}
|
||||
onClick={() => !n.isRead && handleMarkRead(n.id)}
|
||||
onKeyDown={(e) => {
|
||||
if (e.key === "Enter" || e.key === " ") {
|
||||
if (!n.isRead) handleMarkRead(n.id);
|
||||
}
|
||||
}}
|
||||
>
|
||||
<Icon className={`h-4 w-4 mt-0.5 shrink-0 ${color}`} />
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className="flex items-center gap-2">
|
||||
<p className={`text-sm truncate ${!n.isRead ? "font-medium" : ""}`}>
|
||||
{n.title}
|
||||
</p>
|
||||
{!n.isRead && (
|
||||
<span className="h-2 w-2 rounded-full bg-primary shrink-0" />
|
||||
)}
|
||||
</div>
|
||||
<p className="text-xs text-muted-foreground line-clamp-2 mt-0.5">
|
||||
{n.message}
|
||||
</p>
|
||||
<p className="text-[10px] text-muted-foreground mt-1">
|
||||
{formatTime(n.createdAt)}
|
||||
</p>
|
||||
{(n.type === "MISSING_PART" || n.type === "HASH_MISMATCH") && (
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
className="h-6 px-2 text-xs mt-1"
|
||||
onClick={(e) => {
|
||||
e.stopPropagation();
|
||||
handleRepair(n.id);
|
||||
}}
|
||||
>
|
||||
Repair
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
</ScrollArea>
|
||||
</PopoverContent>
|
||||
</Popover>
|
||||
);
|
||||
}
|
||||
37
src/data/notification.queries.ts
Normal file
37
src/data/notification.queries.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { prisma } from "@/lib/prisma";
|
||||
|
||||
export async function getUnreadNotificationCount(): Promise<number> {
|
||||
return prisma.systemNotification.count({
|
||||
where: { isRead: false },
|
||||
});
|
||||
}
|
||||
|
||||
export async function getRecentNotifications(limit = 20) {
|
||||
return prisma.systemNotification.findMany({
|
||||
orderBy: { createdAt: "desc" },
|
||||
take: limit,
|
||||
select: {
|
||||
id: true,
|
||||
type: true,
|
||||
severity: true,
|
||||
title: true,
|
||||
message: true,
|
||||
isRead: true,
|
||||
createdAt: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function markNotificationRead(id: string) {
|
||||
return prisma.systemNotification.update({
|
||||
where: { id },
|
||||
data: { isRead: true },
|
||||
});
|
||||
}
|
||||
|
||||
export async function markAllNotificationsRead() {
|
||||
return prisma.systemNotification.updateMany({
|
||||
where: { isRead: false },
|
||||
data: { isRead: true },
|
||||
});
|
||||
}
|
||||
@@ -340,6 +340,30 @@ export async function listPackageFiles(options: {
|
||||
};
|
||||
}
|
||||
|
||||
async function fullTextSearchPackageIds(query: string, limit: number): Promise<string[]> {
|
||||
// Convert user query to tsquery — handle multi-word by joining with &
|
||||
const tsQuery = query
|
||||
.trim()
|
||||
.split(/\s+/)
|
||||
.filter((w) => w.length >= 2)
|
||||
.map((w) => w.replace(/[^a-zA-Z0-9]/g, ""))
|
||||
.filter(Boolean)
|
||||
.join(" & ");
|
||||
|
||||
if (!tsQuery) return [];
|
||||
|
||||
const results = await prisma.$queryRawUnsafe<{ id: string }[]>(
|
||||
`SELECT id FROM packages
|
||||
WHERE "searchVector" @@ to_tsquery('english', $1)
|
||||
ORDER BY ts_rank("searchVector", to_tsquery('english', $1)) DESC
|
||||
LIMIT $2`,
|
||||
tsQuery,
|
||||
limit
|
||||
);
|
||||
|
||||
return results.map((r) => r.id);
|
||||
}
|
||||
|
||||
export async function searchPackages(options: {
|
||||
query: string;
|
||||
page: number;
|
||||
@@ -366,14 +390,26 @@ export async function searchPackages(options: {
|
||||
);
|
||||
const fileMatchedIds = fileMatches.map((f) => f.packageId);
|
||||
|
||||
// Try full-text search first (better ranking, handles word stemming)
|
||||
let ftsPackageNameIds: string[] = [];
|
||||
if (options.searchIn === "both" && q.length >= 3) {
|
||||
try {
|
||||
ftsPackageNameIds = await fullTextSearchPackageIds(q, 200);
|
||||
} catch {
|
||||
// FTS failed — fall back to ILIKE below
|
||||
}
|
||||
}
|
||||
|
||||
const packageNameIds =
|
||||
options.searchIn === "both"
|
||||
? (
|
||||
await prisma.package.findMany({
|
||||
where: { fileName: { contains: q, mode: "insensitive" } },
|
||||
select: { id: true },
|
||||
})
|
||||
).map((p) => p.id)
|
||||
? ftsPackageNameIds.length > 0
|
||||
? ftsPackageNameIds
|
||||
: (
|
||||
await prisma.package.findMany({
|
||||
where: { fileName: { contains: q, mode: "insensitive" } },
|
||||
select: { id: true },
|
||||
})
|
||||
).map((p) => p.id)
|
||||
: [];
|
||||
|
||||
// Also match by group name
|
||||
@@ -571,6 +607,72 @@ export async function countSkippedPackages(): Promise<number> {
|
||||
return prisma.skippedPackage.count();
|
||||
}
|
||||
|
||||
export async function listUngroupedPackages(options: {
|
||||
page: number;
|
||||
limit: number;
|
||||
}) {
|
||||
const { page, limit } = options;
|
||||
const skip = (page - 1) * limit;
|
||||
|
||||
const where = { packageGroupId: null, destMessageId: { not: null } };
|
||||
|
||||
const [items, total] = await Promise.all([
|
||||
prisma.package.findMany({
|
||||
where,
|
||||
orderBy: { indexedAt: "desc" },
|
||||
skip,
|
||||
take: limit,
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
fileSize: true,
|
||||
archiveType: true,
|
||||
creator: true,
|
||||
fileCount: true,
|
||||
isMultipart: true,
|
||||
partCount: true,
|
||||
tags: true,
|
||||
indexedAt: true,
|
||||
previewData: true,
|
||||
sourceChannel: { select: { id: true, title: true } },
|
||||
},
|
||||
}),
|
||||
prisma.package.count({ where }),
|
||||
]);
|
||||
|
||||
return {
|
||||
items: items.map((p) => ({
|
||||
id: p.id,
|
||||
fileName: p.fileName,
|
||||
fileSize: p.fileSize.toString(),
|
||||
contentHash: "",
|
||||
archiveType: p.archiveType,
|
||||
creator: p.creator,
|
||||
fileCount: p.fileCount,
|
||||
isMultipart: p.isMultipart,
|
||||
partCount: p.partCount,
|
||||
tags: p.tags,
|
||||
indexedAt: p.indexedAt.toISOString(),
|
||||
hasPreview: !!p.previewData,
|
||||
sourceChannel: p.sourceChannel,
|
||||
matchedFileCount: 0,
|
||||
matchedByContent: false,
|
||||
})),
|
||||
pagination: {
|
||||
total,
|
||||
totalPages: Math.ceil(total / limit),
|
||||
page,
|
||||
limit,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export async function countUngroupedPackages(): Promise<number> {
|
||||
return prisma.package.count({
|
||||
where: { packageGroupId: null, destMessageId: { not: null } },
|
||||
});
|
||||
}
|
||||
|
||||
export async function getPackageGroup(groupId: string) {
|
||||
return prisma.packageGroup.findUnique({
|
||||
where: { id: groupId },
|
||||
@@ -630,6 +732,53 @@ export async function createManualGroup(name: string, packageIds: string[]) {
|
||||
data: { packageGroupId: group.id },
|
||||
});
|
||||
|
||||
// Learn a grouping rule from the manual override
|
||||
try {
|
||||
const linkedPkgs = await prisma.package.findMany({
|
||||
where: { id: { in: packageIds } },
|
||||
select: { fileName: true, creator: true },
|
||||
});
|
||||
|
||||
// Extract the common filename pattern
|
||||
const fileNames = linkedPkgs.map((p) => p.fileName);
|
||||
let pattern = "";
|
||||
if (fileNames.length > 1) {
|
||||
// Find longest common prefix
|
||||
let prefix = fileNames[0];
|
||||
for (let i = 1; i < fileNames.length; i++) {
|
||||
while (!fileNames[i].startsWith(prefix)) {
|
||||
prefix = prefix.slice(0, -1);
|
||||
if (!prefix) break;
|
||||
}
|
||||
}
|
||||
const trimmed = prefix.replace(/[\s\-_.(]+$/, "");
|
||||
if (trimmed.length >= 4) {
|
||||
pattern = trimmed;
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to shared creator
|
||||
if (!pattern) {
|
||||
const creators = [...new Set(linkedPkgs.map((p) => p.creator).filter(Boolean))];
|
||||
if (creators.length === 1 && creators[0]) {
|
||||
pattern = creators[0];
|
||||
}
|
||||
}
|
||||
|
||||
if (pattern) {
|
||||
await prisma.groupingRule.create({
|
||||
data: {
|
||||
sourceChannelId: firstPkg.sourceChannelId,
|
||||
pattern,
|
||||
signalType: "MANUAL",
|
||||
createdByGroupId: group.id,
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// Best-effort — don't fail the group creation if rule learning fails
|
||||
}
|
||||
|
||||
// Clean up empty groups left behind
|
||||
await prisma.packageGroup.deleteMany({
|
||||
where: { packages: { none: {} }, id: { not: group.id } },
|
||||
@@ -670,3 +819,13 @@ export async function dissolveGroup(groupId: string) {
|
||||
});
|
||||
await prisma.packageGroup.delete({ where: { id: groupId } });
|
||||
}
|
||||
|
||||
export async function mergeGroups(targetGroupId: string, sourceGroupId: string) {
|
||||
// Move all packages from source group to target group
|
||||
await prisma.package.updateMany({
|
||||
where: { packageGroupId: sourceGroupId },
|
||||
data: { packageGroupId: targetGroupId },
|
||||
});
|
||||
// Delete the now-empty source group
|
||||
await prisma.packageGroup.delete({ where: { id: sourceGroupId } });
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ export interface TelegramMessage {
|
||||
fileSize: bigint;
|
||||
date: Date;
|
||||
mediaAlbumId?: string;
|
||||
replyToMessageId?: bigint; // NEW
|
||||
caption?: string; // NEW
|
||||
}
|
||||
|
||||
export interface ArchiveSet {
|
||||
|
||||
117
worker/src/audit.ts
Normal file
117
worker/src/audit.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { db } from "./db/client.js";
|
||||
import { childLogger } from "./util/logger.js";
|
||||
|
||||
const log = childLogger("audit");
|
||||
|
||||
/**
|
||||
* Periodic integrity audit: checks all packages for consistency.
|
||||
* Creates SystemNotification records for any issues found.
|
||||
*
|
||||
* Checks performed:
|
||||
* 1. Multipart completeness: destMessageIds.length should match partCount
|
||||
* 2. Missing destination: packages with destChannelId but no destMessageId
|
||||
*/
|
||||
export async function runIntegrityAudit(): Promise<{ checked: number; issues: number }> {
|
||||
log.info("Starting integrity audit");
|
||||
|
||||
let checked = 0;
|
||||
let issues = 0;
|
||||
|
||||
// Check 1: Multipart packages with wrong number of destination message IDs
|
||||
const multipartPackages = await db.package.findMany({
|
||||
where: {
|
||||
isMultipart: true,
|
||||
partCount: { gt: 1 },
|
||||
destMessageId: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
partCount: true,
|
||||
destMessageIds: true,
|
||||
sourceChannelId: true,
|
||||
sourceChannel: { select: { title: true } },
|
||||
},
|
||||
});
|
||||
|
||||
checked += multipartPackages.length;
|
||||
|
||||
for (const pkg of multipartPackages) {
|
||||
const actualParts = pkg.destMessageIds.length;
|
||||
if (actualParts > 0 && actualParts !== pkg.partCount) {
|
||||
issues++;
|
||||
|
||||
// Check if we already have a notification for this
|
||||
const existing = await db.systemNotification.findFirst({
|
||||
where: {
|
||||
type: "MISSING_PART",
|
||||
context: { path: ["packageId"], equals: pkg.id },
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (!existing) {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "MISSING_PART",
|
||||
severity: "WARNING",
|
||||
title: `Incomplete multipart: ${pkg.fileName}`,
|
||||
message: `Expected ${pkg.partCount} parts but only ${actualParts} destination message IDs stored`,
|
||||
context: {
|
||||
packageId: pkg.id,
|
||||
fileName: pkg.fileName,
|
||||
expectedParts: pkg.partCount,
|
||||
actualParts,
|
||||
sourceChannelId: pkg.sourceChannelId,
|
||||
channelTitle: pkg.sourceChannel.title,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
log.warn(
|
||||
{ packageId: pkg.id, fileName: pkg.fileName, expected: pkg.partCount, actual: actualParts },
|
||||
"Multipart package has mismatched part count"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check 2: Packages with dest channel but no dest message (orphaned index)
|
||||
const orphanedCount = await db.package.count({
|
||||
where: {
|
||||
destChannelId: { not: null },
|
||||
destMessageId: null,
|
||||
},
|
||||
});
|
||||
|
||||
if (orphanedCount > 0) {
|
||||
issues++;
|
||||
|
||||
const existing = await db.systemNotification.findFirst({
|
||||
where: {
|
||||
type: "INTEGRITY_AUDIT",
|
||||
context: { path: ["check"], equals: "orphaned_index" },
|
||||
createdAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (!existing) {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "INTEGRITY_AUDIT",
|
||||
severity: "INFO",
|
||||
title: `${orphanedCount} packages with missing destination message`,
|
||||
message: `Found ${orphanedCount} packages that have a destination channel set but no destination message ID. These may be from interrupted uploads.`,
|
||||
context: {
|
||||
check: "orphaned_index",
|
||||
count: orphanedCount,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
log.info({ checked, issues }, "Integrity audit complete");
|
||||
return { checked, issues };
|
||||
}
|
||||
@@ -119,6 +119,8 @@ export interface CreatePackageInput {
|
||||
tags?: string[];
|
||||
previewData?: Buffer | null;
|
||||
previewMsgId?: bigint | null;
|
||||
sourceCaption?: string | null;
|
||||
replyToMessageId?: bigint | null;
|
||||
files: {
|
||||
path: string;
|
||||
fileName: string;
|
||||
@@ -150,6 +152,8 @@ export async function createPackageWithFiles(input: CreatePackageInput) {
|
||||
tags: input.tags && input.tags.length > 0 ? input.tags : undefined,
|
||||
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
||||
previewMsgId: input.previewMsgId ?? undefined,
|
||||
sourceCaption: input.sourceCaption ?? undefined,
|
||||
replyToMessageId: input.replyToMessageId ?? undefined,
|
||||
files: {
|
||||
create: input.files,
|
||||
},
|
||||
@@ -587,3 +591,46 @@ export async function linkPackagesToGroup(
|
||||
data: { packageGroupId: groupId },
|
||||
});
|
||||
}
|
||||
|
||||
export async function createTimeWindowGroup(input: {
|
||||
sourceChannelId: string;
|
||||
name: string;
|
||||
packageIds: string[];
|
||||
}): Promise<string> {
|
||||
const group = await db.packageGroup.create({
|
||||
data: {
|
||||
sourceChannelId: input.sourceChannelId,
|
||||
name: input.name,
|
||||
groupingSource: "AUTO_TIME",
|
||||
},
|
||||
});
|
||||
|
||||
await db.package.updateMany({
|
||||
where: { id: { in: input.packageIds } },
|
||||
data: { packageGroupId: group.id },
|
||||
});
|
||||
|
||||
return group.id;
|
||||
}
|
||||
|
||||
export async function createAutoGroup(input: {
|
||||
sourceChannelId: string;
|
||||
name: string;
|
||||
packageIds: string[];
|
||||
groupingSource: "ALBUM" | "MANUAL" | "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY";
|
||||
}): Promise<string> {
|
||||
const group = await db.packageGroup.create({
|
||||
data: {
|
||||
sourceChannelId: input.sourceChannelId,
|
||||
name: input.name,
|
||||
groupingSource: input.groupingSource,
|
||||
},
|
||||
});
|
||||
|
||||
await db.package.updateMany({
|
||||
where: { id: { in: input.packageIds } },
|
||||
data: { packageGroupId: group.id },
|
||||
});
|
||||
|
||||
return group.id;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import type { Client } from "tdl";
|
||||
import type { TelegramPhoto } from "./preview/match.js";
|
||||
import { downloadPhotoThumbnail } from "./tdlib/download.js";
|
||||
import { createOrFindPackageGroup, linkPackagesToGroup } from "./db/queries.js";
|
||||
import { createOrFindPackageGroup, linkPackagesToGroup, createTimeWindowGroup, createAutoGroup } from "./db/queries.js";
|
||||
import { config } from "./util/config.js";
|
||||
import { childLogger } from "./util/logger.js";
|
||||
import { db } from "./db/client.js";
|
||||
|
||||
@@ -77,3 +78,591 @@ export async function processAlbumGroups(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply learned GroupingRules from manual overrides.
|
||||
* For each rule, find ungrouped packages whose fileName contains the pattern.
|
||||
*/
|
||||
export async function processRuleBasedGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const rules = await db.groupingRule.findMany({
|
||||
where: { sourceChannelId },
|
||||
orderBy: { confidence: "desc" },
|
||||
});
|
||||
|
||||
if (rules.length === 0) return;
|
||||
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
},
|
||||
select: { id: true, fileName: true, creator: true },
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
for (const rule of rules) {
|
||||
const matches = ungrouped.filter((pkg) => {
|
||||
const lower = rule.pattern.toLowerCase();
|
||||
return pkg.fileName.toLowerCase().includes(lower) ||
|
||||
(pkg.creator && pkg.creator.toLowerCase().includes(lower));
|
||||
});
|
||||
|
||||
if (matches.length < 2) continue;
|
||||
|
||||
// Check if any are already grouped (by a previous rule in this loop)
|
||||
const stillUngrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: matches.map((m) => m.id) },
|
||||
packageGroupId: null,
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (stillUngrouped.length < 2) continue;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name: rule.pattern,
|
||||
packageIds: stillUngrouped.map((m) => m.id),
|
||||
groupingSource: "MANUAL",
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, ruleId: rule.id, pattern: rule.pattern, memberCount: stillUngrouped.length },
|
||||
"Applied learned grouping rule"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, ruleId: rule.id }, "Failed to apply grouping rule");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* After album grouping, cluster remaining ungrouped packages from the same channel
|
||||
* that were posted within a configurable time window.
|
||||
* Only groups packages that were just indexed in this scan cycle (the `indexedPackages` list).
|
||||
*/
|
||||
export async function processTimeWindowGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
if (config.autoGroupTimeWindowMinutes <= 0) return;
|
||||
|
||||
// Find which of the just-indexed packages are still ungrouped
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
},
|
||||
orderBy: { sourceMessageId: "asc" },
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
sourceMessageId: true,
|
||||
indexedAt: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
const windowMs = config.autoGroupTimeWindowMinutes * 60 * 1000;
|
||||
|
||||
// Cluster by time proximity: walk through sorted list, start new cluster when gap > window
|
||||
const clusters: typeof ungrouped[] = [];
|
||||
let current: typeof ungrouped = [ungrouped[0]];
|
||||
|
||||
for (let i = 1; i < ungrouped.length; i++) {
|
||||
const prev = current[current.length - 1];
|
||||
const gap = Math.abs(ungrouped[i].indexedAt.getTime() - prev.indexedAt.getTime());
|
||||
|
||||
if (gap <= windowMs) {
|
||||
current.push(ungrouped[i]);
|
||||
} else {
|
||||
clusters.push(current);
|
||||
current = [ungrouped[i]];
|
||||
}
|
||||
}
|
||||
clusters.push(current);
|
||||
|
||||
// Create groups for clusters with 2+ packages
|
||||
for (const cluster of clusters) {
|
||||
if (cluster.length < 2) continue;
|
||||
|
||||
// Derive group name from common filename prefix
|
||||
const name = findCommonPrefix(cluster.map((p) => p.fileName)) || cluster[0].fileName;
|
||||
|
||||
try {
|
||||
const groupId = await createTimeWindowGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: cluster.map((p) => p.id),
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, name, memberCount: cluster.length },
|
||||
"Created time-window group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, clusterSize: cluster.length }, "Failed to create time-window group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that share a date pattern (YYYY-MM, YYYY_MM, etc.)
|
||||
* or project slug extracted from their filenames.
|
||||
*/
|
||||
export async function processPatternGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
},
|
||||
select: { id: true, fileName: true },
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Group by extracted pattern
|
||||
const patternMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
const pattern = extractPattern(pkg.fileName);
|
||||
if (!pattern) continue;
|
||||
const group = patternMap.get(pattern) ?? [];
|
||||
group.push(pkg);
|
||||
patternMap.set(pattern, group);
|
||||
}
|
||||
|
||||
for (const [pattern, members] of patternMap) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name: pattern,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_PATTERN",
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, pattern, memberCount: members.length },
|
||||
"Created pattern-based group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, pattern }, "Failed to create pattern group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a grouping pattern from a filename.
|
||||
* Matches: YYYY-MM, YYYY_MM, "Month Year", or a project prefix before common separators.
|
||||
* Returns null if no usable pattern found.
|
||||
*/
|
||||
function extractPattern(fileName: string): string | null {
|
||||
// Strip extension for matching
|
||||
const name = fileName.replace(/\.(zip|rar|7z|pdf|stl)(\.\d+)?$/i, "");
|
||||
|
||||
// Match YYYY-MM or YYYY_MM patterns
|
||||
const dateMatch = name.match(/(\d{4})[\-_](\d{2})/);
|
||||
if (dateMatch) {
|
||||
return `${dateMatch[1]}-${dateMatch[2]}`;
|
||||
}
|
||||
|
||||
// Match "Month Year" patterns (e.g., "January 2025", "Jan 2025")
|
||||
const months = "(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|aug(?:ust)?|sep(?:tember)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)";
|
||||
const monthYearMatch = name.match(new RegExp(`(${months})\\s*(\\d{4})`, "i"));
|
||||
if (monthYearMatch) {
|
||||
const monthStr = monthYearMatch[1].toLowerCase().slice(0, 3);
|
||||
const monthNum = ["jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"].indexOf(monthStr) + 1;
|
||||
if (monthNum > 0) {
|
||||
return `${monthYearMatch[2]}-${String(monthNum).padStart(2, "0")}`;
|
||||
}
|
||||
}
|
||||
|
||||
// Match project prefix: text before " - ", " – ", or "(". Must be at least 5 chars.
|
||||
const prefixMatch = name.match(/^(.{5,}?)(?:\s*[\-–]\s|\s*\()/);
|
||||
if (prefixMatch) {
|
||||
return prefixMatch[1].trim();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that share the same creator within a channel.
|
||||
* Only groups if there are 3+ packages from the same creator (to avoid
|
||||
* over-grouping when a creator only has a couple files).
|
||||
*/
|
||||
export async function processCreatorGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
creator: { not: null },
|
||||
},
|
||||
select: { id: true, fileName: true, creator: true },
|
||||
});
|
||||
|
||||
if (ungrouped.length < 3) return;
|
||||
|
||||
// Group by creator
|
||||
const creatorMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
if (!pkg.creator) continue;
|
||||
const key = pkg.creator.toLowerCase();
|
||||
const group = creatorMap.get(key) ?? [];
|
||||
group.push(pkg);
|
||||
creatorMap.set(key, group);
|
||||
}
|
||||
|
||||
for (const [, members] of creatorMap) {
|
||||
if (members.length < 3) continue;
|
||||
|
||||
const creatorName = members[0].creator!;
|
||||
const name = findCommonPrefix(members.map((m) => m.fileName)) || creatorName;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_PATTERN",
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, creator: creatorName, memberCount: members.length },
|
||||
"Created creator-based group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, creator: creatorName }, "Failed to create creator group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that share the same root folder inside their archives.
|
||||
* E.g., if two packages both contain files under "ProjectX/", they're likely related.
|
||||
* Only considers packages with 3+ files (to avoid false positives from flat archives).
|
||||
*/
|
||||
export async function processZipPathGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
// Find ungrouped packages that have indexed files
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
fileCount: { gte: 3 },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
files: {
|
||||
select: { path: true },
|
||||
take: 50,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Extract the dominant root folder for each package
|
||||
const packageRoots = new Map<string, { id: string; fileName: string }[]>();
|
||||
|
||||
for (const pkg of ungrouped) {
|
||||
const root = extractRootFolder(pkg.files.map((f) => f.path));
|
||||
if (!root) continue;
|
||||
|
||||
const key = root.toLowerCase();
|
||||
const group = packageRoots.get(key) ?? [];
|
||||
group.push({ id: pkg.id, fileName: pkg.fileName });
|
||||
packageRoots.set(key, group);
|
||||
}
|
||||
|
||||
// Create groups for roots shared by 2+ packages
|
||||
for (const [root, members] of packageRoots) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name: root,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_ZIP",
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, rootFolder: root, memberCount: members.length },
|
||||
"Created ZIP path prefix group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, rootFolder: root }, "Failed to create ZIP path group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages that reply to the same root message.
|
||||
* If message B and C both reply to message A, they're grouped together.
|
||||
*/
|
||||
export async function processReplyChainGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
replyToMessageId: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
replyToMessageId: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Group by replyToMessageId
|
||||
const replyMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
if (!pkg.replyToMessageId) continue;
|
||||
const key = pkg.replyToMessageId.toString();
|
||||
const group = replyMap.get(key) ?? [];
|
||||
group.push(pkg);
|
||||
replyMap.set(key, group);
|
||||
}
|
||||
|
||||
for (const [replyId, members] of replyMap) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
const name = findCommonPrefix(members.map((m) => m.fileName)) || members[0].fileName;
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_REPLY" as const,
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, replyToMessageId: replyId, memberCount: members.length },
|
||||
"Created reply-chain group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err, replyToMessageId: replyId }, "Failed to create reply-chain group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Group ungrouped packages with similar captions from the same channel.
|
||||
* Uses normalized caption comparison — two captions match if they share
|
||||
* the same significant words (ignoring common words and file extensions).
|
||||
*/
|
||||
export async function processCaptionGroups(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const ungrouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: null,
|
||||
sourceCaption: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
sourceCaption: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (ungrouped.length < 2) return;
|
||||
|
||||
// Group by normalized caption key
|
||||
const captionMap = new Map<string, typeof ungrouped>();
|
||||
for (const pkg of ungrouped) {
|
||||
if (!pkg.sourceCaption) continue;
|
||||
const key = normalizeCaptionKey(pkg.sourceCaption);
|
||||
if (!key) continue;
|
||||
const group = captionMap.get(key) ?? [];
|
||||
group.push(pkg);
|
||||
captionMap.set(key, group);
|
||||
}
|
||||
|
||||
for (const [, members] of captionMap) {
|
||||
if (members.length < 2) continue;
|
||||
|
||||
const name = members[0].sourceCaption!.slice(0, 80);
|
||||
|
||||
try {
|
||||
const groupId = await createAutoGroup({
|
||||
sourceChannelId,
|
||||
name,
|
||||
packageIds: members.map((m) => m.id),
|
||||
groupingSource: "AUTO_CAPTION" as const,
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ groupId, memberCount: members.length },
|
||||
"Created caption-match group"
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn({ err }, "Failed to create caption group");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize a caption for grouping: lowercase, strip extensions and numbers,
|
||||
* extract significant words (3+ chars), sort, and join.
|
||||
* Two captions with the same key are considered a match.
|
||||
*/
|
||||
function normalizeCaptionKey(caption: string): string | null {
|
||||
const stripped = caption
|
||||
.toLowerCase()
|
||||
.replace(/\.(zip|rar|7z|stl|pdf|obj|gcode)(\.\d+)?/gi, "")
|
||||
.replace(/[^a-z0-9\s]/g, " ");
|
||||
|
||||
const words = stripped
|
||||
.split(/\s+/)
|
||||
.filter((w) => w.length >= 3)
|
||||
.filter((w) => !["the", "and", "for", "with", "from", "part", "file", "files"].includes(w));
|
||||
|
||||
if (words.length < 2) return null;
|
||||
|
||||
return words.sort().join(" ");
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the dominant root folder from a list of archive file paths.
|
||||
* Returns the first path segment that appears in >50% of files.
|
||||
* Returns null for flat archives or archives with no common root.
|
||||
*/
|
||||
function extractRootFolder(paths: string[]): string | null {
|
||||
if (paths.length === 0) return null;
|
||||
|
||||
// Count first path segments
|
||||
const segmentCounts = new Map<string, number>();
|
||||
for (const p of paths) {
|
||||
// Normalize separators and get first segment
|
||||
const normalized = p.replace(/\\/g, "/");
|
||||
const firstSlash = normalized.indexOf("/");
|
||||
if (firstSlash <= 0) continue; // Skip root-level files
|
||||
const segment = normalized.slice(0, firstSlash);
|
||||
// Skip common noise folders
|
||||
if (segment === "__MACOSX" || segment === ".DS_Store" || segment === "Thumbs.db") continue;
|
||||
segmentCounts.set(segment, (segmentCounts.get(segment) ?? 0) + 1);
|
||||
}
|
||||
|
||||
if (segmentCounts.size === 0) return null;
|
||||
|
||||
// Find the most common segment
|
||||
let maxSegment = "";
|
||||
let maxCount = 0;
|
||||
for (const [seg, count] of segmentCounts) {
|
||||
if (count > maxCount) {
|
||||
maxSegment = seg;
|
||||
maxCount = count;
|
||||
}
|
||||
}
|
||||
|
||||
// Must appear in >50% of files and be at least 3 chars
|
||||
if (maxCount < paths.length * 0.5 || maxSegment.length < 3) return null;
|
||||
|
||||
return maxSegment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect packages that could have been grouped differently.
|
||||
* Checks if any grouped package's filename matches a GroupingRule
|
||||
* that would place it in a different group.
|
||||
*/
|
||||
export async function detectGroupingConflicts(
|
||||
sourceChannelId: string,
|
||||
indexedPackages: IndexedPackageRef[]
|
||||
): Promise<void> {
|
||||
const rules = await db.groupingRule.findMany({
|
||||
where: { sourceChannelId },
|
||||
});
|
||||
if (rules.length === 0) return;
|
||||
|
||||
const grouped = await db.package.findMany({
|
||||
where: {
|
||||
id: { in: indexedPackages.map((p) => p.packageId) },
|
||||
packageGroupId: { not: null },
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
fileName: true,
|
||||
packageGroupId: true,
|
||||
packageGroup: { select: { name: true, groupingSource: true } },
|
||||
},
|
||||
});
|
||||
|
||||
for (const pkg of grouped) {
|
||||
for (const rule of rules) {
|
||||
if (pkg.fileName.toLowerCase().includes(rule.pattern.toLowerCase())) {
|
||||
// Check if the rule's source group is different from current group
|
||||
if (rule.createdByGroupId && rule.createdByGroupId !== pkg.packageGroupId) {
|
||||
try {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "GROUPING_CONFLICT",
|
||||
severity: "INFO",
|
||||
title: `Potential grouping conflict: ${pkg.fileName}`,
|
||||
message: `Grouped by ${pkg.packageGroup?.groupingSource ?? "unknown"} into "${pkg.packageGroup?.name}", but also matches rule "${rule.pattern}" from a different manual group`,
|
||||
context: {
|
||||
packageId: pkg.id,
|
||||
fileName: pkg.fileName,
|
||||
currentGroupId: pkg.packageGroupId,
|
||||
matchedRuleId: rule.id,
|
||||
matchedPattern: rule.pattern,
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
// Best-effort
|
||||
}
|
||||
break; // One notification per package
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the longest common prefix among a list of filenames,
|
||||
* trimming trailing separators and partial words.
|
||||
*/
|
||||
function findCommonPrefix(names: string[]): string {
|
||||
if (names.length === 0) return "";
|
||||
if (names.length === 1) return names[0];
|
||||
|
||||
let prefix = names[0];
|
||||
for (let i = 1; i < names.length; i++) {
|
||||
while (!names[i].startsWith(prefix)) {
|
||||
prefix = prefix.slice(0, -1);
|
||||
if (prefix.length === 0) return "";
|
||||
}
|
||||
}
|
||||
|
||||
// Trim trailing separators and partial words
|
||||
const trimmed = prefix.replace(/[\s\-_.(]+$/, "");
|
||||
return trimmed.length >= 3 ? trimmed : "";
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { childLogger } from "./util/logger.js";
|
||||
import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
||||
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
||||
import { runIntegrityAudit } from "./audit.js";
|
||||
|
||||
const log = childLogger("scheduler");
|
||||
|
||||
@@ -87,6 +88,16 @@ async function runCycle(): Promise<void> {
|
||||
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
|
||||
"Ingestion cycle complete"
|
||||
);
|
||||
|
||||
// Run integrity audit after all accounts are processed
|
||||
try {
|
||||
const auditResult = await runIntegrityAudit();
|
||||
if (auditResult.issues > 0) {
|
||||
log.info({ ...auditResult }, "Integrity audit found issues");
|
||||
}
|
||||
} catch (auditErr) {
|
||||
log.warn({ err: auditErr }, "Integrity audit failed");
|
||||
}
|
||||
} catch (err) {
|
||||
log.error({ err }, "Ingestion cycle failed");
|
||||
} finally {
|
||||
|
||||
@@ -39,6 +39,7 @@ interface TdMessage {
|
||||
id: number;
|
||||
date: number;
|
||||
media_album_id?: string;
|
||||
reply_to_message_id?: number;
|
||||
content: {
|
||||
_: string;
|
||||
document?: {
|
||||
@@ -216,6 +217,8 @@ export async function getChannelMessages(
|
||||
fileSize: BigInt(doc.document.size),
|
||||
date: new Date(msg.date * 1000),
|
||||
mediaAlbumId: msg.media_album_id && msg.media_album_id !== "0" ? msg.media_album_id : undefined,
|
||||
replyToMessageId: msg.reply_to_message_id ? BigInt(msg.reply_to_message_id) : undefined,
|
||||
caption: msg.content?.caption?.text || undefined,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ export const config = {
|
||||
/** Maximum file part size for Telegram upload (in MiB). Default 1950 (under 2GB non-Premium limit).
|
||||
* Set to 3900 for Premium accounts (under 4GB limit). */
|
||||
maxPartSizeMB: parseInt(process.env.MAX_PART_SIZE_MB ?? "1950", 10),
|
||||
/** Time window for auto-grouping ungrouped packages from the same channel (minutes). 0 = disabled. */
|
||||
autoGroupTimeWindowMinutes: parseInt(process.env.AUTO_GROUP_TIME_WINDOW_MINUTES ?? "5", 10),
|
||||
/** Maximum jitter added to scheduler interval (in minutes) */
|
||||
jitterMinutes: 5,
|
||||
/** Maximum time span for multipart archive parts (in hours). 0 = no limit. */
|
||||
|
||||
@@ -47,7 +47,8 @@ import { readRarContents } from "./archive/rar-reader.js";
|
||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
||||
import { uploadToChannel } from "./upload/channel.js";
|
||||
import { processAlbumGroups, type IndexedPackageRef } from "./grouping.js";
|
||||
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
||||
import { db } from "./db/client.js";
|
||||
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
||||
import type { Client } from "tdl";
|
||||
|
||||
@@ -776,6 +777,22 @@ async function processArchiveSets(
|
||||
partCount: archiveSet.parts.length,
|
||||
accountId: ctx.accountId,
|
||||
});
|
||||
// Also create a persistent notification
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: inferSkipReason(errMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED",
|
||||
severity: "WARNING",
|
||||
title: `Failed to process ${archiveSet.parts[0].fileName}`,
|
||||
message: errMsg,
|
||||
context: {
|
||||
fileName: archiveSet.parts[0].fileName,
|
||||
sourceChannelId: ctx.channel.id,
|
||||
sourceMessageId: Number(archiveSet.parts[0].id),
|
||||
channelTitle: ctx.channelTitle,
|
||||
reason: inferSkipReason(errMsg),
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
// Best-effort — don't fail the run if skip recording fails
|
||||
}
|
||||
@@ -790,6 +807,38 @@ async function processArchiveSets(
|
||||
indexedPackageRefs,
|
||||
scanResult.photos
|
||||
);
|
||||
|
||||
// Auto-grouping passes (gated by per-channel flag)
|
||||
const channelRecord = await db.telegramChannel.findUnique({
|
||||
where: { id: channel.id },
|
||||
select: { autoGroupEnabled: true },
|
||||
});
|
||||
|
||||
if (channelRecord?.autoGroupEnabled !== false) {
|
||||
// Learned rule-based grouping (from manual overrides)
|
||||
await processRuleBasedGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Time-window grouping for remaining ungrouped packages
|
||||
await processTimeWindowGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Pattern-based grouping (date patterns, project slugs)
|
||||
await processPatternGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Creator-based grouping (3+ files from same creator)
|
||||
await processCreatorGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// ZIP path prefix grouping (shared root folder inside archives)
|
||||
await processZipPathGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Reply chain grouping (messages replying to same root)
|
||||
await processReplyChainGroups(channel.id, indexedPackageRefs);
|
||||
|
||||
// Caption fuzzy match grouping
|
||||
await processCaptionGroups(channel.id, indexedPackageRefs);
|
||||
}
|
||||
|
||||
// Check for potential grouping conflicts
|
||||
await detectGroupingConflicts(channel.id, indexedPackageRefs);
|
||||
}
|
||||
|
||||
return maxProcessedId;
|
||||
@@ -1053,6 +1102,43 @@ async function processOneArchiveSet(
|
||||
uploadPaths = splitPaths;
|
||||
}
|
||||
|
||||
// ── Hash verification after split ──
|
||||
// If we split/repacked, verify the split parts hash matches the original
|
||||
if (splitPaths.length > 0) {
|
||||
const splitHash = await hashParts(splitPaths);
|
||||
if (splitHash !== contentHash) {
|
||||
accountLog.error(
|
||||
{ fileName: archiveName, originalHash: contentHash, splitHash, parts: splitPaths.length },
|
||||
"Hash mismatch after split — file may be corrupted"
|
||||
);
|
||||
// Record notification for visibility
|
||||
try {
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "HASH_MISMATCH",
|
||||
severity: "ERROR",
|
||||
title: `Hash mismatch after splitting ${archiveName}`,
|
||||
message: `Expected ${contentHash.slice(0, 16)}… but got ${splitHash.slice(0, 16)}… after splitting into ${splitPaths.length} parts`,
|
||||
context: {
|
||||
fileName: archiveName,
|
||||
originalHash: contentHash,
|
||||
splitHash,
|
||||
partCount: splitPaths.length,
|
||||
sourceChannelId: channel.id,
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch {
|
||||
// Best-effort notification
|
||||
}
|
||||
throw new Error(`Hash mismatch after split for ${archiveName}: expected ${contentHash}, got ${splitHash}`);
|
||||
}
|
||||
accountLog.debug(
|
||||
{ fileName: archiveName, hash: contentHash.slice(0, 16), parts: splitPaths.length },
|
||||
"Split hash verified — matches original"
|
||||
);
|
||||
}
|
||||
|
||||
// ── Uploading ──
|
||||
// Check if a prior run already uploaded this file (orphaned upload scenario:
|
||||
// file reached Telegram but DB write failed or worker crashed before indexing)
|
||||
@@ -1090,6 +1176,34 @@ async function processOneArchiveSet(
|
||||
);
|
||||
}
|
||||
|
||||
// ── Post-upload integrity check ──
|
||||
// Verify the files on disk still match before we index
|
||||
if (uploadPaths.length > 0 && !existingUpload) {
|
||||
try {
|
||||
const postUploadHash = await hashParts(uploadPaths);
|
||||
if (splitPaths.length > 0) {
|
||||
// Split files — hash should match the split hash (already verified above)
|
||||
// No additional check needed since we verified split hash = original hash
|
||||
} else if (postUploadHash !== contentHash) {
|
||||
accountLog.error(
|
||||
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
|
||||
"Hash changed between hashing and upload — possible disk corruption"
|
||||
);
|
||||
await db.systemNotification.create({
|
||||
data: {
|
||||
type: "HASH_MISMATCH",
|
||||
severity: "ERROR",
|
||||
title: `Post-upload hash mismatch: ${archiveName}`,
|
||||
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`,
|
||||
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
|
||||
},
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// Best-effort — don't fail the ingestion
|
||||
}
|
||||
}
|
||||
|
||||
// ── Preview thumbnail ──
|
||||
let previewData: Buffer | null = null;
|
||||
let previewMsgId: bigint | null = null;
|
||||
@@ -1172,6 +1286,8 @@ async function processOneArchiveSet(
|
||||
tags,
|
||||
previewData,
|
||||
previewMsgId,
|
||||
sourceCaption: archiveSet.parts[0].caption ?? null,
|
||||
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
|
||||
files: entries,
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user