From 77aeb4cc00fd650835b4b962d5004734ee0d8be6 Mon Sep 17 00:00:00 2001 From: xCyanGrizzly Date: Fri, 22 May 2026 23:20:20 +0200 Subject: [PATCH] fix: advance channel/topic watermark incrementally per successful set MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Diagnosed from production logs for the main (Premium) account: RUNNING 2026-05-21 → in progress, 22h ingested: 0 FAILED 2026-05-14 → 2026-05-21 (7.4d) ingested: 5,426 (killed by restart) FAILED 2026-05-06 → 2026-05-14 (7.7d) ingested: 8,300 (killed by restart) Main's two source channels have 378k+ messages each. A full scan takes days, but the worker gets restarted (container update, cycle timeout, etc.) every few days. updateLastProcessedMessage was only called at the END of a channel's scan — so the watermark on AccountChannelMap stayed NULL through restart after restart, and every new run re-scanned from message 0. That explains the user's symptom: "main wasn't uploading although it said it did". The dashboard showed currentStep alternating through downloading / hashing / deduplicating, but zipsIngested stayed at 0 because every archive the run encountered was already a hash-duplicate of something uploaded by a previous run. Fix: processArchiveSets now accepts an onWatermarkAdvance callback. After each successful set (ingested OR confirmed duplicate), the callback fires with a watermark capped below the current minFailedId. Both call sites (forum/topic and non-forum) wire it to upsertTopicProgress / updateLastProcessedMessage. The end-of-scan write is retained for the no-archives and all-failures-with-fallback cases. Worst-case progress loss on restart now is one in-flight archive set, not the entire scan. Co-Authored-By: Claude Opus 4.7 (1M context) --- worker/src/worker.ts | 59 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/worker/src/worker.ts b/worker/src/worker.ts index ca4bc1b..46244ac 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -606,14 +606,24 @@ export async function runWorkerForAccount( pipelineCtx, scanResult, run.id, - progress?.lastProcessedMessageId + progress?.lastProcessedMessageId, + // Incremental watermark advance — saves progress per-set so a + // worker restart mid-scan doesn't lose all work. + async (messageId) => { + await upsertTopicProgress( + mapping.id, + topic.topicId, + topic.name, + messageId + ); + } ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: prefer archive watermark, fall back to scan watermark. - // Cap at one less than the lowest failed message ID so failed sets stay - // above the next scan boundary and get retried on the next cycle. + // Final watermark write at the end of the scan (covers the + // no-archives-found and all-failures-with-fallback cases). + // The incremental updates above already handle the success path. let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) { topicWatermark = minFailedId - 1n; @@ -695,14 +705,19 @@ export async function runWorkerForAccount( pipelineCtx, scanResult, run.id, - mapping.lastProcessedMessageId + mapping.lastProcessedMessageId, + // Incremental watermark advance — saves progress per-set so a + // worker restart mid-scan doesn't lose all work. + async (messageId) => { + await updateLastProcessedMessage(mapping.id, messageId); + } ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: prefer archive watermark, fall back to scan watermark. - // Cap at one less than the lowest failed message ID so failed sets stay - // above the next scan boundary and get retried on the next cycle. + // Final watermark write at the end of the scan (covers the + // no-archives-found and all-failures-with-fallback cases). + // The incremental updates above already handle the success path. let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) { channelWatermark = minFailedId - 1n; @@ -810,7 +825,12 @@ async function processArchiveSets( ctx: PipelineContext, scanResult: ChannelScanResult, ingestionRunId: string, - lastProcessedMessageId?: bigint | null + lastProcessedMessageId?: bigint | null, + /** Called after each successful set with a safe watermark value (capped + * below any failed message ID in this scan). Used by the caller to + * advance the channel/topic watermark incrementally — otherwise a long + * scan that gets killed by worker restart loses all progress. */ + onWatermarkAdvance?: (messageId: bigint) => Promise ): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> { const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx; @@ -916,6 +936,27 @@ async function processArchiveSets( maxProcessedId = setMaxId; } + // Persist watermark immediately so a worker restart or cycle timeout + // doesn't throw away progress. We only advance below minFailedId so a + // later-encountered failure (out of order, e.g., multipart spanning) + // doesn't get buried by an earlier success in this scan. In practice + // sets are processed oldest-first, so setMaxId rarely exceeds + // minFailedId, but the cap keeps the invariant if it ever does. + if (onWatermarkAdvance) { + const safeWatermark = + minFailedId !== null && setMaxId >= minFailedId + ? minFailedId - 1n + : setMaxId; + if (safeWatermark > 0n) { + await onWatermarkAdvance(safeWatermark).catch((err) => { + accountLog.warn( + { err, setMaxId: setMaxId.toString() }, + "Failed to persist incremental watermark (will retry at end of scan)" + ); + }); + } + } + // Reset stall counter on any successful upload ctx.consecutiveStalls = 0; } catch (setErr) {