mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-09 18:51:16 +00:00
feat(worker): retry old SkippedPackages + prefer specific topics over General
Three connected safeguards driven by user feedback after deploying the
incremental watermark and repost-detection fixes.
1. SkippedPackage retry pass (watermark pull-back)
The auto-retry chain (d99a506 + watermark cap) only works for failures
that occur AFTER the fix is deployed. Pre-existing SkippedPackages may
sit below the current watermark — example from prod: secondary's
"Turnbase Delivery Folder.7z" at msgId 37,109,104,640 vs watermark
37,111,201,792. The auto-retry never sees it.
Before scanning each channel/topic, we now query SkippedPackages with
attemptCount < cap for that scope and pull the watermark back to
(lowestSkippedMsgId - 1n) when needed. Both forum and non-forum
branches handle this.
2. Topic scan order: specific topics first, General last
In forum channels, files often appear in both a specific topic (e.g.,
"Artisan Guild January 2022") AND in General. The first encounter
created the Package and locked in the topic context. If we happened
to scan General first, the Package recorded the less-informative
topic.
We now sort topics so General is processed last. New Packages get
the more specific topic name as their context by default.
3. Backfill specific topic on existing Packages
For Packages that were already created with General topic context,
when findRepostedPackage matches and the current scan is in a more
specific topic, update the existing Package's sourceTopicId (and
creator, if it was derived from "General") to the more specific one.
Audit log shows both old and new topic IDs.
The findRepostedPackage query also got an ORDER BY so it returns the
most-specific existing match (non-null sourceTopicId first) when
multiple Packages share the same filename + size in a channel — giving
the audit log richer context.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -207,7 +207,11 @@ export async function findRepostedPackage(
|
||||
sourceChannelId: string,
|
||||
fileName: string,
|
||||
fileSize: bigint
|
||||
): Promise<{ id: string; destMessageId: bigint | null } | null> {
|
||||
): Promise<{
|
||||
id: string;
|
||||
destMessageId: bigint | null;
|
||||
sourceTopicId: bigint | null;
|
||||
} | null> {
|
||||
return db.package.findFirst({
|
||||
where: {
|
||||
sourceChannelId,
|
||||
@@ -215,7 +219,12 @@ export async function findRepostedPackage(
|
||||
fileSize,
|
||||
destMessageId: { not: null },
|
||||
},
|
||||
select: { id: true, destMessageId: true },
|
||||
// Prefer the existing Package with the most specific (non-NULL)
|
||||
// sourceTopicId, so when the user is re-scanning and the file already
|
||||
// exists in a specific topic, the audit log shows the most informative
|
||||
// match. NULLS LAST in DESC order achieves that for any non-null IDs.
|
||||
orderBy: { sourceTopicId: { sort: "desc", nulls: "last" } },
|
||||
select: { id: true, destMessageId: true, sourceTopicId: true },
|
||||
});
|
||||
}
|
||||
|
||||
@@ -702,6 +711,63 @@ export async function deleteSkippedPackage(
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Find SkippedPackages for a given account+channel that are still eligible
|
||||
* for auto-retry (attemptCount below the cap). Used at the start of a scan
|
||||
* to pull the watermark back so we don't strand failed messages forever
|
||||
* after the watermark has advanced past them.
|
||||
*
|
||||
* For non-forum channels, pass `topicId: null` to get rows with NULL topic.
|
||||
* For forum channels, pass the topic ID to scope to that topic only.
|
||||
*/
|
||||
export async function getRetryableSkippedMessageIds(args: {
|
||||
accountId: string;
|
||||
sourceChannelId: string;
|
||||
topicId: bigint | null;
|
||||
cap: number;
|
||||
}): Promise<bigint[]> {
|
||||
const rows = await db.skippedPackage.findMany({
|
||||
where: {
|
||||
accountId: args.accountId,
|
||||
sourceChannelId: args.sourceChannelId,
|
||||
sourceTopicId: args.topicId,
|
||||
attemptCount: { lt: args.cap },
|
||||
},
|
||||
select: { sourceMessageId: true },
|
||||
orderBy: { sourceMessageId: "asc" },
|
||||
});
|
||||
return rows.map((r) => r.sourceMessageId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a Package's source topic when a more specific topic context is
|
||||
* discovered for the same content. Used when findRepostedPackage matches
|
||||
* an existing Package whose topic is less specific (e.g., "General") than
|
||||
* the topic we just encountered the file in.
|
||||
*
|
||||
* Also updates the creator if the new topic name is more informative than
|
||||
* the existing creator (i.e., the existing creator was derived from a
|
||||
* less-specific topic name like "General").
|
||||
*/
|
||||
export async function updatePackageTopicContext(
|
||||
packageId: string,
|
||||
newTopicId: bigint,
|
||||
newTopicName: string | null
|
||||
): Promise<void> {
|
||||
await db.package.update({
|
||||
where: { id: packageId },
|
||||
data: {
|
||||
sourceTopicId: newTopicId,
|
||||
// Only overwrite creator if the new topic name is meaningful (non-empty,
|
||||
// non-General). Keeps explicit creator values from filename or admin
|
||||
// input intact.
|
||||
...(newTopicName && newTopicName !== "General"
|
||||
? { creator: newTopicName }
|
||||
: {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function createOrFindPackageGroup(input: {
|
||||
mediaAlbumId: string;
|
||||
sourceChannelId: string;
|
||||
|
||||
@@ -32,6 +32,8 @@ import {
|
||||
deleteSkippedPackage,
|
||||
getCappedSkippedMessageIds,
|
||||
findRepostedPackage,
|
||||
getRetryableSkippedMessageIds,
|
||||
updatePackageTopicContext,
|
||||
} from "./db/queries.js";
|
||||
import type { ActivityUpdate } from "./db/queries.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
@@ -524,21 +526,77 @@ export async function runWorkerForAccount(
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
const topics = await getForumTopicList(client, channel.telegramId);
|
||||
const rawTopics = await getForumTopicList(client, channel.telegramId);
|
||||
const topicProgressList = await getTopicProgress(mapping.id);
|
||||
|
||||
// Process more-specific topics BEFORE "General" so the first
|
||||
// encounter of any file is in its most specific context. This makes
|
||||
// newly-created Packages carry useful topic info (e.g., a campaign
|
||||
// name) instead of just "General".
|
||||
const topics = [...rawTopics].sort((a, b) => {
|
||||
const aIsGeneral = a.name === "General";
|
||||
const bIsGeneral = b.name === "General";
|
||||
if (aIsGeneral === bIsGeneral) return 0;
|
||||
return aIsGeneral ? 1 : -1;
|
||||
});
|
||||
|
||||
accountLog.info(
|
||||
{ channelId: channel.id, title: channel.title, topicCount: topics.length },
|
||||
"Scanning forum channel by topic"
|
||||
"Scanning forum channel by topic (specific topics first, General last)"
|
||||
);
|
||||
|
||||
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
|
||||
const topic = topics[tIdx];
|
||||
try {
|
||||
const progress = topicProgressList.find(
|
||||
let progress = topicProgressList.find(
|
||||
(tp) => tp.topicId === topic.topicId
|
||||
);
|
||||
|
||||
// ── SkippedPackage retry pass ──
|
||||
// If we have failed messages in this topic with attemptCount
|
||||
// below the cap, pull the watermark back below the lowest of
|
||||
// them so the scan re-picks them up. Without this, a message
|
||||
// that failed before my watermark cap fix (or had its watermark
|
||||
// advanced past it via the all-failures fallback) is stuck in
|
||||
// SkippedPackage forever.
|
||||
try {
|
||||
const retryable = await getRetryableSkippedMessageIds({
|
||||
accountId: account.id,
|
||||
sourceChannelId: channel.id,
|
||||
topicId: topic.topicId,
|
||||
cap: config.maxSkipAttempts,
|
||||
});
|
||||
if (retryable.length > 0) {
|
||||
const lowest = retryable[0];
|
||||
const currentWatermark = progress?.lastProcessedMessageId ?? null;
|
||||
if (currentWatermark !== null && currentWatermark >= lowest) {
|
||||
const resetTo = lowest - 1n;
|
||||
await upsertTopicProgress(
|
||||
mapping.id,
|
||||
topic.topicId,
|
||||
topic.name,
|
||||
resetTo
|
||||
);
|
||||
accountLog.info(
|
||||
{
|
||||
topic: topic.name,
|
||||
retryableCount: retryable.length,
|
||||
lowestSkippedMsgId: lowest.toString(),
|
||||
oldWatermark: currentWatermark.toString(),
|
||||
newWatermark: resetTo.toString(),
|
||||
},
|
||||
"Resetting topic watermark to retry skipped messages"
|
||||
);
|
||||
progress = { ...(progress ?? { id: "", accountChannelMapId: mapping.id, topicId: topic.topicId, topicName: topic.name }), lastProcessedMessageId: resetTo } as typeof progress;
|
||||
}
|
||||
}
|
||||
} catch (retryErr) {
|
||||
accountLog.warn(
|
||||
{ err: retryErr, topic: topic.name },
|
||||
"SkippedPackage retry pass failed (non-fatal)"
|
||||
);
|
||||
}
|
||||
|
||||
const topicLabel = `${channel.title} › ${topic.name}`;
|
||||
const topicProgress = topics.length > 1
|
||||
? ` (topic ${tIdx + 1}/${topics.length})`
|
||||
@@ -664,10 +722,47 @@ export async function runWorkerForAccount(
|
||||
"Processing source channel"
|
||||
);
|
||||
|
||||
// ── SkippedPackage retry pass ──
|
||||
// Pull the watermark back below the lowest still-retryable
|
||||
// SkippedPackage so they get picked up by the scan. See the matching
|
||||
// block in the forum branch for the rationale.
|
||||
let effectiveChannelWatermark = mapping.lastProcessedMessageId;
|
||||
try {
|
||||
const retryable = await getRetryableSkippedMessageIds({
|
||||
accountId: account.id,
|
||||
sourceChannelId: channel.id,
|
||||
topicId: null,
|
||||
cap: config.maxSkipAttempts,
|
||||
});
|
||||
if (retryable.length > 0) {
|
||||
const lowest = retryable[0];
|
||||
if (effectiveChannelWatermark !== null && effectiveChannelWatermark >= lowest) {
|
||||
const resetTo = lowest - 1n;
|
||||
await updateLastProcessedMessage(mapping.id, resetTo);
|
||||
accountLog.info(
|
||||
{
|
||||
channel: channel.title,
|
||||
retryableCount: retryable.length,
|
||||
lowestSkippedMsgId: lowest.toString(),
|
||||
oldWatermark: effectiveChannelWatermark.toString(),
|
||||
newWatermark: resetTo.toString(),
|
||||
},
|
||||
"Resetting channel watermark to retry skipped messages"
|
||||
);
|
||||
effectiveChannelWatermark = resetTo;
|
||||
}
|
||||
}
|
||||
} catch (retryErr) {
|
||||
accountLog.warn(
|
||||
{ err: retryErr, channel: channel.title },
|
||||
"SkippedPackage retry pass failed (non-fatal)"
|
||||
);
|
||||
}
|
||||
|
||||
const scanResult = await getChannelMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
mapping.lastProcessedMessageId,
|
||||
effectiveChannelWatermark,
|
||||
100,
|
||||
(scanned) => {
|
||||
throttled.update({
|
||||
@@ -706,7 +801,7 @@ export async function runWorkerForAccount(
|
||||
pipelineCtx,
|
||||
scanResult,
|
||||
run.id,
|
||||
mapping.lastProcessedMessageId,
|
||||
effectiveChannelWatermark,
|
||||
// Incremental watermark advance — saves progress per-set so a
|
||||
// worker restart mid-scan doesn't lose all work.
|
||||
async (messageId) => {
|
||||
@@ -1183,12 +1278,44 @@ async function processOneArchiveSet(
|
||||
);
|
||||
if (reposted) {
|
||||
counters.zipsDuplicate++;
|
||||
|
||||
// Backfill topic context onto the existing Package when we encounter the
|
||||
// same file in a more specific topic. If the existing Package was created
|
||||
// from "General" or a non-forum scan and we now see the file in a named
|
||||
// topic (e.g., "Artisan Guild January 2022"), update the Package so the
|
||||
// user gets richer metadata. We only update when the current scan is in
|
||||
// a specific topic AND the existing topic differs.
|
||||
const currentTopicName = ctx.topicCreator; // == topic.name for forum scans
|
||||
const currentTopicId = ctx.sourceTopicId;
|
||||
const isCurrentSpecific = currentTopicName !== null && currentTopicName !== "General";
|
||||
const existingTopicDiffers = reposted.sourceTopicId !== currentTopicId;
|
||||
if (isCurrentSpecific && currentTopicId !== null && existingTopicDiffers) {
|
||||
try {
|
||||
await updatePackageTopicContext(reposted.id, currentTopicId, currentTopicName);
|
||||
accountLog.info(
|
||||
{
|
||||
fileName: archiveName,
|
||||
packageId: reposted.id,
|
||||
existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null,
|
||||
newTopicId: Number(currentTopicId),
|
||||
newTopicName: currentTopicName,
|
||||
},
|
||||
"Updated existing Package with more specific topic context"
|
||||
);
|
||||
} catch (updErr) {
|
||||
accountLog.warn({ err: updErr, packageId: reposted.id }, "Failed to update Package topic context (non-fatal)");
|
||||
}
|
||||
}
|
||||
|
||||
accountLog.info(
|
||||
{
|
||||
fileName: archiveName,
|
||||
sourceMessageId: Number(archiveSet.parts[0].id),
|
||||
existingPackageId: reposted.id,
|
||||
existingDestMessageId: reposted.destMessageId ? Number(reposted.destMessageId) : null,
|
||||
existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null,
|
||||
currentTopicId: currentTopicId ? Number(currentTopicId) : null,
|
||||
currentTopicName,
|
||||
totalSize: Number(totalArchiveSize),
|
||||
},
|
||||
"Skipping repost — same fileName + size already uploaded in this channel"
|
||||
|
||||
Reference in New Issue
Block a user