mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-09 18:51:16 +00:00
fix: cap watermark below failed sets so failures retry next cycle
Previously the channel/topic watermark could advance past failed
archive sets in two ways:
1. A later successful set raised maxProcessedId past a failed earlier
set within the same scan.
2. scanResult.maxScannedMessageId was used as fallback even when
archives in the scan had failed (added in 77c26ad to prevent
re-scanning empty channels).
Both paths buried failed archives below the watermark on the next
cycle — they sat permanently in SkippedPackage with no auto-recovery.
Now processArchiveSets returns the lowest failed source message ID
alongside the highest processed one. The caller caps the watermark at
(minFailedId - 1n) so the next scan re-includes the failed messages
and processOneArchiveSet retries them. Successful sets above the
failure boundary are not re-uploaded — packageExistsBySourceMessage
early-skips them on the second pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -560,13 +560,23 @@ export async function runWorkerForAccount(
|
||||
pipelineCtx.sourceTopicId = topic.topicId;
|
||||
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
||||
|
||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
||||
const { maxProcessedId, minFailedId } = await processArchiveSets(
|
||||
pipelineCtx,
|
||||
scanResult,
|
||||
run.id,
|
||||
progress?.lastProcessedMessageId
|
||||
);
|
||||
// Sync client back in case it was recreated during upload stall recovery
|
||||
client = pipelineCtx.client;
|
||||
|
||||
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||
const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (topicWatermark) {
|
||||
// Advance progress: prefer archive watermark, fall back to scan watermark.
|
||||
// Cap at one less than the lowest failed message ID so failed sets stay
|
||||
// above the next scan boundary and get retried on the next cycle.
|
||||
let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) {
|
||||
topicWatermark = minFailedId - 1n;
|
||||
}
|
||||
if (topicWatermark !== null) {
|
||||
await upsertTopicProgress(
|
||||
mapping.id,
|
||||
topic.topicId,
|
||||
@@ -639,13 +649,23 @@ export async function runWorkerForAccount(
|
||||
pipelineCtx.sourceTopicId = null;
|
||||
pipelineCtx.channelTitle = channel.title;
|
||||
|
||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
||||
const { maxProcessedId, minFailedId } = await processArchiveSets(
|
||||
pipelineCtx,
|
||||
scanResult,
|
||||
run.id,
|
||||
mapping.lastProcessedMessageId
|
||||
);
|
||||
// Sync client back in case it was recreated during upload stall recovery
|
||||
client = pipelineCtx.client;
|
||||
|
||||
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||
const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (channelWatermark) {
|
||||
// Advance progress: prefer archive watermark, fall back to scan watermark.
|
||||
// Cap at one less than the lowest failed message ID so failed sets stay
|
||||
// above the next scan boundary and get retried on the next cycle.
|
||||
let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
|
||||
channelWatermark = minFailedId - 1n;
|
||||
}
|
||||
if (channelWatermark !== null) {
|
||||
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
||||
}
|
||||
}
|
||||
@@ -705,7 +725,7 @@ async function processArchiveSets(
|
||||
scanResult: ChannelScanResult,
|
||||
ingestionRunId: string,
|
||||
lastProcessedMessageId?: bigint | null
|
||||
): Promise<bigint | null> {
|
||||
): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> {
|
||||
const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx;
|
||||
|
||||
// Group into archive sets
|
||||
@@ -754,8 +774,11 @@ async function processArchiveSets(
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
// Track the highest message ID that was successfully processed
|
||||
// Track the highest message ID that was successfully processed and the
|
||||
// lowest message ID of any failed set. The caller uses minFailedId to cap
|
||||
// the watermark so failures get retried on the next cycle.
|
||||
let maxProcessedId: bigint | null = null;
|
||||
let minFailedId: bigint | null = null;
|
||||
const indexedPackageRefs: IndexedPackageRef[] = [];
|
||||
|
||||
for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) {
|
||||
@@ -796,6 +819,16 @@ async function processArchiveSets(
|
||||
"Archive set failed, watermark will not advance past this set"
|
||||
);
|
||||
|
||||
// Record the lowest part ID of this set as a failure boundary so the
|
||||
// caller can cap the watermark below it and the next scan re-picks it up.
|
||||
const setMinId = archiveSets[setIdx].parts.reduce(
|
||||
(min, p) => (p.id < min ? p.id : min),
|
||||
archiveSets[setIdx].parts[0].id
|
||||
);
|
||||
if (minFailedId === null || setMinId < minFailedId) {
|
||||
minFailedId = setMinId;
|
||||
}
|
||||
|
||||
// ── TDLib client recreation on repeated upload stalls ──
|
||||
// When the TDLib event stream degrades, uploads complete (bytes sent)
|
||||
// but confirmations never arrive. Retrying with the same broken client
|
||||
@@ -929,7 +962,7 @@ async function processArchiveSets(
|
||||
await detectGroupingConflicts(channel.id, indexedPackageRefs);
|
||||
}
|
||||
|
||||
return maxProcessedId;
|
||||
return { maxProcessedId, minFailedId };
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user