feat(db): add upsertChannelScanState / upsertTopicScanState helpers

Wraps the existing watermark write with the three new scan-state
columns from the previous commit. Single transaction, sets
lastScannedAt=NOW() server-side. Caller is responsible for computing
the trulyIdle bool and the new consecutiveEmptyScans value
(pre-increment vs reset).

Existing updateLastProcessedMessage / upsertTopicProgress are kept for
callers that don't need the new fields (the SkippedPackage retry pass,
which only adjusts the watermark).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-26 19:53:50 +02:00
parent 6652fb8bc4
commit 3111d658f8

View File

@@ -455,6 +455,73 @@ export async function updateLastProcessedMessage(
});
}
export interface ScanStateUpdate {
/** New watermark to persist. Use the same value the caller would have
* passed to updateLastProcessedMessage / upsertTopicProgress. */
lastProcessedMessageId: bigint | null;
/** True if the scan found archives OR has retryable SkippedPackages
* pending. The caller computes this via the trulyIdle formula. */
lastScanFoundArchives: boolean;
/** Pre-incremented value of consecutiveEmptyScans. Caller passes:
* trulyIdle ? prev + 1 : 0
* We do the arithmetic outside the helper so the helper stays a pure
* setter — easier to reason about. */
consecutiveEmptyScans: number;
}
/**
* Atomically update an AccountChannelMap's watermark and scan-state fields.
* Replaces the older updateLastProcessedMessage for the post-scan write.
* Sets lastScannedAt = NOW() server-side.
*/
export async function upsertChannelScanState(
mappingId: string,
update: ScanStateUpdate
) {
return db.accountChannelMap.update({
where: { id: mappingId },
data: {
lastProcessedMessageId: update.lastProcessedMessageId ?? undefined,
lastScannedAt: new Date(),
lastScanFoundArchives: update.lastScanFoundArchives,
consecutiveEmptyScans: update.consecutiveEmptyScans,
},
});
}
/**
* Atomically upsert a TopicProgress row with the new watermark + scan-state
* fields. Same semantics as upsertChannelScanState but for forum topics.
*/
export async function upsertTopicScanState(
accountChannelMapId: string,
topicId: bigint,
topicName: string | null,
update: ScanStateUpdate
) {
return db.topicProgress.upsert({
where: {
accountChannelMapId_topicId: { accountChannelMapId, topicId },
},
create: {
accountChannelMapId,
topicId,
topicName,
lastProcessedMessageId: update.lastProcessedMessageId,
lastScannedAt: new Date(),
lastScanFoundArchives: update.lastScanFoundArchives,
consecutiveEmptyScans: update.consecutiveEmptyScans,
},
update: {
topicName,
lastProcessedMessageId: update.lastProcessedMessageId ?? undefined,
lastScannedAt: new Date(),
lastScanFoundArchives: update.lastScanFoundArchives,
consecutiveEmptyScans: update.consecutiveEmptyScans,
},
});
}
export async function markStaleRunsAsFailed() {
return db.ingestionRun.updateMany({
where: { status: "RUNNING" },