mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Compare commits
4 Commits
7e48131f67
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 59038889ae | |||
| 77c26adb31 | |||
| 35cce3151c | |||
| d6c82ede1e |
@@ -5,7 +5,14 @@ import { config } from "../util/config.js";
|
|||||||
|
|
||||||
const pool = new pg.Pool({
|
const pool = new pg.Pool({
|
||||||
connectionString: config.databaseUrl,
|
connectionString: config.databaseUrl,
|
||||||
max: 5,
|
// Pool needs headroom for: 2 account advisory locks (held for entire cycle),
|
||||||
|
// up to 2 concurrent hash locks, plus Prisma operations from both accounts.
|
||||||
|
// Previously max=5 caused pool exhaustion and indefinite hangs.
|
||||||
|
max: 15,
|
||||||
|
// Prevent pool.connect() from blocking forever when pool is exhausted.
|
||||||
|
// Throws an error after 30s so the operation can fail and retry instead of
|
||||||
|
// silently hanging for hours (as happened with the Turnbase.7z stall).
|
||||||
|
connectionTimeoutMillis: 30_000,
|
||||||
});
|
});
|
||||||
|
|
||||||
const adapter = new PrismaPg(pool);
|
const adapter = new PrismaPg(pool);
|
||||||
|
|||||||
@@ -27,6 +27,33 @@ async function main(): Promise<void> {
|
|||||||
await cleanupTempDir();
|
await cleanupTempDir();
|
||||||
await markStaleRunsAsFailed();
|
await markStaleRunsAsFailed();
|
||||||
|
|
||||||
|
// Release any advisory locks orphaned by a previous worker instance.
|
||||||
|
// When Docker kills a container, PostgreSQL may keep the session alive
|
||||||
|
// (zombie connections), holding advisory locks that block the new worker.
|
||||||
|
try {
|
||||||
|
const result = await pool.query(`
|
||||||
|
SELECT pid, state, left(query, 80) as query, age(clock_timestamp(), state_change) as idle_time
|
||||||
|
FROM pg_stat_activity
|
||||||
|
WHERE datname = current_database()
|
||||||
|
AND pid != pg_backend_pid()
|
||||||
|
AND state = 'idle'
|
||||||
|
AND query LIKE '%pg_try_advisory_lock%'
|
||||||
|
AND state_change < clock_timestamp() - interval '5 minutes'
|
||||||
|
`);
|
||||||
|
for (const row of result.rows) {
|
||||||
|
log.warn(
|
||||||
|
{ pid: row.pid, idleTime: row.idle_time, query: row.query },
|
||||||
|
"Terminating stale advisory lock session from previous worker"
|
||||||
|
);
|
||||||
|
await pool.query("SELECT pg_terminate_backend($1)", [row.pid]);
|
||||||
|
}
|
||||||
|
if (result.rows.length > 0) {
|
||||||
|
log.info({ terminated: result.rows.length }, "Cleaned up stale advisory lock sessions");
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ err }, "Failed to clean up stale advisory locks (non-fatal)");
|
||||||
|
}
|
||||||
|
|
||||||
// Verify destination messages exist for all "uploaded" packages.
|
// Verify destination messages exist for all "uploaded" packages.
|
||||||
// Resets any packages whose dest message is missing so they get re-processed.
|
// Resets any packages whose dest message is missing so they get re-processed.
|
||||||
await recoverIncompleteUploads();
|
await recoverIncompleteUploads();
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { config } from "./util/config.js";
|
import { config } from "./util/config.js";
|
||||||
import { childLogger } from "./util/logger.js";
|
import { childLogger } from "./util/logger.js";
|
||||||
import { withTdlibMutex } from "./util/mutex.js";
|
import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
|
||||||
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
||||||
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
||||||
import { runIntegrityAudit } from "./audit.js";
|
import { runIntegrityAudit } from "./audit.js";
|
||||||
@@ -90,10 +90,18 @@ async function runCycle(): Promise<void> {
|
|||||||
|
|
||||||
for (let i = 0; i < results.length; i++) {
|
for (let i = 0; i < results.length; i++) {
|
||||||
if (results[i].status === "rejected") {
|
if (results[i].status === "rejected") {
|
||||||
|
const reason = (results[i] as PromiseRejectedResult).reason;
|
||||||
log.error(
|
log.error(
|
||||||
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
|
{ phone: accounts[i].phone, err: reason },
|
||||||
"Account ingestion failed"
|
"Account ingestion failed"
|
||||||
);
|
);
|
||||||
|
// If the cycle timed out, force-release the mutex so the next cycle
|
||||||
|
// (or other operations like fetch-channels) can proceed immediately
|
||||||
|
// instead of waiting 30 minutes for the mutex timeout.
|
||||||
|
const errMsg = reason instanceof Error ? reason.message : String(reason);
|
||||||
|
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
|
||||||
|
forceReleaseMutex(accounts[i].phone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -79,6 +79,8 @@ export interface ChannelScanResult {
|
|||||||
archives: TelegramMessage[];
|
archives: TelegramMessage[];
|
||||||
photos: TelegramPhoto[];
|
photos: TelegramPhoto[];
|
||||||
totalScanned: number;
|
totalScanned: number;
|
||||||
|
/** Highest message ID seen during scan (for watermark, even when no archives found). */
|
||||||
|
maxScannedMessageId: bigint | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||||
@@ -158,6 +160,7 @@ export async function getChannelMessages(
|
|||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
let maxScannedMessageId: bigint | null = null;
|
||||||
|
|
||||||
// Open the chat so TDLib can access it
|
// Open the chat so TDLib can access it
|
||||||
try {
|
try {
|
||||||
@@ -204,6 +207,12 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
totalScanned += result.messages.length;
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
|
// Track highest message ID (first message in batch = newest, since results are newest-first)
|
||||||
|
const batchMaxId = BigInt(result.messages[0].id);
|
||||||
|
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||||
|
maxScannedMessageId = batchMaxId;
|
||||||
|
}
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -246,6 +255,11 @@ export async function getChannelMessages(
|
|||||||
fromMessageId = result.messages[result.messages.length - 1].id;
|
fromMessageId = result.messages[result.messages.length - 1].id;
|
||||||
if (result.messages.length < Math.min(limit, 100)) break;
|
if (result.messages.length < Math.min(limit, 100)) break;
|
||||||
|
|
||||||
|
// Early exit: searchChatMessages returns newest-first. Once the oldest
|
||||||
|
// message on this page is at or below the boundary, all remaining pages
|
||||||
|
// are even older — no new messages exist, stop scanning immediately.
|
||||||
|
if (boundary && fromMessageId <= boundary) break;
|
||||||
|
|
||||||
await sleep(config.apiDelayMs);
|
await sleep(config.apiDelayMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,6 +280,7 @@ export async function getChannelMessages(
|
|||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
totalScanned,
|
totalScanned,
|
||||||
|
maxScannedMessageId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -178,6 +178,7 @@ export async function getTopicMessages(
|
|||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
let maxScannedMessageId: bigint | null = null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
let totalScanned = 0;
|
let totalScanned = 0;
|
||||||
@@ -239,6 +240,12 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
totalScanned += result.messages.length;
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
|
// Track highest message ID (first message = newest, since results are newest-first)
|
||||||
|
const batchMaxId = BigInt(result.messages[0].id);
|
||||||
|
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||||
|
maxScannedMessageId = batchMaxId;
|
||||||
|
}
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -302,6 +309,7 @@ export async function getTopicMessages(
|
|||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
totalScanned,
|
totalScanned,
|
||||||
|
maxScannedMessageId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
|
|||||||
|
|
||||||
const log = childLogger("upload");
|
const log = childLogger("upload");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom error class to distinguish upload stalls from other errors.
|
||||||
|
* When consecutive stalls occur, the caller can use this signal to
|
||||||
|
* recreate the TDLib client (whose event stream may have degraded).
|
||||||
|
*/
|
||||||
|
export class UploadStallError extends Error {
|
||||||
|
constructor(message: string) {
|
||||||
|
super(message);
|
||||||
|
this.name = "UploadStallError";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export interface UploadResult {
|
export interface UploadResult {
|
||||||
messageId: bigint;
|
messageId: bigint;
|
||||||
messageIds: bigint[];
|
messageIds: bigint[];
|
||||||
@@ -109,13 +121,21 @@ async function sendWithRetry(
|
|||||||
|
|
||||||
// Stall or timeout — retry with a cooldown
|
// Stall or timeout — retry with a cooldown
|
||||||
const errMsg = err instanceof Error ? err.message : "";
|
const errMsg = err instanceof Error ? err.message : "";
|
||||||
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
|
if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
|
||||||
log.warn(
|
if (!isLastAttempt) {
|
||||||
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
log.warn(
|
||||||
"Upload stalled/timed out — retrying"
|
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
||||||
|
"Upload stalled/timed out — retrying"
|
||||||
|
);
|
||||||
|
await sleep(10_000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// All stall retries exhausted — throw UploadStallError so the caller
|
||||||
|
// knows the TDLib client's event stream is likely degraded and can
|
||||||
|
// recreate the client before continuing.
|
||||||
|
throw new UploadStallError(
|
||||||
|
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
|
||||||
);
|
);
|
||||||
await sleep(10_000);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw err;
|
throw err;
|
||||||
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
|
|||||||
}
|
}
|
||||||
}, timeoutMs);
|
}, timeoutMs);
|
||||||
|
|
||||||
// Stall detection: no progress for 5 minutes after upload started → reject
|
// Stall detection: no progress for 3 minutes after upload started → reject
|
||||||
const STALL_TIMEOUT_MS = 5 * 60_000;
|
// (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
|
||||||
|
// a 3min silence strongly indicates a degraded TDLib event stream)
|
||||||
|
const STALL_TIMEOUT_MS = 3 * 60_000;
|
||||||
const stallChecker = setInterval(() => {
|
const stallChecker = setInterval(() => {
|
||||||
if (settled || !uploadStarted) return;
|
if (settled || !uploadStarted) return;
|
||||||
const stallMs = Date.now() - lastProgressTime;
|
const stallMs = Date.now() - lastProgressTime;
|
||||||
|
|||||||
@@ -11,6 +11,29 @@ const queues = new Map<
|
|||||||
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
|
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
|
||||||
>();
|
>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force-release a stuck mutex.
|
||||||
|
* This should only be called when the holder is known to be stuck (e.g. after
|
||||||
|
* a cycle timeout). It releases the lock and lets the next queued waiter proceed.
|
||||||
|
*/
|
||||||
|
export function forceReleaseMutex(key: string): void {
|
||||||
|
if (!locks.has(key)) return;
|
||||||
|
|
||||||
|
const holder = holders.get(key);
|
||||||
|
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
|
||||||
|
|
||||||
|
locks.delete(key);
|
||||||
|
holders.delete(key);
|
||||||
|
const next = queues.get(key)?.shift();
|
||||||
|
if (next) {
|
||||||
|
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
|
||||||
|
next.resolve();
|
||||||
|
} else {
|
||||||
|
queues.delete(key);
|
||||||
|
log.info({ key }, "TDLib mutex force-released (no waiters)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
|
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
|
||||||
* Different keys run concurrently — this allows two accounts to ingest in parallel
|
* Different keys run concurrently — this allows two accounts to ingest in parallel
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
|
|||||||
import { readRarContents } from "./archive/rar-reader.js";
|
import { readRarContents } from "./archive/rar-reader.js";
|
||||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||||
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
||||||
import { uploadToChannel } from "./upload/channel.js";
|
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
|
||||||
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
||||||
import { db } from "./db/client.js";
|
import { db } from "./db/client.js";
|
||||||
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
||||||
@@ -286,6 +286,7 @@ interface PipelineContext {
|
|||||||
client: Client;
|
client: Client;
|
||||||
runId: string;
|
runId: string;
|
||||||
accountId: string;
|
accountId: string;
|
||||||
|
accountPhone: string;
|
||||||
channelTitle: string;
|
channelTitle: string;
|
||||||
channel: TelegramChannel;
|
channel: TelegramChannel;
|
||||||
destChannelTelegramId: bigint;
|
destChannelTelegramId: bigint;
|
||||||
@@ -303,6 +304,8 @@ interface PipelineContext {
|
|||||||
sourceTopicId: bigint | null;
|
sourceTopicId: bigint | null;
|
||||||
accountLog: ReturnType<typeof childLogger>;
|
accountLog: ReturnType<typeof childLogger>;
|
||||||
maxUploadSize: bigint;
|
maxUploadSize: bigint;
|
||||||
|
/** How many consecutive upload stalls have occurred (resets on success). */
|
||||||
|
consecutiveStalls: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -338,7 +341,8 @@ export async function runWorkerForAccount(
|
|||||||
currentStep: "connecting",
|
currentStep: "connecting",
|
||||||
});
|
});
|
||||||
|
|
||||||
const { client, isPremium } = await createTdlibClient({
|
// Use let so the client can be replaced on TDLib recreation after stalls
|
||||||
|
let { client, isPremium } = await createTdlibClient({
|
||||||
id: account.id,
|
id: account.id,
|
||||||
phone: account.phone,
|
phone: account.phone,
|
||||||
});
|
});
|
||||||
@@ -448,6 +452,7 @@ export async function runWorkerForAccount(
|
|||||||
client,
|
client,
|
||||||
runId: activeRunId,
|
runId: activeRunId,
|
||||||
accountId: account.id,
|
accountId: account.id,
|
||||||
|
accountPhone: account.phone,
|
||||||
channelTitle: channel.title,
|
channelTitle: channel.title,
|
||||||
channel,
|
channel,
|
||||||
destChannelTelegramId: destChannel.telegramId,
|
destChannelTelegramId: destChannel.telegramId,
|
||||||
@@ -458,6 +463,7 @@ export async function runWorkerForAccount(
|
|||||||
sourceTopicId: null,
|
sourceTopicId: null,
|
||||||
accountLog,
|
accountLog,
|
||||||
maxUploadSize,
|
maxUploadSize,
|
||||||
|
consecutiveStalls: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (forum) {
|
if (forum) {
|
||||||
@@ -532,6 +538,15 @@ export async function runWorkerForAccount(
|
|||||||
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
||||||
"No new archives in topic"
|
"No new archives in topic"
|
||||||
);
|
);
|
||||||
|
// Still advance topic watermark so we don't re-scan these messages next cycle
|
||||||
|
if (scanResult.maxScannedMessageId) {
|
||||||
|
await upsertTopicProgress(
|
||||||
|
mapping.id,
|
||||||
|
topic.topicId,
|
||||||
|
topic.name,
|
||||||
|
scanResult.maxScannedMessageId
|
||||||
|
);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -546,14 +561,17 @@ export async function runWorkerForAccount(
|
|||||||
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
||||||
|
|
||||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
||||||
|
// Sync client back in case it was recreated during upload stall recovery
|
||||||
|
client = pipelineCtx.client;
|
||||||
|
|
||||||
// Only advance progress to the highest successfully processed message
|
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||||
if (maxProcessedId) {
|
const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||||
|
if (topicWatermark) {
|
||||||
await upsertTopicProgress(
|
await upsertTopicProgress(
|
||||||
mapping.id,
|
mapping.id,
|
||||||
topic.topicId,
|
topic.topicId,
|
||||||
topic.name,
|
topic.name,
|
||||||
maxProcessedId
|
topicWatermark
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (topicErr) {
|
} catch (topicErr) {
|
||||||
@@ -603,6 +621,11 @@ export async function runWorkerForAccount(
|
|||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||||
|
// Still advance watermark to highest scanned message so we don't
|
||||||
|
// re-scan these messages next cycle
|
||||||
|
if (scanResult.maxScannedMessageId) {
|
||||||
|
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -617,10 +640,13 @@ export async function runWorkerForAccount(
|
|||||||
pipelineCtx.channelTitle = channel.title;
|
pipelineCtx.channelTitle = channel.title;
|
||||||
|
|
||||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
||||||
|
// Sync client back in case it was recreated during upload stall recovery
|
||||||
|
client = pipelineCtx.client;
|
||||||
|
|
||||||
// Only advance progress to the highest successfully processed message
|
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||||
if (maxProcessedId) {
|
const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||||
await updateLastProcessedMessage(mapping.id, maxProcessedId);
|
if (channelWatermark) {
|
||||||
|
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (channelErr) {
|
} catch (channelErr) {
|
||||||
@@ -760,12 +786,68 @@ async function processArchiveSets(
|
|||||||
if (setMaxId > (maxProcessedId ?? 0n)) {
|
if (setMaxId > (maxProcessedId ?? 0n)) {
|
||||||
maxProcessedId = setMaxId;
|
maxProcessedId = setMaxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset stall counter on any successful upload
|
||||||
|
ctx.consecutiveStalls = 0;
|
||||||
} catch (setErr) {
|
} catch (setErr) {
|
||||||
// If a set fails, do NOT advance the watermark past it
|
// If a set fails, do NOT advance the watermark past it
|
||||||
accountLog.warn(
|
accountLog.warn(
|
||||||
{ err: setErr, baseName: archiveSets[setIdx].baseName },
|
{ err: setErr, baseName: archiveSets[setIdx].baseName },
|
||||||
"Archive set failed, watermark will not advance past this set"
|
"Archive set failed, watermark will not advance past this set"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// ── TDLib client recreation on repeated upload stalls ──
|
||||||
|
// When the TDLib event stream degrades, uploads complete (bytes sent)
|
||||||
|
// but confirmations never arrive. Retrying with the same broken client
|
||||||
|
// is futile. Recreate the client to get a fresh connection.
|
||||||
|
if (setErr instanceof UploadStallError) {
|
||||||
|
ctx.consecutiveStalls++;
|
||||||
|
accountLog.warn(
|
||||||
|
{ consecutiveStalls: ctx.consecutiveStalls },
|
||||||
|
"Upload stall detected — TDLib event stream may be degraded"
|
||||||
|
);
|
||||||
|
|
||||||
|
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
|
||||||
|
if (ctx.consecutiveStalls >= 1) {
|
||||||
|
accountLog.info("Recreating TDLib client after consecutive upload stalls");
|
||||||
|
try {
|
||||||
|
await closeTdlibClient(ctx.client);
|
||||||
|
} catch (closeErr) {
|
||||||
|
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { client: newClient } = await createTdlibClient({
|
||||||
|
id: ctx.accountId,
|
||||||
|
phone: ctx.accountPhone,
|
||||||
|
});
|
||||||
|
ctx.client = newClient;
|
||||||
|
|
||||||
|
// Reload chats so the new client can access channels
|
||||||
|
try {
|
||||||
|
for (let page = 0; page < 500; page++) {
|
||||||
|
await newClient.invoke({
|
||||||
|
_: "loadChats",
|
||||||
|
chat_list: { _: "chatListMain" },
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// 404 = all loaded (expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.consecutiveStalls = 0;
|
||||||
|
accountLog.info("TDLib client recreated successfully — continuing ingestion");
|
||||||
|
} catch (recreateErr) {
|
||||||
|
accountLog.error(
|
||||||
|
{ err: recreateErr },
|
||||||
|
"Failed to recreate TDLib client — aborting remaining uploads"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Record the failure for visibility in the UI
|
// Record the failure for visibility in the UI
|
||||||
try {
|
try {
|
||||||
const archiveSet = archiveSets[setIdx];
|
const archiveSet = archiveSets[setIdx];
|
||||||
|
|||||||
Reference in New Issue
Block a user