Files
dragonsstash/worker/src/worker.ts
xCyanGrizzly 901f32ff41 feat(worker): retry old SkippedPackages + prefer specific topics over General
Three connected safeguards driven by user feedback after deploying the
incremental watermark and repost-detection fixes.

1. SkippedPackage retry pass (watermark pull-back)
   The auto-retry chain (d99a506 + watermark cap) only works for failures
   that occur AFTER the fix is deployed. Pre-existing SkippedPackages may
   sit below the current watermark — example from prod: secondary's
   "Turnbase Delivery Folder.7z" at msgId 37,109,104,640 vs watermark
   37,111,201,792. The auto-retry never sees it.

   Before scanning each channel/topic, we now query SkippedPackages with
   attemptCount < cap for that scope and pull the watermark back to
   (lowestSkippedMsgId - 1n) when needed. Both forum and non-forum
   branches handle this.

2. Topic scan order: specific topics first, General last
   In forum channels, files often appear in both a specific topic (e.g.,
   "Artisan Guild January 2022") AND in General. The first encounter
   created the Package and locked in the topic context. If we happened
   to scan General first, the Package recorded the less-informative
   topic.

   We now sort topics so General is processed last. New Packages get
   the more specific topic name as their context by default.

3. Backfill specific topic on existing Packages
   For Packages that were already created with General topic context,
   when findRepostedPackage matches and the current scan is in a more
   specific topic, update the existing Package's sourceTopicId (and
   creator, if it was derived from "General") to the more specific one.
   Audit log shows both old and new topic IDs.

The findRepostedPackage query also got an ORDER BY so it returns the
most-specific existing match (non-null sourceTopicId first) when
multiple Packages share the same filename + size in a channel — giving
the audit log richer context.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 09:02:54 +02:00

1820 lines
67 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import path from "path";
import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
import {
getSourceChannelMappings,
getGlobalDestinationChannel,
packageExistsByHash,
packageExistsBySourceMessage,
createPackageStub,
updatePackageWithMetadata,
createIngestionRun,
completeIngestionRun,
failIngestionRun,
updateLastProcessedMessage,
updateRunActivity,
setChannelForum,
getTopicProgress,
upsertTopicProgress,
upsertChannel,
ensureAccountChannelLink,
getGlobalSetting,
getChannelFetchRequest,
updateFetchRequestStatus,
getAccountLinkedChannelIds,
getExistingChannelsByTelegramId,
getAccountById,
deleteOrphanedPackageByHash,
getUploadedPackageByHash,
upsertSkippedPackage,
deleteSkippedPackage,
getCappedSkippedMessageIds,
findRepostedPackage,
getRetryableSkippedMessageIds,
updatePackageTopicContext,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
import { matchPreviewToArchive } from "./preview/match.js";
import { pickPreviewFile, extractPreviewImage } from "./preview/extract.js";
import { groupArchiveSets } from "./archive/multipart.js";
import type { ArchiveSet } from "./archive/multipart.js";
import { extractCreatorFromFileName, extractCreatorFromChannelTitle } from "./archive/creator.js";
import { hashParts } from "./archive/hash.js";
import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
import type { Client } from "tdl";
const log = childLogger("worker");
/**
* Authenticate a PENDING account by creating a TDLib client.
* TDLib will send an SMS code to the phone number, and the client.login()
* callbacks set the authState to AWAITING_CODE. Once the admin enters the
* code via the UI, pollForAuthCode picks it up and completes the login.
*
* After successful auth:
* 1. Fetches channels from Telegram and writes as a ChannelFetchRequest
* (so the admin can select sources in the UI)
* 2. Auto-joins the destination group if an invite link is configured
*/
export async function authenticateAccount(
account: TelegramAccount
): Promise<void> {
const aLog = childLogger("auth", { accountId: account.id, phone: account.phone });
aLog.info("Starting authentication flow");
let client: Client | undefined;
try {
client = (await createTdlibClient({
id: account.id,
phone: account.phone,
})).client;
aLog.info("Authentication successful");
// Auto-fetch channels and create a fetch request result
aLog.info("Fetching channels from Telegram...");
await createAutoFetchRequest(client, account.id, aLog);
// Auto-join the destination group if an invite link exists
const inviteLink = await getGlobalSetting("destination_invite_link");
if (inviteLink) {
aLog.info("Attempting to join destination group via invite link...");
try {
await joinChatByInviteLink(client, inviteLink);
// Link this account as WRITER to the destination channel
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
aLog.info({ destChannel: destChannel.title }, "Joined destination group and linked as WRITER");
}
} catch (err) {
// May already be a member — that's fine
aLog.warn({ err }, "Could not join destination group (may already be a member)");
// Still try to link as WRITER
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
}
}
}
} catch (err) {
aLog.error({ err }, "Authentication failed");
} finally {
if (client) {
await closeTdlibClient(client);
}
}
}
/**
* Process a ChannelFetchRequest: fetch channels from Telegram,
* enrich with DB state, and write the result JSON.
* Called by the fetch listener (pg_notify) and by authenticateAccount.
*/
export async function processFetchRequest(requestId: string): Promise<void> {
const aLog = childLogger("fetch-request", { requestId });
const request = await getChannelFetchRequest(requestId);
if (!request || request.status !== "PENDING") {
aLog.warn("Fetch request not found or not pending, skipping");
return;
}
await updateFetchRequestStatus(requestId, "IN_PROGRESS");
aLog.info({ accountId: request.accountId }, "Processing fetch request");
const { client } = await createTdlibClient({
id: request.account.id,
phone: request.account.phone,
});
try {
const chats = await getAccountChats(client);
// Enrich with DB state
const linkedTelegramIds = await getAccountLinkedChannelIds(request.accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Also upsert channel metadata while we have the data
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical — metadata sync can fail silently
}
}
await updateFetchRequestStatus(requestId, "COMPLETED", {
resultJson: JSON.stringify(enrichedChats),
});
aLog.info(
{ total: chats.length, linked: [...linkedTelegramIds].length },
"Fetch request completed"
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
aLog.error({ err }, "Fetch request failed");
await updateFetchRequestStatus(requestId, "FAILED", { error: message });
} finally {
await closeTdlibClient(client);
}
}
/**
* Internal helper called after authentication to auto-create a fetch request
* with the channel list (so the UI can show the picker immediately).
*/
async function createAutoFetchRequest(
client: Client,
accountId: string,
aLog: ReturnType<typeof childLogger>
): Promise<void> {
const chats = await getAccountChats(client);
const linkedTelegramIds = await getAccountLinkedChannelIds(accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Upsert channel metadata
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical
}
}
// Create the fetch request record with the result already filled in
const { db } = await import("./db/client.js");
await db.channelFetchRequest.create({
data: {
accountId,
status: "COMPLETED",
resultJson: JSON.stringify(enrichedChats),
},
});
aLog.info(
{ total: chats.length },
"Auto-fetch request created with channel list"
);
}
/**
* Throttle DB writes for download progress to avoid hammering the DB.
* Only writes if at least 2 seconds have passed since the last write.
*/
function createThrottledActivityUpdater(runId: string, minIntervalMs = 2000) {
let lastWriteTime = 0;
let pendingUpdate: ActivityUpdate | null = null;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const flush = async () => {
if (pendingUpdate) {
const update = pendingUpdate;
pendingUpdate = null;
lastWriteTime = Date.now();
await updateRunActivity(runId, update).catch(() => {});
}
};
return {
update: (activity: ActivityUpdate) => {
pendingUpdate = activity;
const elapsed = Date.now() - lastWriteTime;
if (elapsed >= minIntervalMs) {
if (flushTimer) clearTimeout(flushTimer);
flush();
} else if (!flushTimer) {
flushTimer = setTimeout(() => {
flushTimer = null;
flush();
}, minIntervalMs - elapsed);
}
},
flush,
};
}
/** Shared context passed to the archive processing pipeline. */
interface PipelineContext {
client: Client;
runId: string;
accountId: string;
accountPhone: string;
channelTitle: string;
channel: TelegramChannel;
destChannelTelegramId: bigint;
destChannelId: string;
throttled: ReturnType<typeof createThrottledActivityUpdater>;
counters: {
messagesScanned: number;
zipsFound: number;
zipsDuplicate: number;
zipsIngested: number;
};
/** Creator from forum topic name (null for non-forum). */
topicCreator: string | null;
/** Forum topic ID (null for non-forum). */
sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
/** How many consecutive upload stalls have occurred (resets on success). */
consecutiveStalls: number;
}
/**
* Run a full ingestion cycle for a single Telegram account.
* Every step writes live activity to the DB so the admin UI can display it.
*/
export async function runWorkerForAccount(
account: TelegramAccount
): Promise<void> {
const accountLog = childLogger("worker", { accountId: account.id, phone: account.phone });
// 1. Acquire advisory lock
const acquired = await tryAcquireLock(account.id);
if (!acquired) {
accountLog.info("Account already locked, skipping");
return;
}
let runId: string | undefined;
try {
// 2. Create ingestion run
const run = await createIngestionRun(account.id);
runId = run.id;
const activeRunId = runId;
accountLog.info({ runId }, "Ingestion run started");
const throttled = createThrottledActivityUpdater(activeRunId);
// 3. Initialize TDLib client
await updateRunActivity(activeRunId, {
currentActivity: "Connecting to Telegram",
currentStep: "connecting",
});
// Use let so the client can be replaced on TDLib recreation after stalls
let { client, isPremium } = await createTdlibClient({
id: account.id,
phone: account.phone,
});
const maxUploadSize = isPremium
? 3950n * 1024n * 1024n
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
// Load all chats into TDLib's local cache using loadChats (the recommended API).
// Without this, getChat/searchChatMessages fail with "Chat not found".
// loadChats returns a 404 when all chats have been loaded — that's the stop signal.
// Load from main, archive, AND chat folders to cover all chat types.
{
// Discover chat folders first
const folderLists: { _: "chatListFolder"; chat_folder_id: number }[] = [];
try {
const folders = await client.invoke({ _: "getChatFolders" }) as {
chat_folders?: { id: number }[];
};
if (folders.chat_folders) {
for (const f of folders.chat_folders) {
folderLists.push({ _: "chatListFolder", chat_folder_id: f.id });
}
}
} catch {
// getChatFolders may not be available in older TDLib versions
}
const chatLists: Record<string, unknown>[] = [
{ _: "chatListMain" },
{ _: "chatListArchive" },
...folderLists,
];
for (const chatList of chatLists) {
try {
for (let page = 0; page < 500; page++) {
await client.invoke({
_: "loadChats",
chat_list: chatList,
limit: 100,
});
// loadChats returns ok — keep going until 404
}
} catch {
// 404 = all chats loaded (expected), or unsupported list type
}
}
}
const counters = {
messagesScanned: 0,
zipsFound: 0,
zipsDuplicate: 0,
zipsIngested: 0,
};
try {
// 4. Get assigned source channels and global destination
const channelMappings = await getSourceChannelMappings(account.id);
const destChannel = await getGlobalDestinationChannel();
if (!destChannel) {
throw new Error("No global destination channel configured — set one in the admin UI");
}
// ── Ensure TDLib knows about the destination chat ──
// Source channels get an explicit getChat below, but the destination was
// previously only loaded via loadChats — which can miss it if the account
// archived/moved it. Failing here surfaces the problem clearly instead of
// letting every upload fail with a cryptic "Chat not found".
try {
await client.invoke({
_: "getChat",
chat_id: Number(destChannel.telegramId),
});
} catch (destErr) {
accountLog.error(
{ err: destErr, destChannel: destChannel.title, telegramId: destChannel.telegramId.toString() },
"Destination chat is not accessible to this account — uploads will fail. Re-join via invite link or remove this account."
);
// Surface as a persistent notification so the admin sees it in the UI
try {
await db.systemNotification.create({
data: {
type: "UPLOAD_FAILED",
severity: "ERROR",
title: `Destination chat unreachable for ${account.phone}`,
message: `Account ${account.phone} cannot access the destination chat "${destChannel.title}". Uploads for this account will fail until access is restored. Re-join via the invite link in admin settings.`,
context: {
accountId: account.id,
accountPhone: account.phone,
destChannelId: destChannel.id,
destChannelTitle: destChannel.title,
},
},
});
} catch {
// Best-effort notification
}
// Skip this account's ingestion cycle entirely — there's no point
// scanning + downloading if we can't upload.
throw new Error(
`Destination chat "${destChannel.title}" is not accessible to account ${account.phone}`
);
}
const totalChannels = channelMappings.length;
if (totalChannels === 0) {
accountLog.info("No active source channels linked to this account — nothing to ingest");
}
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx];
const channel = mapping.channel;
const channelLabel = totalChannels > 1
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
: channel.title;
try {
// ── Ensure TDLib knows about this chat ──
// getChats may not have loaded all channels (pagination, archive folder, etc.)
// so we explicitly load each channel before scanning.
try {
await client.invoke({
_: "getChat",
chat_id: Number(channel.telegramId),
});
} catch (chatErr) {
accountLog.warn(
{ err: chatErr, channelId: channel.id, title: channel.title, telegramId: channel.telegramId.toString() },
"TDLib does not know about this chat — it may not be accessible to this account. Skipping."
);
continue;
}
// ── Check if channel is a forum ──
const forum = await isChatForum(client, channel.telegramId);
if (forum !== channel.isForum) {
await setChannelForum(channel.id, forum);
accountLog.info(
{ channelId: channel.id, title: channel.title, isForum: forum },
"Updated channel forum status"
);
}
const pipelineCtx: PipelineContext = {
client,
runId: activeRunId,
accountId: account.id,
accountPhone: account.phone,
channelTitle: channel.title,
channel,
destChannelTelegramId: destChannel.telegramId,
destChannelId: destChannel.id,
throttled,
counters,
topicCreator: null,
sourceTopicId: null,
accountLog,
maxUploadSize,
consecutiveStalls: 0,
};
if (forum) {
// ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, {
currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const rawTopics = await getForumTopicList(client, channel.telegramId);
const topicProgressList = await getTopicProgress(mapping.id);
// Process more-specific topics BEFORE "General" so the first
// encounter of any file is in its most specific context. This makes
// newly-created Packages carry useful topic info (e.g., a campaign
// name) instead of just "General".
const topics = [...rawTopics].sort((a, b) => {
const aIsGeneral = a.name === "General";
const bIsGeneral = b.name === "General";
if (aIsGeneral === bIsGeneral) return 0;
return aIsGeneral ? 1 : -1;
});
accountLog.info(
{ channelId: channel.id, title: channel.title, topicCount: topics.length },
"Scanning forum channel by topic (specific topics first, General last)"
);
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
const topic = topics[tIdx];
try {
let progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
// ── SkippedPackage retry pass ──
// If we have failed messages in this topic with attemptCount
// below the cap, pull the watermark back below the lowest of
// them so the scan re-picks them up. Without this, a message
// that failed before my watermark cap fix (or had its watermark
// advanced past it via the all-failures fallback) is stuck in
// SkippedPackage forever.
try {
const retryable = await getRetryableSkippedMessageIds({
accountId: account.id,
sourceChannelId: channel.id,
topicId: topic.topicId,
cap: config.maxSkipAttempts,
});
if (retryable.length > 0) {
const lowest = retryable[0];
const currentWatermark = progress?.lastProcessedMessageId ?? null;
if (currentWatermark !== null && currentWatermark >= lowest) {
const resetTo = lowest - 1n;
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
resetTo
);
accountLog.info(
{
topic: topic.name,
retryableCount: retryable.length,
lowestSkippedMsgId: lowest.toString(),
oldWatermark: currentWatermark.toString(),
newWatermark: resetTo.toString(),
},
"Resetting topic watermark to retry skipped messages"
);
progress = { ...(progress ?? { id: "", accountChannelMapId: mapping.id, topicId: topic.topicId, topicName: topic.name }), lastProcessedMessageId: resetTo } as typeof progress;
}
}
} catch (retryErr) {
accountLog.warn(
{ err: retryErr, topic: topic.name },
"SkippedPackage retry pass failed (non-fatal)"
);
}
const topicLabel = `${channel.title} ${topic.name}`;
const topicProgress = topics.length > 1
? ` (topic ${tIdx + 1}/${topics.length})`
: "";
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
progress?.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${topicLabel}"${topicProgress}${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.info(
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic"
);
// Still advance topic watermark so we don't re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
scanResult.maxScannedMessageId
);
}
continue;
}
accountLog.info(
{ topic: topic.name, archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in topic"
);
// Process archives with topic creator
pipelineCtx.topicCreator = topic.name;
pipelineCtx.sourceTopicId = topic.topicId;
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const { maxProcessedId, minFailedId } = await processArchiveSets(
pipelineCtx,
scanResult,
run.id,
progress?.lastProcessedMessageId,
// Incremental watermark advance — saves progress per-set so a
// worker restart mid-scan doesn't lose all work.
async (messageId) => {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
messageId
);
}
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Final watermark write at the end of the scan (covers the
// no-archives-found and all-failures-with-fallback cases).
// The incremental updates above already handle the success path.
let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) {
topicWatermark = minFailedId - 1n;
}
if (topicWatermark !== null) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
topicWatermark
);
}
} catch (topicErr) {
accountLog.warn(
{ err: topicErr, channelId: channel.id, topic: topic.name, topicId: topic.topicId.toString() },
"Failed to process topic, skipping"
);
}
}
} else {
// ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
accountLog.info(
{ channelId: channel.id, title: channel.title },
"Processing source channel"
);
// ── SkippedPackage retry pass ──
// Pull the watermark back below the lowest still-retryable
// SkippedPackage so they get picked up by the scan. See the matching
// block in the forum branch for the rationale.
let effectiveChannelWatermark = mapping.lastProcessedMessageId;
try {
const retryable = await getRetryableSkippedMessageIds({
accountId: account.id,
sourceChannelId: channel.id,
topicId: null,
cap: config.maxSkipAttempts,
});
if (retryable.length > 0) {
const lowest = retryable[0];
if (effectiveChannelWatermark !== null && effectiveChannelWatermark >= lowest) {
const resetTo = lowest - 1n;
await updateLastProcessedMessage(mapping.id, resetTo);
accountLog.info(
{
channel: channel.title,
retryableCount: retryable.length,
lowestSkippedMsgId: lowest.toString(),
oldWatermark: effectiveChannelWatermark.toString(),
newWatermark: resetTo.toString(),
},
"Resetting channel watermark to retry skipped messages"
);
effectiveChannelWatermark = resetTo;
}
}
} catch (retryErr) {
accountLog.warn(
{ err: retryErr, channel: channel.title },
"SkippedPackage retry pass failed (non-fatal)"
);
}
const scanResult = await getChannelMessages(
client,
channel.telegramId,
effectiveChannelWatermark,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
// Still advance watermark to highest scanned message so we don't
// re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
}
continue;
}
accountLog.info(
{ archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in channel"
);
// For non-forum, creator comes from filename (set to null, resolved per-archive)
pipelineCtx.topicCreator = null;
pipelineCtx.sourceTopicId = null;
pipelineCtx.channelTitle = channel.title;
const { maxProcessedId, minFailedId } = await processArchiveSets(
pipelineCtx,
scanResult,
run.id,
effectiveChannelWatermark,
// Incremental watermark advance — saves progress per-set so a
// worker restart mid-scan doesn't lose all work.
async (messageId) => {
await updateLastProcessedMessage(mapping.id, messageId);
}
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Final watermark write at the end of the scan (covers the
// no-archives-found and all-failures-with-fallback cases).
// The incremental updates above already handle the success path.
let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
channelWatermark = minFailedId - 1n;
}
if (channelWatermark !== null) {
await updateLastProcessedMessage(mapping.id, channelWatermark);
}
}
} catch (channelErr) {
accountLog.warn(
{ err: channelErr, channelId: channel.id, title: channel.title },
"Failed to process channel, skipping to next"
);
// If the channel is no longer accessible (account got removed,
// channel deleted, etc.), surface a persistent notification so the
// admin can decide what to do. Dedupe by (channelId, accountId)
// within the last 24h so we don't flood the notifications list every
// cycle.
const errMsg = channelErr instanceof Error ? channelErr.message : String(channelErr);
const isAccessError =
errMsg.includes("Can't access the chat") ||
errMsg.includes("CHAT_FORBIDDEN") ||
errMsg.includes("CHANNEL_PRIVATE") ||
errMsg.includes("Chat not found");
if (isAccessError) {
try {
const recent = await db.systemNotification.findFirst({
where: {
type: "CHANNEL_ACCESS_LOST",
context: { path: ["channelId"], equals: channel.id },
createdAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) },
},
select: { id: true },
});
if (!recent) {
await db.systemNotification.create({
data: {
type: "CHANNEL_ACCESS_LOST",
severity: "WARNING",
title: `Lost access to "${channel.title}" for ${account.phone}`,
message: `Account ${account.phone} can no longer access source channel "${channel.title}". The worker is skipping this channel every cycle. Re-join the channel or unlink it from this account in admin settings.`,
context: {
channelId: channel.id,
channelTitle: channel.title,
telegramId: channel.telegramId.toString(),
accountId: account.id,
accountPhone: account.phone,
errorMessage: errMsg.slice(0, 200),
},
},
});
}
} catch {
// Best-effort notification
}
}
}
}
// ── Done ──
await throttled.flush();
await completeIngestionRun(activeRunId, counters);
accountLog.info({ counters }, "Ingestion run completed");
} finally {
await throttled.flush();
await closeTdlibClient(client);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
accountLog.error({ err }, "Ingestion run failed");
if (runId) {
await failIngestionRun(runId, message).catch((e) =>
accountLog.error({ e }, "Failed to mark run as failed")
);
}
} finally {
await releaseLock(account.id);
}
}
/**
* Infer the SkipReason from an error message so the UI shows the correct badge.
*/
function inferSkipReason(errMsg: string): "DOWNLOAD_FAILED" | "UPLOAD_FAILED" | "EXTRACT_FAILED" {
const lower = errMsg.toLowerCase();
if (lower.includes("upload") || lower.includes("too many requests") || lower.includes("retry after") || lower.includes("send")) {
return "UPLOAD_FAILED";
}
if (lower.includes("extract") || lower.includes("metadata") || lower.includes("central directory") || lower.includes("archive")) {
return "EXTRACT_FAILED";
}
return "DOWNLOAD_FAILED";
}
/**
* Process a scan result through the archive pipeline:
* group → download → hash → dedup → metadata → split → upload → preview → index.
*
* Returns the highest message ID that was successfully processed (ingested or
* confirmed duplicate). The caller should only advance the progress boundary
* to this value — never to the max of all scanned messages.
*/
async function processArchiveSets(
ctx: PipelineContext,
scanResult: ChannelScanResult,
ingestionRunId: string,
lastProcessedMessageId?: bigint | null,
/** Called after each successful set with a safe watermark value (capped
* below any failed message ID in this scan). Used by the caller to
* advance the channel/topic watermark incrementally — otherwise a long
* scan that gets killed by worker restart loses all progress. */
onWatermarkAdvance?: (messageId: bigint) => Promise<void>
): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> {
const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx;
// Group into archive sets
let archiveSets = groupArchiveSets(scanResult.archives);
// Filter out sets where ALL parts are at or below the boundary (already processed)
if (lastProcessedMessageId) {
const totalBefore = archiveSets.length;
archiveSets = archiveSets.filter((set) =>
set.parts.some((p) => p.id > lastProcessedMessageId)
);
const filtered = totalBefore - archiveSets.length;
if (filtered > 0) {
accountLog.info(
{ filtered, remaining: archiveSets.length },
"Filtered out already-processed archive sets"
);
}
}
// Filter out sets whose source message has hit the auto-retry cap — these are
// treated as "give up for now" so the watermark can advance past them.
// Removing them from archiveSets means they are NOT tracked in minFailedId,
// so the caller's watermark cap won't pin progress below them. The
// SkippedPackage record stays so the user can manually retry via the UI.
const cappedIds = await getCappedSkippedMessageIds(channel.id, config.maxSkipAttempts);
if (cappedIds.size > 0) {
const beforeCap = archiveSets.length;
archiveSets = archiveSets.filter(
(set) => !cappedIds.has(set.parts[0].id)
);
const cappedSkipped = beforeCap - archiveSets.length;
if (cappedSkipped > 0) {
accountLog.warn(
{ cappedSkipped, cap: config.maxSkipAttempts, remaining: archiveSets.length },
"Skipping archive sets that hit the auto-retry attempt cap — watermark will advance past them"
);
}
}
counters.zipsFound += archiveSets.length;
// Match preview photos to archive sets
const previewMatches = matchPreviewToArchive(
scanResult.photos,
archiveSets.map((s) => ({
baseName: s.baseName,
firstMessageId: s.parts[0].id,
firstMessageDate: s.parts[0].date,
}))
);
if (previewMatches.size > 0) {
accountLog.info(
{ matched: previewMatches.size, total: archiveSets.length },
"Matched preview photos to archives"
);
}
await updateRunActivity(runId, {
currentActivity: `Found ${archiveSets.length} archive(s) in "${channelTitle}"`,
currentStep: "scanning",
currentChannel: channelTitle,
totalFiles: archiveSets.length,
zipsFound: counters.zipsFound,
messagesScanned: counters.messagesScanned,
});
// Track the highest message ID that was successfully processed and the
// lowest message ID of any failed set. The caller uses minFailedId to cap
// the watermark so failures get retried on the next cycle.
let maxProcessedId: bigint | null = null;
let minFailedId: bigint | null = null;
const indexedPackageRefs: IndexedPackageRef[] = [];
for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) {
try {
const packageId = await processOneArchiveSet(
ctx,
archiveSets[setIdx],
setIdx,
archiveSets.length,
previewMatches,
ingestionRunId
);
if (packageId) {
const firstPart = archiveSets[setIdx].parts[0];
indexedPackageRefs.push({
packageId,
sourceMessageId: firstPart.id,
mediaAlbumId: firstPart.mediaAlbumId,
});
}
// Set completed (ingested or confirmed duplicate) — advance watermark
const setMaxId = archiveSets[setIdx].parts.reduce(
(max, p) => (p.id > max ? p.id : max),
0n
);
if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId;
}
// Persist watermark immediately so a worker restart or cycle timeout
// doesn't throw away progress. We only advance below minFailedId so a
// later-encountered failure (out of order, e.g., multipart spanning)
// doesn't get buried by an earlier success in this scan. In practice
// sets are processed oldest-first, so setMaxId rarely exceeds
// minFailedId, but the cap keeps the invariant if it ever does.
if (onWatermarkAdvance) {
const safeWatermark =
minFailedId !== null && setMaxId >= minFailedId
? minFailedId - 1n
: setMaxId;
if (safeWatermark > 0n) {
await onWatermarkAdvance(safeWatermark).catch((err) => {
accountLog.warn(
{ err, setMaxId: setMaxId.toString() },
"Failed to persist incremental watermark (will retry at end of scan)"
);
});
}
}
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) {
// If a set fails, do NOT advance the watermark past it
accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
// Record the lowest part ID of this set as a failure boundary so the
// caller can cap the watermark below it and the next scan re-picks it up.
const setMinId = archiveSets[setIdx].parts.reduce(
(min, p) => (p.id < min ? p.id : min),
archiveSets[setIdx].parts[0].id
);
if (minFailedId === null || setMinId < minFailedId) {
minFailedId = setMinId;
}
// ── TDLib client recreation on repeated upload stalls ──
// When the TDLib event stream degrades, uploads complete (bytes sent)
// but confirmations never arrive. Retrying with the same broken client
// is futile. Recreate the client to get a fresh connection.
if (setErr instanceof UploadStallError) {
ctx.consecutiveStalls++;
accountLog.warn(
{ consecutiveStalls: ctx.consecutiveStalls },
"Upload stall detected — TDLib event stream may be degraded"
);
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
if (ctx.consecutiveStalls >= 1) {
accountLog.info("Recreating TDLib client after consecutive upload stalls");
try {
await closeTdlibClient(ctx.client);
} catch (closeErr) {
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
}
try {
const { client: newClient } = await createTdlibClient({
id: ctx.accountId,
phone: ctx.accountPhone,
});
ctx.client = newClient;
// Reload chats so the new client can access channels
try {
for (let page = 0; page < 500; page++) {
await newClient.invoke({
_: "loadChats",
chat_list: { _: "chatListMain" },
limit: 100,
});
}
} catch {
// 404 = all loaded (expected)
}
ctx.consecutiveStalls = 0;
accountLog.info("TDLib client recreated successfully — continuing ingestion");
} catch (recreateErr) {
accountLog.error(
{ err: recreateErr },
"Failed to recreate TDLib client — aborting remaining uploads"
);
break;
}
}
}
// Record the failure for visibility in the UI
try {
const archiveSet = archiveSets[setIdx];
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const rawErrMsg = setErr instanceof Error ? setErr.message : String(setErr);
// Prefix with [<phone>] so the SkippedPackage / notification UI shows
// which account hit the error — important when two accounts share a
// source channel and only one is failing.
const errMsg = `[${ctx.accountPhone}] ${rawErrMsg}`;
await upsertSkippedPackage({
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
reason: inferSkipReason(rawErrMsg),
errorMessage: errMsg,
sourceChannelId: ctx.channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
// Also create a persistent notification
await db.systemNotification.create({
data: {
type: inferSkipReason(rawErrMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED",
severity: "WARNING",
title: `Failed to process ${archiveSet.parts[0].fileName}`,
message: errMsg,
context: {
fileName: archiveSet.parts[0].fileName,
sourceChannelId: ctx.channel.id,
sourceMessageId: Number(archiveSet.parts[0].id),
channelTitle: ctx.channelTitle,
accountPhone: ctx.accountPhone,
reason: inferSkipReason(rawErrMsg),
},
},
});
} catch {
// Best-effort — don't fail the run if skip recording fails
}
}
}
// Post-processing: group packages by Telegram album ID
if (indexedPackageRefs.length > 0) {
await processAlbumGroups(
ctx.client,
channel.id,
indexedPackageRefs,
scanResult.photos
);
// Auto-grouping passes (gated by per-channel flag)
const channelRecord = await db.telegramChannel.findUnique({
where: { id: channel.id },
select: { autoGroupEnabled: true },
});
if (channelRecord?.autoGroupEnabled !== false) {
// Learned rule-based grouping (from manual overrides)
await processRuleBasedGroups(channel.id, indexedPackageRefs);
// Time-window grouping for remaining ungrouped packages
await processTimeWindowGroups(channel.id, indexedPackageRefs);
// Pattern-based grouping (date patterns, project slugs)
await processPatternGroups(channel.id, indexedPackageRefs);
// Creator-based grouping (3+ files from same creator)
await processCreatorGroups(channel.id, indexedPackageRefs);
// ZIP path prefix grouping (shared root folder inside archives)
await processZipPathGroups(channel.id, indexedPackageRefs);
// Reply chain grouping (messages replying to same root)
await processReplyChainGroups(channel.id, indexedPackageRefs);
// Caption fuzzy match grouping
await processCaptionGroups(channel.id, indexedPackageRefs);
}
// Check for potential grouping conflicts
await detectGroupingConflicts(channel.id, indexedPackageRefs);
}
return { maxProcessedId, minFailedId };
}
/**
* Process a single archive set through the full pipeline.
*/
async function processOneArchiveSet(
ctx: PipelineContext,
archiveSet: ArchiveSet,
setIdx: number,
totalSets: number,
previewMatches: Map<string, { id: bigint; fileId: string }>,
ingestionRunId: string
): Promise<string | null> {
const {
client, runId, channelTitle, channel,
destChannelTelegramId, destChannelId,
throttled, counters, topicCreator, sourceTopicId, accountLog,
} = ctx;
const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ──
// This avoids re-downloading large archives that were processed in a prior run.
const alreadyIngested = await packageExistsBySourceMessage(
channel.id,
archiveSet.parts[0].id
);
if (alreadyIngested) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, sourceMessageId: Number(archiveSet.parts[0].id) },
"Archive already ingested (by source message), skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (already ingested)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// Compute the total size across all parts (used by the repost check below
// AND by the size guard further down).
const totalArchiveSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
// ── Pre-download repost detection ──
// The source channel admin frequently reposts the same file at new message
// IDs. packageExistsBySourceMessage misses these (different msgId), so we
// historically downloaded the file just to discover via hash that it's a
// duplicate — wasting hours of bandwidth per run.
//
// Match by (sourceChannelId, fileName, totalSize). The totalSize comparison
// makes this very strong — name-and-size collision between unrelated files
// is rare in practice. If it ever happens, the new file is treated as a
// duplicate; the user can remove the existing Package via the UI to force
// a re-ingestion.
const reposted = await findRepostedPackage(
channel.id,
archiveName,
totalArchiveSize
);
if (reposted) {
counters.zipsDuplicate++;
// Backfill topic context onto the existing Package when we encounter the
// same file in a more specific topic. If the existing Package was created
// from "General" or a non-forum scan and we now see the file in a named
// topic (e.g., "Artisan Guild January 2022"), update the Package so the
// user gets richer metadata. We only update when the current scan is in
// a specific topic AND the existing topic differs.
const currentTopicName = ctx.topicCreator; // == topic.name for forum scans
const currentTopicId = ctx.sourceTopicId;
const isCurrentSpecific = currentTopicName !== null && currentTopicName !== "General";
const existingTopicDiffers = reposted.sourceTopicId !== currentTopicId;
if (isCurrentSpecific && currentTopicId !== null && existingTopicDiffers) {
try {
await updatePackageTopicContext(reposted.id, currentTopicId, currentTopicName);
accountLog.info(
{
fileName: archiveName,
packageId: reposted.id,
existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null,
newTopicId: Number(currentTopicId),
newTopicName: currentTopicName,
},
"Updated existing Package with more specific topic context"
);
} catch (updErr) {
accountLog.warn({ err: updErr, packageId: reposted.id }, "Failed to update Package topic context (non-fatal)");
}
}
accountLog.info(
{
fileName: archiveName,
sourceMessageId: Number(archiveSet.parts[0].id),
existingPackageId: reposted.id,
existingDestMessageId: reposted.destMessageId ? Number(reposted.destMessageId) : null,
existingTopicId: reposted.sourceTopicId ? Number(reposted.sourceTopicId) : null,
currentTopicId: currentTopicId ? Number(currentTopicId) : null,
currentTopicName,
totalSize: Number(totalArchiveSize),
},
"Skipping repost — same fileName + size already uploaded in this channel"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (repost of already-uploaded file)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
const maxSizeBytes = BigInt(config.maxZipSizeMB) * 1024n * 1024n;
if (totalArchiveSize > maxSizeBytes) {
accountLog.warn(
{
fileName: archiveName,
totalSizeMB: Number(totalArchiveSize / (1024n * 1024n)),
maxSizeMB: config.maxZipSizeMB,
},
"Archive exceeds max size limit, skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (exceeds ${config.maxZipSizeMB}MB limit)`,
currentStep: "skipping",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
await upsertSkippedPackage({
fileName: archiveName,
fileSize: totalArchiveSize,
reason: "SIZE_LIMIT",
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
return null;
}
const tempPaths: string[] = [];
let splitPaths: string[] = [];
// Per-set subdirectory so uploaded files keep their original filenames
const setDir = path.join(config.tempDir, `${ingestionRunId}_${archiveSet.parts[0].id}`);
await mkdir(setDir, { recursive: true });
try {
// ── Downloading ──
for (let partIdx = 0; partIdx < archiveSet.parts.length; partIdx++) {
const part = archiveSet.parts[partIdx];
const tempPath = path.join(setDir, part.fileName);
const partLabel = archiveSet.parts.length > 1
? ` (part ${partIdx + 1}/${archiveSet.parts.length})`
: "";
await updateRunActivity(runId, {
currentActivity: `Downloading ${part.fileName}${partLabel}`,
currentStep: "downloading",
currentChannel: channelTitle,
currentFile: part.fileName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: 0n,
totalBytes: part.fileSize,
downloadPercent: 0,
messagesScanned: counters.messagesScanned,
});
accountLog.info(
{
fileName: part.fileName,
fileSize: Number(part.fileSize),
part: partIdx + 1,
totalParts: archiveSet.parts.length,
},
"Downloading archive part"
);
await downloadFile(
client,
part.fileId,
tempPath,
part.fileSize,
part.fileName,
(progress: DownloadProgress) => {
throttled.update({
currentActivity: `Downloading ${part.fileName}${partLabel}${progress.percent}%`,
currentStep: "downloading",
currentChannel: channelTitle,
currentFile: part.fileName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: BigInt(progress.downloadedBytes),
totalBytes: BigInt(progress.totalBytes),
downloadPercent: progress.percent,
});
}
);
await throttled.flush();
tempPaths.push(tempPath);
}
// ── Hashing ──
await updateRunActivity(runId, {
currentActivity: `Computing hash for ${archiveName}`,
currentStep: "hashing",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
});
const contentHash = await hashParts(tempPaths);
// ── Deduplicating ──
await updateRunActivity(runId, {
currentActivity: `Checking if ${archiveName} is a duplicate`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
const exists = await packageExistsByHash(contentHash);
if (exists) {
counters.zipsDuplicate++;
accountLog.debug({ contentHash }, "Duplicate archive, skipping");
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (duplicate)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
const hashLockAcquired = await tryAcquireHashLock(contentHash);
if (!hashLockAcquired) {
counters.zipsDuplicate++;
accountLog.info(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Hash lock held by another worker — skipping concurrent duplicate"
);
return null;
}
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
let creator: string | null = null;
const tags: string[] = [];
let stub: { id: string } | null = null;
try {
// Re-check after acquiring lock: another worker may have finished between
// the first check above and this point.
const existsAfterLock = await packageExistsByHash(contentHash);
if (existsAfterLock) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Duplicate detected after acquiring hash lock — skipping"
);
return null;
}
// ── Reading metadata ──
await updateRunActivity(runId, {
currentActivity: `Reading file list from ${archiveName}`,
currentStep: "reading_metadata",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
try {
if (archiveSet.type === "ZIP") {
entries = await readZipCentralDirectory(tempPaths);
} else if (archiveSet.type === "RAR") {
entries = await readRarContents(tempPaths[0]);
} else if (archiveSet.type === "7Z") {
entries = await read7zContents(tempPaths[0]);
} else if (archiveSet.type === "DOCUMENT") {
// Standalone documents (PDF, STL, etc.) — no extraction,
// record the file itself as the single entry
const part = archiveSet.parts[0];
const ext = part.fileName.match(/\.([^.]+)$/)?.[1] ?? null;
entries = [{
path: part.fileName,
fileName: part.fileName,
extension: ext,
compressedSize: part.fileSize,
uncompressedSize: part.fileSize,
crc32: null,
}];
}
} catch (err) {
accountLog.warn({ err, baseName: archiveSet.baseName }, "Failed to read archive metadata, ingesting without file list");
}
// ── Splitting / Repacking (if needed) ──
let uploadPaths = [...tempPaths];
const totalSize = archiveSet.parts.reduce(
(sum, p) => sum + p.fileSize,
0n
);
const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (hasOversizedPart) {
// Full repack: concatenate all parts → single file → re-split into uniform 2GB chunks
await updateRunActivity(runId, {
currentActivity: `Repacking ${archiveName} (parts >2GB, concatenating + re-splitting)`,
currentStep: "splitting",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
await concatenateFiles(tempPaths, concatPath);
splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
uploadPaths = splitPaths;
// Clean up the concat intermediate file
await unlink(concatPath).catch(() => {});
} else if (!archiveSet.isMultipart && totalSize > MAX_UPLOAD_SIZE) {
// Single file >2GB: split directly
await updateRunActivity(runId, {
currentActivity: `Splitting ${archiveName} for upload (>2GB)`,
currentStep: "splitting",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
uploadPaths = splitPaths;
}
// ── Hash verification after split ──
// If we split/repacked, verify the split parts hash matches the original
if (splitPaths.length > 0) {
const splitHash = await hashParts(splitPaths);
if (splitHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, splitHash, parts: splitPaths.length },
"Hash mismatch after split — file may be corrupted"
);
// Record notification for visibility
try {
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Hash mismatch after splitting ${archiveName}`,
message: `Expected ${contentHash.slice(0, 16)}… but got ${splitHash.slice(0, 16)}… after splitting into ${splitPaths.length} parts`,
context: {
fileName: archiveName,
originalHash: contentHash,
splitHash,
partCount: splitPaths.length,
sourceChannelId: channel.id,
},
},
});
} catch {
// Best-effort notification
}
throw new Error(`Hash mismatch after split for ${archiveName}: expected ${contentHash}, got ${splitHash}`);
}
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16), parts: splitPaths.length },
"Split hash verified — matches original"
);
}
// ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing)
const existingUpload = await getUploadedPackageByHash(contentHash);
let destResult: { messageId: bigint; messageIds: bigint[] };
if (existingUpload && existingUpload.destMessageId) {
accountLog.info(
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
"Reusing existing upload (file already on destination channel)"
);
destResult = {
messageId: existingUpload.destMessageId,
messageIds: existingUpload.destMessageIds?.length
? (existingUpload.destMessageIds as bigint[])
: [existingUpload.destMessageId],
};
} else {
const uploadLabel = uploadPaths.length > 1
? ` (${uploadPaths.length} parts)`
: "";
await updateRunActivity(runId, {
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
currentStep: "uploading",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
destResult = await uploadToChannel(
client,
destChannelTelegramId,
uploadPaths
);
}
// ── Post-upload integrity check ──
// Verify the files on disk still match before we index
if (uploadPaths.length > 0 && !existingUpload) {
try {
const postUploadHash = await hashParts(uploadPaths);
if (splitPaths.length > 0) {
// Split files — hash should match the split hash (already verified above)
// No additional check needed since we verified split hash = original hash
} else if (postUploadHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
"Hash changed between hashing and upload — possible disk corruption"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Post-upload hash mismatch: ${archiveName}`,
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}`,
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
},
});
}
} catch {
// Best-effort — don't fail the ingestion
}
}
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);
creator =
topicCreator ??
extractCreatorFromFileName(archiveName) ??
extractCreatorFromChannelTitle(channelTitle) ??
null;
if (channel.category) {
tags.push(channel.category);
}
stub = await createPackageStub({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
});
counters.zipsIngested++;
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
} finally {
await releaseHashLock(contentHash);
}
if (!stub) return null;
// ── Preview thumbnail ──
// (moved here from before stub creation — lock is released, preview doesn't need it)
let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null;
const matchedPhoto = previewMatches.get(archiveSet.baseName);
if (matchedPhoto) {
await updateRunActivity(runId, {
currentActivity: `Downloading preview image for ${archiveName}`,
currentStep: "preview",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
if (previewData) {
previewMsgId = matchedPhoto.id;
}
}
// ── Fallback: extract preview image from inside the archive ──
if (!previewData && entries.length > 0 && archiveSet.type !== "DOCUMENT") {
const previewEntry = pickPreviewFile(entries);
if (previewEntry) {
accountLog.debug(
{ fileName: archiveName, previewFile: previewEntry.path },
"Attempting to extract preview image from archive"
);
const archiveTypeForExtract = archiveSet.type === "7Z" ? "SEVEN_Z" as const : archiveSet.type as "ZIP" | "RAR";
previewData = await extractPreviewImage(
tempPaths[0],
archiveTypeForExtract,
previewEntry.path
);
}
}
// ── Phase 2: Update stub with file entries and preview ──
await updateRunActivity(runId, {
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
currentStep: "indexing",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
await updatePackageWithMetadata(stub.id, {
files: entries,
previewData,
previewMsgId,
});
await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
currentStep: "complete",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsIngested: counters.zipsIngested,
});
accountLog.info(
{ fileName: archiveName, contentHash, fileCount: entries.length, creator },
"Archive ingested"
);
return stub.id;
} finally {
// ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]);
await rm(setDir, { recursive: true, force: true }).catch(() => {});
}
}
async function deleteFiles(paths: string[]): Promise<void> {
for (const p of paths) {
try {
await unlink(p);
} catch {
// File may already be deleted or never created
}
}
}
/**
* Clean up any leftover temp files/directories from previous runs.
*/
export async function cleanupTempDir(): Promise<void> {
try {
const entries = await readdir(config.tempDir);
for (const entry of entries) {
await rm(path.join(config.tempDir, entry), { recursive: true, force: true }).catch(() => {});
}
if (entries.length > 0) {
log.info({ count: entries.length }, "Cleaned up stale temp files");
}
} catch {
// Directory might not exist yet
}
}