fix: advance channel/topic watermark incrementally per successful set
All checks were successful
continuous-integration/drone/push Build is passing

Diagnosed from production logs for the main (Premium) account:

  RUNNING   2026-05-21 → in progress, 22h     ingested: 0
  FAILED    2026-05-14 → 2026-05-21 (7.4d)    ingested: 5,426 (killed by restart)
  FAILED    2026-05-06 → 2026-05-14 (7.7d)    ingested: 8,300 (killed by restart)

Main's two source channels have 378k+ messages each. A full scan takes
days, but the worker gets restarted (container update, cycle timeout,
etc.) every few days. updateLastProcessedMessage was only called at the
END of a channel's scan — so the watermark on AccountChannelMap stayed
NULL through restart after restart, and every new run re-scanned from
message 0.

That explains the user's symptom: "main wasn't uploading although it
said it did". The dashboard showed currentStep alternating through
downloading / hashing / deduplicating, but zipsIngested stayed at 0
because every archive the run encountered was already a hash-duplicate
of something uploaded by a previous run.

Fix: processArchiveSets now accepts an onWatermarkAdvance callback.
After each successful set (ingested OR confirmed duplicate), the callback
fires with a watermark capped below the current minFailedId. Both call
sites (forum/topic and non-forum) wire it to upsertTopicProgress /
updateLastProcessedMessage. The end-of-scan write is retained for the
no-archives and all-failures-with-fallback cases.

Worst-case progress loss on restart now is one in-flight archive set,
not the entire scan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-22 23:20:20 +02:00
parent 3b327eb3f3
commit 77aeb4cc00

View File

@@ -606,14 +606,24 @@ export async function runWorkerForAccount(
pipelineCtx,
scanResult,
run.id,
progress?.lastProcessedMessageId
progress?.lastProcessedMessageId,
// Incremental watermark advance — saves progress per-set so a
// worker restart mid-scan doesn't lose all work.
async (messageId) => {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
messageId
);
}
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Advance progress: prefer archive watermark, fall back to scan watermark.
// Cap at one less than the lowest failed message ID so failed sets stay
// above the next scan boundary and get retried on the next cycle.
// Final watermark write at the end of the scan (covers the
// no-archives-found and all-failures-with-fallback cases).
// The incremental updates above already handle the success path.
let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) {
topicWatermark = minFailedId - 1n;
@@ -695,14 +705,19 @@ export async function runWorkerForAccount(
pipelineCtx,
scanResult,
run.id,
mapping.lastProcessedMessageId
mapping.lastProcessedMessageId,
// Incremental watermark advance — saves progress per-set so a
// worker restart mid-scan doesn't lose all work.
async (messageId) => {
await updateLastProcessedMessage(mapping.id, messageId);
}
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Advance progress: prefer archive watermark, fall back to scan watermark.
// Cap at one less than the lowest failed message ID so failed sets stay
// above the next scan boundary and get retried on the next cycle.
// Final watermark write at the end of the scan (covers the
// no-archives-found and all-failures-with-fallback cases).
// The incremental updates above already handle the success path.
let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
channelWatermark = minFailedId - 1n;
@@ -810,7 +825,12 @@ async function processArchiveSets(
ctx: PipelineContext,
scanResult: ChannelScanResult,
ingestionRunId: string,
lastProcessedMessageId?: bigint | null
lastProcessedMessageId?: bigint | null,
/** Called after each successful set with a safe watermark value (capped
* below any failed message ID in this scan). Used by the caller to
* advance the channel/topic watermark incrementally — otherwise a long
* scan that gets killed by worker restart loses all progress. */
onWatermarkAdvance?: (messageId: bigint) => Promise<void>
): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> {
const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx;
@@ -916,6 +936,27 @@ async function processArchiveSets(
maxProcessedId = setMaxId;
}
// Persist watermark immediately so a worker restart or cycle timeout
// doesn't throw away progress. We only advance below minFailedId so a
// later-encountered failure (out of order, e.g., multipart spanning)
// doesn't get buried by an earlier success in this scan. In practice
// sets are processed oldest-first, so setMaxId rarely exceeds
// minFailedId, but the cap keeps the invariant if it ever does.
if (onWatermarkAdvance) {
const safeWatermark =
minFailedId !== null && setMaxId >= minFailedId
? minFailedId - 1n
: setMaxId;
if (safeWatermark > 0n) {
await onWatermarkAdvance(safeWatermark).catch((err) => {
accountLog.warn(
{ err, setMaxId: setMaxId.toString() },
"Failed to persist incremental watermark (will retry at end of scan)"
);
});
}
}
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) {