feat(worker): non-forum channel-scan-skip + getChat short-circuit

For non-forum channels in runWorkerForAccount, three guards:

  1. Top-of-loop recency/backoff skip — if recently scanned with no
     pending work, or in backoff and not its turn, skip entirely.
     Bypassed when retryable SkippedPackages exist.

  2. After the SkippedPackage retry pass, a getChat short-circuit —
     if TDLib's local cache says the channel's last_message.id <= our
     effective watermark, skip the paginated searchChatMessages.

  3. End-of-scan persists lastScannedAt + lastScanFoundArchives +
     consecutiveEmptyScans via the new upsertChannelScanState helper.
     trulyIdle requires: no archives, no failures, no retryable pending.

scheduler.ts exposes getCurrentCycle() so the backoff "every Nth cycle"
modulo can be applied.

Forum-topic branch lands in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-26 19:56:54 +02:00
parent c6b23715e8
commit 1a4bc6f9f3
2 changed files with 121 additions and 5 deletions

View File

@@ -19,6 +19,12 @@ let activeCyclePromise: Promise<void> | null = null;
*/
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
/** Read-only access to the current cycle counter for code that needs to
* apply per-cycle modulo logic (e.g. the cold-channel backoff). */
export function getCurrentCycle(): number {
return cycleCount;
}
/**
* Run one ingestion cycle:
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)

View File

@@ -35,10 +35,12 @@ import {
findPackageByRemoteUniqueId,
getRetryableSkippedMessageIds,
updatePackageTopicContext,
upsertChannelScanState,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js";
import { getCurrentCycle } from "./scheduler.js";
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
@@ -741,6 +743,51 @@ export async function runWorkerForAccount(
}
}
} else {
// ── Channel-scan-skip guard ──
// Before any TDLib call, decide whether this channel can be
// skipped entirely this cycle. Three signals (in order):
// 1. retryable SkippedPackages exist → MUST scan
// 2. lastScannedAt within window AND last scan was idle → skip
// 3. in backoff AND not the Nth cycle → skip
try {
const retryable = await getRetryableSkippedMessageIds({
accountId: account.id,
sourceChannelId: channel.id,
topicId: null,
cap: config.maxSkipAttempts,
});
if (retryable.length === 0 && mapping.lastScannedAt) {
const sinceLastScanMs = Date.now() - mapping.lastScannedAt.getTime();
const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs;
const inBackoff = mapping.consecutiveEmptyScans >= config.emptyScanBackoffThreshold;
const backoffSkipsThisCycle =
inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0;
if (
(withinRecencyWindow && !mapping.lastScanFoundArchives) ||
backoffSkipsThisCycle
) {
accountLog.debug(
{
channel: channel.title,
sinceLastScanMs,
consecutiveEmptyScans: mapping.consecutiveEmptyScans,
reason: withinRecencyWindow ? "recent-idle" : "backoff",
},
"Skipping channel — recently scanned and idle, or in backoff"
);
continue;
}
}
} catch (skipErr) {
// Skip guard is best-effort. If the retryable query fails,
// fall through and do the normal scan.
accountLog.warn(
{ err: skipErr, channel: channel.title },
"Skip guard failed, proceeding with scan"
);
}
// ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channelLabel}" for new archives`,
@@ -797,6 +844,34 @@ export async function runWorkerForAccount(
);
}
// ── getChat short-circuit ──
// After the retry pass has settled the effective watermark, ask
// TDLib for the channel's last_message.id. If it's <= our watermark,
// no new content exists since last cycle — skip the paginated
// searchChatMessages entirely. Still update scan-state so the
// recent-scan skip can kick in next cycle.
const channelLastId = await getChannelLastMessageId(client, channel.telegramId);
if (
channelLastId !== null
&& effectiveChannelWatermark !== null
&& channelLastId <= effectiveChannelWatermark
) {
accountLog.info(
{
channel: channel.title,
channelLastId: channelLastId.toString(),
watermark: effectiveChannelWatermark.toString(),
},
"Channel caught up via getChat — skipping searchChatMessages"
);
await upsertChannelScanState(mapping.id, {
lastProcessedMessageId: effectiveChannelWatermark,
lastScanFoundArchives: false,
consecutiveEmptyScans: (mapping.consecutiveEmptyScans ?? 0) + 1,
});
continue;
}
const scanResult = await getChannelMessages(
client,
channel.telegramId,
@@ -817,10 +892,24 @@ export async function runWorkerForAccount(
if (scanResult.archives.length === 0) {
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
// Still advance watermark to highest scanned message so we don't
// re-scan these messages next cycle
// Truly idle requires no retryable SkippedPackages — a channel
// with a chronically-failing archive must NOT enter backoff just
// because no NEW archives showed up this scan.
const retryableNoArchives = await getRetryableSkippedMessageIds({
accountId: account.id,
sourceChannelId: channel.id,
topicId: null,
cap: config.maxSkipAttempts,
});
const channelTrulyIdleNoArchives = retryableNoArchives.length === 0;
if (scanResult.maxScannedMessageId) {
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
await upsertChannelScanState(mapping.id, {
lastProcessedMessageId: scanResult.maxScannedMessageId,
lastScanFoundArchives: !channelTrulyIdleNoArchives,
consecutiveEmptyScans: channelTrulyIdleNoArchives
? (mapping.consecutiveEmptyScans ?? 0) + 1
: 0,
});
}
continue;
}
@@ -856,8 +945,29 @@ export async function runWorkerForAccount(
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
channelWatermark = minFailedId - 1n;
}
// trulyIdle: nothing new this scan AND nothing failed AND no
// retryable SkippedPackages pending. The retryable check matters —
// a chronically-failing archive should NEVER let the channel back
// off, even though zipsFound stays at 0 for it.
const retryablePendingNow = await getRetryableSkippedMessageIds({
accountId: account.id,
sourceChannelId: channel.id,
topicId: null,
cap: config.maxSkipAttempts,
});
const trulyIdle =
scanResult.archives.length === 0
&& minFailedId === null
&& retryablePendingNow.length === 0;
const newConsecutive = trulyIdle
? (mapping.consecutiveEmptyScans ?? 0) + 1
: 0;
if (channelWatermark !== null) {
await updateLastProcessedMessage(mapping.id, channelWatermark);
await upsertChannelScanState(mapping.id, {
lastProcessedMessageId: channelWatermark,
lastScanFoundArchives: !trulyIdle,
consecutiveEmptyScans: newConsecutive,
});
}
}
} catch (channelErr) {