diff --git a/worker/src/worker.ts b/worker/src/worker.ts index a4bd5cf..7953871 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -36,10 +36,16 @@ import { getRetryableSkippedMessageIds, updatePackageTopicContext, upsertChannelScanState, + upsertTopicScanState, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; -import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js"; +import { + getAccountChats, + joinChatByInviteLink, + getChannelLastMessageId, + getForumTopicLastMessageId, +} from "./tdlib/chats.js"; import { getCurrentCycle } from "./scheduler.js"; import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js"; import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js"; @@ -592,6 +598,49 @@ export async function runWorkerForAccount( } } + // ── Topic-scan-skip guard ── + // Same three-signal decision as the non-forum branch, but + // scoped to a single topic. Uses `progress` for the persisted + // scan-state fields (lastScannedAt etc). + try { + const retryableForTopic = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + if (retryableForTopic.length === 0 && progress?.lastScannedAt) { + const sinceLastScanMs = Date.now() - progress.lastScannedAt.getTime(); + const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs; + const inBackoff = + (progress.consecutiveEmptyScans ?? 0) >= config.emptyScanBackoffThreshold; + const backoffSkipsThisCycle = + inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0; + + if ( + (withinRecencyWindow && !progress.lastScanFoundArchives) || + backoffSkipsThisCycle + ) { + accountLog.debug( + { + channel: channel.title, + topic: topic.name, + sinceLastScanMs, + consecutiveEmptyScans: progress.consecutiveEmptyScans, + reason: withinRecencyWindow ? "recent-idle" : "backoff", + }, + "Skipping topic — recently scanned and idle, or in backoff" + ); + continue; + } + } + } catch (skipErr) { + accountLog.warn( + { err: skipErr, topic: topic.name }, + "Topic skip guard failed, proceeding with scan" + ); + } + // ── SkippedPackage retry pass ── // If we have failed messages in this topic with attemptCount // below the cap, pull the watermark back below the lowest of @@ -642,6 +691,38 @@ export async function runWorkerForAccount( ? ` (topic ${tIdx + 1}/${topics.length})` : ""; + // ── getForumTopic short-circuit ── + // After the retry pass has settled the effective watermark, + // ask TDLib for the topic's last_message_id. If it's <= our + // watermark, no new content — skip the paginated search. + const topicLastId = await getForumTopicLastMessageId( + client, + channel.telegramId, + topic.topicId + ); + const effectiveTopicWatermark = progress?.lastProcessedMessageId ?? null; + if ( + topicLastId !== null + && effectiveTopicWatermark !== null + && topicLastId <= effectiveTopicWatermark + ) { + accountLog.info( + { + channel: channel.title, + topic: topic.name, + topicLastId: topicLastId.toString(), + watermark: effectiveTopicWatermark.toString(), + }, + "Topic caught up via getForumTopic — skipping searchChatMessages" + ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: effectiveTopicWatermark, + lastScanFoundArchives: false, + consecutiveEmptyScans: (progress?.consecutiveEmptyScans ?? 0) + 1, + }); + continue; + } + await updateRunActivity(activeRunId, { currentActivity: `Scanning "${topicLabel}"${topicProgress}`, currentStep: "scanning", @@ -679,14 +760,25 @@ export async function runWorkerForAccount( { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned }, "No new archives in topic" ); - // Still advance topic watermark so we don't re-scan these messages next cycle + // Still advance topic watermark so we don't re-scan these + // messages next cycle. Truly idle only when no retryable + // SkippedPackages are pending for this topic — chronically- + // failing archives must NOT push a topic into backoff. + const retryableTopicNoArchives = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + const topicTrulyIdleNoArchives = retryableTopicNoArchives.length === 0; if (scanResult.maxScannedMessageId) { - await upsertTopicProgress( - mapping.id, - topic.topicId, - topic.name, - scanResult.maxScannedMessageId - ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: scanResult.maxScannedMessageId, + lastScanFoundArchives: !topicTrulyIdleNoArchives, + consecutiveEmptyScans: topicTrulyIdleNoArchives + ? (progress?.consecutiveEmptyScans ?? 0) + 1 + : 0, + }); } continue; } @@ -727,13 +819,27 @@ export async function runWorkerForAccount( if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) { topicWatermark = minFailedId - 1n; } + // trulyIdle: no archives this scan AND no failures AND no + // retryable pending. Same definition as the non-forum branch. + const retryableTopicPendingNow = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + const topicTrulyIdle = + scanResult.archives.length === 0 + && minFailedId === null + && retryableTopicPendingNow.length === 0; + const newTopicConsecutive = topicTrulyIdle + ? (progress?.consecutiveEmptyScans ?? 0) + 1 + : 0; if (topicWatermark !== null) { - await upsertTopicProgress( - mapping.id, - topic.topicId, - topic.name, - topicWatermark - ); + await upsertTopicScanState(mapping.id, topic.topicId, topic.name, { + lastProcessedMessageId: topicWatermark, + lastScanFoundArchives: !topicTrulyIdle, + consecutiveEmptyScans: newTopicConsecutive, + }); } } catch (topicErr) { accountLog.warn(