diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 7461d5e..e2592ca 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -207,7 +207,11 @@ export async function findRepostedPackage( sourceChannelId: string, fileName: string, fileSize: bigint -): Promise<{ id: string; destMessageId: bigint | null } | null> { +): Promise<{ + id: string; + destMessageId: bigint | null; + sourceTopicId: bigint | null; +} | null> { return db.package.findFirst({ where: { sourceChannelId, @@ -215,7 +219,12 @@ export async function findRepostedPackage( fileSize, destMessageId: { not: null }, }, - select: { id: true, destMessageId: true }, + // Prefer the existing Package with the most specific (non-NULL) + // sourceTopicId, so when the user is re-scanning and the file already + // exists in a specific topic, the audit log shows the most informative + // match. NULLS LAST in DESC order achieves that for any non-null IDs. + orderBy: { sourceTopicId: { sort: "desc", nulls: "last" } }, + select: { id: true, destMessageId: true, sourceTopicId: true }, }); } @@ -702,6 +711,63 @@ export async function deleteSkippedPackage( }); } +/** + * Find SkippedPackages for a given account+channel that are still eligible + * for auto-retry (attemptCount below the cap). Used at the start of a scan + * to pull the watermark back so we don't strand failed messages forever + * after the watermark has advanced past them. + * + * For non-forum channels, pass `topicId: null` to get rows with NULL topic. + * For forum channels, pass the topic ID to scope to that topic only. + */ +export async function getRetryableSkippedMessageIds(args: { + accountId: string; + sourceChannelId: string; + topicId: bigint | null; + cap: number; +}): Promise { + const rows = await db.skippedPackage.findMany({ + where: { + accountId: args.accountId, + sourceChannelId: args.sourceChannelId, + sourceTopicId: args.topicId, + attemptCount: { lt: args.cap }, + }, + select: { sourceMessageId: true }, + orderBy: { sourceMessageId: "asc" }, + }); + return rows.map((r) => r.sourceMessageId); +} + +/** + * Update a Package's source topic when a more specific topic context is + * discovered for the same content. Used when findRepostedPackage matches + * an existing Package whose topic is less specific (e.g., "General") than + * the topic we just encountered the file in. + * + * Also updates the creator if the new topic name is more informative than + * the existing creator (i.e., the existing creator was derived from a + * less-specific topic name like "General"). + */ +export async function updatePackageTopicContext( + packageId: string, + newTopicId: bigint, + newTopicName: string | null +): Promise { + await db.package.update({ + where: { id: packageId }, + data: { + sourceTopicId: newTopicId, + // Only overwrite creator if the new topic name is meaningful (non-empty, + // non-General). Keeps explicit creator values from filename or admin + // input intact. + ...(newTopicName && newTopicName !== "General" + ? { creator: newTopicName } + : {}), + }, + }); +} + export async function createOrFindPackageGroup(input: { mediaAlbumId: string; sourceChannelId: string; diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 3c06cc4..723b5be 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -32,6 +32,8 @@ import { deleteSkippedPackage, getCappedSkippedMessageIds, findRepostedPackage, + getRetryableSkippedMessageIds, + updatePackageTopicContext, } from "./db/queries.js"; import type { ActivityUpdate } from "./db/queries.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; @@ -524,21 +526,77 @@ export async function runWorkerForAccount( messagesScanned: counters.messagesScanned, }); - const topics = await getForumTopicList(client, channel.telegramId); + const rawTopics = await getForumTopicList(client, channel.telegramId); const topicProgressList = await getTopicProgress(mapping.id); + // Process more-specific topics BEFORE "General" so the first + // encounter of any file is in its most specific context. This makes + // newly-created Packages carry useful topic info (e.g., a campaign + // name) instead of just "General". + const topics = [...rawTopics].sort((a, b) => { + const aIsGeneral = a.name === "General"; + const bIsGeneral = b.name === "General"; + if (aIsGeneral === bIsGeneral) return 0; + return aIsGeneral ? 1 : -1; + }); + accountLog.info( { channelId: channel.id, title: channel.title, topicCount: topics.length }, - "Scanning forum channel by topic" + "Scanning forum channel by topic (specific topics first, General last)" ); for (let tIdx = 0; tIdx < topics.length; tIdx++) { const topic = topics[tIdx]; try { - const progress = topicProgressList.find( + let progress = topicProgressList.find( (tp) => tp.topicId === topic.topicId ); + // ── SkippedPackage retry pass ── + // If we have failed messages in this topic with attemptCount + // below the cap, pull the watermark back below the lowest of + // them so the scan re-picks them up. Without this, a message + // that failed before my watermark cap fix (or had its watermark + // advanced past it via the all-failures fallback) is stuck in + // SkippedPackage forever. + try { + const retryable = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: topic.topicId, + cap: config.maxSkipAttempts, + }); + if (retryable.length > 0) { + const lowest = retryable[0]; + const currentWatermark = progress?.lastProcessedMessageId ?? null; + if (currentWatermark !== null && currentWatermark >= lowest) { + const resetTo = lowest - 1n; + await upsertTopicProgress( + mapping.id, + topic.topicId, + topic.name, + resetTo + ); + accountLog.info( + { + topic: topic.name, + retryableCount: retryable.length, + lowestSkippedMsgId: lowest.toString(), + oldWatermark: currentWatermark.toString(), + newWatermark: resetTo.toString(), + }, + "Resetting topic watermark to retry skipped messages" + ); + progress = { ...(progress ?? { id: "", accountChannelMapId: mapping.id, topicId: topic.topicId, topicName: topic.name }), lastProcessedMessageId: resetTo } as typeof progress; + } + } + } catch (retryErr) { + accountLog.warn( + { err: retryErr, topic: topic.name }, + "SkippedPackage retry pass failed (non-fatal)" + ); + } + const topicLabel = `${channel.title} › ${topic.name}`; const topicProgress = topics.length > 1 ? ` (topic ${tIdx + 1}/${topics.length})` @@ -664,10 +722,47 @@ export async function runWorkerForAccount( "Processing source channel" ); + // ── SkippedPackage retry pass ── + // Pull the watermark back below the lowest still-retryable + // SkippedPackage so they get picked up by the scan. See the matching + // block in the forum branch for the rationale. + let effectiveChannelWatermark = mapping.lastProcessedMessageId; + try { + const retryable = await getRetryableSkippedMessageIds({ + accountId: account.id, + sourceChannelId: channel.id, + topicId: null, + cap: config.maxSkipAttempts, + }); + if (retryable.length > 0) { + const lowest = retryable[0]; + if (effectiveChannelWatermark !== null && effectiveChannelWatermark >= lowest) { + const resetTo = lowest - 1n; + await updateLastProcessedMessage(mapping.id, resetTo); + accountLog.info( + { + channel: channel.title, + retryableCount: retryable.length, + lowestSkippedMsgId: lowest.toString(), + oldWatermark: effectiveChannelWatermark.toString(), + newWatermark: resetTo.toString(), + }, + "Resetting channel watermark to retry skipped messages" + ); + effectiveChannelWatermark = resetTo; + } + } + } catch (retryErr) { + accountLog.warn( + { err: retryErr, channel: channel.title }, + "SkippedPackage retry pass failed (non-fatal)" + ); + } + const scanResult = await getChannelMessages( client, channel.telegramId, - mapping.lastProcessedMessageId, + effectiveChannelWatermark, 100, (scanned) => { throttled.update({ @@ -706,7 +801,7 @@ export async function runWorkerForAccount( pipelineCtx, scanResult, run.id, - mapping.lastProcessedMessageId, + effectiveChannelWatermark, // Incremental watermark advance — saves progress per-set so a // worker restart mid-scan doesn't lose all work. async (messageId) => { @@ -1183,12 +1278,44 @@ async function processOneArchiveSet( ); if (reposted) { counters.zipsDuplicate++; + + // Backfill topic context onto the existing Package when we encounter the + // same file in a more specific topic. If the existing Package was created + // from "General" or a non-forum scan and we now see the file in a named + // topic (e.g., "Artisan Guild January 2022"), update the Package so the + // user gets richer metadata. We only update when the current scan is in + // a specific topic AND the existing topic differs. + const currentTopicName = ctx.topicCreator; // == topic.name for forum scans + const currentTopicId = ctx.sourceTopicId; + const isCurrentSpecific = currentTopicName !== null && currentTopicName !== "General"; + const existingTopicDiffers = reposted.sourceTopicId !== currentTopicId; + if (isCurrentSpecific && currentTopicId !== null && existingTopicDiffers) { + try { + await updatePackageTopicContext(reposted.id, currentTopicId, currentTopicName); + accountLog.info( + { + fileName: archiveName, + packageId: reposted.id, + existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null, + newTopicId: Number(currentTopicId), + newTopicName: currentTopicName, + }, + "Updated existing Package with more specific topic context" + ); + } catch (updErr) { + accountLog.warn({ err: updErr, packageId: reposted.id }, "Failed to update Package topic context (non-fatal)"); + } + } + accountLog.info( { fileName: archiveName, sourceMessageId: Number(archiveSet.parts[0].id), existingPackageId: reposted.id, existingDestMessageId: reposted.destMessageId ? Number(reposted.destMessageId) : null, + existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null, + currentTopicId: currentTopicId ? Number(currentTopicId) : null, + currentTopicName, totalSize: Number(totalArchiveSize), }, "Skipping repost — same fileName + size already uploaded in this channel"