diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 455c0a3..3cf193d 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -455,6 +455,73 @@ export async function updateLastProcessedMessage( }); } +export interface ScanStateUpdate { + /** New watermark to persist. Use the same value the caller would have + * passed to updateLastProcessedMessage / upsertTopicProgress. */ + lastProcessedMessageId: bigint | null; + /** True if the scan found archives OR has retryable SkippedPackages + * pending. The caller computes this via the trulyIdle formula. */ + lastScanFoundArchives: boolean; + /** Pre-incremented value of consecutiveEmptyScans. Caller passes: + * trulyIdle ? prev + 1 : 0 + * We do the arithmetic outside the helper so the helper stays a pure + * setter — easier to reason about. */ + consecutiveEmptyScans: number; +} + +/** + * Atomically update an AccountChannelMap's watermark and scan-state fields. + * Replaces the older updateLastProcessedMessage for the post-scan write. + * Sets lastScannedAt = NOW() server-side. + */ +export async function upsertChannelScanState( + mappingId: string, + update: ScanStateUpdate +) { + return db.accountChannelMap.update({ + where: { id: mappingId }, + data: { + lastProcessedMessageId: update.lastProcessedMessageId ?? undefined, + lastScannedAt: new Date(), + lastScanFoundArchives: update.lastScanFoundArchives, + consecutiveEmptyScans: update.consecutiveEmptyScans, + }, + }); +} + +/** + * Atomically upsert a TopicProgress row with the new watermark + scan-state + * fields. Same semantics as upsertChannelScanState but for forum topics. + */ +export async function upsertTopicScanState( + accountChannelMapId: string, + topicId: bigint, + topicName: string | null, + update: ScanStateUpdate +) { + return db.topicProgress.upsert({ + where: { + accountChannelMapId_topicId: { accountChannelMapId, topicId }, + }, + create: { + accountChannelMapId, + topicId, + topicName, + lastProcessedMessageId: update.lastProcessedMessageId, + lastScannedAt: new Date(), + lastScanFoundArchives: update.lastScanFoundArchives, + consecutiveEmptyScans: update.consecutiveEmptyScans, + }, + update: { + topicName, + lastProcessedMessageId: update.lastProcessedMessageId ?? undefined, + lastScannedAt: new Date(), + lastScanFoundArchives: update.lastScanFoundArchives, + consecutiveEmptyScans: update.consecutiveEmptyScans, + }, + }); +} + export async function markStaleRunsAsFailed() { return db.ingestionRun.updateMany({ where: { status: "RUNNING" },