From ab558e00f57b20ca6a37826ed75778e69ca8c988 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 22 Mar 2026 00:09:59 +0100 Subject: [PATCH] feat: add preview management, channel controls, invite polish, and recovery - Auto-extract preview images from ZIP/RAR/7z archives during ingestion - Upload custom preview images via package drawer - Select preview from archive contents with on-demand extraction UI - Manually add Telegram channels by t.me link, username, or invite link - Invite code UX: bulk create, copy link, usage tracking, delete confirm - Incomplete upload recovery: verify dest messages on worker startup - Rebuild package DB by scanning destination channel with live progress Co-Authored-By: Claude Opus 4.6 (1M context) --- prisma/schema.prisma | 50 ++- .../invites/_components/invite-manager.tsx | 287 ++++++++++-- src/app/(app)/invites/actions.ts | 44 +- .../_components/archive-preview-picker.tsx | 399 +++++++++++++++++ .../stls/_components/package-files-drawer.tsx | 125 +++++- src/app/(app)/stls/actions.ts | 93 ++++ .../telegram/_components/channels-tab.tsx | 17 +- .../telegram/_components/destination-card.tsx | 247 +++++++++-- .../_components/join-channel-dialog.tsx | 179 ++++++++ src/app/(app)/telegram/actions.ts | 107 +++++ src/app/(auth)/register/actions.ts | 7 +- src/app/(auth)/register/page.tsx | 6 +- .../zips/[id]/extract/[requestId]/route.ts | 73 ++++ src/app/api/zips/[id]/extract/route.ts | 118 +++++ src/app/api/zips/[id]/images/route.ts | 56 +++ worker/Dockerfile | 4 +- worker/src/archive/extract-image.ts | 33 ++ worker/src/db/queries.ts | 32 ++ worker/src/extract-listener.ts | 217 +++++++++ worker/src/fetch-listener.ts | 243 ++++++++++- worker/src/index.ts | 5 + worker/src/preview/extract.ts | 111 +++++ worker/src/rebuild.ts | 411 ++++++++++++++++++ worker/src/recovery.ts | 187 ++++++++ worker/src/tdlib/chats.ts | 57 +++ worker/src/worker.ts | 18 + 26 files changed, 3028 insertions(+), 98 deletions(-) create mode 100644 src/app/(app)/stls/_components/archive-preview-picker.tsx create mode 100644 src/app/(app)/telegram/_components/join-channel-dialog.tsx create mode 100644 src/app/api/zips/[id]/extract/[requestId]/route.ts create mode 100644 src/app/api/zips/[id]/extract/route.ts create mode 100644 src/app/api/zips/[id]/images/route.ts create mode 100644 worker/src/archive/extract-image.ts create mode 100644 worker/src/extract-listener.ts create mode 100644 worker/src/preview/extract.ts create mode 100644 worker/src/rebuild.ts create mode 100644 worker/src/recovery.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 8461149..6b9a786 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -38,7 +38,9 @@ model User { tags Tag[] settings UserSettings? telegramLink TelegramLink? - inviteCodes InviteCode[] + inviteCodes InviteCode[] @relation("InviteCreator") + usedInvite InviteCode? @relation("InviteUser", fields: [usedInviteId], references: [id], onDelete: SetNull) + usedInviteId String? } model Account { @@ -471,11 +473,12 @@ model Package { indexedAt DateTime @default(now()) createdAt DateTime @default(now()) - sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id]) - files PackageFile[] - ingestionRun IngestionRun? @relation(fields: [ingestionRunId], references: [id]) - ingestionRunId String? - sendRequests BotSendRequest[] + sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id]) + files PackageFile[] + ingestionRun IngestionRun? @relation(fields: [ingestionRunId], references: [id]) + ingestionRunId String? + sendRequests BotSendRequest[] + extractRequests ArchiveExtractRequest[] @@index([sourceChannelId]) @@index([destChannelId]) @@ -568,7 +571,8 @@ model InviteCode { createdBy String createdAt DateTime @default(now()) - creator User @relation(fields: [createdBy], references: [id], onDelete: Cascade) + creator User @relation("InviteCreator", fields: [createdBy], references: [id], onDelete: Cascade) + usedBy User[] @relation("InviteUser") @@index([code]) @@map("invite_codes") @@ -646,3 +650,35 @@ model BotSubscription { @@index([telegramUserId]) @@map("bot_subscriptions") } + +// ─────────────────────────────────────── +// Archive image extraction (worker-mediated) +// ─────────────────────────────────────── + +enum ExtractStatus { + PENDING + IN_PROGRESS + COMPLETED + FAILED +} + +/// A request for the worker to extract an image from an archive. +/// The web app creates this, sends a pg_notify, and the worker +/// downloads the archive, extracts the file, and writes the result. +model ArchiveExtractRequest { + id String @id @default(cuid()) + packageId String + filePath String @db.VarChar(1024) // path within archive to extract + status ExtractStatus @default(PENDING) + imageData Bytes? // extracted image bytes (JPEG/PNG/WebP) + contentType String? @db.VarChar(64) // MIME type of extracted image + error String? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + package Package @relation(fields: [packageId], references: [id], onDelete: Cascade) + + @@index([packageId, filePath]) + @@index([status]) + @@map("archive_extract_requests") +} diff --git a/src/app/(app)/invites/_components/invite-manager.tsx b/src/app/(app)/invites/_components/invite-manager.tsx index d305d9a..4c868e1 100644 --- a/src/app/(app)/invites/_components/invite-manager.tsx +++ b/src/app/(app)/invites/_components/invite-manager.tsx @@ -1,11 +1,12 @@ "use client"; import { useState, useTransition } from "react"; -import { Copy, Plus, Trash2 } from "lucide-react"; +import { Copy, Link2, Plus, Trash2 } from "lucide-react"; import { Button } from "@/components/ui/button"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; +import { Switch } from "@/components/ui/switch"; import { Table, TableBody, @@ -15,7 +16,30 @@ import { TableRow, } from "@/components/ui/table"; import { Badge } from "@/components/ui/badge"; -import { createInviteCode, deleteInviteCode } from "../actions"; +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, + AlertDialogTrigger, +} from "@/components/ui/alert-dialog"; +import { + Tooltip, + TooltipContent, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import { createInviteCode, createBulkInviteCodes, deleteInviteCode } from "../actions"; + +type InviteUser = { + id: string; + name: string | null; + email: string | null; + createdAt: string; +}; type InviteCode = { id: string; @@ -25,6 +49,7 @@ type InviteCode = { expiresAt: string | null; createdAt: string; creator: { name: string | null }; + usedBy: InviteUser[]; }; export function InviteManager({ @@ -37,8 +62,10 @@ export function InviteManager({ const [maxUses, setMaxUses] = useState(1); const [expiresInDays, setExpiresInDays] = useState(7); const [noExpiry, setNoExpiry] = useState(false); + const [bulkCount, setBulkCount] = useState(5); const [isPending, startTransition] = useTransition(); const [copiedId, setCopiedId] = useState(null); + const [copiedType, setCopiedType] = useState<"code" | "link" | null>(null); function handleCreate() { startTransition(async () => { @@ -49,35 +76,64 @@ export function InviteManager({ }); } + function handleBulkCreate() { + startTransition(async () => { + await createBulkInviteCodes({ + count: bulkCount, + maxUses, + expiresInDays: noExpiry ? null : expiresInDays, + }); + }); + } + function handleDelete(id: string) { startTransition(async () => { await deleteInviteCode(id); }); } - function copyLink(code: string, id: string) { - const url = `${appUrl}/register?code=${code}`; - navigator.clipboard.writeText(url); + function copyToClipboard(text: string, id: string, type: "code" | "link") { + navigator.clipboard.writeText(text); setCopiedId(id); - setTimeout(() => setCopiedId(null), 2000); + setCopiedType(type); + setTimeout(() => { + setCopiedId(null); + setCopiedType(null); + }, 2000); } - function getStatus(invite: InviteCode) { + function getStatus(invite: InviteCode): "active" | "used" | "expired" { if (invite.uses >= invite.maxUses) return "used"; if (invite.expiresAt && new Date(invite.expiresAt) < new Date()) return "expired"; return "active"; } + function formatRelativeDate(dateStr: string) { + const date = new Date(dateStr); + const now = new Date(); + const diffMs = date.getTime() - now.getTime(); + const diffDays = Math.ceil(diffMs / (1000 * 60 * 60 * 24)); + + if (diffDays < 0) return "Expired"; + if (diffDays === 0) return "Today"; + if (diffDays === 1) return "Tomorrow"; + return `${diffDays} days`; + } + + const activeCount = inviteCodes.filter((i) => getStatus(i) === "active").length; + const usedCount = inviteCodes.filter((i) => getStatus(i) === "used").length; + return ( -
+
+ {/* Create Card */} - Create Invite Code + Generate Invite Codes - Generate a new invite code to share with someone + Create single or bulk invite codes to share with new users - +
@@ -92,9 +148,7 @@ export function InviteManager({ />
- +
- setNoExpiry(e.target.checked)} - className="h-4 w-4" + onCheckedChange={setNoExpiry} /> - +
+
+ +
+ +
+
+ + setBulkCount(Number(e.target.value))} + className="w-20" + /> +
+ +
+ {/* Codes Table */} Invite Codes - {inviteCodes.length} invite code{inviteCodes.length !== 1 ? "s" : ""} created + {inviteCodes.length} total · {activeCount} active · {usedCount} fully used @@ -143,6 +224,7 @@ export function InviteManager({ Code Status Uses + Redeemed By Expires Created Actions @@ -151,6 +233,11 @@ export function InviteManager({ {inviteCodes.map((invite) => { const status = getStatus(invite); + const isCopiedCode = + copiedId === invite.id && copiedType === "code"; + const isCopiedLink = + copiedId === invite.id && copiedType === "link"; + return ( @@ -173,32 +260,146 @@ export function InviteManager({ {invite.uses} / {invite.maxUses} - {invite.expiresAt - ? new Date(invite.expiresAt).toLocaleDateString() - : "Never"} + {invite.usedBy.length === 0 ? ( + -- + ) : ( +
+ {invite.usedBy.map((user) => ( + + +
+ {user.name ?? user.email ?? "Unknown"} +
+
+ +
+ {user.email &&
{user.email}
} +
+ Joined{" "} + {new Date(user.createdAt).toLocaleDateString()} +
+
+
+
+ ))} +
+ )}
- {new Date(invite.createdAt).toLocaleDateString()} + {invite.expiresAt ? ( + + + + {formatRelativeDate(invite.expiresAt)} + + + + {new Date(invite.expiresAt).toLocaleString()} + + + ) : ( + Never + )} + + + + + + {new Date(invite.createdAt).toLocaleDateString()} + + + + by {invite.creator.name ?? "Unknown"} + + -
- - +
+ + + + + Copy code + + + + + + + Copy registration link + + + + + + + + + + Delete code + + + + + Delete invite code? + + + This will permanently delete the invite code{" "} + + {invite.code} + + .{" "} + {status === "active" && + "Anyone with this code will no longer be able to register."} + + + + Cancel + handleDelete(invite.id)} + > + Delete + + + +
diff --git a/src/app/(app)/invites/actions.ts b/src/app/(app)/invites/actions.ts index 98f1cf8..35c76b5 100644 --- a/src/app/(app)/invites/actions.ts +++ b/src/app/(app)/invites/actions.ts @@ -33,6 +33,45 @@ export async function createInviteCode(input: { return { success: true, data: { code } }; } +export async function createBulkInviteCodes(input: { + count: number; + maxUses: number; + expiresInDays: number | null; +}): Promise> { + const session = await auth(); + if (!session?.user?.id || session.user.role !== "ADMIN") { + return { success: false, error: "Unauthorized" }; + } + + if (input.count < 1 || input.count > 25) { + return { success: false, error: "Can generate between 1 and 25 codes at a time" }; + } + + const expiresAt = input.expiresInDays + ? new Date(Date.now() + input.expiresInDays * 24 * 60 * 60 * 1000) + : null; + + const codes: string[] = []; + + await prisma.$transaction(async (tx) => { + for (let i = 0; i < input.count; i++) { + const code = crypto.randomBytes(6).toString("hex"); + codes.push(code); + await tx.inviteCode.create({ + data: { + code, + maxUses: input.maxUses, + expiresAt, + createdBy: session.user.id, + }, + }); + } + }); + + revalidatePath("/invites"); + return { success: true, data: { codes } }; +} + export async function deleteInviteCode(id: string): Promise { const session = await auth(); if (!session?.user?.id || session.user.role !== "ADMIN") { @@ -48,7 +87,10 @@ export async function deleteInviteCode(id: string): Promise { export async function getInviteCodes() { const codes = await prisma.inviteCode.findMany({ orderBy: { createdAt: "desc" }, - include: { creator: { select: { name: true } } }, + include: { + creator: { select: { name: true } }, + usedBy: { select: { id: true, name: true, email: true, createdAt: true } }, + }, }); return codes; } diff --git a/src/app/(app)/stls/_components/archive-preview-picker.tsx b/src/app/(app)/stls/_components/archive-preview-picker.tsx new file mode 100644 index 0000000..3babe7c --- /dev/null +++ b/src/app/(app)/stls/_components/archive-preview-picker.tsx @@ -0,0 +1,399 @@ +"use client"; + +import { useEffect, useState, useCallback, useRef, useTransition } from "react"; +import { + Image as ImageIcon, + Loader2, + Check, + AlertCircle, + ImageOff, +} from "lucide-react"; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, + DialogDescription, +} from "@/components/ui/dialog"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { Button } from "@/components/ui/button"; +import { cn } from "@/lib/utils"; +import { toast } from "sonner"; +import { setPreviewFromExtract } from "../actions"; + +interface ArchiveImage { + id: string; + path: string; + fileName: string; + extension: string | null; + size: string; +} + +interface ThumbnailState { + status: "idle" | "loading" | "loaded" | "failed"; + requestId?: string; + imageUrl?: string; + error?: string; +} + +interface ArchivePreviewPickerProps { + packageId: string; + packageName: string; + open: boolean; + onOpenChange: (open: boolean) => void; + onPreviewSet?: () => void; +} + +function formatBytes(bytesStr: string): string { + const bytes = Number(bytesStr); + if (bytes === 0) return "0 B"; + const k = 1024; + const sizes = ["B", "KB", "MB", "GB"]; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return `${parseFloat((bytes / Math.pow(k, i)).toFixed(1))} ${sizes[i]}`; +} + +export function ArchivePreviewPicker({ + packageId, + packageName, + open, + onOpenChange, + onPreviewSet, +}: ArchivePreviewPickerProps) { + const [images, setImages] = useState([]); + const [loading, setLoading] = useState(false); + const [thumbnails, setThumbnails] = useState>(new Map()); + const [selectedPath, setSelectedPath] = useState(null); + const [isPending, startTransition] = useTransition(); + const pollTimers = useRef>>(new Map()); + // Track which paths have already been requested to avoid re-requesting + const requestedPaths = useRef>(new Set()); + + // Cleanup poll timers on unmount + useEffect(() => { + return () => { + for (const timer of pollTimers.current.values()) { + clearInterval(timer); + } + }; + }, []); + + // Fetch image list when opened + useEffect(() => { + if (!open) return; + + setImages([]); + setThumbnails(new Map()); + setSelectedPath(null); + requestedPaths.current.clear(); + + // Clear any leftover poll timers + for (const timer of pollTimers.current.values()) { + clearInterval(timer); + } + pollTimers.current.clear(); + + const fetchImages = async () => { + setLoading(true); + try { + const res = await fetch(`/api/zips/${packageId}/images`); + if (!res.ok) throw new Error("Failed to fetch images"); + const data = await res.json(); + setImages(data.images); + } catch { + toast.error("Failed to load archive images"); + } finally { + setLoading(false); + } + }; + + fetchImages(); + }, [open, packageId]); + + // Poll callback for a specific request + const startPolling = useCallback( + (filePath: string, requestId: string) => { + // Clear any existing poll for this path + const existing = pollTimers.current.get(filePath); + if (existing) clearInterval(existing); + + const pollId = setInterval(async () => { + try { + const pollRes = await fetch( + `/api/zips/${packageId}/extract/${requestId}` + ); + if (!pollRes.ok) return; + const pollData = await pollRes.json(); + + if (pollData.status === "COMPLETED") { + clearInterval(pollId); + pollTimers.current.delete(filePath); + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { + status: "loaded", + requestId, + imageUrl: `/api/zips/${packageId}/extract/${requestId}?image=true`, + }); + return next; + }); + } else if (pollData.status === "FAILED") { + clearInterval(pollId); + pollTimers.current.delete(filePath); + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { + status: "failed", + error: pollData.error || "Extraction failed", + }); + return next; + }); + } + } catch { + // Silently retry on network error + } + }, 2000); + + pollTimers.current.set(filePath, pollId); + }, + [packageId] + ); + + // Request extraction for a specific image + const requestThumbnail = useCallback( + async (filePath: string) => { + // Don't re-request if already in progress + if (requestedPaths.current.has(filePath)) return; + requestedPaths.current.add(filePath); + + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { status: "loading" }); + return next; + }); + + try { + const res = await fetch(`/api/zips/${packageId}/extract`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ filePath }), + }); + + if (!res.ok) { + const err = await res.json(); + throw new Error(err.error || "Extract failed"); + } + + const data = await res.json(); + + if (data.status === "COMPLETED") { + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { + status: "loaded", + requestId: data.requestId, + imageUrl: `/api/zips/${packageId}/extract/${data.requestId}?image=true`, + }); + return next; + }); + return; + } + + // Pending or in-progress: start polling + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { status: "loading", requestId: data.requestId }); + return next; + }); + + startPolling(filePath, data.requestId); + } catch (err) { + requestedPaths.current.delete(filePath); + setThumbnails((prev) => { + const next = new Map(prev); + next.set(filePath, { + status: "failed", + error: err instanceof Error ? err.message : "Failed to extract", + }); + return next; + }); + } + }, + [packageId, startPolling] + ); + + // Auto-request thumbnails for the first batch of images + useEffect(() => { + if (!open || images.length === 0) return; + + // Request the first 12 images automatically + const toRequest = images.slice(0, 12); + for (const img of toRequest) { + requestThumbnail(img.path); + } + // Only trigger when images list changes, not on every requestThumbnail change + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [images, open]); + + // Handle selection confirmation + const handleConfirm = () => { + if (!selectedPath) return; + const thumbState = thumbnails.get(selectedPath); + if (!thumbState?.requestId) return; + + startTransition(async () => { + const result = await setPreviewFromExtract(packageId, thumbState.requestId!); + if (result.success) { + toast.success("Preview updated from archive image"); + onOpenChange(false); + onPreviewSet?.(); + } else { + toast.error(result.error); + } + }); + }; + + return ( + + + + Select Preview Image + + Choose an image from the archive to use as the preview for{" "} + {packageName} + + + + +
+ {loading ? ( +
+ + + Loading image list... + +
+ ) : images.length === 0 ? ( +
+ + + No images found in this archive + +
+ ) : ( +
+ {images.map((img) => { + const thumbState = thumbnails.get(img.path); + const isSelected = selectedPath === img.path; + const isLoaded = thumbState?.status === "loaded"; + const isLoading = thumbState?.status === "loading"; + const isFailed = thumbState?.status === "failed"; + + return ( + + ); + })} +
+ )} +
+
+ + {/* Footer */} + {images.length > 0 && ( +
+ + {images.length} image{images.length !== 1 ? "s" : ""} found + +
+ + +
+
+ )} +
+
+ ); +} diff --git a/src/app/(app)/stls/_components/package-files-drawer.tsx b/src/app/(app)/stls/_components/package-files-drawer.tsx index e2be963..bcc9057 100644 --- a/src/app/(app)/stls/_components/package-files-drawer.tsx +++ b/src/app/(app)/stls/_components/package-files-drawer.tsx @@ -1,6 +1,7 @@ "use client"; -import { useEffect, useState, useCallback, useMemo } from "react"; +import { useEffect, useState, useCallback, useMemo, useRef } from "react"; +import { toast } from "sonner"; import { FileText, Folder, @@ -9,6 +10,9 @@ import { Search, ChevronDown, ChevronRight, + Upload, + ImagePlus, + Images, } from "lucide-react"; import { Dialog, @@ -24,6 +28,8 @@ import { Button } from "@/components/ui/button"; import { cn } from "@/lib/utils"; import type { PackageRow } from "./package-columns"; import { SendToTelegramButton } from "./send-to-telegram-button"; +import { uploadPackagePreview } from "../actions"; +import { ArchivePreviewPicker } from "./archive-preview-picker"; interface FileItem { id: string; @@ -224,6 +230,46 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw const [loadingMore, setLoadingMore] = useState(false); const [search, setSearch] = useState(""); const [page, setPage] = useState(1); + const [uploading, setUploading] = useState(false); + const [localPreviewUrl, setLocalPreviewUrl] = useState(null); + const [showPreviewPicker, setShowPreviewPicker] = useState(false); + const fileInputRef = useRef(null); + + const handlePreviewUpload = useCallback( + async (e: React.ChangeEvent) => { + const file = e.target.files?.[0]; + if (!file || !pkg) return; + + // Reset file input so the same file can be re-selected + e.target.value = ""; + + setUploading(true); + try { + const formData = new FormData(); + formData.append("file", file); + const result = await uploadPackagePreview(pkg.id, formData); + if (result.success) { + toast.success("Preview image uploaded"); + // Show uploaded image immediately via local object URL + setLocalPreviewUrl(URL.createObjectURL(file)); + } else { + toast.error(result.error); + } + } catch { + toast.error("Failed to upload preview image"); + } finally { + setUploading(false); + } + }, + [pkg] + ); + + // Clean up local preview URL when drawer closes or package changes + useEffect(() => { + return () => { + if (localPreviewUrl) URL.revokeObjectURL(localPreviewUrl); + }; + }, [localPreviewUrl]); const fetchFiles = useCallback( async (pageNum: number, append: boolean) => { @@ -258,6 +304,7 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw setTotal(0); setSearch(""); setPage(1); + setLocalPreviewUrl(null); fetchFiles(1, false); } }, [open, pkg, fetchFiles]); @@ -293,12 +340,49 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw {/* Preview image + title row */}
- {pkg?.hasPreview && ( - + {/* Preview image area with upload capability */} + + {(pkg?.hasPreview || localPreviewUrl) ? ( + + ) : ( + )}
@@ -308,11 +392,22 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw {total.toLocaleString()} file{total !== 1 ? "s" : ""} in archive {pkg && ( -
+
+ {pkg.archiveType !== "DOCUMENT" && !pkg.isMultipart && ( + + )}
)}
@@ -416,6 +511,20 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
+ + {/* Archive preview picker modal */} + {pkg && pkg.archiveType !== "DOCUMENT" && !pkg.isMultipart && ( + { + // Refresh the preview by setting a cache-busting URL + setLocalPreviewUrl(`/api/zips/${pkg.id}/preview?t=${Date.now()}`); + }} + /> + )} ); } diff --git a/src/app/(app)/stls/actions.ts b/src/app/(app)/stls/actions.ts index 54c73c8..0e1835c 100644 --- a/src/app/(app)/stls/actions.ts +++ b/src/app/(app)/stls/actions.ts @@ -5,6 +5,13 @@ import { prisma } from "@/lib/prisma"; import type { ActionResult } from "@/types/api.types"; import { revalidatePath } from "next/cache"; +const ALLOWED_IMAGE_TYPES = [ + "image/jpeg", + "image/png", + "image/webp", +] as const; +const MAX_IMAGE_SIZE = 2 * 1024 * 1024; // 2 MB + export async function updatePackageCreator( packageId: string, creator: string | null @@ -24,6 +31,46 @@ export async function updatePackageCreator( } } +export async function uploadPackagePreview( + packageId: string, + formData: FormData +): Promise { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + const file = formData.get("file"); + if (!(file instanceof File)) { + return { success: false, error: "No file provided" }; + } + + if (!ALLOWED_IMAGE_TYPES.includes(file.type as (typeof ALLOWED_IMAGE_TYPES)[number])) { + return { success: false, error: "Only JPG, PNG, and WebP images are accepted" }; + } + + if (file.size > MAX_IMAGE_SIZE) { + return { success: false, error: "Image must be smaller than 2 MB" }; + } + + try { + const arrayBuffer = await file.arrayBuffer(); + const buffer = Buffer.from(arrayBuffer); + + await prisma.package.update({ + where: { id: packageId }, + data: { + previewData: buffer, + // Set previewMsgId to 0 as sentinel so hasPreview checks work + previewMsgId: 0n, + }, + }); + + revalidatePath("/stls"); + return { success: true, data: undefined }; + } catch { + return { success: false, error: "Failed to upload preview image" }; + } +} + export async function bulkSetCreator( packageIds: string[], creator: string @@ -42,3 +89,49 @@ export async function bulkSetCreator( return { success: false, error: "Failed to update creators" }; } } + +/** + * Set a package's preview from an extracted archive image. + * Reads the image data from a completed ArchiveExtractRequest. + */ +export async function setPreviewFromExtract( + packageId: string, + extractRequestId: string +): Promise { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + try { + const extractReq = await prisma.archiveExtractRequest.findUnique({ + where: { id: extractRequestId }, + select: { status: true, imageData: true, packageId: true }, + }); + + if (!extractReq) { + return { success: false, error: "Extract request not found" }; + } + + if (extractReq.packageId !== packageId) { + return { success: false, error: "Extract request does not belong to this package" }; + } + + if (extractReq.status !== "COMPLETED" || !extractReq.imageData) { + return { success: false, error: "Image extraction not yet completed" }; + } + + await prisma.package.update({ + where: { id: packageId }, + data: { + previewData: extractReq.imageData, + // Set previewMsgId to 0 as sentinel so hasPreview checks work + // (original Telegram-matched previews have the actual message ID) + previewMsgId: 0n, + }, + }); + + revalidatePath("/stls"); + return { success: true, data: undefined }; + } catch { + return { success: false, error: "Failed to set preview from archive image" }; + } +} diff --git a/src/app/(app)/telegram/_components/channels-tab.tsx b/src/app/(app)/telegram/_components/channels-tab.tsx index 4e988ea..a3391e7 100644 --- a/src/app/(app)/telegram/_components/channels-tab.tsx +++ b/src/app/(app)/telegram/_components/channels-tab.tsx @@ -2,10 +2,11 @@ import { useState, useTransition } from "react"; import { toast } from "sonner"; -import { Download } from "lucide-react"; +import { Download, Plus } from "lucide-react"; import { getChannelColumns } from "./channel-columns"; import { DestinationCard } from "./destination-card"; import { ChannelPickerDialog } from "./channel-picker-dialog"; +import { JoinChannelDialog } from "./join-channel-dialog"; import { deleteChannel, toggleChannelActive, @@ -30,6 +31,7 @@ export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsT const [deleteId, setDeleteId] = useState(null); const [rescanId, setRescanId] = useState(null); const [fetchChannelsAccountId, setFetchChannelsAccountId] = useState(null); + const [joinDialogOpen, setJoinDialogOpen] = useState(false); // Find the first authenticated account for "Fetch Channels" const authenticatedAccounts = accounts.filter((a) => a.authState === "AUTHENTICATED" && a.isActive); @@ -113,6 +115,14 @@ export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsT Fetch Channels +
{channels.length > 0 && ( @@ -152,6 +162,11 @@ export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsT if (!open) setFetchChannelsAccountId(null); }} /> + +
); } diff --git a/src/app/(app)/telegram/_components/destination-card.tsx b/src/app/(app)/telegram/_components/destination-card.tsx index 588b910..2c2b8ca 100644 --- a/src/app/(app)/telegram/_components/destination-card.tsx +++ b/src/app/(app)/telegram/_components/destination-card.tsx @@ -1,9 +1,21 @@ "use client"; import { useState, useEffect, useTransition } from "react"; -import { Database, AlertTriangle, Link2, Plus, Loader2, ArrowRight } from "lucide-react"; +import { + Database, + AlertTriangle, + Link2, + Plus, + Loader2, + ArrowRight, + RefreshCw, +} from "lucide-react"; import { toast } from "sonner"; -import { createDestinationViaWorker, setGlobalDestination } from "../actions"; +import { + createDestinationViaWorker, + setGlobalDestination, + rebuildPackageDatabase, +} from "../actions"; import { Card, CardContent } from "@/components/ui/card"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; @@ -38,12 +50,29 @@ type CreateState = | { phase: "done"; title: string; telegramId: string } | { phase: "error"; message: string }; +type RebuildState = + | { phase: "idle" } + | { phase: "running"; requestId: string } + | { phase: "done"; created: number; skipped: number; scanned: number } + | { phase: "error"; message: string }; + +interface RebuildProgress { + status: string; + messagesScanned: number; + documentsFound: number; + packagesCreated: number; + packagesSkipped: number; + error?: string; +} + export function DestinationCard({ destination, channels = [] }: DestinationCardProps) { const [isPending, startTransition] = useTransition(); const [createOpen, setCreateOpen] = useState(false); const [title, setTitle] = useState("dragonsstash db"); const [createState, setCreateState] = useState({ phase: "idle" }); const [selectedChannelId, setSelectedChannelId] = useState(""); + const [rebuildState, setRebuildState] = useState({ phase: "idle" }); + const [rebuildProgress, setRebuildProgress] = useState(null); // Channels that can be assigned as destination (SOURCE channels only, exclude current destination) const assignableChannels = channels.filter( @@ -105,6 +134,86 @@ export function DestinationCard({ destination, channels = [] }: DestinationCardP return () => { mounted = false; }; }, [createState]); + // Poll for rebuild progress + useEffect(() => { + if (rebuildState.phase !== "running") return; + + let mounted = true; + const requestId = rebuildState.requestId; + + const poll = async () => { + for (let i = 0; i < 300; i++) { + await new Promise((r) => setTimeout(r, 2000)); + if (!mounted) return; + + try { + const res = await fetch( + `/api/telegram/worker-request?requestId=${requestId}` + ); + if (!res.ok) continue; + + const data = await res.json(); + + // Update live progress from resultJson + if (data.result && typeof data.result === "object") { + if (mounted) setRebuildProgress(data.result as RebuildProgress); + } + + if (data.status === "COMPLETED" && data.result) { + const result = data.result as RebuildProgress; + if (mounted) { + setRebuildState({ + phase: "done", + created: result.packagesCreated, + skipped: result.packagesSkipped, + scanned: result.messagesScanned, + }); + setRebuildProgress(null); + toast.success( + `Rebuild complete: ${result.packagesCreated} packages restored, ${result.packagesSkipped} skipped` + ); + } + return; + } else if (data.status === "FAILED") { + if (mounted) { + setRebuildState({ + phase: "error", + message: data.error || "Rebuild failed", + }); + setRebuildProgress(null); + } + return; + } + } catch { + // Network blip — keep polling + } + } + + if (mounted) { + setRebuildState({ phase: "error", message: "Timed out waiting for rebuild" }); + setRebuildProgress(null); + } + }; + + poll(); + return () => { + mounted = false; + }; + }, [rebuildState]); + + const handleRebuild = () => { + startTransition(async () => { + const result = await rebuildPackageDatabase(); + if (result.success) { + setRebuildState({ phase: "running", requestId: result.data.requestId }); + setRebuildProgress(null); + toast.info("Rebuild started — scanning destination channel..."); + } else { + toast.error(result.error ?? "Failed to start rebuild"); + } + }); + }; + const handleCreate = () => { if (!title.trim()) return; @@ -188,37 +297,115 @@ export function DestinationCard({ destination, channels = [] }: DestinationCardP return ( <> - -
- -
-
-

{destination.title}

- - DESTINATION - -
-
- ID: {destination.telegramId} - {destination.inviteLink && ( - - - Invite link active - - )} + +
+
+ +
+
+

{destination.title}

+ + DESTINATION + +
+
+ ID: {destination.telegramId} + {destination.inviteLink && ( + + + Invite link active + + )} +
+
+ + +
- + + {/* Rebuild progress */} + {rebuildState.phase === "running" && rebuildProgress && ( +
+
+ + + Rebuilding package database... + +
+
+ + + {rebuildProgress.messagesScanned} + {" "} + messages scanned + + + + {rebuildProgress.documentsFound} + {" "} + archives found + + + + {rebuildProgress.packagesCreated} + {" "} + restored + + + + {rebuildProgress.packagesSkipped} + {" "} + skipped + +
+
+ )} + + {rebuildState.phase === "done" && ( +
+
+ + + Rebuild complete: {rebuildState.created} packages restored,{" "} + {rebuildState.skipped} skipped ({rebuildState.scanned} messages + scanned) + +
+
+ )} + + {rebuildState.phase === "error" && ( +
+
+ + Rebuild failed: {rebuildState.message} +
+
+ )}
diff --git a/src/app/(app)/telegram/_components/join-channel-dialog.tsx b/src/app/(app)/telegram/_components/join-channel-dialog.tsx new file mode 100644 index 0000000..7080507 --- /dev/null +++ b/src/app/(app)/telegram/_components/join-channel-dialog.tsx @@ -0,0 +1,179 @@ +"use client"; + +import { useState, useEffect, useCallback } from "react"; +import { Loader2, Link as LinkIcon } from "lucide-react"; +import { toast } from "sonner"; +import { joinChannelByLink } from "../actions"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, + DialogFooter, +} from "@/components/ui/dialog"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; + +interface JoinChannelDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; +} + +type JoinState = + | { phase: "idle" } + | { phase: "submitting"; requestId?: string } + | { phase: "success"; title: string } + | { phase: "error"; message: string }; + +export function JoinChannelDialog({ + open, + onOpenChange, +}: JoinChannelDialogProps) { + const [input, setInput] = useState(""); + const [joinState, setJoinState] = useState({ phase: "idle" }); + + // Reset on close + useEffect(() => { + if (!open) { + setInput(""); + setJoinState({ phase: "idle" }); + } + }, [open]); + + const pollForResult = useCallback(async (requestId: string) => { + for (let i = 0; i < 30; i++) { + await new Promise((r) => setTimeout(r, 2000)); + + try { + const res = await fetch( + `/api/telegram/worker-request?requestId=${requestId}` + ); + if (!res.ok) continue; + + const data = await res.json(); + if (data.status === "COMPLETED") { + const result = data.result; + setJoinState({ + phase: "success", + title: result?.title ?? "Unknown channel", + }); + toast.success(`Channel "${result?.title}" added as source`); + // Auto-close after short delay + setTimeout(() => onOpenChange(false), 1500); + return; + } else if (data.status === "FAILED") { + setJoinState({ + phase: "error", + message: data.error || "Failed to join channel", + }); + return; + } + } catch { + // Network error, keep polling + } + } + + setJoinState({ + phase: "error", + message: "Request timed out. The worker may be busy -- try again later.", + }); + }, [onOpenChange]); + + const handleSubmit = async () => { + if (!input.trim()) return; + + setJoinState({ phase: "submitting" }); + + try { + const result = await joinChannelByLink(input); + if (!result.success) { + setJoinState({ phase: "error", message: result.error ?? "Unknown error" }); + return; + } + + const requestId = result.data!.requestId; + setJoinState({ phase: "submitting", requestId }); + await pollForResult(requestId); + } catch (err) { + const message = err instanceof Error ? err.message : "Network error"; + setJoinState({ phase: "error", message }); + } + }; + + const isSubmitting = joinState.phase === "submitting"; + + return ( + + + + Add Channel + + Join a Telegram channel or group by link, username, or invite link. + The channel will be added as an active source. + + + +
+
+ + setInput(e.target.value)} + onKeyDown={(e) => { + if (e.key === "Enter" && !isSubmitting && input.trim()) { + handleSubmit(); + } + }} + disabled={isSubmitting} + /> +

+ Supported formats: @username, https://t.me/username, https://t.me/+invitecode +

+
+ + {joinState.phase === "submitting" && ( +
+ + {joinState.requestId + ? "Joining channel via worker..." + : "Sending request..."} +
+ )} + + {joinState.phase === "error" && ( +

{joinState.message}

+ )} + + {joinState.phase === "success" && ( +

+ Successfully added "{joinState.title}" +

+ )} +
+ + + + {joinState.phase !== "success" && ( + + )} + +
+
+ ); +} diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 3e9d240..4dd8042 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -501,6 +501,56 @@ export async function saveChannelSelections( } } +// ── Join channel by link/username ── + +/** + * Request the worker to join a channel by t.me link, invite link, or @username. + * Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify. + * Returns the requestId so the UI can poll for completion. + */ +export async function joinChannelByLink( + input: string +): Promise> { + const admin = await requireAdmin(); + if (!admin.success) return admin; + + const trimmed = input.trim(); + if (!trimmed) return { success: false, error: "Input is required" }; + + try { + // Need at least one authenticated account for TDLib + const account = await prisma.telegramAccount.findFirst({ + where: { isActive: true, authState: "AUTHENTICATED" }, + select: { id: true }, + }); + if (!account) { + return { success: false, error: "At least one authenticated account is needed" }; + } + + // Create a fetch request to track progress + const fetchRequest = await prisma.channelFetchRequest.create({ + data: { + accountId: account.id, + status: "PENDING", + }, + }); + + // Signal worker via pg_notify + await prisma.$queryRawUnsafe( + `SELECT pg_notify('join_channel', $1)`, + JSON.stringify({ + requestId: fetchRequest.id, + input: trimmed, + accountId: account.id, + }) + ); + + return { success: true, data: { requestId: fetchRequest.id } }; + } catch { + return { success: false, error: "Failed to request channel join" }; + } +} + // ── Global destination channel ── export async function setGlobalDestination( @@ -631,6 +681,63 @@ export async function createDestinationChannel( } } +/** + * Request the worker to rebuild the package database by scanning the + * destination channel for uploaded archives and recreating Package records. + * Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify. + * Returns the requestId so the UI can poll for progress. + */ +export async function rebuildPackageDatabase(): Promise< + ActionResult<{ requestId: string }> +> { + const admin = await requireAdmin(); + if (!admin.success) return admin; + + try { + // Need at least one authenticated account for TDLib + const hasAccount = await prisma.telegramAccount.findFirst({ + where: { isActive: true, authState: "AUTHENTICATED" }, + select: { id: true }, + }); + if (!hasAccount) { + return { + success: false, + error: + "At least one authenticated account is needed to scan the destination channel", + }; + } + + // Need a destination channel + const destSetting = await prisma.globalSetting.findUnique({ + where: { key: "destination_channel_id" }, + }); + if (!destSetting) { + return { + success: false, + error: "No destination channel configured", + }; + } + + // Create a fetch request to track progress + const fetchRequest = await prisma.channelFetchRequest.create({ + data: { + accountId: hasAccount.id, + status: "PENDING", + }, + }); + + // Signal worker via pg_notify + await prisma.$queryRawUnsafe( + `SELECT pg_notify('rebuild_packages', $1)`, + fetchRequest.id + ); + + return { success: true, data: { requestId: fetchRequest.id } }; + } catch { + return { success: false, error: "Failed to request package database rebuild" }; + } +} + /** * Request the worker to create a new Telegram supergroup as the destination. * Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify. diff --git a/src/app/(auth)/register/actions.ts b/src/app/(auth)/register/actions.ts index 1b47315..1d0d24d 100644 --- a/src/app/(auth)/register/actions.ts +++ b/src/app/(auth)/register/actions.ts @@ -17,15 +17,15 @@ export async function registerUser(input: unknown): Promise= invite.maxUses) { - return { success: false, error: "This invite code has already been used" }; + return { success: false, error: "This invite code has reached its maximum number of uses" }; } if (invite.expiresAt && invite.expiresAt < new Date()) { - return { success: false, error: "This invite code has expired" }; + return { success: false, error: "This invite code has expired. Please request a new one." }; } const existing = await prisma.user.findUnique({ @@ -46,6 +46,7 @@ export async function registerUser(input: unknown): Promise Invite Code - + diff --git a/src/app/api/zips/[id]/extract/[requestId]/route.ts b/src/app/api/zips/[id]/extract/[requestId]/route.ts new file mode 100644 index 0000000..b54786f --- /dev/null +++ b/src/app/api/zips/[id]/extract/[requestId]/route.ts @@ -0,0 +1,73 @@ +import { NextResponse } from "next/server"; +import { authenticateApiRequest } from "@/lib/telegram/api-auth"; +import { prisma } from "@/lib/prisma"; + +export const dynamic = "force-dynamic"; + +/** + * GET /api/zips/:id/extract/:requestId + * Get the status and/or image data for an extraction request. + * Query param: ?image=true returns the raw image bytes if completed. + * Otherwise returns status JSON. + */ +export async function GET( + request: Request, + { params }: { params: Promise<{ id: string; requestId: string }> } +) { + const authResult = await authenticateApiRequest(request); + if ("error" in authResult) return authResult.error; + + const { requestId } = await params; + const url = new URL(request.url); + const wantImage = url.searchParams.get("image") === "true"; + + if (wantImage) { + // Return the raw image bytes + const req = await prisma.archiveExtractRequest.findUnique({ + where: { id: requestId }, + select: { status: true, imageData: true, contentType: true, error: true }, + }); + + if (!req) { + return new NextResponse(null, { status: 404 }); + } + + if (req.status !== "COMPLETED" || !req.imageData) { + return NextResponse.json( + { status: req.status, error: req.error }, + { status: req.status === "FAILED" ? 400 : 202 } + ); + } + + const buffer = + req.imageData instanceof Buffer + ? req.imageData + : Buffer.from(req.imageData); + + return new NextResponse(buffer, { + status: 200, + headers: { + "Content-Type": req.contentType || "image/jpeg", + "Content-Length": String(buffer.length), + "Cache-Control": "public, max-age=3600, immutable", + }, + }); + } + + // Return status JSON (without image data to avoid large payloads) + const req = await prisma.archiveExtractRequest.findUnique({ + where: { id: requestId }, + select: { id: true, status: true, error: true, contentType: true }, + }); + + if (!req) { + return NextResponse.json({ error: "Request not found" }, { status: 404 }); + } + + return NextResponse.json({ + requestId: req.id, + status: req.status, + error: req.error, + contentType: req.contentType, + }); +} diff --git a/src/app/api/zips/[id]/extract/route.ts b/src/app/api/zips/[id]/extract/route.ts new file mode 100644 index 0000000..cb66406 --- /dev/null +++ b/src/app/api/zips/[id]/extract/route.ts @@ -0,0 +1,118 @@ +import { NextResponse } from "next/server"; +import { authenticateApiRequest } from "@/lib/telegram/api-auth"; +import { prisma } from "@/lib/prisma"; + +export const dynamic = "force-dynamic"; + +/** + * POST /api/zips/:id/extract + * Request extraction of an image from a package archive. + * Body: { filePath: string } + * Returns: { requestId: string, status: string } + * + * If a completed extraction already exists for this package+filePath, + * returns it immediately. + */ +export async function POST( + request: Request, + { params }: { params: Promise<{ id: string }> } +) { + const authResult = await authenticateApiRequest(request); + if ("error" in authResult) return authResult.error; + + const { id } = await params; + const body = await request.json(); + const filePath = body?.filePath; + + if (!filePath || typeof filePath !== "string") { + return NextResponse.json( + { error: "filePath is required" }, + { status: 400 } + ); + } + + // Verify package exists + const pkg = await prisma.package.findUnique({ + where: { id }, + select: { id: true, destChannelId: true, destMessageId: true, archiveType: true, isMultipart: true, partCount: true }, + }); + + if (!pkg) { + return NextResponse.json({ error: "Package not found" }, { status: 404 }); + } + + if (!pkg.destChannelId || !pkg.destMessageId) { + return NextResponse.json( + { error: "Package has not been uploaded to destination channel" }, + { status: 400 } + ); + } + + if (pkg.archiveType === "DOCUMENT") { + return NextResponse.json( + { error: "Cannot extract images from standalone documents" }, + { status: 400 } + ); + } + + if (pkg.isMultipart && pkg.partCount > 1) { + return NextResponse.json( + { error: "Image extraction is not supported for multipart archives" }, + { status: 400 } + ); + } + + // Check for an existing completed extraction + const existing = await prisma.archiveExtractRequest.findFirst({ + where: { + packageId: id, + filePath, + status: "COMPLETED", + imageData: { not: null }, + }, + select: { id: true, status: true }, + }); + + if (existing) { + return NextResponse.json({ + requestId: existing.id, + status: "COMPLETED", + }); + } + + // Check for an in-progress request + const pending = await prisma.archiveExtractRequest.findFirst({ + where: { + packageId: id, + filePath, + status: { in: ["PENDING", "IN_PROGRESS"] }, + }, + select: { id: true, status: true }, + }); + + if (pending) { + return NextResponse.json({ + requestId: pending.id, + status: pending.status, + }); + } + + // Create a new extraction request + const extractRequest = await prisma.archiveExtractRequest.create({ + data: { + packageId: id, + filePath, + }, + }); + + // Notify the worker via pg_notify + await prisma.$queryRawUnsafe( + `SELECT pg_notify('archive_extract', $1)`, + extractRequest.id + ); + + return NextResponse.json({ + requestId: extractRequest.id, + status: "PENDING", + }); +} diff --git a/src/app/api/zips/[id]/images/route.ts b/src/app/api/zips/[id]/images/route.ts new file mode 100644 index 0000000..7c01b78 --- /dev/null +++ b/src/app/api/zips/[id]/images/route.ts @@ -0,0 +1,56 @@ +import { NextResponse } from "next/server"; +import { authenticateApiRequest } from "@/lib/telegram/api-auth"; +import { prisma } from "@/lib/prisma"; + +export const dynamic = "force-dynamic"; + +const IMAGE_EXTENSIONS = ["jpg", "jpeg", "png", "webp", "gif", "bmp"]; + +/** + * GET /api/zips/:id/images + * Lists image files inside a package's archive (from PackageFile records). + * Returns a list of image file paths that can be used as preview candidates. + */ +export async function GET( + request: Request, + { params }: { params: Promise<{ id: string }> } +) { + const authResult = await authenticateApiRequest(request); + if ("error" in authResult) return authResult.error; + + const { id } = await params; + + const pkg = await prisma.package.findUnique({ + where: { id }, + select: { id: true, archiveType: true }, + }); + + if (!pkg) { + return NextResponse.json({ error: "Package not found" }, { status: 404 }); + } + + const images = await prisma.packageFile.findMany({ + where: { + packageId: id, + extension: { in: IMAGE_EXTENSIONS }, + }, + orderBy: { path: "asc" }, + select: { + id: true, + path: true, + fileName: true, + extension: true, + uncompressedSize: true, + }, + }); + + const mapped = images.map((img) => ({ + id: img.id, + path: img.path, + fileName: img.fileName, + extension: img.extension, + size: img.uncompressedSize.toString(), + })); + + return NextResponse.json({ images: mapped }); +} diff --git a/worker/Dockerfile b/worker/Dockerfile index 5a53dc4..491a0a0 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -3,7 +3,7 @@ FROM node:20-bookworm-slim AS deps RUN sed -i 's/^Components: main$/Components: main non-free/' /etc/apt/sources.list.d/debian.sources && \ apt-get update && apt-get install -y \ - libssl-dev zlib1g-dev unrar p7zip-full \ + libssl-dev zlib1g-dev unzip unrar p7zip-full \ && rm -rf /var/lib/apt/lists/* WORKDIR /app @@ -26,7 +26,7 @@ FROM node:20-bookworm-slim AS runner RUN sed -i 's/^Components: main$/Components: main non-free/' /etc/apt/sources.list.d/debian.sources && \ apt-get update && apt-get install -y \ - libssl3 zlib1g unrar p7zip-full \ + libssl3 zlib1g unzip unrar p7zip-full \ && rm -rf /var/lib/apt/lists/* WORKDIR /app diff --git a/worker/src/archive/extract-image.ts b/worker/src/archive/extract-image.ts new file mode 100644 index 0000000..0ad3b52 --- /dev/null +++ b/worker/src/archive/extract-image.ts @@ -0,0 +1,33 @@ +import path from "path"; + +const IMAGE_EXTENSIONS = new Set(["jpg", "jpeg", "png", "webp", "gif", "bmp"]); + +/** + * Check if a file path within an archive is an image. + */ +export function isImageFile(filePath: string): boolean { + const ext = path.extname(filePath).toLowerCase().slice(1); + return IMAGE_EXTENSIONS.has(ext); +} + +/** + * Get the MIME type for an image file extension. + */ +export function getImageMimeType(filePath: string): string { + const ext = path.extname(filePath).toLowerCase().slice(1); + switch (ext) { + case "jpg": + case "jpeg": + return "image/jpeg"; + case "png": + return "image/png"; + case "webp": + return "image/webp"; + case "gif": + return "image/gif"; + case "bmp": + return "image/bmp"; + default: + return "application/octet-stream"; + } +} diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index e8058e7..c238507 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -438,3 +438,35 @@ export async function getExistingChannelsByTelegramId(): Promise { + const request = await db.archiveExtractRequest.findUnique({ + where: { id: requestId }, + include: { + package: { + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + destChannelId: true, + destMessageId: true, + isMultipart: true, + partCount: true, + }, + }, + }, + }); + + if (!request || request.status !== "PENDING") { + log.debug({ requestId }, "Extract request not found or not pending"); + return; + } + + const pkg = request.package; + if (!pkg.destChannelId || !pkg.destMessageId) { + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { status: "FAILED", error: "Package has no destination upload" }, + }); + return; + } + + // Multipart archives require downloading and reassembling all parts, + // which is too complex for on-demand extraction. Reject early. + if (pkg.isMultipart && pkg.partCount > 1) { + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { status: "FAILED", error: "Image extraction is not supported for multipart archives" }, + }); + return; + } + + // Check for a cached result first: if another request for the same + // package+filePath already completed, reuse its data. + const cached = await db.archiveExtractRequest.findFirst({ + where: { + packageId: pkg.id, + filePath: request.filePath, + status: "COMPLETED", + imageData: { not: null }, + id: { not: requestId }, + }, + select: { imageData: true, contentType: true }, + }); + + if (cached?.imageData) { + log.info({ requestId, filePath: request.filePath }, "Reusing cached extraction result"); + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { + status: "COMPLETED", + imageData: cached.imageData, + contentType: cached.contentType, + }, + }); + return; + } + + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { status: "IN_PROGRESS" }, + }); + + log.info( + { requestId, packageId: pkg.id, filePath: request.filePath, archiveType: pkg.archiveType }, + "Processing extract request" + ); + + const tempDir = path.join(config.tempDir, `extract_${requestId}`); + + try { + await mkdir(tempDir, { recursive: true }); + + // Wrap the entire TDLib session in the mutex so no other TDLib + // operation can run concurrently (TDLib is single-session). + await withTdlibMutex("extract", async () => { + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + throw new Error("No authenticated Telegram accounts available"); + } + + const account = accounts[0]; + const client = await createTdlibClient({ id: account.id, phone: account.phone }); + + try { + // Load chat list so TDLib can find the dest channel + try { + await client.invoke({ + _: "getChats", + chat_list: { _: "chatListMain" }, + limit: 1000, + }); + } catch { + // May already be loaded + } + + // Get the dest channel telegram ID + const destChannel = await db.telegramChannel.findUnique({ + where: { id: pkg.destChannelId! }, + select: { telegramId: true }, + }); + + if (!destChannel) { + throw new Error("Destination channel not found in DB"); + } + + const chatId = Number(destChannel.telegramId); + const messageId = Number(pkg.destMessageId); + + // Get the file_id from the destination message + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const message = await client.invoke({ + _: "getMessage", + chat_id: chatId, + message_id: messageId, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + }) as any; + + const doc = message?.content?.document; + if (!doc?.document?.id) { + throw new Error("Could not find document in destination message"); + } + + const fileId = String(doc.document.id); + const fileName = doc.file_name || pkg.fileName; + const archivePath = path.join(tempDir, fileName); + + log.info( + { requestId, fileName, fileId, chatId, messageId }, + "Downloading archive for extraction" + ); + + await downloadFile( + client, + fileId, + archivePath, + pkg.fileSize, + fileName + ); + + // Extract the requested image using the existing CLI-based extractor. + // This pipes the file to stdout (no temp files needed for the extracted image). + const imageData = await extractPreviewImage( + archivePath, + pkg.archiveType as "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT", + request.filePath + ); + + if (!imageData) { + throw new Error(`Could not extract "${request.filePath}" from archive`); + } + + // Cap at 5MB for safety + if (imageData.length > 5 * 1024 * 1024) { + throw new Error(`Extracted image is too large (${(imageData.length / 1024 / 1024).toFixed(1)}MB)`); + } + + const contentType = getImageMimeType(request.filePath); + + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { + status: "COMPLETED", + imageData: new Uint8Array(imageData), + contentType, + }, + }); + + log.info( + { requestId, filePath: request.filePath, bytes: imageData.length }, + "Image extracted successfully" + ); + } finally { + await closeTdlibClient(client).catch(() => {}); + } + }); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.error({ err, requestId }, "Extract request failed"); + await db.archiveExtractRequest.update({ + where: { id: requestId }, + data: { status: "FAILED", error: errMsg }, + }).catch(() => {}); + } finally { + await rm(tempDir, { recursive: true, force: true }).catch(() => {}); + } +} diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index 3d49e38..59340ce 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -3,7 +3,9 @@ import { pool } from "./db/client.js"; import { childLogger } from "./util/logger.js"; import { withTdlibMutex } from "./util/mutex.js"; import { processFetchRequest } from "./worker.js"; -import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; +import { processExtractRequest } from "./extract-listener.js"; +import { rebuildPackageDatabase } from "./rebuild.js"; +import { generateInviteLink, createSupergroup, searchPublicChat } from "./tdlib/chats.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; import { triggerImmediateCycle } from "./scheduler.js"; import { @@ -13,6 +15,7 @@ import { getActiveAccounts, upsertChannel, ensureAccountChannelLink, + updateFetchRequestStatus, } from "./db/queries.js"; const log = childLogger("fetch-listener"); @@ -31,6 +34,8 @@ const RECONNECT_DELAY_MS = 5_000; * - `generate_invite` — payload = channelId → generate invite link for destination * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `ingestion_trigger` — trigger an immediate ingestion cycle + * - `join_channel` — payload = JSON { requestId, input, accountId } → join/lookup channel by link/username + * - `rebuild_packages` — payload = requestId → rebuild package DB from destination channel * * If the underlying connection is lost, the listener automatically reconnects * so that pg_notify signals are never silently dropped. @@ -47,6 +52,9 @@ async function connectListener(): Promise { await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN create_destination"); await pgClient.query("LISTEN ingestion_trigger"); + await pgClient.query("LISTEN join_channel"); + await pgClient.query("LISTEN archive_extract"); + await pgClient.query("LISTEN rebuild_packages"); pgClient.on("notification", (msg) => { if (msg.channel === "channel_fetch" && msg.payload) { @@ -57,6 +65,12 @@ async function connectListener(): Promise { handleCreateDestination(msg.payload); } else if (msg.channel === "ingestion_trigger") { handleIngestionTrigger(); + } else if (msg.channel === "join_channel" && msg.payload) { + handleJoinChannel(msg.payload); + } else if (msg.channel === "archive_extract" && msg.payload) { + handleArchiveExtract(msg.payload); + } else if (msg.channel === "rebuild_packages" && msg.payload) { + handleRebuildPackages(msg.payload); } }); @@ -82,7 +96,7 @@ async function connectListener(): Promise { } }); - log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); + log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger, join_channel, archive_extract, rebuild_packages)"); } catch (err) { log.error({ err }, "Failed to start fetch listener — retrying"); scheduleReconnect(); @@ -260,6 +274,217 @@ function handleCreateDestination(payload: string): void { }); } +// ── Join channel handler ── + +/** + * Parse a Telegram link/username into its type and identifier. + * + * Supported formats: + * - @username or username → public chat search + * - https://t.me/username → public chat search + * - https://t.me/+INVITE_HASH → join by invite link + * - https://t.me/joinchat/INVITE_HASH → join by invite link (legacy) + */ +function parseTelegramInput(input: string): { type: "username"; username: string } | { type: "invite"; link: string } | null { + const trimmed = input.trim(); + + // Invite link patterns + const invitePatterns = [ + /^https?:\/\/t\.me\/\+([a-zA-Z0-9_-]+)$/, + /^https?:\/\/t\.me\/joinchat\/([a-zA-Z0-9_-]+)$/, + /^https?:\/\/telegram\.me\/\+([a-zA-Z0-9_-]+)$/, + /^https?:\/\/telegram\.me\/joinchat\/([a-zA-Z0-9_-]+)$/, + ]; + + for (const pattern of invitePatterns) { + if (pattern.test(trimmed)) { + return { type: "invite", link: trimmed }; + } + } + + // Public link: https://t.me/username + const publicLinkMatch = trimmed.match(/^https?:\/\/(?:t\.me|telegram\.me)\/([a-zA-Z][a-zA-Z0-9_]{3,31})$/); + if (publicLinkMatch) { + return { type: "username", username: publicLinkMatch[1] }; + } + + // @username or bare username + const usernameMatch = trimmed.match(/^@?([a-zA-Z][a-zA-Z0-9_]{3,31})$/); + if (usernameMatch) { + return { type: "username", username: usernameMatch[1] }; + } + + return null; +} + +function handleJoinChannel(payload: string): void { + fetchQueue = fetchQueue.then(async () => { + let requestId: string | undefined; + try { + const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; + requestId = parsed.requestId; + + await withTdlibMutex("join-channel", async () => { + await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); + + const accounts = await getActiveAccounts(); + const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0]; + if (!account) { + throw new Error("No authenticated accounts available"); + } + + const client = await createTdlibClient({ id: account.id, phone: account.phone }); + + try { + const linkInfo = parseTelegramInput(parsed.input); + if (!linkInfo) { + throw new Error( + "Invalid input. Use a t.me link (e.g. https://t.me/channel_name), " + + "an invite link (e.g. https://t.me/+abc123), or a @username." + ); + } + + let chatInfo: { chatId: bigint; title: string; type: string; isForum: boolean }; + + if (linkInfo.type === "username") { + // Public chat: search by username + const result = await searchPublicChat(client, linkInfo.username); + if (!result) { + throw new Error(`Public channel "@${linkInfo.username}" not found. Check the username and try again.`); + } + if (result.type !== "channel" && result.type !== "supergroup") { + throw new Error(`"@${linkInfo.username}" is a ${result.type}, not a channel or group. Only channels and supergroups are supported.`); + } + chatInfo = { chatId: result.chatId, title: result.title, type: result.type, isForum: result.isForum }; + } else { + // Private/invite link: join first, then get chat info + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let joinResult: any; + try { + joinResult = await client.invoke({ + _: "joinChatByInviteLink", + invite_link: linkInfo.link, + }); + } catch (joinErr: unknown) { + const msg = joinErr instanceof Error ? joinErr.message : String(joinErr); + // "INVITE_REQUEST_SENT" means the chat requires admin approval + if (msg.includes("INVITE_REQUEST_SENT")) { + throw new Error("Join request sent. An admin of that channel must approve it before it can be added."); + } + // Already a member is fine + if (!msg.includes("USER_ALREADY_PARTICIPANT") && !msg.includes("INVITE_HASH_EXPIRED")) { + throw new Error(`Failed to join via invite link: ${msg}`); + } + // If already a participant, we need to get chat info from the link + // Try checkChatInviteLink to get the chat id + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const checkResult = (await client.invoke({ + _: "checkChatInviteLink", + invite_link: linkInfo.link, + })) as any; + if (checkResult.chat_id) { + joinResult = { id: checkResult.chat_id }; + } else { + throw joinErr; + } + } catch { + throw joinErr; + } + } + + // Get full chat info + const chatId = joinResult?.id ?? joinResult?.chat_id; + if (!chatId) { + throw new Error("Joined channel but could not determine chat ID."); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const chat = (await client.invoke({ _: "getChat", chat_id: chatId })) as any; + let type: string = "other"; + let isForum = false; + + if (chat.type?._ === "chatTypeSupergroup") { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const sg = (await client.invoke({ + _: "getSupergroup", + supergroup_id: chat.type.supergroup_id, + })) as any; + type = sg.is_channel ? "channel" : "supergroup"; + isForum = sg.is_forum ?? false; + } catch { + type = "supergroup"; + } + } else if (chat.type?._ === "chatTypeBasicGroup") { + type = "group"; + } + + if (type !== "channel" && type !== "supergroup") { + throw new Error(`The joined chat is a ${type}, not a channel or group. Only channels and supergroups are supported.`); + } + + chatInfo = { chatId: BigInt(chatId), title: chat.title ?? "Unknown", type, isForum }; + } + + // Upsert channel in DB (active as source by default since user explicitly added it) + const channel = await upsertChannel({ + telegramId: chatInfo.chatId, + title: chatInfo.title, + type: "SOURCE", + isForum: chatInfo.isForum, + isActive: true, + }); + + // Link the account as READER + await ensureAccountChannelLink(account.id, channel.id, "READER"); + + log.info( + { channelId: channel.id, telegramId: chatInfo.chatId.toString(), title: chatInfo.title }, + "Channel joined and added" + ); + + await updateFetchRequestStatus(requestId!, "COMPLETED", { + resultJson: JSON.stringify({ + channelId: channel.id, + telegramId: chatInfo.chatId.toString(), + title: chatInfo.title, + type: chatInfo.type, + isForum: chatInfo.isForum, + }), + }); + } finally { + await closeTdlibClient(client); + } + }); + } catch (err) { + log.error({ err, payload }, "Failed to join channel"); + if (requestId) { + try { + await updateFetchRequestStatus(requestId, "FAILED", { + error: err instanceof Error ? err.message : String(err), + }); + } catch { + // Best-effort + } + } + } + }); +} + +// ── Archive extract handler ── + +function handleArchiveExtract(requestId: string): void { + fetchQueue = fetchQueue.then(async () => { + try { + log.info({ requestId }, "Archive extract request received"); + await processExtractRequest(requestId); + } catch (err) { + log.error({ err, requestId }, "Failed to process archive extract request"); + } + }); +} + // ── Ingestion trigger handler ── function handleIngestionTrigger(): void { @@ -272,3 +497,17 @@ function handleIngestionTrigger(): void { } }); } + +// ── Package database rebuild handler ── + +function handleRebuildPackages(requestId: string): void { + fetchQueue = fetchQueue.then(async () => { + try { + await withTdlibMutex("rebuild-packages", () => + rebuildPackageDatabase(requestId) + ); + } catch (err) { + log.error({ err, requestId }, "Failed to rebuild package database"); + } + }); +} diff --git a/worker/src/index.ts b/worker/src/index.ts index c02c741..dc2e556 100644 --- a/worker/src/index.ts +++ b/worker/src/index.ts @@ -3,6 +3,7 @@ import { config } from "./util/config.js"; import { logger } from "./util/logger.js"; import { markStaleRunsAsFailed } from "./db/queries.js"; import { cleanupTempDir } from "./worker.js"; +import { recoverIncompleteUploads } from "./recovery.js"; import { startScheduler, stopScheduler } from "./scheduler.js"; import { startFetchListener, stopFetchListener } from "./fetch-listener.js"; import { db, pool } from "./db/client.js"; @@ -26,6 +27,10 @@ async function main(): Promise { await cleanupTempDir(); await markStaleRunsAsFailed(); + // Verify destination messages exist for all "uploaded" packages. + // Resets any packages whose dest message is missing so they get re-processed. + await recoverIncompleteUploads(); + // Start the fetch listener (pg_notify for on-demand channel fetching) await startFetchListener(); diff --git a/worker/src/preview/extract.ts b/worker/src/preview/extract.ts new file mode 100644 index 0000000..005fcbb --- /dev/null +++ b/worker/src/preview/extract.ts @@ -0,0 +1,111 @@ +import { execFile } from "child_process"; +import { promisify } from "util"; +import { childLogger } from "../util/logger.js"; +import type { FileEntry } from "../archive/zip-reader.js"; + +const execFileAsync = promisify(execFile); +const log = childLogger("preview-extract"); + +/** Max bytes we'll accept for an extracted preview image (2MB). */ +const MAX_PREVIEW_BYTES = 2 * 1024 * 1024; + +/** Image extensions we consider valid previews, in priority order. */ +const IMAGE_EXTENSIONS = new Set(["jpg", "jpeg", "png"]); + +/** + * Pick the best preview image from the file entries list. + * + * Prefers files that look like dedicated preview images (01.jpg, insta.jpg, + * preview.jpg) over arbitrary images buried in subdirectories. + * Skips images that are suspiciously large (>2MB uncompressed). + */ +export function pickPreviewFile(entries: FileEntry[]): FileEntry | null { + const candidates = entries.filter((e) => { + if (!e.extension || !IMAGE_EXTENSIONS.has(e.extension.toLowerCase())) return false; + // Skip very large images — they're probably textures, not previews + if (e.uncompressedSize > BigInt(MAX_PREVIEW_BYTES)) return false; + return true; + }); + + if (candidates.length === 0) return null; + + // Score candidates: lower depth + preview-like names win + const scored = candidates.map((entry) => { + const depth = entry.path.split("/").length - 1; + const nameLower = entry.fileName.toLowerCase(); + + let nameScore = 10; // default + // Known preview-like names get priority + if (/^(preview|thumb|cover|insta)\b/i.test(nameLower)) { + nameScore = 0; + } else if (/^0*[1-2]\.(jpe?g|png)$/i.test(nameLower)) { + // 01.jpg, 1.jpg, 02.jpg — common preview filenames + nameScore = 1; + } else if (/^0*[3-9]\.(jpe?g|png)$/i.test(nameLower)) { + nameScore = 2; + } + + return { entry, score: nameScore + depth }; + }); + + scored.sort((a, b) => a.score - b.score); + return scored[0].entry; +} + +/** + * Extract a single file from an archive and return its contents as a Buffer. + * + * Uses the appropriate CLI tool based on archive type: + * - ZIP: unzip -p + * - RAR: unrar p -inul + * - 7Z: 7z e -so + */ +export async function extractPreviewImage( + archivePath: string, + archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT", + filePath: string +): Promise { + if (archiveType === "DOCUMENT") return null; + + try { + let stdout: Buffer; + + if (archiveType === "ZIP") { + const result = await execFileAsync("unzip", ["-p", archivePath, filePath], { + timeout: 15000, + maxBuffer: MAX_PREVIEW_BYTES, + encoding: "buffer", + }); + stdout = result.stdout as unknown as Buffer; + } else if (archiveType === "RAR") { + const result = await execFileAsync("unrar", ["p", "-inul", archivePath, filePath], { + timeout: 15000, + maxBuffer: MAX_PREVIEW_BYTES, + encoding: "buffer", + }); + stdout = result.stdout as unknown as Buffer; + } else { + // SEVEN_Z + const result = await execFileAsync("7z", ["e", "-so", archivePath, filePath], { + timeout: 15000, + maxBuffer: MAX_PREVIEW_BYTES, + encoding: "buffer", + }); + stdout = result.stdout as unknown as Buffer; + } + + if (stdout.length === 0) { + log.warn({ archivePath, filePath }, "Extracted preview image is empty"); + return null; + } + + log.debug( + { archivePath, filePath, bytes: stdout.length }, + "Extracted preview image from archive" + ); + return stdout; + } catch (err) { + log.warn({ err, archivePath, filePath }, "Failed to extract preview image from archive"); + return null; + } +} diff --git a/worker/src/rebuild.ts b/worker/src/rebuild.ts new file mode 100644 index 0000000..8116ca7 --- /dev/null +++ b/worker/src/rebuild.ts @@ -0,0 +1,411 @@ +import type { Client } from "tdl"; +import { config } from "./util/config.js"; +import { childLogger } from "./util/logger.js"; +import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; +import { invokeWithTimeout, MAX_SCAN_PAGES } from "./tdlib/download.js"; +import { isArchiveAttachment } from "./archive/detect.js"; +import { extractCreatorFromFileName } from "./archive/creator.js"; +import { groupArchiveSets } from "./archive/multipart.js"; +import type { TelegramMessage } from "./archive/multipart.js"; +import { + getActiveAccounts, + getGlobalDestinationChannel, +} from "./db/queries.js"; +import { db } from "./db/client.js"; + +const log = childLogger("rebuild"); + +export interface RebuildProgress { + status: "PENDING" | "IN_PROGRESS" | "COMPLETED" | "FAILED"; + messagesScanned: number; + documentsFound: number; + packagesCreated: number; + packagesSkipped: number; + error?: string; +} + +/** + * Scan the destination channel for uploaded archive files and rebuild + * the package database from what's actually there. + * + * Uses searchChatMessages (not getChatHistory) because the destination + * channel may be a hidden-history supergroup. + * + * For each document found: + * 1. Check if a Package record with that destMessageId already exists -> skip + * 2. Try to match by fileName to an existing package without destMessageId -> update it + * 3. Otherwise create a minimal Package record (no file listing, no content hash) + * + * This is a "best-effort" rebuild. It restores the mapping between destination + * messages and package records so that the bot can deliver files. It does NOT + * re-download archives or rebuild file listings (those require the source channel). + */ +export async function rebuildPackageDatabase( + requestId: string +): Promise { + log.info({ requestId }, "Starting package database rebuild"); + + try { + await db.channelFetchRequest.update({ + where: { id: requestId }, + data: { status: "IN_PROGRESS" }, + }); + + // Get an authenticated account for TDLib + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + throw new Error("No authenticated accounts available"); + } + + const destChannel = await getGlobalDestinationChannel(); + if (!destChannel) { + throw new Error("No destination channel configured"); + } + + const account = accounts[0]; + const client = await createTdlibClient({ + id: account.id, + phone: account.phone, + }); + + try { + const progress: RebuildProgress = { + status: "IN_PROGRESS", + messagesScanned: 0, + documentsFound: 0, + packagesCreated: 0, + packagesSkipped: 0, + }; + + // Write initial progress + await updateRebuildProgress(requestId, progress); + + // Scan the destination channel for all document messages + const archiveMessages = await scanDestinationChannel( + client, + destChannel.telegramId, + async (scanned) => { + progress.messagesScanned = scanned; + await updateRebuildProgress(requestId, progress); + } + ); + + progress.documentsFound = archiveMessages.length; + await updateRebuildProgress(requestId, progress); + + log.info( + { + messagesScanned: progress.messagesScanned, + documentsFound: archiveMessages.length, + }, + "Destination channel scan complete" + ); + + // Group into archive sets (handles multipart) + const archiveSets = groupArchiveSets(archiveMessages); + + log.info( + { archiveSets: archiveSets.length, totalMessages: archiveMessages.length }, + "Grouped into archive sets" + ); + + // Get ALL source channels so we can try to match + const sourceChannels = await db.telegramChannel.findMany({ + where: { type: "SOURCE" }, + select: { id: true, title: true }, + }); + // Use the first source channel as a fallback for unmatched packages + const fallbackSourceId = sourceChannels[0]?.id ?? null; + + // Process each archive set + for (const archiveSet of archiveSets) { + const firstPart = archiveSet.parts[0]; + const fileName = firstPart.fileName; + const destMessageId = firstPart.id; + const totalSize = archiveSet.parts.reduce( + (sum, p) => sum + p.fileSize, + 0n + ); + + // 1. Check if a package with this destMessageId already exists + const existingByDest = await db.package.findFirst({ + where: { + destChannelId: destChannel.id, + destMessageId, + }, + select: { id: true }, + }); + + if (existingByDest) { + progress.packagesSkipped++; + await updateRebuildProgress(requestId, progress); + continue; + } + + // 2. Try to match by fileName to an existing package without destMessageId + const existingByName = await db.package.findFirst({ + where: { + fileName, + destMessageId: null, + }, + select: { id: true }, + }); + + if (existingByName) { + // Update existing record with destination info + await db.package.update({ + where: { id: existingByName.id }, + data: { + destChannelId: destChannel.id, + destMessageId, + isMultipart: archiveSet.parts.length > 1, + partCount: archiveSet.parts.length, + }, + }); + progress.packagesCreated++; + log.debug({ fileName, destMessageId: Number(destMessageId) }, "Updated existing package with dest info"); + await updateRebuildProgress(requestId, progress); + continue; + } + + // 3. Create a new minimal Package record + // We don't have the source message or content hash, so generate a placeholder hash + const placeholderHash = `rebuild:${destChannel.id}:${destMessageId}`; + const creator = extractCreatorFromFileName(fileName) ?? null; + const archiveType = archiveSet.type; + + // We need a sourceChannelId (required FK). Use fallback if available. + if (!fallbackSourceId) { + log.warn( + { fileName }, + "No source channels exist — cannot create package record without a source channel" + ); + progress.packagesSkipped++; + await updateRebuildProgress(requestId, progress); + continue; + } + + try { + await db.package.create({ + data: { + contentHash: placeholderHash, + fileName, + fileSize: totalSize, + archiveType, + sourceChannelId: fallbackSourceId, + sourceMessageId: 0n, // Unknown — rebuilt from destination + destChannelId: destChannel.id, + destMessageId, + isMultipart: archiveSet.parts.length > 1, + partCount: archiveSet.parts.length, + fileCount: 0, + creator, + }, + }); + progress.packagesCreated++; + log.debug( + { fileName, destMessageId: Number(destMessageId), creator }, + "Created new package from destination" + ); + } catch (err) { + // Unique constraint on contentHash — might be a race or duplicate + if (err instanceof Error && err.message.includes("Unique constraint")) { + log.debug({ fileName, placeholderHash }, "Package already exists (hash conflict), skipping"); + progress.packagesSkipped++; + } else { + throw err; + } + } + + await updateRebuildProgress(requestId, progress); + } + + // Done + progress.status = "COMPLETED"; + await updateRebuildProgress(requestId, progress); + + await db.channelFetchRequest.update({ + where: { id: requestId }, + data: { + status: "COMPLETED", + resultJson: JSON.stringify(progress), + }, + }); + + log.info( + { + messagesScanned: progress.messagesScanned, + documentsFound: progress.documentsFound, + packagesCreated: progress.packagesCreated, + packagesSkipped: progress.packagesSkipped, + }, + "Package database rebuild complete" + ); + } finally { + await closeTdlibClient(client); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.error({ err, requestId }, "Package database rebuild failed"); + + await db.channelFetchRequest.update({ + where: { id: requestId }, + data: { + status: "FAILED", + error: message, + resultJson: JSON.stringify({ + status: "FAILED", + error: message, + }), + }, + }); + } +} + +/** + * Scan the destination channel for document messages using searchChatMessages. + * Returns archive messages in chronological order (oldest first). + */ +async function scanDestinationChannel( + client: Client, + chatId: bigint, + onProgress?: (messagesScanned: number) => Promise +): Promise { + const archives: TelegramMessage[] = []; + let currentFromId = 0; + let totalScanned = 0; + let pageCount = 0; + let lastProgressUpdate = 0; + + // eslint-disable-next-line no-constant-condition + while (true) { + if (pageCount >= MAX_SCAN_PAGES) { + log.warn( + { chatId: chatId.toString(), pageCount, totalScanned }, + "Hit max page limit for destination scan, stopping" + ); + break; + } + pageCount++; + + const previousFromId = currentFromId; + + const result = await invokeWithTimeout<{ + messages?: { + id: number; + date: number; + content: { + _: string; + document?: { + file_name?: string; + document?: { + id: number; + size: number; + }; + }; + }; + }[]; + }>(client, { + _: "searchChatMessages", + chat_id: Number(chatId), + query: "", + from_message_id: currentFromId, + offset: 0, + limit: 100, + filter: { _: "searchMessagesFilterDocument" }, + sender_id: null, + message_thread_id: 0, + saved_messages_topic_id: 0, + }); + + if (!result.messages || result.messages.length === 0) break; + + totalScanned += result.messages.length; + + for (const msg of result.messages) { + const doc = msg.content?.document; + if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) { + archives.push({ + id: BigInt(msg.id), + fileName: doc.file_name, + fileId: String(doc.document.id), + fileSize: BigInt(doc.document.size), + date: new Date(msg.date * 1000), + }); + } + } + + // Throttle progress updates to every 2 seconds + const now = Date.now(); + if (onProgress && now - lastProgressUpdate >= 2000) { + lastProgressUpdate = now; + await onProgress(totalScanned); + } + + currentFromId = result.messages[result.messages.length - 1].id; + + // Stuck detection + if (currentFromId === previousFromId) { + log.warn( + { chatId: chatId.toString(), currentFromId, totalScanned }, + "Pagination stuck, breaking" + ); + break; + } + + if (result.messages.length < 100) break; + + await sleep(config.apiDelayMs); + } + + // Final progress update + if (onProgress) { + await onProgress(totalScanned); + } + + log.info( + { + chatId: chatId.toString(), + archives: archives.length, + totalScanned, + pages: pageCount, + }, + "Destination channel scan complete" + ); + + // Reverse to chronological order (oldest first) + return archives.reverse(); +} + +/** + * Update the rebuild progress in the fetch request's resultJson field. + * Throttled to avoid excessive DB writes. + */ +let lastUpdateTime = 0; +async function updateRebuildProgress( + requestId: string, + progress: RebuildProgress +): Promise { + const now = Date.now(); + // Throttle to every 2 seconds, but always write for status changes + if ( + progress.status !== "IN_PROGRESS" || + now - lastUpdateTime >= 2000 + ) { + lastUpdateTime = now; + try { + await db.channelFetchRequest.update({ + where: { id: requestId }, + data: { + resultJson: JSON.stringify(progress), + }, + }); + } catch { + // Best-effort + } + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/worker/src/recovery.ts b/worker/src/recovery.ts new file mode 100644 index 0000000..cabe2f8 --- /dev/null +++ b/worker/src/recovery.ts @@ -0,0 +1,187 @@ +import { childLogger } from "./util/logger.js"; +import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; +import { withFloodWait } from "./util/retry.js"; +import { + getActiveAccounts, + getPackagesWithDestMessage, + resetPackageDestination, + getGlobalDestinationChannel, +} from "./db/queries.js"; +import type { Client } from "tdl"; + +const log = childLogger("recovery"); + +/** + * Verify that destination messages still exist in Telegram for all + * packages that claim to be uploaded. If a message is missing (deleted + * or never actually committed), reset the package so the next ingestion + * run will re-download and re-upload it. + * + * This handles the case where the worker crashed mid-upload: TDLib may + * have returned a temporary message ID that was stored as destMessageId + * but the upload never completed server-side, or the message was later + * deleted from the destination channel. + * + * Called once on worker startup, before the scheduler begins. + */ +export async function recoverIncompleteUploads(): Promise { + const packages = await getPackagesWithDestMessage(); + if (packages.length === 0) { + log.debug("No packages with destination messages to verify"); + return; + } + + // We need a TDLib client to verify messages. Use the first active account. + const accounts = await getActiveAccounts(); + if (accounts.length === 0) { + log.info("No active accounts available for upload verification, skipping recovery"); + return; + } + + const destChannel = await getGlobalDestinationChannel(); + if (!destChannel) { + log.info("No destination channel configured, skipping recovery"); + return; + } + + // Group packages by destChannelId for efficient verification + const byChannel = new Map(); + for (const pkg of packages) { + const channelId = pkg.destChannelId!; + if (!byChannel.has(channelId)) { + byChannel.set(channelId, []); + } + byChannel.get(channelId)!.push(pkg); + } + + log.info( + { totalPackages: packages.length, channels: byChannel.size }, + "Verifying destination messages exist in Telegram" + ); + + const account = accounts[0]; + let client: Client | undefined; + + try { + client = await createTdlibClient({ id: account.id, phone: account.phone }); + + // Load the chat list so TDLib can resolve chat IDs + try { + await client.invoke({ + _: "getChats", + chat_list: { _: "chatListMain" }, + limit: 1000, + }); + } catch { + // May already be loaded + } + + let resetCount = 0; + let verifiedCount = 0; + + for (const [, channelPackages] of byChannel) { + for (const pkg of channelPackages) { + const exists = await verifyMessageExists( + client, + destChannel.telegramId, + pkg.destMessageId! + ); + + if (exists) { + verifiedCount++; + } else { + log.warn( + { + packageId: pkg.id, + fileName: pkg.fileName, + destMessageId: Number(pkg.destMessageId), + }, + "Destination message missing in Telegram, resetting package for re-upload" + ); + await resetPackageDestination(pkg.id); + resetCount++; + } + } + } + + if (resetCount > 0) { + log.info( + { resetCount, verifiedCount, totalChecked: packages.length }, + "Upload recovery complete — packages reset for re-processing" + ); + } else { + log.info( + { verifiedCount, totalChecked: packages.length }, + "Upload recovery complete — all destination messages verified" + ); + } + } catch (err) { + log.error({ err }, "Upload recovery failed (non-fatal, will retry next startup)"); + } finally { + if (client) { + await closeTdlibClient(client); + } + } +} + +/** + * Check whether a message exists in a Telegram chat. + * Returns false if the message was deleted or never existed. + */ +async function verifyMessageExists( + client: Client, + chatTelegramId: bigint, + messageId: bigint +): Promise { + try { + const result = await withFloodWait( + () => + client.invoke({ + _: "getMessage", + chat_id: Number(chatTelegramId), + message_id: Number(messageId), + }), + "getMessage:verify" + ); + + // TDLib returns the message object if it exists. + // A deleted message may return with content type "messageChatDeleteMessage" + // or the call may throw. Check that we got a real message with content. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const msg = result as any; + if (!msg || !msg.content) { + return false; + } + + // Check that the message has document content (our uploads are documents) + // A message that exists but has no document content was likely cleared/replaced + if (msg.content._ !== "messageDocument") { + log.debug( + { + messageId: Number(messageId), + contentType: msg.content._, + }, + "Destination message exists but is not a document" + ); + return false; + } + + return true; + } catch (err) { + // TDLib throws "Message not found" (error code 404) for deleted messages + const message = err instanceof Error ? err.message : String(err); + const code = (err as { code?: number })?.code; + + if (code === 404 || message.includes("not found") || message.includes("Not Found")) { + return false; + } + + // For other errors (network issues, etc.), assume the message exists + // to avoid incorrectly resetting packages due to transient failures + log.warn( + { err, messageId: Number(messageId) }, + "Could not verify message (assuming it exists)" + ); + return true; + } +} diff --git a/worker/src/tdlib/chats.ts b/worker/src/tdlib/chats.ts index 1929816..0787ed7 100644 --- a/worker/src/tdlib/chats.ts +++ b/worker/src/tdlib/chats.ts @@ -176,6 +176,63 @@ export async function joinChatByInviteLink( log.info({ inviteLink }, "Joined chat by invite link"); } +/** + * Search for a public chat by username. + * Returns the chat info if found, or null if not found. + */ +export async function searchPublicChat( + client: Client, + username: string +): Promise { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const chat = (await withFloodWait( + () => client.invoke({ + _: "searchPublicChat", + username, + }), + "searchPublicChat" + )) as any; + + const chatType = chat.type?._; + let type: TelegramChatInfo["type"] = "other"; + let isForum = false; + + if (chatType === "chatTypeSupergroup") { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const sg = (await withFloodWait( + () => client.invoke({ + _: "getSupergroup", + supergroup_id: chat.type.supergroup_id, + }), + "getSupergroup" + )) as any; + + type = sg.is_channel ? "channel" : "supergroup"; + isForum = sg.is_forum ?? false; + } catch { + type = "supergroup"; + } + } else if (chatType === "chatTypeBasicGroup") { + type = "group"; + } else if (chatType === "chatTypePrivate" || chatType === "chatTypeSecret") { + type = "private"; + } + + log.info({ username, chatId: chat.id, type }, "Found public chat"); + return { + chatId: BigInt(chat.id), + title: chat.title ?? username, + type, + isForum, + }; + } catch (err) { + log.warn({ username, err }, "Public chat not found"); + return null; + } +} + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index bc6b0a7..1ee30f2 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -34,6 +34,7 @@ import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdli import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js"; import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js"; import { matchPreviewToArchive } from "./preview/match.js"; +import { pickPreviewFile, extractPreviewImage } from "./preview/extract.js"; import { groupArchiveSets } from "./archive/multipart.js"; import type { ArchiveSet } from "./archive/multipart.js"; import { extractCreatorFromFileName, extractCreatorFromChannelTitle } from "./archive/creator.js"; @@ -971,6 +972,23 @@ async function processOneArchiveSet( previewMsgId = matchedPhoto.id; } + // ── Fallback: extract preview image from inside the archive ── + if (!previewData && entries.length > 0 && archiveSet.type !== "DOCUMENT") { + const previewEntry = pickPreviewFile(entries); + if (previewEntry) { + accountLog.debug( + { fileName: archiveName, previewFile: previewEntry.path }, + "Attempting to extract preview image from archive" + ); + const archiveTypeForExtract = archiveSet.type === "7Z" ? "SEVEN_Z" as const : archiveSet.type as "ZIP" | "RAR"; + previewData = await extractPreviewImage( + tempPaths[0], + archiveTypeForExtract, + previewEntry.path + ); + } + } + // ── Resolve creator: topic name > filename extraction > channel title > null ── const creator = topicCreator ?? extractCreatorFromFileName(archiveName)