diff --git a/src/app/(app)/telegram/_components/worker-status-panel.tsx b/src/app/(app)/telegram/_components/worker-status-panel.tsx index 282ebef..ef7601d 100644 --- a/src/app/(app)/telegram/_components/worker-status-panel.tsx +++ b/src/app/(app)/telegram/_components/worker-status-panel.tsx @@ -233,6 +233,11 @@ function RunningStatus({ )} + {run.messagesScanned > 0 && ( + + {run.messagesScanned} messages + + )} {run.zipsIngested > 0 && ( {run.zipsIngested} ingested diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 4f40eca..c405c55 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -68,6 +68,8 @@ export interface ChannelScanResult { photos: TelegramPhoto[]; } +export type ScanProgressCallback = (messagesScanned: number) => void; + /** * Fetch messages from a channel, stopping once we've scanned past the * last-processed boundary (with one page of lookback for multipart safety). @@ -82,13 +84,15 @@ export async function getChannelMessages( client: Client, chatId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -103,6 +107,8 @@ export async function getChannelMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -132,6 +138,9 @@ export async function getChannelMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -144,7 +153,7 @@ export async function getChannelMessages( } log.info( - { chatId: chatId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Channel scan complete" ); diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 23f02c5..1c8e7ce 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js"; import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; -import type { ChannelScanResult } from "./download.js"; +import type { ChannelScanResult, ScanProgressCallback } from "./download.js"; const log = childLogger("topics"); @@ -140,13 +140,15 @@ export async function getTopicMessages( chatId: bigint, topicId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -190,6 +192,8 @@ export async function getTopicMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -219,6 +223,9 @@ export async function getTopicMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -230,7 +237,7 @@ export async function getTopicMessages( } log.info( - { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Topic scan complete" ); diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 945e447..e77c4e4 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -349,8 +349,14 @@ export async function runWorkerForAccount( throw new Error("No global destination channel configured — set one in the admin UI"); } - for (const mapping of channelMappings) { + const totalChannels = channelMappings.length; + + for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { + const mapping = channelMappings[chIdx]; const channel = mapping.channel; + const channelLabel = totalChannels > 1 + ? `[${chIdx + 1}/${totalChannels}] ${channel.title}` + : channel.title; try { // ── Check if channel is a forum ── @@ -380,15 +386,16 @@ export async function runWorkerForAccount( if (forum) { // ── Forum channel: scan per-topic ── await updateRunActivity(activeRunId, { - currentActivity: `Enumerating topics in "${channel.title}"`, + currentActivity: `Enumerating topics in "${channelLabel}"`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const topics = await getForumTopicList(client, channel.telegramId); @@ -399,31 +406,51 @@ export async function runWorkerForAccount( "Scanning forum channel by topic" ); - for (const topic of topics) { + for (let tIdx = 0; tIdx < topics.length; tIdx++) { + const topic = topics[tIdx]; try { const progress = topicProgressList.find( (tp) => tp.topicId === topic.topicId ); + const topicLabel = `${channel.title} › ${topic.name}`; + const topicProgress = topics.length > 1 + ? ` (topic ${tIdx + 1}/${topics.length})` + : ""; + await updateRunActivity(activeRunId, { - currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`, + currentActivity: `Scanning "${topicLabel}"${topicProgress}`, currentStep: "scanning", - currentChannel: `${channel.title} › ${topic.name}`, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const scanResult = await getTopicMessages( client, channel.telegramId, topic.topicId, - progress?.lastProcessedMessageId + progress?.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + const topicMsgCount = scanResult.archives.length + scanResult.photos.length; + counters.messagesScanned += topicMsgCount; + if (scanResult.archives.length === 0) { accountLog.debug( { channelId: channel.id, topic: topic.name }, @@ -463,15 +490,16 @@ export async function runWorkerForAccount( } else { // ── Non-forum channel: flat scan (existing behavior) ── await updateRunActivity(activeRunId, { - currentActivity: `Scanning "${channel.title}" for new archives`, + currentActivity: `Scanning "${channelLabel}" for new archives`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); accountLog.info( @@ -482,9 +510,22 @@ export async function runWorkerForAccount( const scanResult = await getChannelMessages( client, channel.telegramId, - mapping.lastProcessedMessageId + mapping.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + const channelMsgCount = scanResult.archives.length + scanResult.photos.length; + counters.messagesScanned += channelMsgCount; + if (scanResult.archives.length === 0) { accountLog.debug({ channelId: channel.id }, "No new archives"); continue; @@ -593,6 +634,7 @@ async function processArchiveSets( currentChannel: channelTitle, totalFiles: archiveSets.length, zipsFound: counters.zipsFound, + messagesScanned: counters.messagesScanned, }); // Track the highest message ID that was successfully processed