feat: add retry server actions for skipped/failed packages

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 16:22:31 +01:00
parent 71c3228e44
commit dcc1c97053

View File

@@ -177,3 +177,148 @@ export async function setPreviewFromExtract(
return { success: false, error: "Failed to set preview from archive image" };
}
}
export async function retrySkippedPackageAction(
id: string
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
const skipped = await prisma.skippedPackage.findUnique({
where: { id },
});
if (!skipped) return { success: false, error: "Skipped package not found" };
// Find the AccountChannelMap and reset watermark if needed
const mapping = await prisma.accountChannelMap.findUnique({
where: {
accountId_channelId: {
accountId: skipped.accountId,
channelId: skipped.sourceChannelId,
},
},
});
if (mapping) {
const targetId = skipped.sourceMessageId - 1n;
// Only reset if the watermark is past this message
if (mapping.lastProcessedMessageId && mapping.lastProcessedMessageId >= skipped.sourceMessageId) {
await prisma.accountChannelMap.update({
where: { id: mapping.id },
data: { lastProcessedMessageId: targetId },
});
}
// Also reset TopicProgress if this was a forum topic message
if (skipped.sourceTopicId) {
const topicProgress = await prisma.topicProgress.findFirst({
where: {
accountChannelMapId: mapping.id,
topicId: skipped.sourceTopicId,
},
});
if (topicProgress && topicProgress.lastProcessedMessageId && topicProgress.lastProcessedMessageId >= skipped.sourceMessageId) {
await prisma.topicProgress.update({
where: { id: topicProgress.id },
data: { lastProcessedMessageId: targetId },
});
}
}
}
// Delete the skip record
await prisma.skippedPackage.delete({ where: { id } });
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to retry skipped package" };
}
}
export async function retryAllSkippedPackagesAction(
reason?: "SIZE_LIMIT" | "DOWNLOAD_FAILED" | "EXTRACT_FAILED" | "UPLOAD_FAILED"
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
const where: Record<string, unknown> = {};
if (reason) where.reason = reason;
const skippedItems = await prisma.skippedPackage.findMany({ where });
if (skippedItems.length === 0) {
return { success: true, data: undefined };
}
// Group by (accountId, channelId) to find minimum messageId per channel
const channelResets = new Map<string, { mappingKey: { accountId: string; channelId: string }; minMessageId: bigint; topicResets: Map<bigint, bigint> }>();
for (const item of skippedItems) {
const key = `${item.accountId}:${item.sourceChannelId}`;
const existing = channelResets.get(key);
const targetId = item.sourceMessageId - 1n;
if (!existing) {
const topicResets = new Map<bigint, bigint>();
if (item.sourceTopicId) {
topicResets.set(item.sourceTopicId, targetId);
}
channelResets.set(key, {
mappingKey: { accountId: item.accountId, channelId: item.sourceChannelId },
minMessageId: targetId,
topicResets,
});
} else {
if (targetId < existing.minMessageId) {
existing.minMessageId = targetId;
}
if (item.sourceTopicId) {
const existingTopic = existing.topicResets.get(item.sourceTopicId);
if (!existingTopic || targetId < existingTopic) {
existing.topicResets.set(item.sourceTopicId, targetId);
}
}
}
}
// Reset watermarks
for (const reset of channelResets.values()) {
const mapping = await prisma.accountChannelMap.findUnique({
where: { accountId_channelId: reset.mappingKey },
});
if (!mapping) continue;
if (mapping.lastProcessedMessageId && mapping.lastProcessedMessageId > reset.minMessageId) {
await prisma.accountChannelMap.update({
where: { id: mapping.id },
data: { lastProcessedMessageId: reset.minMessageId },
});
}
// Reset topic progress
for (const [topicId, targetId] of reset.topicResets) {
const topicProgress = await prisma.topicProgress.findFirst({
where: { accountChannelMapId: mapping.id, topicId },
});
if (topicProgress && topicProgress.lastProcessedMessageId && topicProgress.lastProcessedMessageId > targetId) {
await prisma.topicProgress.update({
where: { id: topicProgress.id },
data: { lastProcessedMessageId: targetId },
});
}
}
}
// Delete all matching skip records
await prisma.skippedPackage.deleteMany({ where });
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to retry skipped packages" };
}
}