Fix multiple issues

This commit is contained in:
2026-03-07 21:33:40 +01:00
parent 6926df9a2c
commit 2763de2711
11 changed files with 524 additions and 156 deletions

View File

@@ -1,8 +1,16 @@
import type pg from "pg";
import { pool } from "./client.js";
import { childLogger } from "../util/logger.js";
const log = childLogger("locks");
/**
* Holds the pooled connection for each active advisory lock.
* Session-level advisory locks are tied to the specific PostgreSQL connection,
* so we MUST keep the same connection checked out for the entire lock duration.
*/
const heldConnections = new Map<string, pg.PoolClient>();
/**
* Derive a stable 32-bit integer lock ID from an account ID string.
* PostgreSQL advisory locks use bigint, but we use 32-bit for safety.
@@ -20,6 +28,9 @@ function hashToLockId(accountId: string): number {
/**
* Try to acquire a PostgreSQL advisory lock for an account.
* Returns true if acquired, false if already held by another session.
*
* IMPORTANT: The pooled connection is kept checked out for the duration
* of the lock. You MUST call releaseLock() when done to return it to the pool.
*/
export async function tryAcquireLock(accountId: string): Promise<boolean> {
const lockId = hashToLockId(accountId);
@@ -31,26 +42,40 @@ export async function tryAcquireLock(accountId: string): Promise<boolean> {
);
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
if (acquired) {
// Keep the connection checked out — lock is tied to this connection
heldConnections.set(accountId, client);
log.debug({ accountId, lockId }, "Advisory lock acquired");
return true;
} else {
// Lock not acquired — release the connection back to the pool
client.release();
log.debug({ accountId, lockId }, "Advisory lock already held");
return false;
}
return acquired;
} finally {
} catch (err) {
client.release();
throw err;
}
}
/**
* Release the advisory lock for an account.
* Uses the SAME connection that acquired the lock, then returns it to the pool.
*/
export async function releaseLock(accountId: string): Promise<void> {
const lockId = hashToLockId(accountId);
const client = await pool.connect();
const client = heldConnections.get(accountId);
if (!client) {
log.warn({ accountId, lockId }, "No held connection for lock release — lock may have already been released");
return;
}
try {
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
log.debug({ accountId, lockId }, "Advisory lock released");
} finally {
heldConnections.delete(accountId);
client.release();
}
}

View File

@@ -1,6 +1,7 @@
import type { Client } from "tdl";
import { childLogger } from "../util/logger.js";
import { config } from "../util/config.js";
import { withFloodWait } from "../util/retry.js";
const log = childLogger("chats");
@@ -29,11 +30,14 @@ export async function getAccountChats(
while (hasMore) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 100,
})) as { chat_ids: number[] };
const result = (await withFloodWait(
() => client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 100,
}),
"getChats"
)) as { chat_ids: number[] };
if (!result.chat_ids || result.chat_ids.length === 0) {
break;
@@ -42,10 +46,13 @@ export async function getAccountChats(
for (const chatId of result.chat_ids) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chat = (await client.invoke({
_: "getChat",
chat_id: chatId,
})) as any;
const chat = (await withFloodWait(
() => client.invoke({
_: "getChat",
chat_id: chatId,
}),
"getChat"
)) as any;
const chatType = chat.type?._;
let type: TelegramChatInfo["type"] = "other";
@@ -55,10 +62,13 @@ export async function getAccountChats(
// Get supergroup details to check if it's a channel or group
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sg = (await client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as any;
const sg = (await withFloodWait(
() => client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
}),
"getSupergroup"
)) as any;
type = sg.is_channel ? "channel" : "supergroup";
isForum = sg.is_forum ?? false;
@@ -109,12 +119,15 @@ export async function generateInviteLink(
chatId: bigint
): Promise<string> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "createChatInviteLink",
chat_id: Number(chatId),
name: "DragonsStash Auto-Join",
creates_join_request: false,
})) as any;
const result = (await withFloodWait(
() => client.invoke({
_: "createChatInviteLink",
chat_id: Number(chatId),
name: "DragonsStash Auto-Join",
creates_join_request: false,
}),
"createChatInviteLink"
)) as any;
const link = result.invite_link as string;
log.info({ chatId: chatId.toString(), link }, "Generated invite link");
@@ -130,13 +143,16 @@ export async function createSupergroup(
title: string
): Promise<{ chatId: bigint; title: string }> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "createNewSupergroupChat",
title,
is_forum: false,
is_channel: false,
description: "DragonsStash archive destination — all accounts write here",
})) as any;
const result = (await withFloodWait(
() => client.invoke({
_: "createNewSupergroupChat",
title,
is_forum: false,
is_channel: false,
description: "DragonsStash archive destination — all accounts write here",
}),
"createNewSupergroupChat"
)) as any;
const chatId = BigInt(result.id);
log.info({ chatId: chatId.toString(), title }, "Created new supergroup");
@@ -150,10 +166,13 @@ export async function joinChatByInviteLink(
client: Client,
inviteLink: string
): Promise<void> {
await client.invoke({
_: "joinChatByInviteLink",
invite_link: inviteLink,
});
await withFloodWait(
() => client.invoke({
_: "joinChatByInviteLink",
invite_link: inviteLink,
}),
"joinChatByInviteLink"
);
log.info({ inviteLink }, "Joined chat by invite link");
}

View File

@@ -2,6 +2,7 @@ import type { Client } from "tdl";
import { readFile, rename, copyFile, unlink, stat } from "fs/promises";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
@@ -78,8 +79,12 @@ export interface ChannelScanResult {
export type ScanProgressCallback = (messagesScanned: number) => void;
/**
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
* Invoke a TDLib method with a timeout to prevent indefinite hangs,
* and automatic retry on FLOOD_WAIT rate-limit errors.
*
* If TDLib does not respond within the timeout, the promise rejects.
* If Telegram returns a rate limit error, sleeps for the required
* duration and retries (up to maxRetries times).
*/
export async function invokeWithTimeout<T>(
client: Client,
@@ -87,32 +92,40 @@ export async function invokeWithTimeout<T>(
request: Record<string, any>,
timeoutMs = INVOKE_TIMEOUT_MS
): Promise<T> {
return new Promise<T>((resolve, reject) => {
let settled = false;
return withFloodWait(
() =>
new Promise<T>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
}
}, timeoutMs);
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(
new Error(
`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`
)
);
}
}, timeoutMs);
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
}),
`TDLib:${request._}`
);
}
/**
@@ -415,15 +428,20 @@ export async function downloadFile(
client.on("update", handleUpdate);
// Start async download (non-blocking — progress via updateFile events)
client
.invoke({
_: "downloadFile",
file_id: numericId,
priority: 32,
offset: 0,
limit: 0,
synchronous: false,
})
// Wrapped in withFloodWait: if the initial invoke is rate-limited,
// it will sleep and retry before the download event loop begins.
withFloodWait(
() =>
client.invoke({
_: "downloadFile",
file_id: numericId,
priority: 32,
offset: 0,
limit: 0,
synchronous: false,
}),
`downloadFile:${fileName}`
)
.then((result: unknown) => {
// If the file was already cached locally, invoke returns immediately
const file = result as TdFile | undefined;

View File

@@ -3,6 +3,7 @@ import { stat } from "fs/promises";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
const log = childLogger("upload");
@@ -84,24 +85,29 @@ async function sendAndWaitForUpload(
fileName: string,
fileSizeMB: number
): Promise<bigint> {
// Send the message — this returns a temporary message immediately
const tempMsg = (await client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
})) as { id: number };
// Send the message — this returns a temporary message immediately.
// Wrapped in withFloodWait to handle Telegram rate limits on upload.
const tempMsg = (await withFloodWait(
() =>
client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
}),
"sendMessage:upload"
)) as { id: number };
const tempMsgId = tempMsg.id;

109
worker/src/util/retry.ts Normal file
View File

@@ -0,0 +1,109 @@
import { childLogger } from "./logger.js";
import { config } from "./config.js";
const log = childLogger("retry");
/**
* Extract the FLOOD_WAIT duration (in seconds) from a TDLib error.
*
* TDLib errors for rate limiting look like:
* - Error message: "Too Many Requests: retry after 30"
* - Error message: "FLOOD_WAIT_30"
* - Error code: 429
*/
export function extractFloodWaitSeconds(err: unknown): number | null {
if (!err || typeof err !== "object") return null;
const message = (err as { message?: string }).message ?? "";
const code = (err as { code?: number }).code;
// Match "FLOOD_WAIT_<seconds>" pattern
const floodMatch = message.match(/FLOOD_WAIT_(\d+)/i);
if (floodMatch) {
return parseInt(floodMatch[1], 10);
}
// Match "retry after <seconds>" pattern (from Telegram HTTP API style errors)
const retryMatch = message.match(/retry after (\d+)/i);
if (retryMatch) {
return parseInt(retryMatch[1], 10);
}
// If error code is 429 but no explicit wait time, default to 30 seconds
if (code === 429) {
return 30;
}
return null;
}
/**
* Sleep for a given number of milliseconds, with a descriptive log message.
*/
function sleepMs(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Wraps a TDLib invoke operation with FLOOD_WAIT-aware retry logic.
*
* When Telegram returns a rate limit error (FLOOD_WAIT / 429), this:
* 1. Extracts the required wait time from the error
* 2. Logs a warning with the wait duration
* 3. Sleeps for the required duration + small jitter
* 4. Retries the operation (up to maxRetries times)
*
* Non-rate-limit errors are re-thrown immediately.
*
* Usage:
* const result = await withFloodWait(() => client.invoke({ ... }));
*/
export async function withFloodWait<T>(
fn: () => Promise<T>,
context?: string,
maxRetries?: number
): Promise<T> {
const limit = maxRetries ?? config.maxRetries;
let lastError: unknown;
for (let attempt = 0; attempt <= limit; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
const waitSeconds = extractFloodWaitSeconds(err);
if (waitSeconds === null) {
// Not a rate limit error — re-throw immediately
throw err;
}
if (attempt >= limit) {
log.error(
{ context, attempt, waitSeconds },
"Rate limit exceeded max retries — giving up"
);
throw err;
}
// Add small jitter (15 seconds) to avoid multiple clients retrying simultaneously
const jitter = 1000 + Math.random() * 4000;
const totalWaitMs = waitSeconds * 1000 + jitter;
log.warn(
{
context,
attempt: attempt + 1,
maxRetries: limit,
waitSeconds,
totalWaitMs: Math.round(totalWaitMs),
},
`Rate-limited by Telegram — sleeping ${waitSeconds}s before retry`
);
await sleepMs(totalWaitMs);
}
}
throw lastError;
}

View File

@@ -716,6 +716,29 @@ async function processOneArchiveSet(
return;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
const totalArchiveSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const maxSizeBytes = BigInt(config.maxZipSizeMB) * 1024n * 1024n;
if (totalArchiveSize > maxSizeBytes) {
accountLog.warn(
{
fileName: archiveName,
totalSizeMB: Number(totalArchiveSize / (1024n * 1024n)),
maxSizeMB: config.maxZipSizeMB,
},
"Archive exceeds max size limit, skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (exceeds ${config.maxZipSizeMB}MB limit)`,
currentStep: "skipping",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
return;
}
const tempPaths: string[] = [];
let splitPaths: string[] = [];