mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Compare commits
4 Commits
7e48131f67
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 59038889ae | |||
| 77c26adb31 | |||
| 35cce3151c | |||
| d6c82ede1e |
@@ -5,7 +5,14 @@ import { config } from "../util/config.js";
|
||||
|
||||
const pool = new pg.Pool({
|
||||
connectionString: config.databaseUrl,
|
||||
max: 5,
|
||||
// Pool needs headroom for: 2 account advisory locks (held for entire cycle),
|
||||
// up to 2 concurrent hash locks, plus Prisma operations from both accounts.
|
||||
// Previously max=5 caused pool exhaustion and indefinite hangs.
|
||||
max: 15,
|
||||
// Prevent pool.connect() from blocking forever when pool is exhausted.
|
||||
// Throws an error after 30s so the operation can fail and retry instead of
|
||||
// silently hanging for hours (as happened with the Turnbase.7z stall).
|
||||
connectionTimeoutMillis: 30_000,
|
||||
});
|
||||
|
||||
const adapter = new PrismaPg(pool);
|
||||
|
||||
@@ -27,6 +27,33 @@ async function main(): Promise<void> {
|
||||
await cleanupTempDir();
|
||||
await markStaleRunsAsFailed();
|
||||
|
||||
// Release any advisory locks orphaned by a previous worker instance.
|
||||
// When Docker kills a container, PostgreSQL may keep the session alive
|
||||
// (zombie connections), holding advisory locks that block the new worker.
|
||||
try {
|
||||
const result = await pool.query(`
|
||||
SELECT pid, state, left(query, 80) as query, age(clock_timestamp(), state_change) as idle_time
|
||||
FROM pg_stat_activity
|
||||
WHERE datname = current_database()
|
||||
AND pid != pg_backend_pid()
|
||||
AND state = 'idle'
|
||||
AND query LIKE '%pg_try_advisory_lock%'
|
||||
AND state_change < clock_timestamp() - interval '5 minutes'
|
||||
`);
|
||||
for (const row of result.rows) {
|
||||
log.warn(
|
||||
{ pid: row.pid, idleTime: row.idle_time, query: row.query },
|
||||
"Terminating stale advisory lock session from previous worker"
|
||||
);
|
||||
await pool.query("SELECT pg_terminate_backend($1)", [row.pid]);
|
||||
}
|
||||
if (result.rows.length > 0) {
|
||||
log.info({ terminated: result.rows.length }, "Cleaned up stale advisory lock sessions");
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn({ err }, "Failed to clean up stale advisory locks (non-fatal)");
|
||||
}
|
||||
|
||||
// Verify destination messages exist for all "uploaded" packages.
|
||||
// Resets any packages whose dest message is missing so they get re-processed.
|
||||
await recoverIncompleteUploads();
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { config } from "./util/config.js";
|
||||
import { childLogger } from "./util/logger.js";
|
||||
import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
|
||||
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
||||
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
||||
import { runIntegrityAudit } from "./audit.js";
|
||||
@@ -90,10 +90,18 @@ async function runCycle(): Promise<void> {
|
||||
|
||||
for (let i = 0; i < results.length; i++) {
|
||||
if (results[i].status === "rejected") {
|
||||
const reason = (results[i] as PromiseRejectedResult).reason;
|
||||
log.error(
|
||||
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
|
||||
{ phone: accounts[i].phone, err: reason },
|
||||
"Account ingestion failed"
|
||||
);
|
||||
// If the cycle timed out, force-release the mutex so the next cycle
|
||||
// (or other operations like fetch-channels) can proceed immediately
|
||||
// instead of waiting 30 minutes for the mutex timeout.
|
||||
const errMsg = reason instanceof Error ? reason.message : String(reason);
|
||||
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
|
||||
forceReleaseMutex(accounts[i].phone);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -79,6 +79,8 @@ export interface ChannelScanResult {
|
||||
archives: TelegramMessage[];
|
||||
photos: TelegramPhoto[];
|
||||
totalScanned: number;
|
||||
/** Highest message ID seen during scan (for watermark, even when no archives found). */
|
||||
maxScannedMessageId: bigint | null;
|
||||
}
|
||||
|
||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||
@@ -158,6 +160,7 @@ export async function getChannelMessages(
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
let maxScannedMessageId: bigint | null = null;
|
||||
|
||||
// Open the chat so TDLib can access it
|
||||
try {
|
||||
@@ -204,6 +207,12 @@ export async function getChannelMessages(
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
// Track highest message ID (first message in batch = newest, since results are newest-first)
|
||||
const batchMaxId = BigInt(result.messages[0].id);
|
||||
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||
maxScannedMessageId = batchMaxId;
|
||||
}
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -246,6 +255,11 @@ export async function getChannelMessages(
|
||||
fromMessageId = result.messages[result.messages.length - 1].id;
|
||||
if (result.messages.length < Math.min(limit, 100)) break;
|
||||
|
||||
// Early exit: searchChatMessages returns newest-first. Once the oldest
|
||||
// message on this page is at or below the boundary, all remaining pages
|
||||
// are even older — no new messages exist, stop scanning immediately.
|
||||
if (boundary && fromMessageId <= boundary) break;
|
||||
|
||||
await sleep(config.apiDelayMs);
|
||||
}
|
||||
}
|
||||
@@ -266,6 +280,7 @@ export async function getChannelMessages(
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
maxScannedMessageId,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -178,6 +178,7 @@ export async function getTopicMessages(
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
let maxScannedMessageId: bigint | null = null;
|
||||
|
||||
let currentFromId = 0;
|
||||
let totalScanned = 0;
|
||||
@@ -239,6 +240,12 @@ export async function getTopicMessages(
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
// Track highest message ID (first message = newest, since results are newest-first)
|
||||
const batchMaxId = BigInt(result.messages[0].id);
|
||||
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||
maxScannedMessageId = batchMaxId;
|
||||
}
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -302,6 +309,7 @@ export async function getTopicMessages(
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
maxScannedMessageId,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
|
||||
|
||||
const log = childLogger("upload");
|
||||
|
||||
/**
|
||||
* Custom error class to distinguish upload stalls from other errors.
|
||||
* When consecutive stalls occur, the caller can use this signal to
|
||||
* recreate the TDLib client (whose event stream may have degraded).
|
||||
*/
|
||||
export class UploadStallError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "UploadStallError";
|
||||
}
|
||||
}
|
||||
|
||||
export interface UploadResult {
|
||||
messageId: bigint;
|
||||
messageIds: bigint[];
|
||||
@@ -109,13 +121,21 @@ async function sendWithRetry(
|
||||
|
||||
// Stall or timeout — retry with a cooldown
|
||||
const errMsg = err instanceof Error ? err.message : "";
|
||||
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
|
||||
log.warn(
|
||||
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
||||
"Upload stalled/timed out — retrying"
|
||||
if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
|
||||
if (!isLastAttempt) {
|
||||
log.warn(
|
||||
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
||||
"Upload stalled/timed out — retrying"
|
||||
);
|
||||
await sleep(10_000);
|
||||
continue;
|
||||
}
|
||||
// All stall retries exhausted — throw UploadStallError so the caller
|
||||
// knows the TDLib client's event stream is likely degraded and can
|
||||
// recreate the client before continuing.
|
||||
throw new UploadStallError(
|
||||
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
|
||||
);
|
||||
await sleep(10_000);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw err;
|
||||
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
// Stall detection: no progress for 5 minutes after upload started → reject
|
||||
const STALL_TIMEOUT_MS = 5 * 60_000;
|
||||
// Stall detection: no progress for 3 minutes after upload started → reject
|
||||
// (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
|
||||
// a 3min silence strongly indicates a degraded TDLib event stream)
|
||||
const STALL_TIMEOUT_MS = 3 * 60_000;
|
||||
const stallChecker = setInterval(() => {
|
||||
if (settled || !uploadStarted) return;
|
||||
const stallMs = Date.now() - lastProgressTime;
|
||||
|
||||
@@ -11,6 +11,29 @@ const queues = new Map<
|
||||
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
|
||||
>();
|
||||
|
||||
/**
|
||||
* Force-release a stuck mutex.
|
||||
* This should only be called when the holder is known to be stuck (e.g. after
|
||||
* a cycle timeout). It releases the lock and lets the next queued waiter proceed.
|
||||
*/
|
||||
export function forceReleaseMutex(key: string): void {
|
||||
if (!locks.has(key)) return;
|
||||
|
||||
const holder = holders.get(key);
|
||||
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
|
||||
|
||||
locks.delete(key);
|
||||
holders.delete(key);
|
||||
const next = queues.get(key)?.shift();
|
||||
if (next) {
|
||||
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
|
||||
next.resolve();
|
||||
} else {
|
||||
queues.delete(key);
|
||||
log.info({ key }, "TDLib mutex force-released (no waiters)");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
|
||||
* Different keys run concurrently — this allows two accounts to ingest in parallel
|
||||
|
||||
@@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
|
||||
import { readRarContents } from "./archive/rar-reader.js";
|
||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
||||
import { uploadToChannel } from "./upload/channel.js";
|
||||
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
|
||||
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
||||
import { db } from "./db/client.js";
|
||||
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
||||
@@ -286,6 +286,7 @@ interface PipelineContext {
|
||||
client: Client;
|
||||
runId: string;
|
||||
accountId: string;
|
||||
accountPhone: string;
|
||||
channelTitle: string;
|
||||
channel: TelegramChannel;
|
||||
destChannelTelegramId: bigint;
|
||||
@@ -303,6 +304,8 @@ interface PipelineContext {
|
||||
sourceTopicId: bigint | null;
|
||||
accountLog: ReturnType<typeof childLogger>;
|
||||
maxUploadSize: bigint;
|
||||
/** How many consecutive upload stalls have occurred (resets on success). */
|
||||
consecutiveStalls: number;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -338,7 +341,8 @@ export async function runWorkerForAccount(
|
||||
currentStep: "connecting",
|
||||
});
|
||||
|
||||
const { client, isPremium } = await createTdlibClient({
|
||||
// Use let so the client can be replaced on TDLib recreation after stalls
|
||||
let { client, isPremium } = await createTdlibClient({
|
||||
id: account.id,
|
||||
phone: account.phone,
|
||||
});
|
||||
@@ -448,6 +452,7 @@ export async function runWorkerForAccount(
|
||||
client,
|
||||
runId: activeRunId,
|
||||
accountId: account.id,
|
||||
accountPhone: account.phone,
|
||||
channelTitle: channel.title,
|
||||
channel,
|
||||
destChannelTelegramId: destChannel.telegramId,
|
||||
@@ -458,6 +463,7 @@ export async function runWorkerForAccount(
|
||||
sourceTopicId: null,
|
||||
accountLog,
|
||||
maxUploadSize,
|
||||
consecutiveStalls: 0,
|
||||
};
|
||||
|
||||
if (forum) {
|
||||
@@ -532,6 +538,15 @@ export async function runWorkerForAccount(
|
||||
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
||||
"No new archives in topic"
|
||||
);
|
||||
// Still advance topic watermark so we don't re-scan these messages next cycle
|
||||
if (scanResult.maxScannedMessageId) {
|
||||
await upsertTopicProgress(
|
||||
mapping.id,
|
||||
topic.topicId,
|
||||
topic.name,
|
||||
scanResult.maxScannedMessageId
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -546,14 +561,17 @@ export async function runWorkerForAccount(
|
||||
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
||||
|
||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
||||
// Sync client back in case it was recreated during upload stall recovery
|
||||
client = pipelineCtx.client;
|
||||
|
||||
// Only advance progress to the highest successfully processed message
|
||||
if (maxProcessedId) {
|
||||
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||
const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (topicWatermark) {
|
||||
await upsertTopicProgress(
|
||||
mapping.id,
|
||||
topic.topicId,
|
||||
topic.name,
|
||||
maxProcessedId
|
||||
topicWatermark
|
||||
);
|
||||
}
|
||||
} catch (topicErr) {
|
||||
@@ -603,6 +621,11 @@ export async function runWorkerForAccount(
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||
// Still advance watermark to highest scanned message so we don't
|
||||
// re-scan these messages next cycle
|
||||
if (scanResult.maxScannedMessageId) {
|
||||
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -617,10 +640,13 @@ export async function runWorkerForAccount(
|
||||
pipelineCtx.channelTitle = channel.title;
|
||||
|
||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
||||
// Sync client back in case it was recreated during upload stall recovery
|
||||
client = pipelineCtx.client;
|
||||
|
||||
// Only advance progress to the highest successfully processed message
|
||||
if (maxProcessedId) {
|
||||
await updateLastProcessedMessage(mapping.id, maxProcessedId);
|
||||
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||
const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (channelWatermark) {
|
||||
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
||||
}
|
||||
}
|
||||
} catch (channelErr) {
|
||||
@@ -760,12 +786,68 @@ async function processArchiveSets(
|
||||
if (setMaxId > (maxProcessedId ?? 0n)) {
|
||||
maxProcessedId = setMaxId;
|
||||
}
|
||||
|
||||
// Reset stall counter on any successful upload
|
||||
ctx.consecutiveStalls = 0;
|
||||
} catch (setErr) {
|
||||
// If a set fails, do NOT advance the watermark past it
|
||||
accountLog.warn(
|
||||
{ err: setErr, baseName: archiveSets[setIdx].baseName },
|
||||
"Archive set failed, watermark will not advance past this set"
|
||||
);
|
||||
|
||||
// ── TDLib client recreation on repeated upload stalls ──
|
||||
// When the TDLib event stream degrades, uploads complete (bytes sent)
|
||||
// but confirmations never arrive. Retrying with the same broken client
|
||||
// is futile. Recreate the client to get a fresh connection.
|
||||
if (setErr instanceof UploadStallError) {
|
||||
ctx.consecutiveStalls++;
|
||||
accountLog.warn(
|
||||
{ consecutiveStalls: ctx.consecutiveStalls },
|
||||
"Upload stall detected — TDLib event stream may be degraded"
|
||||
);
|
||||
|
||||
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
|
||||
if (ctx.consecutiveStalls >= 1) {
|
||||
accountLog.info("Recreating TDLib client after consecutive upload stalls");
|
||||
try {
|
||||
await closeTdlibClient(ctx.client);
|
||||
} catch (closeErr) {
|
||||
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
|
||||
}
|
||||
|
||||
try {
|
||||
const { client: newClient } = await createTdlibClient({
|
||||
id: ctx.accountId,
|
||||
phone: ctx.accountPhone,
|
||||
});
|
||||
ctx.client = newClient;
|
||||
|
||||
// Reload chats so the new client can access channels
|
||||
try {
|
||||
for (let page = 0; page < 500; page++) {
|
||||
await newClient.invoke({
|
||||
_: "loadChats",
|
||||
chat_list: { _: "chatListMain" },
|
||||
limit: 100,
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// 404 = all loaded (expected)
|
||||
}
|
||||
|
||||
ctx.consecutiveStalls = 0;
|
||||
accountLog.info("TDLib client recreated successfully — continuing ingestion");
|
||||
} catch (recreateErr) {
|
||||
accountLog.error(
|
||||
{ err: recreateErr },
|
||||
"Failed to recreate TDLib client — aborting remaining uploads"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Record the failure for visibility in the UI
|
||||
try {
|
||||
const archiveSet = archiveSets[setIdx];
|
||||
|
||||
Reference in New Issue
Block a user