mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Add live message scanning progress, channel/topic counters to worker activity
- Add progress callbacks to getChannelMessages and getTopicMessages that fire after each page of messages is fetched - Worker now shows channel progress (e.g. "[2/5] Channel Name") when processing multiple source channels - Worker now shows topic progress (e.g. "topic 3/12") when scanning forums - Worker now shows live message scanning count during channel/topic scans (e.g. "Scanning Channel — 300 messages scanned") - UI stats line now always shows messagesScanned count - messagesScanned counter now increments during the scanning phase, not just during archive processing Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
This commit is contained in:
@@ -233,6 +233,11 @@ function RunningStatus({
|
|||||||
</span>
|
</span>
|
||||||
</span>
|
</span>
|
||||||
)}
|
)}
|
||||||
|
{run.messagesScanned > 0 && (
|
||||||
|
<span>
|
||||||
|
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
|
||||||
|
</span>
|
||||||
|
)}
|
||||||
{run.zipsIngested > 0 && (
|
{run.zipsIngested > 0 && (
|
||||||
<span>
|
<span>
|
||||||
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
||||||
|
|||||||
@@ -68,6 +68,8 @@ export interface ChannelScanResult {
|
|||||||
photos: TelegramPhoto[];
|
photos: TelegramPhoto[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch messages from a channel, stopping once we've scanned past the
|
* Fetch messages from a channel, stopping once we've scanned past the
|
||||||
* last-processed boundary (with one page of lookback for multipart safety).
|
* last-processed boundary (with one page of lookback for multipart safety).
|
||||||
@@ -82,13 +84,15 @@ export async function getChannelMessages(
|
|||||||
client: Client,
|
client: Client,
|
||||||
chatId: bigint,
|
chatId: bigint,
|
||||||
lastProcessedMessageId?: bigint | null,
|
lastProcessedMessageId?: bigint | null,
|
||||||
limit = 100
|
limit = 100,
|
||||||
|
onProgress?: ScanProgressCallback
|
||||||
): Promise<ChannelScanResult> {
|
): Promise<ChannelScanResult> {
|
||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
|
let totalScanned = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
@@ -103,6 +107,8 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -132,6 +138,9 @@ export async function getChannelMessages(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report scanning progress after each page
|
||||||
|
onProgress?.(totalScanned);
|
||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
@@ -144,7 +153,7 @@ export async function getChannelMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
|
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||||
"Channel scan complete"
|
"Channel scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js";
|
|||||||
import { isArchiveAttachment } from "../archive/detect.js";
|
import { isArchiveAttachment } from "../archive/detect.js";
|
||||||
import type { TelegramMessage } from "../archive/multipart.js";
|
import type { TelegramMessage } from "../archive/multipart.js";
|
||||||
import type { TelegramPhoto } from "../preview/match.js";
|
import type { TelegramPhoto } from "../preview/match.js";
|
||||||
import type { ChannelScanResult } from "./download.js";
|
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
||||||
|
|
||||||
const log = childLogger("topics");
|
const log = childLogger("topics");
|
||||||
|
|
||||||
@@ -140,13 +140,15 @@ export async function getTopicMessages(
|
|||||||
chatId: bigint,
|
chatId: bigint,
|
||||||
topicId: bigint,
|
topicId: bigint,
|
||||||
lastProcessedMessageId?: bigint | null,
|
lastProcessedMessageId?: bigint | null,
|
||||||
limit = 100
|
limit = 100,
|
||||||
|
onProgress?: ScanProgressCallback
|
||||||
): Promise<ChannelScanResult> {
|
): Promise<ChannelScanResult> {
|
||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
|
let totalScanned = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
@@ -190,6 +192,8 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -219,6 +223,9 @@ export async function getTopicMessages(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report scanning progress after each page
|
||||||
|
onProgress?.(totalScanned);
|
||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
@@ -230,7 +237,7 @@ export async function getTopicMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
|
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||||
"Topic scan complete"
|
"Topic scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
|
|||||||
throw new Error("No global destination channel configured — set one in the admin UI");
|
throw new Error("No global destination channel configured — set one in the admin UI");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const mapping of channelMappings) {
|
const totalChannels = channelMappings.length;
|
||||||
|
|
||||||
|
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
||||||
|
const mapping = channelMappings[chIdx];
|
||||||
const channel = mapping.channel;
|
const channel = mapping.channel;
|
||||||
|
const channelLabel = totalChannels > 1
|
||||||
|
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
|
||||||
|
: channel.title;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// ── Check if channel is a forum ──
|
// ── Check if channel is a forum ──
|
||||||
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
|
|||||||
if (forum) {
|
if (forum) {
|
||||||
// ── Forum channel: scan per-topic ──
|
// ── Forum channel: scan per-topic ──
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Enumerating topics in "${channel.title}"`,
|
currentActivity: `Enumerating topics in "${channelLabel}"`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: channel.title,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
const topics = await getForumTopicList(client, channel.telegramId);
|
const topics = await getForumTopicList(client, channel.telegramId);
|
||||||
@@ -399,31 +406,51 @@ export async function runWorkerForAccount(
|
|||||||
"Scanning forum channel by topic"
|
"Scanning forum channel by topic"
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const topic of topics) {
|
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
|
||||||
|
const topic = topics[tIdx];
|
||||||
try {
|
try {
|
||||||
const progress = topicProgressList.find(
|
const progress = topicProgressList.find(
|
||||||
(tp) => tp.topicId === topic.topicId
|
(tp) => tp.topicId === topic.topicId
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const topicLabel = `${channel.title} › ${topic.name}`;
|
||||||
|
const topicProgress = topics.length > 1
|
||||||
|
? ` (topic ${tIdx + 1}/${topics.length})`
|
||||||
|
: "";
|
||||||
|
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
|
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: `${channel.title} › ${topic.name}`,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
const scanResult = await getTopicMessages(
|
const scanResult = await getTopicMessages(
|
||||||
client,
|
client,
|
||||||
channel.telegramId,
|
channel.telegramId,
|
||||||
topic.topicId,
|
topic.topicId,
|
||||||
progress?.lastProcessedMessageId
|
progress?.lastProcessedMessageId,
|
||||||
|
100,
|
||||||
|
(scanned) => {
|
||||||
|
throttled.update({
|
||||||
|
currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`,
|
||||||
|
currentStep: "scanning",
|
||||||
|
currentChannel: channelLabel,
|
||||||
|
messagesScanned: counters.messagesScanned + scanned,
|
||||||
|
});
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Add scanned messages to global counter
|
||||||
|
const topicMsgCount = scanResult.archives.length + scanResult.photos.length;
|
||||||
|
counters.messagesScanned += topicMsgCount;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug(
|
accountLog.debug(
|
||||||
{ channelId: channel.id, topic: topic.name },
|
{ channelId: channel.id, topic: topic.name },
|
||||||
@@ -463,15 +490,16 @@ export async function runWorkerForAccount(
|
|||||||
} else {
|
} else {
|
||||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Scanning "${channel.title}" for new archives`,
|
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: channel.title,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
accountLog.info(
|
accountLog.info(
|
||||||
@@ -482,9 +510,22 @@ export async function runWorkerForAccount(
|
|||||||
const scanResult = await getChannelMessages(
|
const scanResult = await getChannelMessages(
|
||||||
client,
|
client,
|
||||||
channel.telegramId,
|
channel.telegramId,
|
||||||
mapping.lastProcessedMessageId
|
mapping.lastProcessedMessageId,
|
||||||
|
100,
|
||||||
|
(scanned) => {
|
||||||
|
throttled.update({
|
||||||
|
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
|
||||||
|
currentStep: "scanning",
|
||||||
|
currentChannel: channelLabel,
|
||||||
|
messagesScanned: counters.messagesScanned + scanned,
|
||||||
|
});
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Add scanned messages to global counter
|
||||||
|
const channelMsgCount = scanResult.archives.length + scanResult.photos.length;
|
||||||
|
counters.messagesScanned += channelMsgCount;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug({ channelId: channel.id }, "No new archives");
|
accountLog.debug({ channelId: channel.id }, "No new archives");
|
||||||
continue;
|
continue;
|
||||||
@@ -593,6 +634,7 @@ async function processArchiveSets(
|
|||||||
currentChannel: channelTitle,
|
currentChannel: channelTitle,
|
||||||
totalFiles: archiveSets.length,
|
totalFiles: archiveSets.length,
|
||||||
zipsFound: counters.zipsFound,
|
zipsFound: counters.zipsFound,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Track the highest message ID that was successfully processed
|
// Track the highest message ID that was successfully processed
|
||||||
|
|||||||
Reference in New Issue
Block a user