feat: fix multi-part archive forwarding and add kickstarter package linking
All checks were successful
continuous-integration/drone/push Build is passing

Multi-part send fix:
- Add destMessageIds BigInt[] to Package schema with backfill migration
- Worker uploadToChannel now returns all message IDs, stored in DB
- Bot forwards all parts of multi-part archives (not just the first)
- Add retry logic for upload rate limits (429) and download stalls

Kickstarter package linking:
- Add package search/linking queries and API routes
- Add PackageLinkerDialog with search + checkbox selection
- Add "Link Packages" and "Send All" actions to kickstarter table
- Add sendAllKickstarterPackages server action

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-26 18:11:35 +01:00
parent 527aca7c25
commit 718007446f
17 changed files with 1575 additions and 23 deletions

View File

@@ -70,7 +70,7 @@ export async function packageExistsByHash(contentHash: string) {
export async function getUploadedPackageByHash(contentHash: string) {
return db.package.findFirst({
where: { contentHash, destMessageId: { not: null }, destChannelId: { not: null } },
select: { destChannelId: true, destMessageId: true },
select: { destChannelId: true, destMessageId: true, destMessageIds: true },
});
}
@@ -111,6 +111,7 @@ export interface CreatePackageInput {
sourceTopicId?: bigint | null;
destChannelId?: string;
destMessageId?: bigint;
destMessageIds?: bigint[];
isMultipart: boolean;
partCount: number;
ingestionRunId: string;
@@ -140,6 +141,7 @@ export async function createPackageWithFiles(input: CreatePackageInput) {
sourceTopicId: input.sourceTopicId ?? undefined,
destChannelId: input.destChannelId,
destMessageId: input.destMessageId,
destMessageIds: input.destMessageIds ?? (input.destMessageId ? [input.destMessageId] : []),
isMultipart: input.isMultipart,
partCount: input.partCount,
fileCount: input.files.length,

View File

@@ -2,13 +2,16 @@ import type { Client } from "tdl";
import { readFile, rename, copyFile, unlink, stat } from "fs/promises";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
const log = childLogger("download");
/** Maximum retry attempts for stalled/failed downloads */
const MAX_DOWNLOAD_RETRIES = 3;
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
export const MAX_SCAN_PAGES = 5000;
@@ -353,6 +356,75 @@ export async function downloadFile(
isComplete: false,
});
for (let attempt = 0; attempt <= MAX_DOWNLOAD_RETRIES; attempt++) {
try {
return await downloadFileAttempt(client, numericId, fileId, destPath, totalBytes, fileName, onProgress);
} catch (err) {
const isLastAttempt = attempt >= MAX_DOWNLOAD_RETRIES;
// Rate limit from Telegram
const waitSeconds = extractFloodWaitSeconds(err);
if (waitSeconds !== null && !isLastAttempt) {
const jitter = 1000 + Math.random() * 4000;
const waitMs = waitSeconds * 1000 + jitter;
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_DOWNLOAD_RETRIES, waitSeconds },
`Download rate-limited — sleeping ${waitSeconds}s before retry`
);
await cancelDownload(client, numericId);
await sleep(waitMs);
continue;
}
// Stall, timeout, or unexpected stop — cancel and retry
const errMsg = err instanceof Error ? err.message : "";
if (
(errMsg.includes("stalled") || errMsg.includes("timed out") || errMsg.includes("stopped unexpectedly")) &&
!isLastAttempt
) {
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_DOWNLOAD_RETRIES },
"Download failed — cancelling and retrying"
);
await cancelDownload(client, numericId);
await sleep(5_000);
continue;
}
throw err;
}
}
throw new Error(`Download failed after ${MAX_DOWNLOAD_RETRIES} retries for ${fileName}`);
}
/**
* Cancel an active TDLib download so it can be retried cleanly.
*/
async function cancelDownload(client: Client, fileId: number): Promise<void> {
try {
await client.invoke({
_: "cancelDownloadFile",
file_id: fileId,
only_if_pending: false,
});
log.debug({ fileId }, "Cancelled TDLib download for retry");
} catch {
// Best-effort
}
}
/**
* Single download attempt with progress tracking, stall detection, and verification.
*/
async function downloadFileAttempt(
client: Client,
numericId: number,
fileId: string,
destPath: string,
totalBytes: number,
fileName: string,
onProgress?: ProgressCallback
): Promise<void> {
return new Promise<void>((resolve, reject) => {
let lastLoggedPercent = 0;
let settled = false;

View File

@@ -3,12 +3,13 @@ import { stat } from "fs/promises";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
const log = childLogger("upload");
export interface UploadResult {
messageId: bigint;
messageIds: bigint[];
}
/**
@@ -28,7 +29,7 @@ export async function uploadToChannel(
filePaths: string[],
caption?: string
): Promise<UploadResult> {
let firstMessageId: bigint | null = null;
const allMessageIds: bigint[] = [];
for (let i = 0; i < filePaths.length; i++) {
const filePath = filePaths[i];
@@ -49,11 +50,9 @@ export async function uploadToChannel(
"Uploading file to channel"
);
const serverMsgId = await sendAndWaitForUpload(client, chatId, filePath, fileCaption, fileName, fileSizeMB);
const serverMsgId = await sendWithRetry(client, chatId, filePath, fileCaption, fileName, fileSizeMB);
if (i === 0) {
firstMessageId = serverMsgId;
}
allMessageIds.push(serverMsgId);
// Rate limit delay between uploads
if (i < filePaths.length - 1) {
@@ -61,16 +60,68 @@ export async function uploadToChannel(
}
}
if (firstMessageId === null) {
if (allMessageIds.length === 0) {
throw new Error("Upload failed: no messages sent");
}
log.info(
{ chatId: Number(chatId), messageId: Number(firstMessageId), files: filePaths.length },
{ chatId: Number(chatId), messageId: Number(allMessageIds[0]), files: filePaths.length },
"All uploads confirmed by Telegram"
);
return { messageId: firstMessageId };
return { messageId: allMessageIds[0], messageIds: allMessageIds };
}
/**
* Retry wrapper for sendAndWaitForUpload.
* Handles:
* - Rate limits (429 / FLOOD_WAIT) from updateMessageSendFailed — waits and retries
* - Stall / timeout — retries with a cooldown
*/
const MAX_UPLOAD_RETRIES = 3;
async function sendWithRetry(
client: Client,
chatId: bigint,
filePath: string,
caption: string | undefined,
fileName: string,
fileSizeMB: number
): Promise<bigint> {
for (let attempt = 0; attempt <= MAX_UPLOAD_RETRIES; attempt++) {
try {
return await sendAndWaitForUpload(client, chatId, filePath, caption, fileName, fileSizeMB);
} catch (err) {
const isLastAttempt = attempt >= MAX_UPLOAD_RETRIES;
// Rate limit from Telegram (429 / FLOOD_WAIT / "retry after N")
const waitSeconds = extractFloodWaitSeconds(err);
if (waitSeconds !== null && !isLastAttempt) {
const jitter = 1000 + Math.random() * 4000;
const waitMs = waitSeconds * 1000 + jitter;
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES, waitSeconds },
`Upload rate-limited — sleeping ${waitSeconds}s before retry`
);
await sleep(waitMs);
continue;
}
// Stall or timeout — retry with a cooldown
const errMsg = err instanceof Error ? err.message : "";
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
"Upload stalled/timed out — retrying"
);
await sleep(10_000);
continue;
}
throw err;
}
}
throw new Error(`Upload failed after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`);
}
/**
@@ -94,6 +145,7 @@ async function sendAndWaitForUpload(
let lastLoggedPercent = 0;
let tempMsgId: number | null = null;
let uploadStarted = false;
let lastProgressBytes = 0;
let lastProgressTime = Date.now();
// Timeout: 20 minutes per GB, minimum 15 minutes
@@ -137,9 +189,14 @@ async function sendAndWaitForUpload(
const file = update.file;
if (file?.remote?.is_uploading_active && file.expected_size > 0) {
uploadStarted = true;
lastProgressTime = Date.now();
const uploaded = file.remote.uploaded_size ?? 0;
// Only reset stall timer when bytes actually advance
if (uploaded > lastProgressBytes) {
lastProgressBytes = uploaded;
lastProgressTime = Date.now();
}
const total = file.expected_size;
const percent = Math.round((uploaded / total) * 100);
if (percent >= lastLoggedPercent + 20) {
@@ -178,7 +235,9 @@ async function sendAndWaitForUpload(
settled = true;
cleanup();
const errorMsg = update.error?.message ?? "Unknown upload error";
reject(new Error(`Upload failed for ${fileName}: ${errorMsg}`));
const error = new Error(`Upload failed for ${fileName}: ${errorMsg}`);
(error as Error & { code?: number }).code = update.error?.code;
reject(error);
}
}
}

View File

@@ -1057,14 +1057,19 @@ async function processOneArchiveSet(
// Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing)
const existingUpload = await getUploadedPackageByHash(contentHash);
let destResult: { messageId: bigint };
let destResult: { messageId: bigint; messageIds: bigint[] };
if (existingUpload && existingUpload.destMessageId) {
accountLog.info(
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
"Reusing existing upload (file already on destination channel)"
);
destResult = { messageId: existingUpload.destMessageId };
destResult = {
messageId: existingUpload.destMessageId,
messageIds: existingUpload.destMessageIds?.length
? (existingUpload.destMessageIds as bigint[])
: [existingUpload.destMessageId],
};
} else {
const uploadLabel = uploadPaths.length > 1
? ` (${uploadPaths.length} parts)`
@@ -1158,6 +1163,7 @@ async function processOneArchiveSet(
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart:
archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,