diff --git a/worker/src/worker.ts b/worker/src/worker.ts index ca4bc1b..46244ac 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -606,14 +606,24 @@ export async function runWorkerForAccount( pipelineCtx, scanResult, run.id, - progress?.lastProcessedMessageId + progress?.lastProcessedMessageId, + // Incremental watermark advance — saves progress per-set so a + // worker restart mid-scan doesn't lose all work. + async (messageId) => { + await upsertTopicProgress( + mapping.id, + topic.topicId, + topic.name, + messageId + ); + } ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: prefer archive watermark, fall back to scan watermark. - // Cap at one less than the lowest failed message ID so failed sets stay - // above the next scan boundary and get retried on the next cycle. + // Final watermark write at the end of the scan (covers the + // no-archives-found and all-failures-with-fallback cases). + // The incremental updates above already handle the success path. let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) { topicWatermark = minFailedId - 1n; @@ -695,14 +705,19 @@ export async function runWorkerForAccount( pipelineCtx, scanResult, run.id, - mapping.lastProcessedMessageId + mapping.lastProcessedMessageId, + // Incremental watermark advance — saves progress per-set so a + // worker restart mid-scan doesn't lose all work. + async (messageId) => { + await updateLastProcessedMessage(mapping.id, messageId); + } ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: prefer archive watermark, fall back to scan watermark. - // Cap at one less than the lowest failed message ID so failed sets stay - // above the next scan boundary and get retried on the next cycle. + // Final watermark write at the end of the scan (covers the + // no-archives-found and all-failures-with-fallback cases). + // The incremental updates above already handle the success path. let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) { channelWatermark = minFailedId - 1n; @@ -810,7 +825,12 @@ async function processArchiveSets( ctx: PipelineContext, scanResult: ChannelScanResult, ingestionRunId: string, - lastProcessedMessageId?: bigint | null + lastProcessedMessageId?: bigint | null, + /** Called after each successful set with a safe watermark value (capped + * below any failed message ID in this scan). Used by the caller to + * advance the channel/topic watermark incrementally — otherwise a long + * scan that gets killed by worker restart loses all progress. */ + onWatermarkAdvance?: (messageId: bigint) => Promise ): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> { const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx; @@ -916,6 +936,27 @@ async function processArchiveSets( maxProcessedId = setMaxId; } + // Persist watermark immediately so a worker restart or cycle timeout + // doesn't throw away progress. We only advance below minFailedId so a + // later-encountered failure (out of order, e.g., multipart spanning) + // doesn't get buried by an earlier success in this scan. In practice + // sets are processed oldest-first, so setMaxId rarely exceeds + // minFailedId, but the cap keeps the invariant if it ever does. + if (onWatermarkAdvance) { + const safeWatermark = + minFailedId !== null && setMaxId >= minFailedId + ? minFailedId - 1n + : setMaxId; + if (safeWatermark > 0n) { + await onWatermarkAdvance(safeWatermark).catch((err) => { + accountLog.warn( + { err, setMaxId: setMaxId.toString() }, + "Failed to persist incremental watermark (will retry at end of scan)" + ); + }); + } + } + // Reset stall counter on any successful upload ctx.consecutiveStalls = 0; } catch (setErr) {