10 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
49b82a352b Fix review issues: race condition in invokeWithTimeout and mutex queue entry
- Add settled flag to invokeWithTimeout to prevent double-settling
- Create mutex queue entry with wrapped resolve before pushing to queue

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:17:56 +00:00
copilot-swe-agent[bot]
2e242912af Remove worker/dist build artifacts from git, add to .gitignore
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:15:17 +00:00
copilot-swe-agent[bot]
9adbdb2a77 Fix worker getting stuck during sync: add timeouts, stuck detection, and safety limits
- Add invokeWithTimeout wrapper for TDLib API calls (2min timeout per call)
- Add stuck detection to getChannelMessages: break if from_message_id doesn't advance
- Add stuck detection to getTopicMessages: same protection for topic scanning
- Add stuck detection to getForumTopicList: break if pagination offsets don't advance
- Add max page limit (5000) to all scanning loops to prevent infinite pagination
- Add mutex wait timeout (30min) to prevent indefinite blocking when holder hangs
- Add cycle timeout (4h default, configurable via WORKER_CYCLE_TIMEOUT_MINUTES)
- Fix end-of-page detection to use actual limit value instead of hardcoded 100

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:14:53 +00:00
copilot-swe-agent[bot]
ad71346468 Initial plan 2026-03-05 13:02:41 +00:00
xCyanGrizzly
e19a80897d Merge pull request #12 from xCyanGrizzly/copilot/fix-worker-functionality-visibility
Fix worker activity tracking, add scan progress, default channels to disabled
2026-03-05 09:42:48 +01:00
copilot-swe-agent[bot]
22da4dfad2 Fix messagesScanned consistency: use totalScanned from scan results, remove double-counting
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:36:40 +00:00
copilot-swe-agent[bot]
22bcacf3bd Add live message scanning progress, channel/topic counters to worker activity
- Add progress callbacks to getChannelMessages and getTopicMessages that
  fire after each page of messages is fetched
- Worker now shows channel progress (e.g. "[2/5] Channel Name") when
  processing multiple source channels
- Worker now shows topic progress (e.g. "topic 3/12") when scanning forums
- Worker now shows live message scanning count during channel/topic scans
  (e.g. "Scanning Channel — 300 messages scanned")
- UI stats line now always shows messagesScanned count
- messagesScanned counter now increments during the scanning phase, not
  just during archive processing

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:33:26 +00:00
copilot-swe-agent[bot]
15da57b8c0 Fix worker stuck on "Working..." and default channels to disabled
1. Worker trigger: Add ingestion_trigger pg_notify listener so the worker
   picks up on-demand triggers from the UI and runs an immediate cycle with
   full activity tracking (currentActivity, currentStep, etc).

2. Remove orphaned IngestionRun creation from triggerIngestion server action.
   Previously the UI created RUNNING runs without activity fields, causing
   the UI to show "Working..." with no details. Now only the worker creates
   runs with proper activity tracking.

3. Default channels to disabled (isActive: false) in schema and all creation
   paths. Destination channels are explicitly set to active since they must
   receive uploads. Includes Prisma migration.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:27:37 +00:00
copilot-swe-agent[bot]
8f1a912ccb Initial plan 2026-03-05 08:11:20 +00:00
xCyanGrizzly
81b65912aa Merge pull request #11 from xCyanGrizzly/copilot/fix-nextjs-deployment-issues
Fix Docker deployment: file permissions, missing env vars, healthcheck timing
2026-03-04 23:24:05 +01:00
12 changed files with 352 additions and 74 deletions

1
.gitignore vendored
View File

@@ -18,6 +18,7 @@ worker/node_modules
# production
/build
worker/dist
# misc
.DS_Store

View File

@@ -0,0 +1,3 @@
-- Change the default for new channels to disabled (isActive = false).
-- Existing channels are not affected — admins can manually enable/disable them.
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;

View File

@@ -417,7 +417,7 @@ model TelegramChannel {
title String
type ChannelType
isForum Boolean @default(false)
isActive Boolean @default(true)
isActive Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

View File

@@ -233,6 +233,11 @@ function RunningStatus({
</span>
</span>
)}
{run.messagesScanned > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
</span>
)}
{run.zipsIngested > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested

View File

@@ -173,6 +173,7 @@ export async function createChannel(
telegramId: BigInt(parsed.data.telegramId),
title: parsed.data.title,
type: parsed.data.type,
isActive: false,
},
});
revalidatePath(REVALIDATE_PATH);
@@ -371,19 +372,8 @@ export async function triggerIngestion(
return { success: false, error: "No eligible accounts found" };
}
// Create ingestion runs — the worker picks these up
for (const account of accounts) {
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// pg_notify for immediate worker pickup
// Signal the worker to run an immediate ingestion cycle via pg_notify.
// The worker will create its own IngestionRun records with proper activity tracking.
try {
await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`,
@@ -417,7 +407,7 @@ export async function saveChannelSelections(
try {
let linked = 0;
for (const ch of channels) {
// Upsert the channel record
// Upsert the channel record (new channels default to disabled)
const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(ch.telegramId) },
create: {
@@ -425,6 +415,7 @@ export async function saveChannelSelections(
title: ch.title,
type: "SOURCE",
isForum: ch.isForum,
isActive: false,
},
update: {
title: ch.title,
@@ -467,10 +458,10 @@ export async function setGlobalDestination(
if (!channel) return { success: false, error: "Channel not found" };
try {
// Set the channel type to DESTINATION
// Set the channel type to DESTINATION and ensure it's active
await prisma.telegramChannel.update({
where: { id: channelId },
data: { type: "DESTINATION" },
data: { type: "DESTINATION", isActive: true },
});
// Save as global destination
@@ -521,17 +512,19 @@ export async function createDestinationChannel(
if (!admin.success) return admin;
try {
// Create the channel as DESTINATION
// Create the channel as DESTINATION (active by default — needed for uploads)
const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(telegramId) },
create: {
telegramId: BigInt(telegramId),
title,
type: "DESTINATION",
isActive: true,
},
update: {
title,
type: "DESTINATION",
isActive: true,
},
});

View File

@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
title: string;
type: "SOURCE" | "DESTINATION";
isForum: boolean;
isActive?: boolean;
}
/**
* Upsert a channel by telegramId. Returns the channel record.
* If it already exists, update title and forum status.
* New channels default to disabled (isActive: false) so the admin must
* explicitly enable them before the worker processes them.
* Pass isActive: true for DESTINATION channels that must be active immediately.
*/
export async function upsertChannel(input: UpsertChannelInput) {
return db.telegramChannel.upsert({
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
title: input.title,
type: input.type,
isForum: input.isForum,
isActive: input.isActive ?? false,
},
update: {
title: input.title,

View File

@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
import { processFetchRequest } from "./worker.js";
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { triggerImmediateCycle } from "./scheduler.js";
import {
getGlobalDestinationChannel,
getGlobalSetting,
@@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null;
* - `channel_fetch` — payload = requestId → fetch channels for an account
* - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle
*/
export async function startFetchListener(): Promise<void> {
pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite");
await pgClient.query("LISTEN create_destination");
await pgClient.query("LISTEN ingestion_trigger");
pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) {
@@ -39,10 +42,12 @@ export async function startFetchListener(): Promise<void> {
handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload);
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger();
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
}
export function stopFetchListener(): void {
@@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void {
const result = await createSupergroup(client, parsed.title);
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
// Upsert it as a DESTINATION channel in the DB
// Upsert it as a DESTINATION channel in the DB (active by default)
const channel = await upsertChannel({
telegramId: result.chatId,
title: result.title,
type: "DESTINATION",
isForum: false,
isActive: true,
});
// Set as global destination
@@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void {
}
});
}
// ── Ingestion trigger handler ──
function handleIngestionTrigger(): void {
fetchQueue = fetchQueue.then(async () => {
try {
log.info("Ingestion trigger received from UI");
await triggerImmediateCycle();
} catch (err) {
log.error({ err }, "Failed to trigger immediate ingestion cycle");
}
});
}

View File

@@ -10,6 +10,13 @@ let running = false;
let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0;
/**
* Maximum time for a single ingestion cycle (ms).
* After this, new accounts won't be started (in-progress work finishes).
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
*/
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
/**
* Run one ingestion cycle:
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
@@ -17,6 +24,10 @@ let cycleCount = 0;
*
* All TDLib operations are wrapped in the mutex to ensure only one client
* runs at a time (also shared with the fetch listener for on-demand requests).
*
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress
* account processing is allowed to finish its current archive set.
*/
async function runCycle(): Promise<void> {
if (running) {
@@ -26,7 +37,8 @@ async function runCycle(): Promise<void> {
running = true;
cycleCount++;
log.info({ cycle: cycleCount }, "Starting ingestion cycle");
const cycleStart = Date.now();
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
try {
// ── Phase 1: Authenticate pending accounts ──
@@ -37,6 +49,10 @@ async function runCycle(): Promise<void> {
"Found pending accounts, starting authentication"
);
for (const account of pendingAccounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn("Cycle timeout reached during authentication phase, stopping");
break;
}
await withTdlibMutex(`auth:${account.phone}`, () =>
authenticateAccount(account)
);
@@ -54,12 +70,22 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn(
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
"Cycle timeout reached, skipping remaining accounts"
);
break;
}
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account)
);
}
log.info("Ingestion cycle complete");
log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
"Ingestion cycle complete"
);
} catch (err) {
log.error({ err }, "Ingestion cycle failed");
} finally {
@@ -105,6 +131,19 @@ export async function startScheduler(): Promise<void> {
scheduleNext();
}
/**
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
* If a cycle is already running, this is a no-op.
*/
export async function triggerImmediateCycle(): Promise<void> {
if (running) {
log.info("Cycle already running, ignoring trigger");
return;
}
log.info("Immediate cycle triggered via UI");
await runCycle();
}
/**
* Stop the scheduler gracefully.
*/

View File

@@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js";
const log = childLogger("download");
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
export const MAX_SCAN_PAGES = 5000;
/** Timeout for a single TDLib API call (ms) */
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
interface TdPhotoSize {
type: string;
photo: {
@@ -66,6 +72,47 @@ interface TdFile {
export interface ChannelScanResult {
archives: TelegramMessage[];
photos: TelegramPhoto[];
totalScanned: number;
}
export type ScanProgressCallback = (messagesScanned: number) => void;
/**
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
* If TDLib does not respond within the timeout, the promise rejects.
*/
export async function invokeWithTimeout<T>(
client: Client,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: Record<string, any>,
timeoutMs = INVOKE_TIMEOUT_MS
): Promise<T> {
return new Promise<T>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
}
}, timeoutMs);
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
}
/**
@@ -77,32 +124,53 @@ export interface ChannelScanResult {
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/
export async function getChannelMessages(
client: Client,
chatId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const result = (await client.invoke({
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
_: "getChatHistory",
chat_id: Number(chatId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
only_local: false,
})) as { messages: TdMessage[] };
});
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -132,19 +200,31 @@ export async function getChannelMessages(
}
}
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break;
if (result.messages.length < Math.min(limit, 100)) break;
// Rate limit delay
await sleep(config.apiDelayMs);
}
log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete"
);
@@ -152,6 +232,7 @@ export async function getChannelMessages(
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}

View File

@@ -4,7 +4,8 @@ import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
import type { ChannelScanResult } from "./download.js";
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
const log = childLogger("topics");
@@ -21,16 +22,16 @@ export async function isChatForum(
chatId: bigint
): Promise<boolean> {
try {
const chat = (await client.invoke({
_: "getChat",
chat_id: Number(chatId),
})) as {
const chat = await invokeWithTimeout<{
type?: {
_: string;
supergroup_id?: number;
is_forum?: boolean;
};
};
}>(client, {
_: "getChat",
chat_id: Number(chatId),
});
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
return true;
@@ -38,10 +39,10 @@ export async function isChatForum(
// Also check via getSupergroup for older TDLib versions
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
const sg = (await client.invoke({
const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as { is_forum?: boolean };
});
return sg.is_forum === true;
}
@@ -54,6 +55,7 @@ export async function isChatForum(
/**
* Get all forum topics in a supergroup.
* Includes stuck detection and timeout protection on API calls.
*/
export async function getForumTopicList(
client: Client,
@@ -63,18 +65,24 @@ export async function getForumTopicList(
let offsetDate = 0;
let offsetMessageId = 0;
let offsetMessageThreadId = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const result = (await client.invoke({
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
})) as {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, topicCount: topics.length },
"Hit max page limit for topic enumeration, stopping"
);
break;
}
pageCount++;
const prevOffsetDate = offsetDate;
const prevOffsetMessageId = offsetMessageId;
const prevOffsetMessageThreadId = offsetMessageThreadId;
const result = await invokeWithTimeout<{
topics?: {
info?: {
message_thread_id?: number;
@@ -85,7 +93,15 @@ export async function getForumTopicList(
next_offset_date?: number;
next_offset_message_id?: number;
next_offset_message_thread_id?: number;
};
}>(client, {
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
});
if (!result.topics || result.topics.length === 0) break;
@@ -113,6 +129,19 @@ export async function getForumTopicList(
offsetMessageId = result.next_offset_message_id ?? 0;
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
// Stuck detection: if offsets didn't advance, break
if (
offsetDate === prevOffsetDate &&
offsetMessageId === prevOffsetMessageId &&
offsetMessageThreadId === prevOffsetMessageThreadId
) {
log.warn(
{ chatId: chatId.toString(), topicCount: topics.length },
"Topic pagination stuck (offsets not advancing), breaking"
);
break;
}
await sleep(config.apiDelayMs);
}
@@ -134,35 +163,43 @@ export async function getForumTopicList(
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/
export async function getTopicMessages(
client: Client,
chatId: bigint,
topicId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
})) as {
const result = await invokeWithTimeout<{
messages?: {
id: number;
date: number;
@@ -186,10 +223,23 @@ export async function getTopicMessages(
caption?: { text?: string };
};
}[];
};
}>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -219,18 +269,30 @@ export async function getTopicMessages(
}
}
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break;
if (result.messages.length < Math.min(limit, 100)) break;
await sleep(config.apiDelayMs);
}
log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete"
);
@@ -238,6 +300,7 @@ export async function getTopicMessages(
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}

View File

@@ -4,12 +4,21 @@ const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; label: string }> = [];
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
/**
* Ensures only one TDLib client runs at a time across the entire worker process.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
* before creating any TDLib client.
*
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
*/
export async function withTdlibMutex<T>(
label: string,
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
): Promise<T> {
if (locked) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
await new Promise<void>((resolve) => queue.push({ resolve, label }));
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = queue.indexOf(entry);
if (idx !== -1) {
queue.splice(idx, 1);
reject(new Error(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
`(waiting: ${label}, holder: ${holder})`
));
}
}, MUTEX_WAIT_TIMEOUT_MS);
const entry = {
resolve: () => {
clearTimeout(timer);
resolve();
},
reject,
label,
};
queue.push(entry);
});
}
locked = true;

View File

@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
throw new Error("No global destination channel configured — set one in the admin UI");
}
for (const mapping of channelMappings) {
const totalChannels = channelMappings.length;
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx];
const channel = mapping.channel;
const channelLabel = totalChannels > 1
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
: channel.title;
try {
// ── Check if channel is a forum ──
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
if (forum) {
// ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, {
currentActivity: `Enumerating topics in "${channel.title}"`,
currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning",
currentChannel: channel.title,
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const topics = await getForumTopicList(client, channel.telegramId);
@@ -399,31 +406,50 @@ export async function runWorkerForAccount(
"Scanning forum channel by topic"
);
for (const topic of topics) {
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
const topic = topics[tIdx];
try {
const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
const topicLabel = `${channel.title} ${topic.name}`;
const topicProgress = topics.length > 1
? ` (topic ${tIdx + 1}/${topics.length})`
: "";
await updateRunActivity(activeRunId, {
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning",
currentChannel: `${channel.title} ${topic.name}`,
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
progress?.lastProcessedMessageId
progress?.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${topicLabel}"${topicProgress}${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.debug(
{ channelId: channel.id, topic: topic.name },
@@ -463,15 +489,16 @@ export async function runWorkerForAccount(
} else {
// ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channel.title}" for new archives`,
currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning",
currentChannel: channel.title,
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
accountLog.info(
@@ -482,9 +509,21 @@ export async function runWorkerForAccount(
const scanResult = await getChannelMessages(
client,
channel.telegramId,
mapping.lastProcessedMessageId
mapping.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives");
continue;
@@ -593,6 +632,7 @@ async function processArchiveSets(
currentChannel: channelTitle,
totalFiles: archiveSets.length,
zipsFound: counters.zipsFound,
messagesScanned: counters.messagesScanned,
});
// Track the highest message ID that was successfully processed
@@ -646,7 +686,6 @@ async function processOneArchiveSet(
throttled, counters, topicCreator, sourceTopicId, accountLog,
} = ctx;
counters.messagesScanned += archiveSet.parts.length;
const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ──