mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Compare commits
5 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49b82a352b | ||
|
|
2e242912af | ||
|
|
9adbdb2a77 | ||
|
|
ad71346468 | ||
|
|
e19a80897d |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -18,6 +18,7 @@ worker/node_modules
|
|||||||
|
|
||||||
# production
|
# production
|
||||||
/build
|
/build
|
||||||
|
worker/dist
|
||||||
|
|
||||||
# misc
|
# misc
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
|||||||
@@ -10,6 +10,13 @@ let running = false;
|
|||||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||||
let cycleCount = 0;
|
let cycleCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum time for a single ingestion cycle (ms).
|
||||||
|
* After this, new accounts won't be started (in-progress work finishes).
|
||||||
|
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
|
||||||
|
*/
|
||||||
|
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run one ingestion cycle:
|
* Run one ingestion cycle:
|
||||||
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
||||||
@@ -17,6 +24,10 @@ let cycleCount = 0;
|
|||||||
*
|
*
|
||||||
* All TDLib operations are wrapped in the mutex to ensure only one client
|
* All TDLib operations are wrapped in the mutex to ensure only one client
|
||||||
* runs at a time (also shared with the fetch listener for on-demand requests).
|
* runs at a time (also shared with the fetch listener for on-demand requests).
|
||||||
|
*
|
||||||
|
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
|
||||||
|
* Once the timeout elapses, no new accounts will be started but any in-progress
|
||||||
|
* account processing is allowed to finish its current archive set.
|
||||||
*/
|
*/
|
||||||
async function runCycle(): Promise<void> {
|
async function runCycle(): Promise<void> {
|
||||||
if (running) {
|
if (running) {
|
||||||
@@ -26,7 +37,8 @@ async function runCycle(): Promise<void> {
|
|||||||
|
|
||||||
running = true;
|
running = true;
|
||||||
cycleCount++;
|
cycleCount++;
|
||||||
log.info({ cycle: cycleCount }, "Starting ingestion cycle");
|
const cycleStart = Date.now();
|
||||||
|
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// ── Phase 1: Authenticate pending accounts ──
|
// ── Phase 1: Authenticate pending accounts ──
|
||||||
@@ -37,6 +49,10 @@ async function runCycle(): Promise<void> {
|
|||||||
"Found pending accounts, starting authentication"
|
"Found pending accounts, starting authentication"
|
||||||
);
|
);
|
||||||
for (const account of pendingAccounts) {
|
for (const account of pendingAccounts) {
|
||||||
|
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
|
||||||
|
log.warn("Cycle timeout reached during authentication phase, stopping");
|
||||||
|
break;
|
||||||
|
}
|
||||||
await withTdlibMutex(`auth:${account.phone}`, () =>
|
await withTdlibMutex(`auth:${account.phone}`, () =>
|
||||||
authenticateAccount(account)
|
authenticateAccount(account)
|
||||||
);
|
);
|
||||||
@@ -54,12 +70,22 @@ async function runCycle(): Promise<void> {
|
|||||||
log.info({ accountCount: accounts.length }, "Processing accounts");
|
log.info({ accountCount: accounts.length }, "Processing accounts");
|
||||||
|
|
||||||
for (const account of accounts) {
|
for (const account of accounts) {
|
||||||
|
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
|
||||||
|
log.warn(
|
||||||
|
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
|
||||||
|
"Cycle timeout reached, skipping remaining accounts"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
await withTdlibMutex(`ingest:${account.phone}`, () =>
|
await withTdlibMutex(`ingest:${account.phone}`, () =>
|
||||||
runWorkerForAccount(account)
|
runWorkerForAccount(account)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Ingestion cycle complete");
|
log.info(
|
||||||
|
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
|
||||||
|
"Ingestion cycle complete"
|
||||||
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error({ err }, "Ingestion cycle failed");
|
log.error({ err }, "Ingestion cycle failed");
|
||||||
} finally {
|
} finally {
|
||||||
|
|||||||
@@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js";
|
|||||||
|
|
||||||
const log = childLogger("download");
|
const log = childLogger("download");
|
||||||
|
|
||||||
|
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
|
||||||
|
export const MAX_SCAN_PAGES = 5000;
|
||||||
|
|
||||||
|
/** Timeout for a single TDLib API call (ms) */
|
||||||
|
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
|
||||||
|
|
||||||
interface TdPhotoSize {
|
interface TdPhotoSize {
|
||||||
type: string;
|
type: string;
|
||||||
photo: {
|
photo: {
|
||||||
@@ -71,6 +77,44 @@ export interface ChannelScanResult {
|
|||||||
|
|
||||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
|
||||||
|
* If TDLib does not respond within the timeout, the promise rejects.
|
||||||
|
*/
|
||||||
|
export async function invokeWithTimeout<T>(
|
||||||
|
client: Client,
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
request: Record<string, any>,
|
||||||
|
timeoutMs = INVOKE_TIMEOUT_MS
|
||||||
|
): Promise<T> {
|
||||||
|
return new Promise<T>((resolve, reject) => {
|
||||||
|
let settled = false;
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
|
||||||
|
}
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
(client.invoke(request) as Promise<T>)
|
||||||
|
.then((result) => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve(result);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
if (!settled) {
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch messages from a channel, stopping once we've scanned past the
|
* Fetch messages from a channel, stopping once we've scanned past the
|
||||||
* last-processed boundary (with one page of lookback for multipart safety).
|
* last-processed boundary (with one page of lookback for multipart safety).
|
||||||
@@ -80,6 +124,11 @@ export type ScanProgressCallback = (messagesScanned: number) => void;
|
|||||||
* When `lastProcessedMessageId` is null (first run), scans everything.
|
* When `lastProcessedMessageId` is null (first run), scans everything.
|
||||||
* The worker applies a post-grouping filter to skip fully-processed sets,
|
* The worker applies a post-grouping filter to skip fully-processed sets,
|
||||||
* and keeps `packageExistsBySourceMessage` as a safety net.
|
* and keeps `packageExistsBySourceMessage` as a safety net.
|
||||||
|
*
|
||||||
|
* Safety features:
|
||||||
|
* - Max page limit to prevent infinite loops
|
||||||
|
* - Stuck detection: breaks if from_message_id stops advancing
|
||||||
|
* - Timeout on each TDLib API call
|
||||||
*/
|
*/
|
||||||
export async function getChannelMessages(
|
export async function getChannelMessages(
|
||||||
client: Client,
|
client: Client,
|
||||||
@@ -94,17 +143,29 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
let totalScanned = 0;
|
let totalScanned = 0;
|
||||||
|
let pageCount = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
const result = (await client.invoke({
|
if (pageCount >= MAX_SCAN_PAGES) {
|
||||||
|
log.warn(
|
||||||
|
{ chatId: chatId.toString(), pageCount, totalScanned },
|
||||||
|
"Hit max page limit for channel scan, stopping"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pageCount++;
|
||||||
|
|
||||||
|
const previousFromId = currentFromId;
|
||||||
|
|
||||||
|
const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
|
||||||
_: "getChatHistory",
|
_: "getChatHistory",
|
||||||
chat_id: Number(chatId),
|
chat_id: Number(chatId),
|
||||||
from_message_id: currentFromId,
|
from_message_id: currentFromId,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
limit: Math.min(limit, 100),
|
limit: Math.min(limit, 100),
|
||||||
only_local: false,
|
only_local: false,
|
||||||
})) as { messages: TdMessage[] };
|
});
|
||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
@@ -144,17 +205,26 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
|
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
|
||||||
|
if (currentFromId === previousFromId) {
|
||||||
|
log.warn(
|
||||||
|
{ chatId: chatId.toString(), currentFromId, totalScanned },
|
||||||
|
"Pagination stuck (from_message_id not advancing), breaking"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
if (boundary && currentFromId < boundary) break;
|
if (boundary && currentFromId < boundary) break;
|
||||||
|
|
||||||
if (result.messages.length < 100) break;
|
if (result.messages.length < Math.min(limit, 100)) break;
|
||||||
|
|
||||||
// Rate limit delay
|
// Rate limit delay
|
||||||
await sleep(config.apiDelayMs);
|
await sleep(config.apiDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
|
||||||
"Channel scan complete"
|
"Channel scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { isArchiveAttachment } from "../archive/detect.js";
|
|||||||
import type { TelegramMessage } from "../archive/multipart.js";
|
import type { TelegramMessage } from "../archive/multipart.js";
|
||||||
import type { TelegramPhoto } from "../preview/match.js";
|
import type { TelegramPhoto } from "../preview/match.js";
|
||||||
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
||||||
|
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
|
||||||
|
|
||||||
const log = childLogger("topics");
|
const log = childLogger("topics");
|
||||||
|
|
||||||
@@ -21,16 +22,16 @@ export async function isChatForum(
|
|||||||
chatId: bigint
|
chatId: bigint
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
const chat = (await client.invoke({
|
const chat = await invokeWithTimeout<{
|
||||||
_: "getChat",
|
|
||||||
chat_id: Number(chatId),
|
|
||||||
})) as {
|
|
||||||
type?: {
|
type?: {
|
||||||
_: string;
|
_: string;
|
||||||
supergroup_id?: number;
|
supergroup_id?: number;
|
||||||
is_forum?: boolean;
|
is_forum?: boolean;
|
||||||
};
|
};
|
||||||
};
|
}>(client, {
|
||||||
|
_: "getChat",
|
||||||
|
chat_id: Number(chatId),
|
||||||
|
});
|
||||||
|
|
||||||
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
|
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
|
||||||
return true;
|
return true;
|
||||||
@@ -38,10 +39,10 @@ export async function isChatForum(
|
|||||||
|
|
||||||
// Also check via getSupergroup for older TDLib versions
|
// Also check via getSupergroup for older TDLib versions
|
||||||
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
|
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
|
||||||
const sg = (await client.invoke({
|
const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
|
||||||
_: "getSupergroup",
|
_: "getSupergroup",
|
||||||
supergroup_id: chat.type.supergroup_id,
|
supergroup_id: chat.type.supergroup_id,
|
||||||
})) as { is_forum?: boolean };
|
});
|
||||||
return sg.is_forum === true;
|
return sg.is_forum === true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,6 +55,7 @@ export async function isChatForum(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all forum topics in a supergroup.
|
* Get all forum topics in a supergroup.
|
||||||
|
* Includes stuck detection and timeout protection on API calls.
|
||||||
*/
|
*/
|
||||||
export async function getForumTopicList(
|
export async function getForumTopicList(
|
||||||
client: Client,
|
client: Client,
|
||||||
@@ -63,18 +65,24 @@ export async function getForumTopicList(
|
|||||||
let offsetDate = 0;
|
let offsetDate = 0;
|
||||||
let offsetMessageId = 0;
|
let offsetMessageId = 0;
|
||||||
let offsetMessageThreadId = 0;
|
let offsetMessageThreadId = 0;
|
||||||
|
let pageCount = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
const result = (await client.invoke({
|
if (pageCount >= MAX_SCAN_PAGES) {
|
||||||
_: "getForumTopics",
|
log.warn(
|
||||||
chat_id: Number(chatId),
|
{ chatId: chatId.toString(), pageCount, topicCount: topics.length },
|
||||||
query: "",
|
"Hit max page limit for topic enumeration, stopping"
|
||||||
offset_date: offsetDate,
|
);
|
||||||
offset_message_id: offsetMessageId,
|
break;
|
||||||
offset_message_thread_id: offsetMessageThreadId,
|
}
|
||||||
limit: 100,
|
pageCount++;
|
||||||
})) as {
|
|
||||||
|
const prevOffsetDate = offsetDate;
|
||||||
|
const prevOffsetMessageId = offsetMessageId;
|
||||||
|
const prevOffsetMessageThreadId = offsetMessageThreadId;
|
||||||
|
|
||||||
|
const result = await invokeWithTimeout<{
|
||||||
topics?: {
|
topics?: {
|
||||||
info?: {
|
info?: {
|
||||||
message_thread_id?: number;
|
message_thread_id?: number;
|
||||||
@@ -85,7 +93,15 @@ export async function getForumTopicList(
|
|||||||
next_offset_date?: number;
|
next_offset_date?: number;
|
||||||
next_offset_message_id?: number;
|
next_offset_message_id?: number;
|
||||||
next_offset_message_thread_id?: number;
|
next_offset_message_thread_id?: number;
|
||||||
};
|
}>(client, {
|
||||||
|
_: "getForumTopics",
|
||||||
|
chat_id: Number(chatId),
|
||||||
|
query: "",
|
||||||
|
offset_date: offsetDate,
|
||||||
|
offset_message_id: offsetMessageId,
|
||||||
|
offset_message_thread_id: offsetMessageThreadId,
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
|
||||||
if (!result.topics || result.topics.length === 0) break;
|
if (!result.topics || result.topics.length === 0) break;
|
||||||
|
|
||||||
@@ -113,6 +129,19 @@ export async function getForumTopicList(
|
|||||||
offsetMessageId = result.next_offset_message_id ?? 0;
|
offsetMessageId = result.next_offset_message_id ?? 0;
|
||||||
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
|
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
|
||||||
|
|
||||||
|
// Stuck detection: if offsets didn't advance, break
|
||||||
|
if (
|
||||||
|
offsetDate === prevOffsetDate &&
|
||||||
|
offsetMessageId === prevOffsetMessageId &&
|
||||||
|
offsetMessageThreadId === prevOffsetMessageThreadId
|
||||||
|
) {
|
||||||
|
log.warn(
|
||||||
|
{ chatId: chatId.toString(), topicCount: topics.length },
|
||||||
|
"Topic pagination stuck (offsets not advancing), breaking"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
await sleep(config.apiDelayMs);
|
await sleep(config.apiDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,6 +163,11 @@ export async function getForumTopicList(
|
|||||||
* When `lastProcessedMessageId` is null (first run), scans everything.
|
* When `lastProcessedMessageId` is null (first run), scans everything.
|
||||||
* The worker applies a post-grouping filter to skip fully-processed sets,
|
* The worker applies a post-grouping filter to skip fully-processed sets,
|
||||||
* and keeps `packageExistsBySourceMessage` as a safety net.
|
* and keeps `packageExistsBySourceMessage` as a safety net.
|
||||||
|
*
|
||||||
|
* Safety features:
|
||||||
|
* - Max page limit to prevent infinite loops
|
||||||
|
* - Stuck detection: breaks if from_message_id stops advancing
|
||||||
|
* - Timeout on each TDLib API call
|
||||||
*/
|
*/
|
||||||
export async function getTopicMessages(
|
export async function getTopicMessages(
|
||||||
client: Client,
|
client: Client,
|
||||||
@@ -149,22 +183,23 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
let totalScanned = 0;
|
let totalScanned = 0;
|
||||||
|
let pageCount = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (pageCount >= MAX_SCAN_PAGES) {
|
||||||
|
log.warn(
|
||||||
|
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
|
||||||
|
"Hit max page limit for topic scan, stopping"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pageCount++;
|
||||||
|
|
||||||
|
const previousFromId = currentFromId;
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
const result = (await client.invoke({
|
const result = await invokeWithTimeout<{
|
||||||
_: "searchChatMessages",
|
|
||||||
chat_id: Number(chatId),
|
|
||||||
query: "",
|
|
||||||
message_thread_id: Number(topicId),
|
|
||||||
from_message_id: currentFromId,
|
|
||||||
offset: 0,
|
|
||||||
limit: Math.min(limit, 100),
|
|
||||||
filter: null,
|
|
||||||
sender_id: null,
|
|
||||||
saved_messages_topic_id: 0,
|
|
||||||
})) as {
|
|
||||||
messages?: {
|
messages?: {
|
||||||
id: number;
|
id: number;
|
||||||
date: number;
|
date: number;
|
||||||
@@ -188,7 +223,18 @@ export async function getTopicMessages(
|
|||||||
caption?: { text?: string };
|
caption?: { text?: string };
|
||||||
};
|
};
|
||||||
}[];
|
}[];
|
||||||
};
|
}>(client, {
|
||||||
|
_: "searchChatMessages",
|
||||||
|
chat_id: Number(chatId),
|
||||||
|
query: "",
|
||||||
|
message_thread_id: Number(topicId),
|
||||||
|
from_message_id: currentFromId,
|
||||||
|
offset: 0,
|
||||||
|
limit: Math.min(limit, 100),
|
||||||
|
filter: null,
|
||||||
|
sender_id: null,
|
||||||
|
saved_messages_topic_id: 0,
|
||||||
|
});
|
||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
@@ -228,16 +274,25 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
|
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
|
||||||
|
if (currentFromId === previousFromId) {
|
||||||
|
log.warn(
|
||||||
|
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
|
||||||
|
"Topic pagination stuck (from_message_id not advancing), breaking"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
if (boundary && currentFromId < boundary) break;
|
if (boundary && currentFromId < boundary) break;
|
||||||
|
|
||||||
if (result.messages.length < 100) break;
|
if (result.messages.length < Math.min(limit, 100)) break;
|
||||||
|
|
||||||
await sleep(config.apiDelayMs);
|
await sleep(config.apiDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
|
||||||
"Topic scan complete"
|
"Topic scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -4,12 +4,21 @@ const log = childLogger("mutex");
|
|||||||
|
|
||||||
let locked = false;
|
let locked = false;
|
||||||
let holder = "";
|
let holder = "";
|
||||||
const queue: Array<{ resolve: () => void; label: string }> = [];
|
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum time to wait for the TDLib mutex (ms).
|
||||||
|
* If the mutex is not available within this time, the operation is rejected.
|
||||||
|
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
|
||||||
|
*/
|
||||||
|
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures only one TDLib client runs at a time across the entire worker process.
|
* Ensures only one TDLib client runs at a time across the entire worker process.
|
||||||
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
|
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
|
||||||
* before creating any TDLib client.
|
* before creating any TDLib client.
|
||||||
|
*
|
||||||
|
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
|
||||||
*/
|
*/
|
||||||
export async function withTdlibMutex<T>(
|
export async function withTdlibMutex<T>(
|
||||||
label: string,
|
label: string,
|
||||||
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
|
|||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
if (locked) {
|
if (locked) {
|
||||||
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
|
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
|
||||||
await new Promise<void>((resolve) => queue.push({ resolve, label }));
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
const idx = queue.indexOf(entry);
|
||||||
|
if (idx !== -1) {
|
||||||
|
queue.splice(idx, 1);
|
||||||
|
reject(new Error(
|
||||||
|
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
|
||||||
|
`(waiting: ${label}, holder: ${holder})`
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}, MUTEX_WAIT_TIMEOUT_MS);
|
||||||
|
|
||||||
|
const entry = {
|
||||||
|
resolve: () => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve();
|
||||||
|
},
|
||||||
|
reject,
|
||||||
|
label,
|
||||||
|
};
|
||||||
|
queue.push(entry);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
locked = true;
|
locked = true;
|
||||||
|
|||||||
Reference in New Issue
Block a user