mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-09 18:51:16 +00:00
feat(worker): non-forum channel-scan-skip + getChat short-circuit
For non-forum channels in runWorkerForAccount, three guards:
1. Top-of-loop recency/backoff skip — if recently scanned with no
pending work, or in backoff and not its turn, skip entirely.
Bypassed when retryable SkippedPackages exist.
2. After the SkippedPackage retry pass, a getChat short-circuit —
if TDLib's local cache says the channel's last_message.id <= our
effective watermark, skip the paginated searchChatMessages.
3. End-of-scan persists lastScannedAt + lastScanFoundArchives +
consecutiveEmptyScans via the new upsertChannelScanState helper.
trulyIdle requires: no archives, no failures, no retryable pending.
scheduler.ts exposes getCurrentCycle() so the backoff "every Nth cycle"
modulo can be applied.
Forum-topic branch lands in the next commit.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,6 +19,12 @@ let activeCyclePromise: Promise<void> | null = null;
|
||||
*/
|
||||
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
|
||||
|
||||
/** Read-only access to the current cycle counter for code that needs to
|
||||
* apply per-cycle modulo logic (e.g. the cold-channel backoff). */
|
||||
export function getCurrentCycle(): number {
|
||||
return cycleCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run one ingestion cycle:
|
||||
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
||||
|
||||
@@ -35,10 +35,12 @@ import {
|
||||
findPackageByRemoteUniqueId,
|
||||
getRetryableSkippedMessageIds,
|
||||
updatePackageTopicContext,
|
||||
upsertChannelScanState,
|
||||
} from "./db/queries.js";
|
||||
import type { ActivityUpdate } from "./db/queries.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
|
||||
import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js";
|
||||
import { getCurrentCycle } from "./scheduler.js";
|
||||
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
|
||||
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
|
||||
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
|
||||
@@ -741,6 +743,51 @@ export async function runWorkerForAccount(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// ── Channel-scan-skip guard ──
|
||||
// Before any TDLib call, decide whether this channel can be
|
||||
// skipped entirely this cycle. Three signals (in order):
|
||||
// 1. retryable SkippedPackages exist → MUST scan
|
||||
// 2. lastScannedAt within window AND last scan was idle → skip
|
||||
// 3. in backoff AND not the Nth cycle → skip
|
||||
try {
|
||||
const retryable = await getRetryableSkippedMessageIds({
|
||||
accountId: account.id,
|
||||
sourceChannelId: channel.id,
|
||||
topicId: null,
|
||||
cap: config.maxSkipAttempts,
|
||||
});
|
||||
if (retryable.length === 0 && mapping.lastScannedAt) {
|
||||
const sinceLastScanMs = Date.now() - mapping.lastScannedAt.getTime();
|
||||
const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs;
|
||||
const inBackoff = mapping.consecutiveEmptyScans >= config.emptyScanBackoffThreshold;
|
||||
const backoffSkipsThisCycle =
|
||||
inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0;
|
||||
|
||||
if (
|
||||
(withinRecencyWindow && !mapping.lastScanFoundArchives) ||
|
||||
backoffSkipsThisCycle
|
||||
) {
|
||||
accountLog.debug(
|
||||
{
|
||||
channel: channel.title,
|
||||
sinceLastScanMs,
|
||||
consecutiveEmptyScans: mapping.consecutiveEmptyScans,
|
||||
reason: withinRecencyWindow ? "recent-idle" : "backoff",
|
||||
},
|
||||
"Skipping channel — recently scanned and idle, or in backoff"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (skipErr) {
|
||||
// Skip guard is best-effort. If the retryable query fails,
|
||||
// fall through and do the normal scan.
|
||||
accountLog.warn(
|
||||
{ err: skipErr, channel: channel.title },
|
||||
"Skip guard failed, proceeding with scan"
|
||||
);
|
||||
}
|
||||
|
||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||
@@ -797,6 +844,34 @@ export async function runWorkerForAccount(
|
||||
);
|
||||
}
|
||||
|
||||
// ── getChat short-circuit ──
|
||||
// After the retry pass has settled the effective watermark, ask
|
||||
// TDLib for the channel's last_message.id. If it's <= our watermark,
|
||||
// no new content exists since last cycle — skip the paginated
|
||||
// searchChatMessages entirely. Still update scan-state so the
|
||||
// recent-scan skip can kick in next cycle.
|
||||
const channelLastId = await getChannelLastMessageId(client, channel.telegramId);
|
||||
if (
|
||||
channelLastId !== null
|
||||
&& effectiveChannelWatermark !== null
|
||||
&& channelLastId <= effectiveChannelWatermark
|
||||
) {
|
||||
accountLog.info(
|
||||
{
|
||||
channel: channel.title,
|
||||
channelLastId: channelLastId.toString(),
|
||||
watermark: effectiveChannelWatermark.toString(),
|
||||
},
|
||||
"Channel caught up via getChat — skipping searchChatMessages"
|
||||
);
|
||||
await upsertChannelScanState(mapping.id, {
|
||||
lastProcessedMessageId: effectiveChannelWatermark,
|
||||
lastScanFoundArchives: false,
|
||||
consecutiveEmptyScans: (mapping.consecutiveEmptyScans ?? 0) + 1,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
const scanResult = await getChannelMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
@@ -817,10 +892,24 @@ export async function runWorkerForAccount(
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||
// Still advance watermark to highest scanned message so we don't
|
||||
// re-scan these messages next cycle
|
||||
// Truly idle requires no retryable SkippedPackages — a channel
|
||||
// with a chronically-failing archive must NOT enter backoff just
|
||||
// because no NEW archives showed up this scan.
|
||||
const retryableNoArchives = await getRetryableSkippedMessageIds({
|
||||
accountId: account.id,
|
||||
sourceChannelId: channel.id,
|
||||
topicId: null,
|
||||
cap: config.maxSkipAttempts,
|
||||
});
|
||||
const channelTrulyIdleNoArchives = retryableNoArchives.length === 0;
|
||||
if (scanResult.maxScannedMessageId) {
|
||||
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
|
||||
await upsertChannelScanState(mapping.id, {
|
||||
lastProcessedMessageId: scanResult.maxScannedMessageId,
|
||||
lastScanFoundArchives: !channelTrulyIdleNoArchives,
|
||||
consecutiveEmptyScans: channelTrulyIdleNoArchives
|
||||
? (mapping.consecutiveEmptyScans ?? 0) + 1
|
||||
: 0,
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -856,8 +945,29 @@ export async function runWorkerForAccount(
|
||||
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
|
||||
channelWatermark = minFailedId - 1n;
|
||||
}
|
||||
// trulyIdle: nothing new this scan AND nothing failed AND no
|
||||
// retryable SkippedPackages pending. The retryable check matters —
|
||||
// a chronically-failing archive should NEVER let the channel back
|
||||
// off, even though zipsFound stays at 0 for it.
|
||||
const retryablePendingNow = await getRetryableSkippedMessageIds({
|
||||
accountId: account.id,
|
||||
sourceChannelId: channel.id,
|
||||
topicId: null,
|
||||
cap: config.maxSkipAttempts,
|
||||
});
|
||||
const trulyIdle =
|
||||
scanResult.archives.length === 0
|
||||
&& minFailedId === null
|
||||
&& retryablePendingNow.length === 0;
|
||||
const newConsecutive = trulyIdle
|
||||
? (mapping.consecutiveEmptyScans ?? 0) + 1
|
||||
: 0;
|
||||
if (channelWatermark !== null) {
|
||||
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
||||
await upsertChannelScanState(mapping.id, {
|
||||
lastProcessedMessageId: channelWatermark,
|
||||
lastScanFoundArchives: !trulyIdle,
|
||||
consecutiveEmptyScans: newConsecutive,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (channelErr) {
|
||||
|
||||
Reference in New Issue
Block a user