From 1a4bc6f9f36380550e661c4641e19057a26b4482 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Tue, 26 May 2026 19:56:54 +0200 Subject: [PATCH] feat(worker): non-forum channel-scan-skip + getChat short-circuit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For non-forum channels in runWorkerForAccount, three guards: 1. Top-of-loop recency/backoff skip — if recently scanned with no pending work, or in backoff and not its turn, skip entirely. Bypassed when retryable SkippedPackages exist. 2. After the SkippedPackage retry pass, a getChat short-circuit — if TDLib's local cache says the channel's last_message.id <= our effective watermark, skip the paginated searchChatMessages. 3. End-of-scan persists lastScannedAt + lastScanFoundArchives + consecutiveEmptyScans via the new upsertChannelScanState helper. trulyIdle requires: no archives, no failures, no retryable pending. scheduler.ts exposes getCurrentCycle() so the backoff "every Nth cycle" modulo can be applied. Forum-topic branch lands in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/src/scheduler.ts | 6 ++ worker/src/worker.ts | 120 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index 4162cf3..99210de 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -19,6 +19,12 @@ let activeCyclePromise: Promise | null = null; */ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000; +/** Read-only access to the current cycle counter for code that needs to + * apply per-cycle modulo logic (e.g. the cold-channel backoff). */ +export function getCurrentCycle(): number { + return cycleCount; +} + /** * Run one ingestion cycle: * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) diff --git a/worker/src/worker.ts b/worker/src/worker.ts index c4e2c66..a4bd5cf 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -35,10 +35,12 @@ import { findPackageByRemoteUniqueId, getRetryableSkippedMessageIds, updatePackageTopicContext, + upsertChannelScanState, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; -import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js"; +import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js"; +import { getCurrentCycle } from "./scheduler.js"; import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js"; import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js"; import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js"; @@ -741,6 +743,51 @@ export async function runWorkerForAccount( } } } else { + // ── Channel-scan-skip guard ── + // Before any TDLib call, decide whether this channel can be + // skipped entirely this cycle. Three signals (in order): + // 1. retryable SkippedPackages exist → MUST scan + // 2. lastScannedAt within window AND last scan was idle → skip + // 3. in backoff AND not the Nth cycle → skip + try { + const retryable = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: null, + cap: config.maxSkipAttempts, + }); + if (retryable.length === 0 && mapping.lastScannedAt) { + const sinceLastScanMs = Date.now() - mapping.lastScannedAt.getTime(); + const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs; + const inBackoff = mapping.consecutiveEmptyScans >= config.emptyScanBackoffThreshold; + const backoffSkipsThisCycle = + inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0; + + if ( + (withinRecencyWindow && !mapping.lastScanFoundArchives) || + backoffSkipsThisCycle + ) { + accountLog.debug( + { + channel: channel.title, + sinceLastScanMs, + consecutiveEmptyScans: mapping.consecutiveEmptyScans, + reason: withinRecencyWindow ? "recent-idle" : "backoff", + }, + "Skipping channel — recently scanned and idle, or in backoff" + ); + continue; + } + } + } catch (skipErr) { + // Skip guard is best-effort. If the retryable query fails, + // fall through and do the normal scan. + accountLog.warn( + { err: skipErr, channel: channel.title }, + "Skip guard failed, proceeding with scan" + ); + } + // ── Non-forum channel: flat scan (existing behavior) ── await updateRunActivity(activeRunId, { currentActivity: `Scanning "${channelLabel}" for new archives`, @@ -797,6 +844,34 @@ export async function runWorkerForAccount( ); } + // ── getChat short-circuit ── + // After the retry pass has settled the effective watermark, ask + // TDLib for the channel's last_message.id. If it's <= our watermark, + // no new content exists since last cycle — skip the paginated + // searchChatMessages entirely. Still update scan-state so the + // recent-scan skip can kick in next cycle. + const channelLastId = await getChannelLastMessageId(client, channel.telegramId); + if ( + channelLastId !== null + && effectiveChannelWatermark !== null + && channelLastId <= effectiveChannelWatermark + ) { + accountLog.info( + { + channel: channel.title, + channelLastId: channelLastId.toString(), + watermark: effectiveChannelWatermark.toString(), + }, + "Channel caught up via getChat — skipping searchChatMessages" + ); + await upsertChannelScanState(mapping.id, { + lastProcessedMessageId: effectiveChannelWatermark, + lastScanFoundArchives: false, + consecutiveEmptyScans: (mapping.consecutiveEmptyScans ?? 0) + 1, + }); + continue; + } + const scanResult = await getChannelMessages( client, channel.telegramId, @@ -817,10 +892,24 @@ export async function runWorkerForAccount( if (scanResult.archives.length === 0) { accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel"); - // Still advance watermark to highest scanned message so we don't - // re-scan these messages next cycle + // Truly idle requires no retryable SkippedPackages — a channel + // with a chronically-failing archive must NOT enter backoff just + // because no NEW archives showed up this scan. + const retryableNoArchives = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: null, + cap: config.maxSkipAttempts, + }); + const channelTrulyIdleNoArchives = retryableNoArchives.length === 0; if (scanResult.maxScannedMessageId) { - await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId); + await upsertChannelScanState(mapping.id, { + lastProcessedMessageId: scanResult.maxScannedMessageId, + lastScanFoundArchives: !channelTrulyIdleNoArchives, + consecutiveEmptyScans: channelTrulyIdleNoArchives + ? (mapping.consecutiveEmptyScans ?? 0) + 1 + : 0, + }); } continue; } @@ -856,8 +945,29 @@ export async function runWorkerForAccount( if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) { channelWatermark = minFailedId - 1n; } + // trulyIdle: nothing new this scan AND nothing failed AND no + // retryable SkippedPackages pending. The retryable check matters — + // a chronically-failing archive should NEVER let the channel back + // off, even though zipsFound stays at 0 for it. + const retryablePendingNow = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: null, + cap: config.maxSkipAttempts, + }); + const trulyIdle = + scanResult.archives.length === 0 + && minFailedId === null + && retryablePendingNow.length === 0; + const newConsecutive = trulyIdle + ? (mapping.consecutiveEmptyScans ?? 0) + 1 + : 0; if (channelWatermark !== null) { - await updateLastProcessedMessage(mapping.id, channelWatermark); + await upsertChannelScanState(mapping.id, { + lastProcessedMessageId: channelWatermark, + lastScanFoundArchives: !trulyIdle, + consecutiveEmptyScans: newConsecutive, + }); } } } catch (channelErr) {