diff --git a/bot/src/send-listener.ts b/bot/src/send-listener.ts index 4389733..d50a116 100644 --- a/bot/src/send-listener.ts +++ b/bot/src/send-listener.ts @@ -12,29 +12,78 @@ import { copyMessageToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/cl const log = childLogger("send-listener"); let pgClient: pg.PoolClient | null = null; +let stopped = false; + +/** Delay (ms) before attempting to reconnect after a connection loss. */ +const RECONNECT_DELAY_MS = 5_000; /** * Start listening for pg_notify signals: * - `bot_send` — payload = requestId → send a package to a user * - `new_package` — payload = JSON { packageId, fileName, creator } → notify subscribers + * + * If the underlying connection is lost, the listener automatically reconnects + * so that pg_notify signals are never silently dropped. */ export async function startSendListener(): Promise { - pgClient = await pool.connect(); - await pgClient.query("LISTEN bot_send"); - await pgClient.query("LISTEN new_package"); + stopped = false; + await connectListener(); +} - pgClient.on("notification", (msg) => { - if (msg.channel === "bot_send" && msg.payload) { - handleBotSend(msg.payload); - } else if (msg.channel === "new_package" && msg.payload) { - handleNewPackage(msg.payload); +async function connectListener(): Promise { + try { + pgClient = await pool.connect(); + await pgClient.query("LISTEN bot_send"); + await pgClient.query("LISTEN new_package"); + + pgClient.on("notification", (msg) => { + if (msg.channel === "bot_send" && msg.payload) { + handleBotSend(msg.payload); + } else if (msg.channel === "new_package" && msg.payload) { + handleNewPackage(msg.payload); + } + }); + + // Reconnect automatically when the connection ends unexpectedly + pgClient.on("end", () => { + if (!stopped) { + log.warn("Send listener connection lost — reconnecting"); + pgClient = null; + scheduleReconnect(); + } + }); + + pgClient.on("error", (err) => { + log.error({ err }, "Send listener connection error"); + if (!stopped && pgClient) { + try { + pgClient.release(true); + } catch (releaseErr) { + log.debug({ err: releaseErr }, "Failed to release pg client after error"); + } + pgClient = null; + scheduleReconnect(); + } + }); + + log.info("Send listener started (bot_send, new_package)"); + } catch (err) { + log.error({ err }, "Failed to start send listener — retrying"); + scheduleReconnect(); + } +} + +function scheduleReconnect(): void { + if (stopped) return; + setTimeout(() => { + if (!stopped) { + connectListener(); } - }); - - log.info("Send listener started (bot_send, new_package)"); + }, RECONNECT_DELAY_MS); } export function stopSendListener(): void { + stopped = true; if (pgClient) { pgClient.release(); pgClient = null; diff --git a/bot/src/tdlib/client.ts b/bot/src/tdlib/client.ts index 19a2110..9508690 100644 --- a/bot/src/tdlib/client.ts +++ b/bot/src/tdlib/client.ts @@ -33,7 +33,7 @@ export async function createBotClient(): Promise { await client.login(() => ({ type: "bot", - token: config.botToken, + getToken: () => Promise.resolve(config.botToken), })); log.info("Bot client authenticated successfully"); @@ -54,7 +54,10 @@ export async function closeBotClient(): Promise { /** * Forward a message from a channel to a user's DM. - * Uses copyMessage to make it appear as sent by the bot. + * Uses forwardMessages with send_copy to make it appear as sent by the bot. + * + * The fromChatId is the TDLib chat ID stored in the DB — already in the correct + * format (negative for supergroups/channels, e.g. -1001234567890). */ export async function copyMessageToUser( fromChatId: bigint, @@ -63,14 +66,10 @@ export async function copyMessageToUser( ): Promise { if (!client) throw new Error("Bot client not initialized"); - // TDLib uses negative chat IDs for channels/supergroups - // The telegramId from the DB is the raw Telegram ID; for channels it needs -100 prefix - const fromChatIdNum = Number(-100n * 1n) + Number(fromChatId); - await client.invoke({ _: "forwardMessages", chat_id: Number(toUserId), - from_chat_id: Number(fromChatId) > 0 ? -Number(fromChatId) : Number(fromChatId), + from_chat_id: Number(fromChatId), message_ids: [Number(messageId)], send_copy: true, remove_caption: false, diff --git a/src/app/(app)/telegram/_components/channels-tab.tsx b/src/app/(app)/telegram/_components/channels-tab.tsx index ec57377..e8100db 100644 --- a/src/app/(app)/telegram/_components/channels-tab.tsx +++ b/src/app/(app)/telegram/_components/channels-tab.tsx @@ -94,7 +94,7 @@ export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsT return (
- +
- ); @@ -187,46 +222,59 @@ export function DestinationCard({ destination }: DestinationCardProps) { - ); } -function CreateDestinationDialog({ +function DestinationDialog({ open, onOpenChange, title, setTitle, - onSubmit, + onSubmitCreate, createState, isPending, + assignableChannels, + selectedChannelId, + setSelectedChannelId, + onSubmitAssign, }: { open: boolean; onOpenChange: (open: boolean) => void; title: string; setTitle: (v: string) => void; - onSubmit: () => void; + onSubmitCreate: () => void; createState: CreateState; isPending: boolean; + assignableChannels: ChannelRow[]; + selectedChannelId: string; + setSelectedChannelId: (v: string) => void; + onSubmitAssign: () => void; }) { const isCreating = createState.phase === "creating"; + const hasAssignable = assignableChannels.length > 0; return ( - Create Destination Channel + Set Destination Channel - A private Telegram group will be created automatically using one of - your authenticated accounts. All accounts will write archives here. + Choose an existing channel or create a new private group. All + accounts will write archives to this destination. @@ -241,46 +289,111 @@ function CreateDestinationDialog({

) : ( -
- {createState.phase === "error" && ( -
-

{createState.message}

+ + + + + Use Existing + + + + Create New + + + + + {createState.phase === "error" && ( +
+

{createState.message}

+
+ )} + +
+ + +

+ The selected channel will become the destination. All accounts + will be linked as writers automatically. +

- )} -
- - setTitle(e.target.value)} - /> -

- This will be the name of the Telegram group. You can rename it later in Telegram. -

-
-
+ + + + + + + + {createState.phase === "error" && ( +
+

{createState.message}

+
+ )} + +
+ + setTitle(e.target.value)} + /> +

+ A new private Telegram group will be created using one of your + authenticated accounts. You can rename it later in Telegram. +

+
+ + + + + +
+ )} - - - - - ); diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 27baa57..4520cde 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -270,6 +270,13 @@ export async function setChannelType( if (!existing) return { success: false, error: "Channel not found" }; try { + if (type === "DESTINATION") { + // Setting as destination: use the full global destination logic + // so it updates the global settings key, creates WRITER links, etc. + return await setGlobalDestination(id); + } + + // Setting as SOURCE — just change the type await prisma.telegramChannel.update({ where: { id }, data: { type }, diff --git a/worker/src/db/locks.ts b/worker/src/db/locks.ts index 51df04f..c5a384c 100644 --- a/worker/src/db/locks.ts +++ b/worker/src/db/locks.ts @@ -1,8 +1,16 @@ +import type pg from "pg"; import { pool } from "./client.js"; import { childLogger } from "../util/logger.js"; const log = childLogger("locks"); +/** + * Holds the pooled connection for each active advisory lock. + * Session-level advisory locks are tied to the specific PostgreSQL connection, + * so we MUST keep the same connection checked out for the entire lock duration. + */ +const heldConnections = new Map(); + /** * Derive a stable 32-bit integer lock ID from an account ID string. * PostgreSQL advisory locks use bigint, but we use 32-bit for safety. @@ -20,6 +28,9 @@ function hashToLockId(accountId: string): number { /** * Try to acquire a PostgreSQL advisory lock for an account. * Returns true if acquired, false if already held by another session. + * + * IMPORTANT: The pooled connection is kept checked out for the duration + * of the lock. You MUST call releaseLock() when done to return it to the pool. */ export async function tryAcquireLock(accountId: string): Promise { const lockId = hashToLockId(accountId); @@ -31,26 +42,40 @@ export async function tryAcquireLock(accountId: string): Promise { ); const acquired = result.rows[0]?.pg_try_advisory_lock ?? false; if (acquired) { + // Keep the connection checked out — lock is tied to this connection + heldConnections.set(accountId, client); log.debug({ accountId, lockId }, "Advisory lock acquired"); + return true; } else { + // Lock not acquired — release the connection back to the pool + client.release(); log.debug({ accountId, lockId }, "Advisory lock already held"); + return false; } - return acquired; - } finally { + } catch (err) { client.release(); + throw err; } } /** * Release the advisory lock for an account. + * Uses the SAME connection that acquired the lock, then returns it to the pool. */ export async function releaseLock(accountId: string): Promise { const lockId = hashToLockId(accountId); - const client = await pool.connect(); + const client = heldConnections.get(accountId); + + if (!client) { + log.warn({ accountId, lockId }, "No held connection for lock release — lock may have already been released"); + return; + } + try { await client.query("SELECT pg_advisory_unlock($1)", [lockId]); log.debug({ accountId, lockId }, "Advisory lock released"); } finally { + heldConnections.delete(accountId); client.release(); } } diff --git a/worker/src/tdlib/chats.ts b/worker/src/tdlib/chats.ts index cabfbce..1929816 100644 --- a/worker/src/tdlib/chats.ts +++ b/worker/src/tdlib/chats.ts @@ -1,6 +1,7 @@ import type { Client } from "tdl"; import { childLogger } from "../util/logger.js"; import { config } from "../util/config.js"; +import { withFloodWait } from "../util/retry.js"; const log = childLogger("chats"); @@ -29,11 +30,14 @@ export async function getAccountChats( while (hasMore) { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const result = (await client.invoke({ - _: "getChats", - chat_list: { _: "chatListMain" }, - limit: 100, - })) as { chat_ids: number[] }; + const result = (await withFloodWait( + () => client.invoke({ + _: "getChats", + chat_list: { _: "chatListMain" }, + limit: 100, + }), + "getChats" + )) as { chat_ids: number[] }; if (!result.chat_ids || result.chat_ids.length === 0) { break; @@ -42,10 +46,13 @@ export async function getAccountChats( for (const chatId of result.chat_ids) { try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const chat = (await client.invoke({ - _: "getChat", - chat_id: chatId, - })) as any; + const chat = (await withFloodWait( + () => client.invoke({ + _: "getChat", + chat_id: chatId, + }), + "getChat" + )) as any; const chatType = chat.type?._; let type: TelegramChatInfo["type"] = "other"; @@ -55,10 +62,13 @@ export async function getAccountChats( // Get supergroup details to check if it's a channel or group try { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const sg = (await client.invoke({ - _: "getSupergroup", - supergroup_id: chat.type.supergroup_id, - })) as any; + const sg = (await withFloodWait( + () => client.invoke({ + _: "getSupergroup", + supergroup_id: chat.type.supergroup_id, + }), + "getSupergroup" + )) as any; type = sg.is_channel ? "channel" : "supergroup"; isForum = sg.is_forum ?? false; @@ -109,12 +119,15 @@ export async function generateInviteLink( chatId: bigint ): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const result = (await client.invoke({ - _: "createChatInviteLink", - chat_id: Number(chatId), - name: "DragonsStash Auto-Join", - creates_join_request: false, - })) as any; + const result = (await withFloodWait( + () => client.invoke({ + _: "createChatInviteLink", + chat_id: Number(chatId), + name: "DragonsStash Auto-Join", + creates_join_request: false, + }), + "createChatInviteLink" + )) as any; const link = result.invite_link as string; log.info({ chatId: chatId.toString(), link }, "Generated invite link"); @@ -130,13 +143,16 @@ export async function createSupergroup( title: string ): Promise<{ chatId: bigint; title: string }> { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const result = (await client.invoke({ - _: "createNewSupergroupChat", - title, - is_forum: false, - is_channel: false, - description: "DragonsStash archive destination — all accounts write here", - })) as any; + const result = (await withFloodWait( + () => client.invoke({ + _: "createNewSupergroupChat", + title, + is_forum: false, + is_channel: false, + description: "DragonsStash archive destination — all accounts write here", + }), + "createNewSupergroupChat" + )) as any; const chatId = BigInt(result.id); log.info({ chatId: chatId.toString(), title }, "Created new supergroup"); @@ -150,10 +166,13 @@ export async function joinChatByInviteLink( client: Client, inviteLink: string ): Promise { - await client.invoke({ - _: "joinChatByInviteLink", - invite_link: inviteLink, - }); + await withFloodWait( + () => client.invoke({ + _: "joinChatByInviteLink", + invite_link: inviteLink, + }), + "joinChatByInviteLink" + ); log.info({ inviteLink }, "Joined chat by invite link"); } diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 600c7d7..f953bd2 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -2,6 +2,7 @@ import type { Client } from "tdl"; import { readFile, rename, copyFile, unlink, stat } from "fs/promises"; import { config } from "../util/config.js"; import { childLogger } from "../util/logger.js"; +import { withFloodWait } from "../util/retry.js"; import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; @@ -78,8 +79,12 @@ export interface ChannelScanResult { export type ScanProgressCallback = (messagesScanned: number) => void; /** - * Invoke a TDLib method with a timeout to prevent indefinite hangs. + * Invoke a TDLib method with a timeout to prevent indefinite hangs, + * and automatic retry on FLOOD_WAIT rate-limit errors. + * * If TDLib does not respond within the timeout, the promise rejects. + * If Telegram returns a rate limit error, sleeps for the required + * duration and retries (up to maxRetries times). */ export async function invokeWithTimeout( client: Client, @@ -87,32 +92,40 @@ export async function invokeWithTimeout( request: Record, timeoutMs = INVOKE_TIMEOUT_MS ): Promise { - return new Promise((resolve, reject) => { - let settled = false; + return withFloodWait( + () => + new Promise((resolve, reject) => { + let settled = false; - const timer = setTimeout(() => { - if (!settled) { - settled = true; - reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`)); - } - }, timeoutMs); + const timer = setTimeout(() => { + if (!settled) { + settled = true; + reject( + new Error( + `TDLib invoke timed out after ${timeoutMs}ms for ${request._}` + ) + ); + } + }, timeoutMs); - (client.invoke(request) as Promise) - .then((result) => { - if (!settled) { - settled = true; - clearTimeout(timer); - resolve(result); - } - }) - .catch((err) => { - if (!settled) { - settled = true; - clearTimeout(timer); - reject(err); - } - }); - }); + (client.invoke(request) as Promise) + .then((result) => { + if (!settled) { + settled = true; + clearTimeout(timer); + resolve(result); + } + }) + .catch((err) => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(err); + } + }); + }), + `TDLib:${request._}` + ); } /** @@ -415,15 +428,20 @@ export async function downloadFile( client.on("update", handleUpdate); // Start async download (non-blocking — progress via updateFile events) - client - .invoke({ - _: "downloadFile", - file_id: numericId, - priority: 32, - offset: 0, - limit: 0, - synchronous: false, - }) + // Wrapped in withFloodWait: if the initial invoke is rate-limited, + // it will sleep and retry before the download event loop begins. + withFloodWait( + () => + client.invoke({ + _: "downloadFile", + file_id: numericId, + priority: 32, + offset: 0, + limit: 0, + synchronous: false, + }), + `downloadFile:${fileName}` + ) .then((result: unknown) => { // If the file was already cached locally, invoke returns immediately const file = result as TdFile | undefined; diff --git a/worker/src/upload/channel.ts b/worker/src/upload/channel.ts index 79f8978..b7132ba 100644 --- a/worker/src/upload/channel.ts +++ b/worker/src/upload/channel.ts @@ -3,6 +3,7 @@ import { stat } from "fs/promises"; import type { Client } from "tdl"; import { config } from "../util/config.js"; import { childLogger } from "../util/logger.js"; +import { withFloodWait } from "../util/retry.js"; const log = childLogger("upload"); @@ -84,24 +85,29 @@ async function sendAndWaitForUpload( fileName: string, fileSizeMB: number ): Promise { - // Send the message — this returns a temporary message immediately - const tempMsg = (await client.invoke({ - _: "sendMessage", - chat_id: Number(chatId), - input_message_content: { - _: "inputMessageDocument", - document: { - _: "inputFileLocal", - path: filePath, - }, - caption: caption - ? { - _: "formattedText", - text: caption, - } - : undefined, - }, - })) as { id: number }; + // Send the message — this returns a temporary message immediately. + // Wrapped in withFloodWait to handle Telegram rate limits on upload. + const tempMsg = (await withFloodWait( + () => + client.invoke({ + _: "sendMessage", + chat_id: Number(chatId), + input_message_content: { + _: "inputMessageDocument", + document: { + _: "inputFileLocal", + path: filePath, + }, + caption: caption + ? { + _: "formattedText", + text: caption, + } + : undefined, + }, + }), + "sendMessage:upload" + )) as { id: number }; const tempMsgId = tempMsg.id; diff --git a/worker/src/util/retry.ts b/worker/src/util/retry.ts new file mode 100644 index 0000000..636a0b7 --- /dev/null +++ b/worker/src/util/retry.ts @@ -0,0 +1,109 @@ +import { childLogger } from "./logger.js"; +import { config } from "./config.js"; + +const log = childLogger("retry"); + +/** + * Extract the FLOOD_WAIT duration (in seconds) from a TDLib error. + * + * TDLib errors for rate limiting look like: + * - Error message: "Too Many Requests: retry after 30" + * - Error message: "FLOOD_WAIT_30" + * - Error code: 429 + */ +export function extractFloodWaitSeconds(err: unknown): number | null { + if (!err || typeof err !== "object") return null; + + const message = (err as { message?: string }).message ?? ""; + const code = (err as { code?: number }).code; + + // Match "FLOOD_WAIT_" pattern + const floodMatch = message.match(/FLOOD_WAIT_(\d+)/i); + if (floodMatch) { + return parseInt(floodMatch[1], 10); + } + + // Match "retry after " pattern (from Telegram HTTP API style errors) + const retryMatch = message.match(/retry after (\d+)/i); + if (retryMatch) { + return parseInt(retryMatch[1], 10); + } + + // If error code is 429 but no explicit wait time, default to 30 seconds + if (code === 429) { + return 30; + } + + return null; +} + +/** + * Sleep for a given number of milliseconds, with a descriptive log message. + */ +function sleepMs(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Wraps a TDLib invoke operation with FLOOD_WAIT-aware retry logic. + * + * When Telegram returns a rate limit error (FLOOD_WAIT / 429), this: + * 1. Extracts the required wait time from the error + * 2. Logs a warning with the wait duration + * 3. Sleeps for the required duration + small jitter + * 4. Retries the operation (up to maxRetries times) + * + * Non-rate-limit errors are re-thrown immediately. + * + * Usage: + * const result = await withFloodWait(() => client.invoke({ ... })); + */ +export async function withFloodWait( + fn: () => Promise, + context?: string, + maxRetries?: number +): Promise { + const limit = maxRetries ?? config.maxRetries; + let lastError: unknown; + + for (let attempt = 0; attempt <= limit; attempt++) { + try { + return await fn(); + } catch (err) { + lastError = err; + const waitSeconds = extractFloodWaitSeconds(err); + + if (waitSeconds === null) { + // Not a rate limit error — re-throw immediately + throw err; + } + + if (attempt >= limit) { + log.error( + { context, attempt, waitSeconds }, + "Rate limit exceeded max retries — giving up" + ); + throw err; + } + + // Add small jitter (1–5 seconds) to avoid multiple clients retrying simultaneously + const jitter = 1000 + Math.random() * 4000; + const totalWaitMs = waitSeconds * 1000 + jitter; + + log.warn( + { + context, + attempt: attempt + 1, + maxRetries: limit, + waitSeconds, + totalWaitMs: Math.round(totalWaitMs), + }, + `Rate-limited by Telegram — sleeping ${waitSeconds}s before retry` + ); + + await sleepMs(totalWaitMs); + } + } + + throw lastError; +} diff --git a/worker/src/worker.ts b/worker/src/worker.ts index f72955e..fb472d2 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -716,6 +716,29 @@ async function processOneArchiveSet( return; } + // ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ── + const totalArchiveSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n); + const maxSizeBytes = BigInt(config.maxZipSizeMB) * 1024n * 1024n; + if (totalArchiveSize > maxSizeBytes) { + accountLog.warn( + { + fileName: archiveName, + totalSizeMB: Number(totalArchiveSize / (1024n * 1024n)), + maxSizeMB: config.maxZipSizeMB, + }, + "Archive exceeds max size limit, skipping" + ); + await updateRunActivity(runId, { + currentActivity: `Skipped ${archiveName} (exceeds ${config.maxZipSizeMB}MB limit)`, + currentStep: "skipping", + currentChannel: channelTitle, + currentFile: archiveName, + currentFileNum: setIdx + 1, + totalFiles: totalSets, + }); + return; + } + const tempPaths: string[] = []; let splitPaths: string[] = [];