From 77c26adb3151add01ca4ab0637f0243e50184b6d Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Mon, 4 May 2026 20:37:42 +0200 Subject: [PATCH] perf: set watermarks even when no archives found to prevent re-scanning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, channels/topics with no new archives never had their watermark updated. This meant every cycle re-scanned all messages from scratch just to discover nothing new — especially costly for the 1079- topic Model Printing Emporium forum. - Add maxScannedMessageId to ChannelScanResult (highest msg ID seen) - Set channel watermark to scan boundary when no archives are found - Set topic watermark to scan boundary when no archives are found - Fall back to scan watermark when archive processing doesn't advance it After one full cycle, subsequent cycles will skip already-scanned messages via the early-exit boundary check, dramatically reducing TDLib API calls on channels with mostly non-archive content. Co-Authored-By: Claude Opus 4.6 (1M context) --- worker/src/tdlib/download.ts | 10 ++++++++++ worker/src/tdlib/topics.ts | 8 ++++++++ worker/src/worker.ts | 28 ++++++++++++++++++++++------ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index dc31ae2..f340c4f 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -79,6 +79,8 @@ export interface ChannelScanResult { archives: TelegramMessage[]; photos: TelegramPhoto[]; totalScanned: number; + /** Highest message ID seen during scan (for watermark, even when no archives found). */ + maxScannedMessageId: bigint | null; } export type ScanProgressCallback = (messagesScanned: number) => void; @@ -158,6 +160,7 @@ export async function getChannelMessages( const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; + let maxScannedMessageId: bigint | null = null; // Open the chat so TDLib can access it try { @@ -204,6 +207,12 @@ export async function getChannelMessages( totalScanned += result.messages.length; + // Track highest message ID (first message in batch = newest, since results are newest-first) + const batchMaxId = BigInt(result.messages[0].id); + if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) { + maxScannedMessageId = batchMaxId; + } + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -271,6 +280,7 @@ export async function getChannelMessages( archives: archives.reverse(), photos: photos.reverse(), totalScanned, + maxScannedMessageId, }; } diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index cc6b4b1..58b1e8c 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -178,6 +178,7 @@ export async function getTopicMessages( const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; + let maxScannedMessageId: bigint | null = null; let currentFromId = 0; let totalScanned = 0; @@ -239,6 +240,12 @@ export async function getTopicMessages( totalScanned += result.messages.length; + // Track highest message ID (first message = newest, since results are newest-first) + const batchMaxId = BigInt(result.messages[0].id); + if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) { + maxScannedMessageId = batchMaxId; + } + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -302,6 +309,7 @@ export async function getTopicMessages( archives: archives.reverse(), photos: photos.reverse(), totalScanned, + maxScannedMessageId, }; } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 940d58d..66de9d8 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -538,6 +538,15 @@ export async function runWorkerForAccount( { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned }, "No new archives in topic" ); + // Still advance topic watermark so we don't re-scan these messages next cycle + if (scanResult.maxScannedMessageId) { + await upsertTopicProgress( + mapping.id, + topic.topicId, + topic.name, + scanResult.maxScannedMessageId + ); + } continue; } @@ -555,13 +564,14 @@ export async function runWorkerForAccount( // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Only advance progress to the highest successfully processed message - if (maxProcessedId) { + // Advance progress: use archive watermark if available, fall back to scan watermark + const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; + if (topicWatermark) { await upsertTopicProgress( mapping.id, topic.topicId, topic.name, - maxProcessedId + topicWatermark ); } } catch (topicErr) { @@ -611,6 +621,11 @@ export async function runWorkerForAccount( if (scanResult.archives.length === 0) { accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel"); + // Still advance watermark to highest scanned message so we don't + // re-scan these messages next cycle + if (scanResult.maxScannedMessageId) { + await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId); + } continue; } @@ -628,9 +643,10 @@ export async function runWorkerForAccount( // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Only advance progress to the highest successfully processed message - if (maxProcessedId) { - await updateLastProcessedMessage(mapping.id, maxProcessedId); + // Advance progress: use archive watermark if available, fall back to scan watermark + const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; + if (channelWatermark) { + await updateLastProcessedMessage(mapping.id, channelWatermark); } } } catch (channelErr) {