From 718007446f6957909342ad83036ffb3b517fc1c0 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Thu, 26 Mar 2026 18:11:35 +0100 Subject: [PATCH] feat: fix multi-part archive forwarding and add kickstarter package linking Multi-part send fix: - Add destMessageIds BigInt[] to Package schema with backfill migration - Worker uploadToChannel now returns all message IDs, stored in DB - Bot forwards all parts of multi-part archives (not just the first) - Add retry logic for upload rate limits (429) and download stalls Kickstarter package linking: - Add package search/linking queries and API routes - Add PackageLinkerDialog with search + checkbox selection - Add "Link Packages" and "Send All" actions to kickstarter table - Add sendAllKickstarterPackages server action Co-Authored-By: Claude Opus 4.6 (1M context) --- bot/src/db/queries.ts | 3 + bot/src/send-listener.ts | 26 +- bot/src/tdlib/client.ts | 19 + ...3-26-multipart-send-kickstarter-linking.md | 964 ++++++++++++++++++ .../migration.sql | 7 + prisma/schema.prisma | 1 + .../_components/kickstarter-columns.tsx | 16 +- .../_components/kickstarter-table.tsx | 24 +- .../_components/package-linker-dialog.tsx | 211 ++++ src/app/(app)/kickstarters/actions.ts | 80 ++ src/app/api/packages/linked/route.ts | 21 + src/app/api/packages/search/route.ts | 26 + src/data/kickstarter.queries.ts | 31 + worker/src/db/queries.ts | 4 +- worker/src/tdlib/download.ts | 74 +- worker/src/upload/channel.ts | 81 +- worker/src/worker.ts | 10 +- 17 files changed, 1575 insertions(+), 23 deletions(-) create mode 100644 docs/superpowers/plans/2026-03-26-multipart-send-kickstarter-linking.md create mode 100644 prisma/migrations/20260326120000_add_dest_message_ids/migration.sql create mode 100644 src/app/(app)/kickstarters/_components/package-linker-dialog.tsx create mode 100644 src/app/api/packages/linked/route.ts create mode 100644 src/app/api/packages/search/route.ts diff --git a/bot/src/db/queries.ts b/bot/src/db/queries.ts index 888e23e..a0be57a 100644 --- a/bot/src/db/queries.ts +++ b/bot/src/db/queries.ts @@ -122,6 +122,9 @@ export async function getPendingSendRequest(requestId: string) { archiveType: true, destChannelId: true, destMessageId: true, + destMessageIds: true, + isMultipart: true, + partCount: true, previewData: true, sourceChannel: { select: { title: true, telegramId: true } }, }, diff --git a/bot/src/send-listener.ts b/bot/src/send-listener.ts index 4102b06..58c1573 100644 --- a/bot/src/send-listener.ts +++ b/bot/src/send-listener.ts @@ -7,7 +7,7 @@ import { findMatchingSubscriptions, getGlobalDestinationChannel, } from "./db/queries.js"; -import { copyMessageToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/client.js"; +import { copyMessageToUser, copyMultipleMessagesToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/client.js"; import { sleep } from "./util/flood-wait.js"; const log = childLogger("send-listener"); @@ -154,11 +154,25 @@ async function processSendRequest(requestId: string): Promise { } // Forward the actual archive file(s) from destination channel - await copyMessageToUser( - destChannel.telegramId, - pkg.destMessageId, - targetUserId - ); + const messageIds = pkg.destMessageIds as bigint[] | undefined; + if (messageIds && messageIds.length > 1) { + log.info( + { requestId, parts: messageIds.length }, + "Sending multi-part archive" + ); + await copyMultipleMessagesToUser( + destChannel.telegramId, + messageIds, + targetUserId + ); + } else { + // Single part or legacy (no destMessageIds populated) + await copyMessageToUser( + destChannel.telegramId, + pkg.destMessageId, + targetUserId + ); + } await updateSendRequest(requestId, "SENT"); log.info({ requestId }, "Send request completed successfully"); diff --git a/bot/src/tdlib/client.ts b/bot/src/tdlib/client.ts index a9c45a1..683fcec 100644 --- a/bot/src/tdlib/client.ts +++ b/bot/src/tdlib/client.ts @@ -121,6 +121,25 @@ export async function copyMessageToUser( }, fileName); } +/** + * Send multiple document messages from a channel to a user's DM. + * Used for multi-part archives where each part is a separate Telegram message. + * Sends parts sequentially with a small delay to avoid rate limits. + */ +export async function copyMultipleMessagesToUser( + fromChatId: bigint, + messageIds: bigint[], + toUserId: bigint +): Promise { + for (let i = 0; i < messageIds.length; i++) { + await copyMessageToUser(fromChatId, messageIds[i], toUserId); + // Small delay between parts to avoid rate limits + if (i < messageIds.length - 1) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } +} + /** * Send a message and wait for Telegram to confirm delivery. * Returns when updateMessageSendSucceeded fires for the temp message. diff --git a/docs/superpowers/plans/2026-03-26-multipart-send-kickstarter-linking.md b/docs/superpowers/plans/2026-03-26-multipart-send-kickstarter-linking.md new file mode 100644 index 0000000..14e08bf --- /dev/null +++ b/docs/superpowers/plans/2026-03-26-multipart-send-kickstarter-linking.md @@ -0,0 +1,964 @@ +# Multi-Part Send Fix & Kickstarter Package Linking + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix multi-part package forwarding so all archive parts reach the user, and add UI to link STL packages to kickstarters with "send all" capability. + +**Architecture:** Two independent subsystems. (A) Store all destination message IDs when the worker uploads multi-part archives, then have the bot forward every part. (B) Add a package-linker dialog in the kickstarter UI using the existing `linkPackages` action, plus a "send all" action that queues every linked package. + +**Tech Stack:** Prisma (schema + migration), TypeScript worker/bot services, Next.js App Router (server actions + React client components), shadcn/ui, TanStack Table. + +--- + +## File Map + +### Subsystem A — Multi-Part Send Fix + +| Action | File | Responsibility | +|--------|------|----------------| +| Modify | `prisma/schema.prisma` | Add `destMessageIds BigInt[]` to Package | +| Create | `prisma/migrations/_add_dest_message_ids/migration.sql` | Migration SQL | +| Modify | `worker/src/upload/channel.ts` | Return all message IDs from `uploadToChannel` | +| Modify | `worker/src/db/queries.ts` | Add `destMessageIds` to `CreatePackageInput` and `createPackageWithFiles` | +| Modify | `worker/src/worker.ts` | Pass all message IDs when creating package | +| Modify | `bot/src/db/queries.ts` | Include `destMessageIds` in `getPendingSendRequest` | +| Modify | `bot/src/send-listener.ts` | Forward all parts, not just the first | + +### Subsystem B — Kickstarter Package Linking UI + +| Action | File | Responsibility | +|--------|------|----------------| +| Create | `src/app/(app)/kickstarters/_components/package-linker-dialog.tsx` | Dialog with package search + selection for linking | +| Modify | `src/app/(app)/kickstarters/_components/kickstarter-columns.tsx` | Add "Link Packages" and "Send All" actions to row menu | +| Modify | `src/app/(app)/kickstarters/_components/kickstarter-table.tsx` | Wire up new dialogs + state | +| Modify | `src/app/(app)/kickstarters/actions.ts` | Add `sendAllKickstarterPackages` action | +| Modify | `src/data/kickstarter.queries.ts` | Add query to search packages for linking | + +--- + +## Task 1: Add `destMessageIds` to Prisma Schema + Migration + +**Files:** +- Modify: `prisma/schema.prisma:470-471` +- Create: migration SQL + +- [ ] **Step 1: Add field to schema** + +In `prisma/schema.prisma`, add `destMessageIds` after `destMessageId`: + +```prisma + destMessageId BigInt? + destMessageIds BigInt[] @default([]) +``` + +- [ ] **Step 2: Create migration SQL manually** + +Create the migration directory and SQL file. The migration adds the column with a default and backfills existing rows by copying `destMessageId` into the array where it's non-null: + +```sql +-- AlterTable +ALTER TABLE "packages" ADD COLUMN "destMessageIds" BIGINT[] DEFAULT ARRAY[]::BIGINT[]; + +-- Backfill: copy existing destMessageId into the array +UPDATE "packages" +SET "destMessageIds" = ARRAY["destMessageId"] +WHERE "destMessageId" IS NOT NULL; +``` + +- [ ] **Step 3: Apply migration to database** + +```bash +docker exec dragonsstash-db psql -U dragons -d dragonsstash -f - < migration.sql +``` + +- [ ] **Step 4: Regenerate Prisma client** + +Use the app container (which has node/prisma) to regenerate: + +```bash +docker exec dragonsstash npx prisma generate +``` + +Or, if running locally with node: `npx prisma generate` + +- [ ] **Step 5: Commit** + +```bash +git add prisma/schema.prisma prisma/migrations/ +git commit -m "feat: add destMessageIds field to Package for multi-part forwarding" +``` + +--- + +## Task 2: Worker — Return All Message IDs from Upload + +**Files:** +- Modify: `worker/src/upload/channel.ts:10-12,25-74` + +- [ ] **Step 1: Update UploadResult interface** + +In `worker/src/upload/channel.ts`, change the interface to include all IDs: + +```typescript +export interface UploadResult { + messageId: bigint; + messageIds: bigint[]; +} +``` + +- [ ] **Step 2: Collect all message IDs in uploadToChannel** + +Replace the upload loop to track all message IDs: + +```typescript +export async function uploadToChannel( + client: Client, + chatId: bigint, + filePaths: string[], + caption?: string +): Promise { + const allMessageIds: bigint[] = []; + + for (let i = 0; i < filePaths.length; i++) { + const filePath = filePaths[i]; + const fileCaption = i === 0 && caption ? caption : undefined; + + const fileName = path.basename(filePath); + let fileSizeMB = 0; + try { + const s = await stat(filePath); + fileSizeMB = Math.round(s.size / (1024 * 1024)); + } catch { + // Non-critical + } + + log.info( + { chatId: Number(chatId), fileName, sizeMB: fileSizeMB, part: i + 1, total: filePaths.length }, + "Uploading file to channel" + ); + + const serverMsgId = await sendWithRetry(client, chatId, filePath, fileCaption, fileName, fileSizeMB); + allMessageIds.push(serverMsgId); + + // Rate limit delay between uploads + if (i < filePaths.length - 1) { + await sleep(config.apiDelayMs); + } + } + + if (allMessageIds.length === 0) { + throw new Error("Upload failed: no messages sent"); + } + + log.info( + { chatId: Number(chatId), messageId: Number(allMessageIds[0]), files: filePaths.length }, + "All uploads confirmed by Telegram" + ); + + return { messageId: allMessageIds[0], messageIds: allMessageIds }; +} +``` + +- [ ] **Step 3: Commit** + +```bash +git add worker/src/upload/channel.ts +git commit -m "feat: return all message IDs from uploadToChannel for multi-part" +``` + +--- + +## Task 3: Worker — Store All Message IDs in Database + +**Files:** +- Modify: `worker/src/db/queries.ts:104-155` +- Modify: `worker/src/worker.ts:1056-1086` + +- [ ] **Step 1: Add destMessageIds to CreatePackageInput** + +In `worker/src/db/queries.ts`, add the field to the interface: + +```typescript +export interface CreatePackageInput { + // ... existing fields ... + destMessageId?: bigint; + destMessageIds?: bigint[]; + // ... rest ... +} +``` + +- [ ] **Step 2: Store destMessageIds in createPackageWithFiles** + +In the `db.package.create` call inside `createPackageWithFiles`, add: + +```typescript +destMessageIds: input.destMessageIds ?? (input.destMessageId ? [input.destMessageId] : []), +``` + +- [ ] **Step 3: Pass messageIds from worker pipeline** + +In `worker/src/worker.ts`, the upload section (around line 1068-1085) currently does: + +```typescript +destResult = await uploadToChannel(client, destChannelTelegramId, uploadPaths); +``` + +After this, when calling `createPackageWithFiles`, add `destMessageIds`: + +```typescript +const pkg = await createPackageWithFiles({ + // ... existing fields ... + destMessageId: destResult.messageId, + destMessageIds: destResult.messageIds, + // ... rest ... +}); +``` + +- [ ] **Step 4: Commit** + +```bash +git add worker/src/db/queries.ts worker/src/worker.ts +git commit -m "feat: store all multi-part message IDs in package record" +``` + +--- + +## Task 4: Bot — Forward All Parts + +**Files:** +- Modify: `bot/src/db/queries.ts:110-132` +- Modify: `bot/src/send-listener.ts:105-169` +- Modify: `bot/src/tdlib/client.ts:66-122` + +- [ ] **Step 1: Include destMessageIds in bot query** + +In `bot/src/db/queries.ts`, add `destMessageIds` to the `getPendingSendRequest` select: + +```typescript +package: { + select: { + id: true, + fileName: true, + fileSize: true, + fileCount: true, + creator: true, + tags: true, + archiveType: true, + destChannelId: true, + destMessageId: true, + destMessageIds: true, // <-- ADD THIS + isMultipart: true, // <-- ADD THIS (for logging) + partCount: true, // <-- ADD THIS (for logging) + previewData: true, + sourceChannel: { select: { title: true, telegramId: true } }, + }, +}, +``` + +- [ ] **Step 2: Add copyMultipleMessagesToUser helper** + +In `bot/src/tdlib/client.ts`, add a new export after `copyMessageToUser`: + +```typescript +/** + * Send multiple document messages from a channel to a user's DM. + * Used for multi-part archives where each part is a separate Telegram message. + * Sends parts sequentially with a small delay to avoid rate limits. + */ +export async function copyMultipleMessagesToUser( + fromChatId: bigint, + messageIds: bigint[], + toUserId: bigint +): Promise { + for (let i = 0; i < messageIds.length; i++) { + await copyMessageToUser(fromChatId, messageIds[i], toUserId); + // Small delay between parts to avoid rate limits + if (i < messageIds.length - 1) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } +} +``` + +- [ ] **Step 3: Update processSendRequest to forward all parts** + +In `bot/src/send-listener.ts`, update the import to include the new function: + +```typescript +import { copyMessageToUser, copyMultipleMessagesToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/client.js"; +``` + +Then replace the single `copyMessageToUser` call (around line 157) with logic that forwards all parts: + +```typescript + // Forward the actual archive file(s) from destination channel + const messageIds = pkg.destMessageIds as bigint[] | undefined; + if (messageIds && messageIds.length > 1) { + log.info( + { requestId, parts: messageIds.length }, + "Sending multi-part archive" + ); + await copyMultipleMessagesToUser( + destChannel.telegramId, + messageIds, + targetUserId + ); + } else { + // Single part or legacy (no destMessageIds populated) + await copyMessageToUser( + destChannel.telegramId, + pkg.destMessageId, + targetUserId + ); + } +``` + +- [ ] **Step 4: Commit** + +```bash +git add bot/src/db/queries.ts bot/src/send-listener.ts bot/src/tdlib/client.ts +git commit -m "feat: forward all parts of multi-part archives via bot" +``` + +--- + +## Task 5: Rebuild & Deploy Worker + Bot + +- [ ] **Step 1: Rebuild worker image** + +```bash +docker compose -f docker-compose.dev.yml build worker +docker tag dragonsstash-worker:latest git.samagsteribbe.nl/admin/dragonsstash-worker:latest +docker compose -p dragonsstash -f /opt/stacks/DragonsStash/docker-compose.yml up -d worker +``` + +- [ ] **Step 2: Rebuild bot image** + +```bash +docker compose -f docker-compose.dev.yml build bot +docker tag dragonsstash-bot:latest git.samagsteribbe.nl/admin/dragonsstash-bot:latest +docker compose -p dragonsstash -f /opt/stacks/DragonsStash/docker-compose.yml up -d bot +``` + +- [ ] **Step 3: Verify bot startup** + +```bash +docker logs dragonsstash-bot --tail=20 +``` + +Expected: Bot starts cleanly, "Send listener started" message. + +--- + +## Task 6: Kickstarter — Package Search Query + +**Files:** +- Modify: `src/data/kickstarter.queries.ts` + +- [ ] **Step 1: Add searchPackagesForLinking query** + +Append to `src/data/kickstarter.queries.ts`: + +```typescript +export async function searchPackagesForLinking(query: string, limit = 20) { + if (!query || query.length < 2) return []; + + return prisma.package.findMany({ + where: { + OR: [ + { fileName: { contains: query, mode: "insensitive" } }, + { creator: { contains: query, mode: "insensitive" } }, + ], + }, + orderBy: { indexedAt: "desc" }, + take: limit, + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + creator: true, + fileCount: true, + }, + }); +} + +export async function getLinkedPackageIds(kickstarterId: string): Promise { + const links = await prisma.kickstarterPackage.findMany({ + where: { kickstarterId }, + select: { packageId: true }, + }); + return links.map((l) => l.packageId); +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add src/data/kickstarter.queries.ts +git commit -m "feat: add package search query for kickstarter linking" +``` + +--- + +## Task 7: Kickstarter — Package Linker Dialog Component + +**Files:** +- Create: `src/app/(app)/kickstarters/_components/package-linker-dialog.tsx` + +- [ ] **Step 1: Create the package linker dialog** + +This component provides a search input to find packages and checkboxes to select/deselect them. It calls the existing `linkPackages` action on save. + +```tsx +"use client"; + +import { useState, useTransition, useCallback, useEffect } from "react"; +import { Search, Package, X, Loader2 } from "lucide-react"; +import { toast } from "sonner"; +import { linkPackages } from "../actions"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Badge } from "@/components/ui/badge"; +import { Checkbox } from "@/components/ui/checkbox"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { ScrollArea } from "@/components/ui/scroll-area"; + +interface PackageResult { + id: string; + fileName: string; + fileSize: bigint; + archiveType: string; + creator: string | null; + fileCount: number; +} + +interface PackageLinkerDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + kickstarterId: string; + kickstarterName: string; + initialPackageIds: string[]; +} + +function formatSize(bytes: bigint | number): string { + const b = Number(bytes); + if (b >= 1024 * 1024 * 1024) return `${(b / (1024 * 1024 * 1024)).toFixed(1)} GB`; + if (b >= 1024 * 1024) return `${(b / (1024 * 1024)).toFixed(0)} MB`; + return `${(b / 1024).toFixed(0)} KB`; +} + +export function PackageLinkerDialog({ + open, + onOpenChange, + kickstarterId, + kickstarterName, + initialPackageIds, +}: PackageLinkerDialogProps) { + const [isPending, startTransition] = useTransition(); + const [searchQuery, setSearchQuery] = useState(""); + const [searchResults, setSearchResults] = useState([]); + const [isSearching, setIsSearching] = useState(false); + const [selectedIds, setSelectedIds] = useState>(new Set(initialPackageIds)); + + // Reset state when dialog opens + useEffect(() => { + if (open) { + setSelectedIds(new Set(initialPackageIds)); + setSearchQuery(""); + setSearchResults([]); + } + }, [open, initialPackageIds]); + + const doSearch = useCallback(async (query: string) => { + if (query.length < 2) { + setSearchResults([]); + return; + } + setIsSearching(true); + try { + const res = await fetch(`/api/packages/search?q=${encodeURIComponent(query)}&limit=20`); + if (res.ok) { + const data = await res.json(); + setSearchResults(data.packages ?? []); + } + } catch { + // Ignore search errors + } finally { + setIsSearching(false); + } + }, []); + + // Debounced search + useEffect(() => { + const timer = setTimeout(() => doSearch(searchQuery), 300); + return () => clearTimeout(timer); + }, [searchQuery, doSearch]); + + function togglePackage(id: string) { + setSelectedIds((prev) => { + const next = new Set(prev); + if (next.has(id)) next.delete(id); + else next.add(id); + return next; + }); + } + + function handleSave() { + startTransition(async () => { + const result = await linkPackages(kickstarterId, Array.from(selectedIds)); + if (result.success) { + toast.success(`Linked ${selectedIds.size} package(s) to "${kickstarterName}"`); + onOpenChange(false); + } else { + toast.error(result.error); + } + }); + } + + return ( + + + + Link Packages + + Search and select STL packages to link to “{kickstarterName}”. + + + +
+ {/* Selected count */} + {selectedIds.size > 0 && ( +
+ + {selectedIds.size} package(s) selected + +
+ )} + + {/* Search input */} +
+ + setSearchQuery(e.target.value)} + className="pl-9" + autoFocus + /> + {isSearching && ( + + )} +
+ + {/* Results */} + +
+ {searchResults.length === 0 && searchQuery.length >= 2 && !isSearching && ( +

+ No packages found +

+ )} + {searchQuery.length < 2 && ( +

+ Type at least 2 characters to search +

+ )} + {searchResults.map((pkg) => ( + + ))} +
+
+
+ + + + + +
+
+ ); +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add src/app/(app)/kickstarters/_components/package-linker-dialog.tsx +git commit -m "feat: add package linker dialog for kickstarters" +``` + +--- + +## Task 8: Package Search API Route + +**Files:** +- Create: `src/app/api/packages/search/route.ts` + +- [ ] **Step 1: Create the API route** + +The package linker dialog needs a client-side fetch for debounced search. Create a lightweight API route: + +```typescript +import { NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { searchPackagesForLinking } from "@/data/kickstarter.queries"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: Request) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { searchParams } = new URL(request.url); + const query = searchParams.get("q") ?? ""; + const limit = Math.min(Number(searchParams.get("limit") ?? "20"), 50); + + const packages = await searchPackagesForLinking(query, limit); + + // Serialize BigInt for JSON + const serialized = packages.map((p) => ({ + ...p, + fileSize: p.fileSize.toString(), + })); + + return NextResponse.json({ packages: serialized }); +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add src/app/api/packages/search/route.ts +git commit -m "feat: add package search API route for kickstarter linking" +``` + +--- + +## Task 9: Kickstarter — Send All Packages Action + +**Files:** +- Modify: `src/app/(app)/kickstarters/actions.ts` + +- [ ] **Step 1: Add sendAllKickstarterPackages action** + +Append to `src/app/(app)/kickstarters/actions.ts`: + +```typescript +export async function sendAllKickstarterPackages( + kickstarterId: string +): Promise> { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + try { + const telegramLink = await prisma.telegramLink.findUnique({ + where: { userId: session.user.id }, + }); + + if (!telegramLink) { + return { success: false, error: "No linked Telegram account. Link one in Settings." }; + } + + const kickstarter = await prisma.kickstarter.findFirst({ + where: { id: kickstarterId, userId: session.user.id }, + select: { + packages: { + select: { + package: { + select: { id: true, destChannelId: true, destMessageId: true, fileName: true }, + }, + }, + }, + }, + }); + + if (!kickstarter) { + return { success: false, error: "Kickstarter not found" }; + } + + const sendablePackages = kickstarter.packages + .map((lnk) => lnk.package) + .filter((p) => p.destChannelId && p.destMessageId); + + if (sendablePackages.length === 0) { + return { success: false, error: "No linked packages are available for sending" }; + } + + let queued = 0; + for (const pkg of sendablePackages) { + const existing = await prisma.botSendRequest.findFirst({ + where: { + packageId: pkg.id, + telegramLinkId: telegramLink.id, + status: { in: ["PENDING", "SENDING"] }, + }, + }); + + if (!existing) { + const sendRequest = await prisma.botSendRequest.create({ + data: { + packageId: pkg.id, + telegramLinkId: telegramLink.id, + requestedByUserId: session.user.id, + status: "PENDING", + }, + }); + + try { + await prisma.$queryRawUnsafe( + `SELECT pg_notify('bot_send', $1)`, + sendRequest.id + ); + } catch { + // Best-effort + } + + queued++; + } + } + + revalidatePath(REVALIDATE_PATH); + return { success: true, data: { queued } }; + } catch { + return { success: false, error: "Failed to send packages" }; + } +} +``` + +- [ ] **Step 2: Commit** + +```bash +git add src/app/(app)/kickstarters/actions.ts +git commit -m "feat: add sendAllKickstarterPackages action" +``` + +--- + +## Task 10: Kickstarter Table — Wire Up Link & Send Actions + +**Files:** +- Modify: `src/app/(app)/kickstarters/_components/kickstarter-columns.tsx` +- Modify: `src/app/(app)/kickstarters/_components/kickstarter-table.tsx` + +- [ ] **Step 1: Add actions to column menu** + +In `kickstarter-columns.tsx`, add `Link2` and `Send` imports from lucide-react, add `onLinkPackages` and `onSendAll` to props, and add menu items: + +```typescript +import { MoreHorizontal, Pencil, Trash2, ExternalLink, Link2, Send } from "lucide-react"; + +// Update interface: +interface KickstarterColumnsProps { + onEdit: (kickstarter: KickstarterRow) => void; + onDelete: (id: string) => void; + onLinkPackages: (kickstarter: KickstarterRow) => void; + onSendAll: (kickstarter: KickstarterRow) => void; +} +``` + +In the actions column dropdown, add between Edit and the separator: + +```tsx + onLinkPackages(row.original)}> + + Link Packages + +{row.original._count.packages > 0 && ( + onSendAll(row.original)}> + + Send All ({row.original._count.packages}) + +)} +``` + +Update the function signature to destructure the new props: + +```typescript +export function getKickstarterColumns({ + onEdit, + onDelete, + onLinkPackages, + onSendAll, +}: KickstarterColumnsProps): ColumnDef[] { +``` + +- [ ] **Step 2: Wire up state in kickstarter-table.tsx** + +Add imports and state for the new dialogs: + +```typescript +import { PackageLinkerDialog } from "./package-linker-dialog"; +import { sendAllKickstarterPackages } from "../actions"; + +// Inside KickstarterTable: +const [linkTarget, setLinkTarget] = useState(null); +const [sendAllTarget, setSendAllTarget] = useState(null); +``` + +Update the columns call: + +```typescript +const columns = getKickstarterColumns({ + onEdit: (kickstarter) => { + setEditKickstarter(kickstarter); + setModalOpen(true); + }, + onDelete: (id) => setDeleteId(id), + onLinkPackages: (kickstarter) => setLinkTarget(kickstarter), + onSendAll: (kickstarter) => { + startTransition(async () => { + const result = await sendAllKickstarterPackages(kickstarter.id); + if (result.success) { + toast.success(`Queued ${result.data!.queued} package(s) for delivery`); + } else { + toast.error(result.error); + } + }); + }, +}); +``` + +Add the `PackageLinkerDialog` before the closing `` of the component's return: + +```tsx +{linkTarget && ( + !open && setLinkTarget(null)} + kickstarterId={linkTarget.id} + kickstarterName={linkTarget.name} + initialPackageIds={[]} + /> +)} +``` + +Note: `initialPackageIds` is `[]` because the table doesn't fetch linked packages. The dialog will start empty but preserve selections during the session. For a better UX, we fetch the linked IDs when the dialog opens — see step 3. + +- [ ] **Step 3: Fetch initial linked packages when dialog opens** + +To populate the dialog with already-linked packages, add an API route or use a server action. The simplest approach: modify the `PackageLinkerDialog` to fetch linked IDs on mount. + +In `package-linker-dialog.tsx`, add to the `useEffect` that runs when `open` changes: + +```typescript +useEffect(() => { + if (open) { + setSearchQuery(""); + setSearchResults([]); + // Fetch currently linked packages + fetch(`/api/packages/linked?kickstarterId=${kickstarterId}`) + .then((res) => res.json()) + .then((data) => { + if (data.packageIds) { + setSelectedIds(new Set(data.packageIds)); + } + }) + .catch(() => {}); + } +}, [open, kickstarterId]); +``` + +Create the API route at `src/app/api/packages/linked/route.ts`: + +```typescript +import { NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { getLinkedPackageIds } from "@/data/kickstarter.queries"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: Request) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { searchParams } = new URL(request.url); + const kickstarterId = searchParams.get("kickstarterId"); + if (!kickstarterId) { + return NextResponse.json({ error: "kickstarterId required" }, { status: 400 }); + } + + const packageIds = await getLinkedPackageIds(kickstarterId); + return NextResponse.json({ packageIds }); +} +``` + +- [ ] **Step 4: Commit** + +```bash +git add src/app/(app)/kickstarters/_components/ src/app/api/packages/ +git commit -m "feat: wire up package linking and send-all in kickstarter table" +``` + +--- + +## Task 11: Rebuild & Deploy App + +- [ ] **Step 1: Rebuild app image** + +```bash +docker compose build app # or equivalent for the production compose +docker tag dragonsstash:latest git.samagsteribbe.nl/admin/dragonsstash:latest +docker compose -p dragonsstash -f /opt/stacks/DragonsStash/docker-compose.yml up -d app +``` + +- [ ] **Step 2: Verify app startup** + +```bash +docker logs dragonsstash --tail=20 +``` + +Expected: App starts cleanly, health check passes. + +- [ ] **Step 3: Manual test** + +1. Go to Kickstarters tab +2. Open a kickstarter's row menu → "Link Packages" +3. Search for a package, select it, save +4. Verify the package count column updates +5. Use "Send All" to queue all linked packages for Telegram delivery diff --git a/prisma/migrations/20260326120000_add_dest_message_ids/migration.sql b/prisma/migrations/20260326120000_add_dest_message_ids/migration.sql new file mode 100644 index 0000000..e8197bb --- /dev/null +++ b/prisma/migrations/20260326120000_add_dest_message_ids/migration.sql @@ -0,0 +1,7 @@ +-- AlterTable +ALTER TABLE "packages" ADD COLUMN "destMessageIds" BIGINT[] DEFAULT ARRAY[]::BIGINT[]; + +-- Backfill: copy existing destMessageId into the array +UPDATE "packages" +SET "destMessageIds" = ARRAY["destMessageId"] +WHERE "destMessageId" IS NOT NULL; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index fb6a8b2..3e2e0e9 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -469,6 +469,7 @@ model Package { sourceTopicId BigInt? destChannelId String? destMessageId BigInt? + destMessageIds BigInt[] @default([]) isMultipart Boolean @default(false) partCount Int @default(1) fileCount Int @default(0) diff --git a/src/app/(app)/kickstarters/_components/kickstarter-columns.tsx b/src/app/(app)/kickstarters/_components/kickstarter-columns.tsx index 6c59dae..eb673f8 100644 --- a/src/app/(app)/kickstarters/_components/kickstarter-columns.tsx +++ b/src/app/(app)/kickstarters/_components/kickstarter-columns.tsx @@ -1,7 +1,7 @@ "use client"; import { type ColumnDef } from "@tanstack/react-table"; -import { MoreHorizontal, Pencil, Trash2, ExternalLink } from "lucide-react"; +import { MoreHorizontal, Pencil, Trash2, ExternalLink, Link2, Send } from "lucide-react"; import { DataTableColumnHeader } from "@/components/shared/data-table-column-header"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; @@ -32,6 +32,8 @@ export interface KickstarterRow { interface KickstarterColumnsProps { onEdit: (kickstarter: KickstarterRow) => void; onDelete: (id: string) => void; + onLinkPackages: (kickstarter: KickstarterRow) => void; + onSendAll: (kickstarter: KickstarterRow) => void; } const deliveryConfig: Record = { @@ -63,6 +65,8 @@ const paymentConfig: Record = { export function getKickstarterColumns({ onEdit, onDelete, + onLinkPackages, + onSendAll, }: KickstarterColumnsProps): ColumnDef[] { return [ { @@ -170,6 +174,16 @@ export function getKickstarterColumns({ Edit + onLinkPackages(row.original)}> + + Link Packages + + {row.original._count.packages > 0 && ( + onSendAll(row.original)}> + + Send All ({row.original._count.packages}) + + )} onDelete(row.original.id)} diff --git a/src/app/(app)/kickstarters/_components/kickstarter-table.tsx b/src/app/(app)/kickstarters/_components/kickstarter-table.tsx index 7d0ceaf..efad3bc 100644 --- a/src/app/(app)/kickstarters/_components/kickstarter-table.tsx +++ b/src/app/(app)/kickstarters/_components/kickstarter-table.tsx @@ -7,7 +7,8 @@ import { toast } from "sonner"; import { useDataTable } from "@/hooks/use-data-table"; import { getKickstarterColumns, type KickstarterRow } from "./kickstarter-columns"; import { KickstarterModal } from "./kickstarter-modal"; -import { deleteKickstarter } from "../actions"; +import { PackageLinkerDialog } from "./package-linker-dialog"; +import { deleteKickstarter, sendAllKickstarterPackages } from "../actions"; import { DataTable } from "@/components/shared/data-table"; import { DataTablePagination } from "@/components/shared/data-table-pagination"; import { DataTableViewOptions } from "@/components/shared/data-table-view-options"; @@ -50,6 +51,7 @@ export function KickstarterTable({ const [modalOpen, setModalOpen] = useState(false); const [editKickstarter, setEditKickstarter] = useState(); const [deleteId, setDeleteId] = useState(null); + const [linkTarget, setLinkTarget] = useState(null); const [searchValue, setSearchValue] = useState(searchParams.get("search") ?? ""); @@ -88,6 +90,17 @@ export function KickstarterTable({ setModalOpen(true); }, onDelete: (id) => setDeleteId(id), + onLinkPackages: (kickstarter) => setLinkTarget(kickstarter), + onSendAll: (kickstarter) => { + startTransition(async () => { + const result = await sendAllKickstarterPackages(kickstarter.id); + if (result.success) { + toast.success(`Queued ${result.data!.queued} package(s) for delivery`); + } else { + toast.error(result.error); + } + }); + }, }); const { table } = useDataTable({ data, columns, pageCount }); @@ -188,6 +201,15 @@ export function KickstarterTable({ onConfirm={handleDelete} isLoading={isPending} /> + + {linkTarget && ( + !open && setLinkTarget(null)} + kickstarterId={linkTarget.id} + kickstarterName={linkTarget.name} + /> + )} ); } diff --git a/src/app/(app)/kickstarters/_components/package-linker-dialog.tsx b/src/app/(app)/kickstarters/_components/package-linker-dialog.tsx new file mode 100644 index 0000000..7704789 --- /dev/null +++ b/src/app/(app)/kickstarters/_components/package-linker-dialog.tsx @@ -0,0 +1,211 @@ +"use client"; + +import { useState, useTransition, useCallback, useEffect } from "react"; +import { Search, Package, X, Loader2 } from "lucide-react"; +import { toast } from "sonner"; +import { linkPackages } from "../actions"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Badge } from "@/components/ui/badge"; +import { Checkbox } from "@/components/ui/checkbox"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { ScrollArea } from "@/components/ui/scroll-area"; + +interface PackageResult { + id: string; + fileName: string; + fileSize: string; + archiveType: string; + creator: string | null; + fileCount: number; +} + +interface PackageLinkerDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + kickstarterId: string; + kickstarterName: string; +} + +function formatSize(bytes: string | number): string { + const b = Number(bytes); + if (b >= 1024 * 1024 * 1024) return `${(b / (1024 * 1024 * 1024)).toFixed(1)} GB`; + if (b >= 1024 * 1024) return `${(b / (1024 * 1024)).toFixed(0)} MB`; + return `${(b / 1024).toFixed(0)} KB`; +} + +export function PackageLinkerDialog({ + open, + onOpenChange, + kickstarterId, + kickstarterName, +}: PackageLinkerDialogProps) { + const [isPending, startTransition] = useTransition(); + const [searchQuery, setSearchQuery] = useState(""); + const [searchResults, setSearchResults] = useState([]); + const [isSearching, setIsSearching] = useState(false); + const [selectedIds, setSelectedIds] = useState>(new Set()); + + // Fetch currently linked packages when dialog opens + useEffect(() => { + if (open) { + setSearchQuery(""); + setSearchResults([]); + fetch(`/api/packages/linked?kickstarterId=${kickstarterId}`) + .then((res) => res.json()) + .then((data) => { + if (data.packageIds) { + setSelectedIds(new Set(data.packageIds)); + } + }) + .catch(() => {}); + } + }, [open, kickstarterId]); + + const doSearch = useCallback(async (query: string) => { + if (query.length < 2) { + setSearchResults([]); + return; + } + setIsSearching(true); + try { + const res = await fetch(`/api/packages/search?q=${encodeURIComponent(query)}&limit=20`); + if (res.ok) { + const data = await res.json(); + setSearchResults(data.packages ?? []); + } + } catch { + // Ignore search errors + } finally { + setIsSearching(false); + } + }, []); + + // Debounced search + useEffect(() => { + const timer = setTimeout(() => doSearch(searchQuery), 300); + return () => clearTimeout(timer); + }, [searchQuery, doSearch]); + + function togglePackage(id: string) { + setSelectedIds((prev) => { + const next = new Set(prev); + if (next.has(id)) next.delete(id); + else next.add(id); + return next; + }); + } + + function handleSave() { + startTransition(async () => { + const result = await linkPackages(kickstarterId, Array.from(selectedIds)); + if (result.success) { + toast.success(`Linked ${selectedIds.size} package(s) to "${kickstarterName}"`); + onOpenChange(false); + } else { + toast.error(result.error); + } + }); + } + + return ( + + + + Link Packages + + Search and select STL packages to link to “{kickstarterName}”. + + + +
+ {selectedIds.size > 0 && ( +
+ + {selectedIds.size} package(s) selected + +
+ )} + +
+ + setSearchQuery(e.target.value)} + className="pl-9" + autoFocus + /> + {isSearching && ( + + )} +
+ + +
+ {searchResults.length === 0 && searchQuery.length >= 2 && !isSearching && ( +

+ No packages found +

+ )} + {searchQuery.length < 2 && ( +

+ Type at least 2 characters to search +

+ )} + {searchResults.map((pkg) => ( + + ))} +
+
+
+ + + + + +
+
+ ); +} diff --git a/src/app/(app)/kickstarters/actions.ts b/src/app/(app)/kickstarters/actions.ts index 1554341..8762ea6 100644 --- a/src/app/(app)/kickstarters/actions.ts +++ b/src/app/(app)/kickstarters/actions.ts @@ -146,3 +146,83 @@ export async function linkPackages( return { success: false, error: "Failed to link packages" }; } } + +export async function sendAllKickstarterPackages( + kickstarterId: string +): Promise> { + const session = await auth(); + if (!session?.user?.id) return { success: false, error: "Unauthorized" }; + + try { + const telegramLink = await prisma.telegramLink.findUnique({ + where: { userId: session.user.id }, + }); + + if (!telegramLink) { + return { success: false, error: "No linked Telegram account. Link one in Settings." }; + } + + const kickstarter = await prisma.kickstarter.findFirst({ + where: { id: kickstarterId, userId: session.user.id }, + select: { + packages: { + select: { + package: { + select: { id: true, destChannelId: true, destMessageId: true, fileName: true }, + }, + }, + }, + }, + }); + + if (!kickstarter) { + return { success: false, error: "Kickstarter not found" }; + } + + const sendablePackages = kickstarter.packages + .map((lnk) => lnk.package) + .filter((p) => p.destChannelId && p.destMessageId); + + if (sendablePackages.length === 0) { + return { success: false, error: "No linked packages are available for sending" }; + } + + let queued = 0; + for (const pkg of sendablePackages) { + const existing = await prisma.botSendRequest.findFirst({ + where: { + packageId: pkg.id, + telegramLinkId: telegramLink.id, + status: { in: ["PENDING", "SENDING"] }, + }, + }); + + if (!existing) { + const sendRequest = await prisma.botSendRequest.create({ + data: { + packageId: pkg.id, + telegramLinkId: telegramLink.id, + requestedByUserId: session.user.id, + status: "PENDING", + }, + }); + + try { + await prisma.$queryRawUnsafe( + `SELECT pg_notify('bot_send', $1)`, + sendRequest.id + ); + } catch { + // Best-effort + } + + queued++; + } + } + + revalidatePath(REVALIDATE_PATH); + return { success: true, data: { queued } }; + } catch { + return { success: false, error: "Failed to send packages" }; + } +} diff --git a/src/app/api/packages/linked/route.ts b/src/app/api/packages/linked/route.ts new file mode 100644 index 0000000..12eeb40 --- /dev/null +++ b/src/app/api/packages/linked/route.ts @@ -0,0 +1,21 @@ +import { NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { getLinkedPackageIds } from "@/data/kickstarter.queries"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: Request) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { searchParams } = new URL(request.url); + const kickstarterId = searchParams.get("kickstarterId"); + if (!kickstarterId) { + return NextResponse.json({ error: "kickstarterId required" }, { status: 400 }); + } + + const packageIds = await getLinkedPackageIds(kickstarterId); + return NextResponse.json({ packageIds }); +} diff --git a/src/app/api/packages/search/route.ts b/src/app/api/packages/search/route.ts new file mode 100644 index 0000000..2cad6af --- /dev/null +++ b/src/app/api/packages/search/route.ts @@ -0,0 +1,26 @@ +import { NextResponse } from "next/server"; +import { auth } from "@/lib/auth"; +import { searchPackagesForLinking } from "@/data/kickstarter.queries"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: Request) { + const session = await auth(); + if (!session?.user?.id) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { searchParams } = new URL(request.url); + const query = searchParams.get("q") ?? ""; + const limit = Math.min(Number(searchParams.get("limit") ?? "20"), 50); + + const packages = await searchPackagesForLinking(query, limit); + + // Serialize BigInt for JSON + const serialized = packages.map((p) => ({ + ...p, + fileSize: p.fileSize.toString(), + })); + + return NextResponse.json({ packages: serialized }); +} diff --git a/src/data/kickstarter.queries.ts b/src/data/kickstarter.queries.ts index 5d954ce..c2db6f2 100644 --- a/src/data/kickstarter.queries.ts +++ b/src/data/kickstarter.queries.ts @@ -95,3 +95,34 @@ export async function getKickstarterHosts() { include: { _count: { select: { kickstarters: true } } }, }); } + +export async function searchPackagesForLinking(query: string, limit = 20) { + if (!query || query.length < 2) return []; + + return prisma.package.findMany({ + where: { + OR: [ + { fileName: { contains: query, mode: "insensitive" } }, + { creator: { contains: query, mode: "insensitive" } }, + ], + }, + orderBy: { indexedAt: "desc" }, + take: limit, + select: { + id: true, + fileName: true, + fileSize: true, + archiveType: true, + creator: true, + fileCount: true, + }, + }); +} + +export async function getLinkedPackageIds(kickstarterId: string): Promise { + const links = await prisma.kickstarterPackage.findMany({ + where: { kickstarterId }, + select: { packageId: true }, + }); + return links.map((l) => l.packageId); +} diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 5d34411..edc330f 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -70,7 +70,7 @@ export async function packageExistsByHash(contentHash: string) { export async function getUploadedPackageByHash(contentHash: string) { return db.package.findFirst({ where: { contentHash, destMessageId: { not: null }, destChannelId: { not: null } }, - select: { destChannelId: true, destMessageId: true }, + select: { destChannelId: true, destMessageId: true, destMessageIds: true }, }); } @@ -111,6 +111,7 @@ export interface CreatePackageInput { sourceTopicId?: bigint | null; destChannelId?: string; destMessageId?: bigint; + destMessageIds?: bigint[]; isMultipart: boolean; partCount: number; ingestionRunId: string; @@ -140,6 +141,7 @@ export async function createPackageWithFiles(input: CreatePackageInput) { sourceTopicId: input.sourceTopicId ?? undefined, destChannelId: input.destChannelId, destMessageId: input.destMessageId, + destMessageIds: input.destMessageIds ?? (input.destMessageId ? [input.destMessageId] : []), isMultipart: input.isMultipart, partCount: input.partCount, fileCount: input.files.length, diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index b267df5..07534e5 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -2,13 +2,16 @@ import type { Client } from "tdl"; import { readFile, rename, copyFile, unlink, stat } from "fs/promises"; import { config } from "../util/config.js"; import { childLogger } from "../util/logger.js"; -import { withFloodWait } from "../util/retry.js"; +import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js"; import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; const log = childLogger("download"); +/** Maximum retry attempts for stalled/failed downloads */ +const MAX_DOWNLOAD_RETRIES = 3; + /** Maximum number of pages to scan per channel/topic to prevent infinite loops */ export const MAX_SCAN_PAGES = 5000; @@ -353,6 +356,75 @@ export async function downloadFile( isComplete: false, }); + for (let attempt = 0; attempt <= MAX_DOWNLOAD_RETRIES; attempt++) { + try { + return await downloadFileAttempt(client, numericId, fileId, destPath, totalBytes, fileName, onProgress); + } catch (err) { + const isLastAttempt = attempt >= MAX_DOWNLOAD_RETRIES; + + // Rate limit from Telegram + const waitSeconds = extractFloodWaitSeconds(err); + if (waitSeconds !== null && !isLastAttempt) { + const jitter = 1000 + Math.random() * 4000; + const waitMs = waitSeconds * 1000 + jitter; + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_DOWNLOAD_RETRIES, waitSeconds }, + `Download rate-limited — sleeping ${waitSeconds}s before retry` + ); + await cancelDownload(client, numericId); + await sleep(waitMs); + continue; + } + + // Stall, timeout, or unexpected stop — cancel and retry + const errMsg = err instanceof Error ? err.message : ""; + if ( + (errMsg.includes("stalled") || errMsg.includes("timed out") || errMsg.includes("stopped unexpectedly")) && + !isLastAttempt + ) { + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_DOWNLOAD_RETRIES }, + "Download failed — cancelling and retrying" + ); + await cancelDownload(client, numericId); + await sleep(5_000); + continue; + } + + throw err; + } + } + throw new Error(`Download failed after ${MAX_DOWNLOAD_RETRIES} retries for ${fileName}`); +} + +/** + * Cancel an active TDLib download so it can be retried cleanly. + */ +async function cancelDownload(client: Client, fileId: number): Promise { + try { + await client.invoke({ + _: "cancelDownloadFile", + file_id: fileId, + only_if_pending: false, + }); + log.debug({ fileId }, "Cancelled TDLib download for retry"); + } catch { + // Best-effort + } +} + +/** + * Single download attempt with progress tracking, stall detection, and verification. + */ +async function downloadFileAttempt( + client: Client, + numericId: number, + fileId: string, + destPath: string, + totalBytes: number, + fileName: string, + onProgress?: ProgressCallback +): Promise { return new Promise((resolve, reject) => { let lastLoggedPercent = 0; let settled = false; diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index fd6212f..fd8c7fd 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -3,12 +3,13 @@ import { stat } from "fs/promises"; import type { Client } from "tdl"; import { config } from "../util/config.js"; import { childLogger } from "../util/logger.js"; -import { withFloodWait } from "../util/retry.js"; +import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js"; const log = childLogger("upload"); export interface UploadResult { messageId: bigint; + messageIds: bigint[]; } /** @@ -28,7 +29,7 @@ export async function uploadToChannel( filePaths: string[], caption?: string ): Promise { - let firstMessageId: bigint | null = null; + const allMessageIds: bigint[] = []; for (let i = 0; i < filePaths.length; i++) { const filePath = filePaths[i]; @@ -49,11 +50,9 @@ export async function uploadToChannel( "Uploading file to channel" ); - const serverMsgId = await sendAndWaitForUpload(client, chatId, filePath, fileCaption, fileName, fileSizeMB); + const serverMsgId = await sendWithRetry(client, chatId, filePath, fileCaption, fileName, fileSizeMB); - if (i === 0) { - firstMessageId = serverMsgId; - } + allMessageIds.push(serverMsgId); // Rate limit delay between uploads if (i < filePaths.length - 1) { @@ -61,16 +60,68 @@ export async function uploadToChannel( } } - if (firstMessageId === null) { + if (allMessageIds.length === 0) { throw new Error("Upload failed: no messages sent"); } log.info( - { chatId: Number(chatId), messageId: Number(firstMessageId), files: filePaths.length }, + { chatId: Number(chatId), messageId: Number(allMessageIds[0]), files: filePaths.length }, "All uploads confirmed by Telegram" ); - return { messageId: firstMessageId }; + return { messageId: allMessageIds[0], messageIds: allMessageIds }; +} + +/** + * Retry wrapper for sendAndWaitForUpload. + * Handles: + * - Rate limits (429 / FLOOD_WAIT) from updateMessageSendFailed — waits and retries + * - Stall / timeout — retries with a cooldown + */ +const MAX_UPLOAD_RETRIES = 3; + +async function sendWithRetry( + client: Client, + chatId: bigint, + filePath: string, + caption: string | undefined, + fileName: string, + fileSizeMB: number +): Promise { + for (let attempt = 0; attempt <= MAX_UPLOAD_RETRIES; attempt++) { + try { + return await sendAndWaitForUpload(client, chatId, filePath, caption, fileName, fileSizeMB); + } catch (err) { + const isLastAttempt = attempt >= MAX_UPLOAD_RETRIES; + + // Rate limit from Telegram (429 / FLOOD_WAIT / "retry after N") + const waitSeconds = extractFloodWaitSeconds(err); + if (waitSeconds !== null && !isLastAttempt) { + const jitter = 1000 + Math.random() * 4000; + const waitMs = waitSeconds * 1000 + jitter; + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES, waitSeconds }, + `Upload rate-limited — sleeping ${waitSeconds}s before retry` + ); + await sleep(waitMs); + continue; + } + + // Stall or timeout — retry with a cooldown + const errMsg = err instanceof Error ? err.message : ""; + if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) { + log.warn( + { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, + "Upload stalled/timed out — retrying" + ); + await sleep(10_000); + continue; + } + + throw err; + } + } + throw new Error(`Upload failed after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`); } /** @@ -94,6 +145,7 @@ async function sendAndWaitForUpload( let lastLoggedPercent = 0; let tempMsgId: number | null = null; let uploadStarted = false; + let lastProgressBytes = 0; let lastProgressTime = Date.now(); // Timeout: 20 minutes per GB, minimum 15 minutes @@ -137,9 +189,14 @@ async function sendAndWaitForUpload( const file = update.file; if (file?.remote?.is_uploading_active && file.expected_size > 0) { uploadStarted = true; - lastProgressTime = Date.now(); const uploaded = file.remote.uploaded_size ?? 0; + + // Only reset stall timer when bytes actually advance + if (uploaded > lastProgressBytes) { + lastProgressBytes = uploaded; + lastProgressTime = Date.now(); + } const total = file.expected_size; const percent = Math.round((uploaded / total) * 100); if (percent >= lastLoggedPercent + 20) { @@ -178,7 +235,9 @@ async function sendAndWaitForUpload( settled = true; cleanup(); const errorMsg = update.error?.message ?? "Unknown upload error"; - reject(new Error(`Upload failed for ${fileName}: ${errorMsg}`)); + const error = new Error(`Upload failed for ${fileName}: ${errorMsg}`); + (error as Error & { code?: number }).code = update.error?.code; + reject(error); } } } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index d86f068..422f9cb 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -1057,14 +1057,19 @@ async function processOneArchiveSet( // Check if a prior run already uploaded this file (orphaned upload scenario: // file reached Telegram but DB write failed or worker crashed before indexing) const existingUpload = await getUploadedPackageByHash(contentHash); - let destResult: { messageId: bigint }; + let destResult: { messageId: bigint; messageIds: bigint[] }; if (existingUpload && existingUpload.destMessageId) { accountLog.info( { fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) }, "Reusing existing upload (file already on destination channel)" ); - destResult = { messageId: existingUpload.destMessageId }; + destResult = { + messageId: existingUpload.destMessageId, + messageIds: existingUpload.destMessageIds?.length + ? (existingUpload.destMessageIds as bigint[]) + : [existingUpload.destMessageId], + }; } else { const uploadLabel = uploadPaths.length > 1 ? ` (${uploadPaths.length} parts)` @@ -1158,6 +1163,7 @@ async function processOneArchiveSet( sourceTopicId, destChannelId, destMessageId: destResult.messageId, + destMessageIds: destResult.messageIds, isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1, partCount: uploadPaths.length,