From 22bcacf3bdfcb2adc28712368a330668fb28da07 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Thu, 5 Mar 2026 08:33:26 +0000
Subject: [PATCH] Add live message scanning progress, channel/topic counters to
worker activity
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- Add progress callbacks to getChannelMessages and getTopicMessages that
fire after each page of messages is fetched
- Worker now shows channel progress (e.g. "[2/5] Channel Name") when
processing multiple source channels
- Worker now shows topic progress (e.g. "topic 3/12") when scanning forums
- Worker now shows live message scanning count during channel/topic scans
(e.g. "Scanning Channel — 300 messages scanned")
- UI stats line now always shows messagesScanned count
- messagesScanned counter now increments during the scanning phase, not
just during archive processing
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
---
.../_components/worker-status-panel.tsx | 5 ++
worker/src/tdlib/download.ts | 13 +++-
worker/src/tdlib/topics.ts | 13 +++-
worker/src/worker.ts | 62 ++++++++++++++++---
4 files changed, 78 insertions(+), 15 deletions(-)
diff --git a/src/app/(app)/telegram/_components/worker-status-panel.tsx b/src/app/(app)/telegram/_components/worker-status-panel.tsx
index 282ebef..ef7601d 100644
--- a/src/app/(app)/telegram/_components/worker-status-panel.tsx
+++ b/src/app/(app)/telegram/_components/worker-status-panel.tsx
@@ -233,6 +233,11 @@ function RunningStatus({
)}
+ {run.messagesScanned > 0 && (
+
+ {run.messagesScanned} messages
+
+ )}
{run.zipsIngested > 0 && (
{run.zipsIngested} ingested
diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts
index 4f40eca..c405c55 100644
--- a/worker/src/tdlib/download.ts
+++ b/worker/src/tdlib/download.ts
@@ -68,6 +68,8 @@ export interface ChannelScanResult {
photos: TelegramPhoto[];
}
+export type ScanProgressCallback = (messagesScanned: number) => void;
+
/**
* Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety).
@@ -82,13 +84,15 @@ export async function getChannelMessages(
client: Client,
chatId: bigint,
lastProcessedMessageId?: bigint | null,
- limit = 100
+ limit = 100,
+ onProgress?: ScanProgressCallback
): Promise {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
+ let totalScanned = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
@@ -103,6 +107,8 @@ export async function getChannelMessages(
if (!result.messages || result.messages.length === 0) break;
+ totalScanned += result.messages.length;
+
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -132,6 +138,9 @@ export async function getChannelMessages(
}
}
+ // Report scanning progress after each page
+ onProgress?.(totalScanned);
+
currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback)
@@ -144,7 +153,7 @@ export async function getChannelMessages(
}
log.info(
- { chatId: chatId.toString(), archives: archives.length, photos: photos.length },
+ { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
"Channel scan complete"
);
diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts
index 23f02c5..1c8e7ce 100644
--- a/worker/src/tdlib/topics.ts
+++ b/worker/src/tdlib/topics.ts
@@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
-import type { ChannelScanResult } from "./download.js";
+import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
const log = childLogger("topics");
@@ -140,13 +140,15 @@ export async function getTopicMessages(
chatId: bigint,
topicId: bigint,
lastProcessedMessageId?: bigint | null,
- limit = 100
+ limit = 100,
+ onProgress?: ScanProgressCallback
): Promise {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
+ let totalScanned = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
@@ -190,6 +192,8 @@ export async function getTopicMessages(
if (!result.messages || result.messages.length === 0) break;
+ totalScanned += result.messages.length;
+
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -219,6 +223,9 @@ export async function getTopicMessages(
}
}
+ // Report scanning progress after each page
+ onProgress?.(totalScanned);
+
currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback)
@@ -230,7 +237,7 @@ export async function getTopicMessages(
}
log.info(
- { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
+ { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
"Topic scan complete"
);
diff --git a/worker/src/worker.ts b/worker/src/worker.ts
index 945e447..e77c4e4 100644
--- a/worker/src/worker.ts
+++ b/worker/src/worker.ts
@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
throw new Error("No global destination channel configured — set one in the admin UI");
}
- for (const mapping of channelMappings) {
+ const totalChannels = channelMappings.length;
+
+ for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
+ const mapping = channelMappings[chIdx];
const channel = mapping.channel;
+ const channelLabel = totalChannels > 1
+ ? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
+ : channel.title;
try {
// ── Check if channel is a forum ──
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
if (forum) {
// ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, {
- currentActivity: `Enumerating topics in "${channel.title}"`,
+ currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning",
- currentChannel: channel.title,
+ currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
+ messagesScanned: counters.messagesScanned,
});
const topics = await getForumTopicList(client, channel.telegramId);
@@ -399,31 +406,51 @@ export async function runWorkerForAccount(
"Scanning forum channel by topic"
);
- for (const topic of topics) {
+ for (let tIdx = 0; tIdx < topics.length; tIdx++) {
+ const topic = topics[tIdx];
try {
const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
+ const topicLabel = `${channel.title} › ${topic.name}`;
+ const topicProgress = topics.length > 1
+ ? ` (topic ${tIdx + 1}/${topics.length})`
+ : "";
+
await updateRunActivity(activeRunId, {
- currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
+ currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning",
- currentChannel: `${channel.title} › ${topic.name}`,
+ currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
+ messagesScanned: counters.messagesScanned,
});
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
- progress?.lastProcessedMessageId
+ progress?.lastProcessedMessageId,
+ 100,
+ (scanned) => {
+ throttled.update({
+ currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`,
+ currentStep: "scanning",
+ currentChannel: channelLabel,
+ messagesScanned: counters.messagesScanned + scanned,
+ });
+ }
);
+ // Add scanned messages to global counter
+ const topicMsgCount = scanResult.archives.length + scanResult.photos.length;
+ counters.messagesScanned += topicMsgCount;
+
if (scanResult.archives.length === 0) {
accountLog.debug(
{ channelId: channel.id, topic: topic.name },
@@ -463,15 +490,16 @@ export async function runWorkerForAccount(
} else {
// ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, {
- currentActivity: `Scanning "${channel.title}" for new archives`,
+ currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning",
- currentChannel: channel.title,
+ currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
+ messagesScanned: counters.messagesScanned,
});
accountLog.info(
@@ -482,9 +510,22 @@ export async function runWorkerForAccount(
const scanResult = await getChannelMessages(
client,
channel.telegramId,
- mapping.lastProcessedMessageId
+ mapping.lastProcessedMessageId,
+ 100,
+ (scanned) => {
+ throttled.update({
+ currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
+ currentStep: "scanning",
+ currentChannel: channelLabel,
+ messagesScanned: counters.messagesScanned + scanned,
+ });
+ }
);
+ // Add scanned messages to global counter
+ const channelMsgCount = scanResult.archives.length + scanResult.photos.length;
+ counters.messagesScanned += channelMsgCount;
+
if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives");
continue;
@@ -593,6 +634,7 @@ async function processArchiveSets(
currentChannel: channelTitle,
totalFiles: archiveSets.length,
zipsFound: counters.zipsFound,
+ messagesScanned: counters.messagesScanned,
});
// Track the highest message ID that was successfully processed