mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-13 04:31:16 +00:00
feat(worker): non-forum channel-scan-skip + getChat short-circuit
For non-forum channels in runWorkerForAccount, three guards:
1. Top-of-loop recency/backoff skip — if recently scanned with no
pending work, or in backoff and not its turn, skip entirely.
Bypassed when retryable SkippedPackages exist.
2. After the SkippedPackage retry pass, a getChat short-circuit —
if TDLib's local cache says the channel's last_message.id <= our
effective watermark, skip the paginated searchChatMessages.
3. End-of-scan persists lastScannedAt + lastScanFoundArchives +
consecutiveEmptyScans via the new upsertChannelScanState helper.
trulyIdle requires: no archives, no failures, no retryable pending.
scheduler.ts exposes getCurrentCycle() so the backoff "every Nth cycle"
modulo can be applied.
Forum-topic branch lands in the next commit.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,6 +19,12 @@ let activeCyclePromise: Promise<void> | null = null;
|
|||||||
*/
|
*/
|
||||||
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
|
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
|
||||||
|
|
||||||
|
/** Read-only access to the current cycle counter for code that needs to
|
||||||
|
* apply per-cycle modulo logic (e.g. the cold-channel backoff). */
|
||||||
|
export function getCurrentCycle(): number {
|
||||||
|
return cycleCount;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run one ingestion cycle:
|
* Run one ingestion cycle:
|
||||||
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
||||||
|
|||||||
@@ -35,10 +35,12 @@ import {
|
|||||||
findPackageByRemoteUniqueId,
|
findPackageByRemoteUniqueId,
|
||||||
getRetryableSkippedMessageIds,
|
getRetryableSkippedMessageIds,
|
||||||
updatePackageTopicContext,
|
updatePackageTopicContext,
|
||||||
|
upsertChannelScanState,
|
||||||
} from "./db/queries.js";
|
} from "./db/queries.js";
|
||||||
import type { ActivityUpdate } from "./db/queries.js";
|
import type { ActivityUpdate } from "./db/queries.js";
|
||||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||||
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
|
import { getAccountChats, joinChatByInviteLink, getChannelLastMessageId } from "./tdlib/chats.js";
|
||||||
|
import { getCurrentCycle } from "./scheduler.js";
|
||||||
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
|
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
|
||||||
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
|
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
|
||||||
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
|
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
|
||||||
@@ -741,6 +743,51 @@ export async function runWorkerForAccount(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// ── Channel-scan-skip guard ──
|
||||||
|
// Before any TDLib call, decide whether this channel can be
|
||||||
|
// skipped entirely this cycle. Three signals (in order):
|
||||||
|
// 1. retryable SkippedPackages exist → MUST scan
|
||||||
|
// 2. lastScannedAt within window AND last scan was idle → skip
|
||||||
|
// 3. in backoff AND not the Nth cycle → skip
|
||||||
|
try {
|
||||||
|
const retryable = await getRetryableSkippedMessageIds({
|
||||||
|
accountId: account.id,
|
||||||
|
sourceChannelId: channel.id,
|
||||||
|
topicId: null,
|
||||||
|
cap: config.maxSkipAttempts,
|
||||||
|
});
|
||||||
|
if (retryable.length === 0 && mapping.lastScannedAt) {
|
||||||
|
const sinceLastScanMs = Date.now() - mapping.lastScannedAt.getTime();
|
||||||
|
const withinRecencyWindow = sinceLastScanMs < config.skipRecentScanWindowMs;
|
||||||
|
const inBackoff = mapping.consecutiveEmptyScans >= config.emptyScanBackoffThreshold;
|
||||||
|
const backoffSkipsThisCycle =
|
||||||
|
inBackoff && getCurrentCycle() % config.emptyScanBackoffEveryNth !== 0;
|
||||||
|
|
||||||
|
if (
|
||||||
|
(withinRecencyWindow && !mapping.lastScanFoundArchives) ||
|
||||||
|
backoffSkipsThisCycle
|
||||||
|
) {
|
||||||
|
accountLog.debug(
|
||||||
|
{
|
||||||
|
channel: channel.title,
|
||||||
|
sinceLastScanMs,
|
||||||
|
consecutiveEmptyScans: mapping.consecutiveEmptyScans,
|
||||||
|
reason: withinRecencyWindow ? "recent-idle" : "backoff",
|
||||||
|
},
|
||||||
|
"Skipping channel — recently scanned and idle, or in backoff"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (skipErr) {
|
||||||
|
// Skip guard is best-effort. If the retryable query fails,
|
||||||
|
// fall through and do the normal scan.
|
||||||
|
accountLog.warn(
|
||||||
|
{ err: skipErr, channel: channel.title },
|
||||||
|
"Skip guard failed, proceeding with scan"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||||
@@ -797,6 +844,34 @@ export async function runWorkerForAccount(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── getChat short-circuit ──
|
||||||
|
// After the retry pass has settled the effective watermark, ask
|
||||||
|
// TDLib for the channel's last_message.id. If it's <= our watermark,
|
||||||
|
// no new content exists since last cycle — skip the paginated
|
||||||
|
// searchChatMessages entirely. Still update scan-state so the
|
||||||
|
// recent-scan skip can kick in next cycle.
|
||||||
|
const channelLastId = await getChannelLastMessageId(client, channel.telegramId);
|
||||||
|
if (
|
||||||
|
channelLastId !== null
|
||||||
|
&& effectiveChannelWatermark !== null
|
||||||
|
&& channelLastId <= effectiveChannelWatermark
|
||||||
|
) {
|
||||||
|
accountLog.info(
|
||||||
|
{
|
||||||
|
channel: channel.title,
|
||||||
|
channelLastId: channelLastId.toString(),
|
||||||
|
watermark: effectiveChannelWatermark.toString(),
|
||||||
|
},
|
||||||
|
"Channel caught up via getChat — skipping searchChatMessages"
|
||||||
|
);
|
||||||
|
await upsertChannelScanState(mapping.id, {
|
||||||
|
lastProcessedMessageId: effectiveChannelWatermark,
|
||||||
|
lastScanFoundArchives: false,
|
||||||
|
consecutiveEmptyScans: (mapping.consecutiveEmptyScans ?? 0) + 1,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
const scanResult = await getChannelMessages(
|
const scanResult = await getChannelMessages(
|
||||||
client,
|
client,
|
||||||
channel.telegramId,
|
channel.telegramId,
|
||||||
@@ -817,10 +892,24 @@ export async function runWorkerForAccount(
|
|||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||||
// Still advance watermark to highest scanned message so we don't
|
// Truly idle requires no retryable SkippedPackages — a channel
|
||||||
// re-scan these messages next cycle
|
// with a chronically-failing archive must NOT enter backoff just
|
||||||
|
// because no NEW archives showed up this scan.
|
||||||
|
const retryableNoArchives = await getRetryableSkippedMessageIds({
|
||||||
|
accountId: account.id,
|
||||||
|
sourceChannelId: channel.id,
|
||||||
|
topicId: null,
|
||||||
|
cap: config.maxSkipAttempts,
|
||||||
|
});
|
||||||
|
const channelTrulyIdleNoArchives = retryableNoArchives.length === 0;
|
||||||
if (scanResult.maxScannedMessageId) {
|
if (scanResult.maxScannedMessageId) {
|
||||||
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
|
await upsertChannelScanState(mapping.id, {
|
||||||
|
lastProcessedMessageId: scanResult.maxScannedMessageId,
|
||||||
|
lastScanFoundArchives: !channelTrulyIdleNoArchives,
|
||||||
|
consecutiveEmptyScans: channelTrulyIdleNoArchives
|
||||||
|
? (mapping.consecutiveEmptyScans ?? 0) + 1
|
||||||
|
: 0,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -856,8 +945,29 @@ export async function runWorkerForAccount(
|
|||||||
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
|
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
|
||||||
channelWatermark = minFailedId - 1n;
|
channelWatermark = minFailedId - 1n;
|
||||||
}
|
}
|
||||||
|
// trulyIdle: nothing new this scan AND nothing failed AND no
|
||||||
|
// retryable SkippedPackages pending. The retryable check matters —
|
||||||
|
// a chronically-failing archive should NEVER let the channel back
|
||||||
|
// off, even though zipsFound stays at 0 for it.
|
||||||
|
const retryablePendingNow = await getRetryableSkippedMessageIds({
|
||||||
|
accountId: account.id,
|
||||||
|
sourceChannelId: channel.id,
|
||||||
|
topicId: null,
|
||||||
|
cap: config.maxSkipAttempts,
|
||||||
|
});
|
||||||
|
const trulyIdle =
|
||||||
|
scanResult.archives.length === 0
|
||||||
|
&& minFailedId === null
|
||||||
|
&& retryablePendingNow.length === 0;
|
||||||
|
const newConsecutive = trulyIdle
|
||||||
|
? (mapping.consecutiveEmptyScans ?? 0) + 1
|
||||||
|
: 0;
|
||||||
if (channelWatermark !== null) {
|
if (channelWatermark !== null) {
|
||||||
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
await upsertChannelScanState(mapping.id, {
|
||||||
|
lastProcessedMessageId: channelWatermark,
|
||||||
|
lastScanFoundArchives: !trulyIdle,
|
||||||
|
consecutiveEmptyScans: newConsecutive,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (channelErr) {
|
} catch (channelErr) {
|
||||||
|
|||||||
Reference in New Issue
Block a user