diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 66de9d8..53e77b2 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -560,13 +560,23 @@ export async function runWorkerForAccount( pipelineCtx.sourceTopicId = topic.topicId; pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`; - const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId); + const { maxProcessedId, minFailedId } = await processArchiveSets( + pipelineCtx, + scanResult, + run.id, + progress?.lastProcessedMessageId + ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: use archive watermark if available, fall back to scan watermark - const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; - if (topicWatermark) { + // Advance progress: prefer archive watermark, fall back to scan watermark. + // Cap at one less than the lowest failed message ID so failed sets stay + // above the next scan boundary and get retried on the next cycle. + let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; + if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) { + topicWatermark = minFailedId - 1n; + } + if (topicWatermark !== null) { await upsertTopicProgress( mapping.id, topic.topicId, @@ -639,13 +649,23 @@ export async function runWorkerForAccount( pipelineCtx.sourceTopicId = null; pipelineCtx.channelTitle = channel.title; - const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId); + const { maxProcessedId, minFailedId } = await processArchiveSets( + pipelineCtx, + scanResult, + run.id, + mapping.lastProcessedMessageId + ); // Sync client back in case it was recreated during upload stall recovery client = pipelineCtx.client; - // Advance progress: use archive watermark if available, fall back to scan watermark - const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; - if (channelWatermark) { + // Advance progress: prefer archive watermark, fall back to scan watermark. + // Cap at one less than the lowest failed message ID so failed sets stay + // above the next scan boundary and get retried on the next cycle. + let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId; + if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) { + channelWatermark = minFailedId - 1n; + } + if (channelWatermark !== null) { await updateLastProcessedMessage(mapping.id, channelWatermark); } } @@ -705,7 +725,7 @@ async function processArchiveSets( scanResult: ChannelScanResult, ingestionRunId: string, lastProcessedMessageId?: bigint | null -): Promise { +): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> { const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx; // Group into archive sets @@ -754,8 +774,11 @@ async function processArchiveSets( messagesScanned: counters.messagesScanned, }); - // Track the highest message ID that was successfully processed + // Track the highest message ID that was successfully processed and the + // lowest message ID of any failed set. The caller uses minFailedId to cap + // the watermark so failures get retried on the next cycle. let maxProcessedId: bigint | null = null; + let minFailedId: bigint | null = null; const indexedPackageRefs: IndexedPackageRef[] = []; for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) { @@ -796,6 +819,16 @@ async function processArchiveSets( "Archive set failed, watermark will not advance past this set" ); + // Record the lowest part ID of this set as a failure boundary so the + // caller can cap the watermark below it and the next scan re-picks it up. + const setMinId = archiveSets[setIdx].parts.reduce( + (min, p) => (p.id < min ? p.id : min), + archiveSets[setIdx].parts[0].id + ); + if (minFailedId === null || setMinId < minFailedId) { + minFailedId = setMinId; + } + // ── TDLib client recreation on repeated upload stalls ── // When the TDLib event stream degrades, uploads complete (bytes sent) // but confirmations never arrive. Retrying with the same broken client @@ -929,7 +962,7 @@ async function processArchiveSets( await detectGroupingConflicts(channel.id, indexedPackageRefs); } - return maxProcessedId; + return { maxProcessedId, minFailedId }; } /**