From 4f6a6f0f755aa3aacb884a0ce1cb7d4189d75ce2 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Tue, 26 May 2026 19:58:44 +0200 Subject: [PATCH] feat(worker): forum-topic scan-skip + getForumTopic short-circuit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror of the non-forum guards from 1a4bc6f, scoped to forum topics inside the topic loop: - Top-of-topic-loop recency/backoff skip - getForumTopic short-circuit after the SkippedPackage retry pass - upsertTopicScanState for end-of-scan persistence (both the archives-found path and the no-archives path) Same trulyIdle definition throughout: no archives this scan, no failures this scan, no retryable SkippedPackage rows pending. Topics with chronic failures stay out of backoff because their counter never increments. For MPE specifically (1,086 forum topics), per-cycle searchChatMessages calls drop from ~1,086 to roughly the count of topics with new activity in the last 5 minutes — typically <50. Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/src/worker.ts | 134 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 14 deletions(-) diff --git a/worker/src/worker.ts b/worker/src/worker.ts index a4bd5cf..7953871 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -36,10 +36,16 @@ import { getRetryableSkippedMessageIds, updatePackageTopicContext, upsertChannelScanState, + upsertTopicScanState, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; -import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js"; +import { + getAccountChats, + joinChatByInviteLink, + getChannelLastMessageId, + getForumTopicLastMessageId, +} from "./tdlib/chats.js"; import { getCurrentCycle } from "./scheduler.js"; import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js"; import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js"; @@ -592,6 +598,49 @@ export async function runWorkerForAccount( } } + // ── Topic-scan-skip guard ── + // Same three-signal decision as the non-forum branch, but + // scoped to a single topic. Uses `progress` for the persisted + // scan-state fields (lastScannedAt etc). + try { + const retryableForTopic = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + if (retryableForTopic.length === 0 && progress?.lastScannedAt) { + const sinceLastScanMs = Date.now() - progress.lastScannedAt.getTime(); + const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs; + const inBackoff = + (progress.consecutiveEmptyScans ?? 0) >= config.emptyScanBackoffThreshold; + const backoffSkipsThisCycle = + inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0; + + if ( + (withinRecencyWindow && !progress.lastScanFoundArchives) || + backoffSkipsThisCycle + ) { + accountLog.debug( + { + channel: channel.title, + topic: topic.name, + sinceLastScanMs, + consecutiveEmptyScans: progress.consecutiveEmptyScans, + reason: withinRecencyWindow ? "recent-idle" : "backoff", + }, + "Skipping topic — recently scanned and idle, or in backoff" + ); + continue; + } + } + } catch (skipErr) { + accountLog.warn( + { err: skipErr, topic: topic.name }, + "Topic skip guard failed, proceeding with scan" + ); + } + // ── SkippedPackage retry pass ── // If we have failed messages in this topic with attemptCount // below the cap, pull the watermark back below the lowest of @@ -642,6 +691,38 @@ export async function runWorkerForAccount( ? ` (topic ${tIdx + 1}/${topics.length})` : ""; + // ── getForumTopic short-circuit ── + // After the retry pass has settled the effective watermark, + // ask TDLib for the topic's last_message_id. If it's <= our + // watermark, no new content — skip the paginated search. + const topicLastId = await getForumTopicLastMessageId( + client, + channel.telegramId, + topic.topicId + ); + const effectiveTopicWatermark = progress?.lastProcessedMessageId ?? null; + if ( + topicLastId !== null + && effectiveTopicWatermark !== null + && topicLastId <= effectiveTopicWatermark + ) { + accountLog.info( + { + channel: channel.title, + topic: topic.name, + topicLastId: topicLastId.toString(), + watermark: effectiveTopicWatermark.toString(), + }, + "Topic caught up via getForumTopic — skipping searchChatMessages" + ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: effectiveTopicWatermark, + lastScanFoundArchives: false, + consecutiveEmptyScans: (progress?.consecutiveEmptyScans ?? 0) + 1, + }); + continue; + } + await updateRunActivity(activeRunId, { currentActivity: `Scanning "${topicLabel}"${topicProgress}`, currentStep: "scanning", @@ -679,14 +760,25 @@ export async function runWorkerForAccount( { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned }, "No new archives in topic" ); - // Still advance topic watermark so we don't re-scan these messages next cycle + // Still advance topic watermark so we don't re-scan these + // messages next cycle. Truly idle only when no retryable + // SkippedPackages are pending for this topic — chronically- + // failing archives must NOT push a topic into backoff. + const retryableTopicNoArchives = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + const topicTrulyIdleNoArchives = retryableTopicNoArchives.length === 0; if (scanResult.maxScannedMessageId) { - await upsertTopicProgress( - mapping.id, - topic.topicId, - topic.name, - scanResult.maxScannedMessageId - ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: scanResult.maxScannedMessageId, + lastScanFoundArchives: !topicTrulyIdleNoArchives, + consecutiveEmptyScans: topicTrulyIdleNoArchives + ? (progress?.consecutiveEmptyScans ?? 0) + 1 + : 0, + }); } continue; } @@ -727,13 +819,27 @@ export async function runWorkerForAccount( if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) { topicWatermark = minFailedId - 1n; } + // trulyIdle: no archives this scan AND no failures AND no + // retryable pending. Same definition as the non-forum branch. + const retryableTopicPendingNow = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + const topicTrulyIdle = + scanResult.archives.length === 0 + && minFailedId === null + && retryableTopicPendingNow.length === 0; + const newTopicConsecutive = topicTrulyIdle + ? (progress?.consecutiveEmptyScans ?? 0) + 1 + : 0; if (topicWatermark !== null) { - await upsertTopicProgress( - mapping.id, - topic.topicId, - topic.name, - topicWatermark - ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: topicWatermark, + lastScanFoundArchives: !topicTrulyIdle, + consecutiveEmptyScans: newTopicConsecutive, + }); } } catch (topicErr) { accountLog.warn(