Files
dragonsstash/worker/src/worker.ts
xCyanGrizzly d99a506b10 fix: cap watermark below failed sets so failures retry next cycle
Previously the channel/topic watermark could advance past failed
archive sets in two ways:

1. A later successful set raised maxProcessedId past a failed earlier
   set within the same scan.
2. scanResult.maxScannedMessageId was used as fallback even when
   archives in the scan had failed (added in 77c26ad to prevent
   re-scanning empty channels).

Both paths buried failed archives below the watermark on the next
cycle — they sat permanently in SkippedPackage with no auto-recovery.

Now processArchiveSets returns the lowest failed source message ID
alongside the highest processed one. The caller caps the watermark at
(minFailedId - 1n) so the next scan re-includes the failed messages
and processOneArchiveSet retries them. Successful sets above the
failure boundary are not re-uploaded — packageExistsBySourceMessage
early-skips them on the second pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:46:26 +02:00

1497 lines
52 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import path from "path";
import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
import {
getSourceChannelMappings,
getGlobalDestinationChannel,
packageExistsByHash,
packageExistsBySourceMessage,
createPackageStub,
updatePackageWithMetadata,
createIngestionRun,
completeIngestionRun,
failIngestionRun,
updateLastProcessedMessage,
updateRunActivity,
setChannelForum,
getTopicProgress,
upsertTopicProgress,
upsertChannel,
ensureAccountChannelLink,
getGlobalSetting,
getChannelFetchRequest,
updateFetchRequestStatus,
getAccountLinkedChannelIds,
getExistingChannelsByTelegramId,
getAccountById,
deleteOrphanedPackageByHash,
getUploadedPackageByHash,
upsertSkippedPackage,
deleteSkippedPackage,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
import { matchPreviewToArchive } from "./preview/match.js";
import { pickPreviewFile, extractPreviewImage } from "./preview/extract.js";
import { groupArchiveSets } from "./archive/multipart.js";
import type { ArchiveSet } from "./archive/multipart.js";
import { extractCreatorFromFileName, extractCreatorFromChannelTitle } from "./archive/creator.js";
import { hashParts } from "./archive/hash.js";
import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
import type { Client } from "tdl";
const log = childLogger("worker");
/**
* Authenticate a PENDING account by creating a TDLib client.
* TDLib will send an SMS code to the phone number, and the client.login()
* callbacks set the authState to AWAITING_CODE. Once the admin enters the
* code via the UI, pollForAuthCode picks it up and completes the login.
*
* After successful auth:
* 1. Fetches channels from Telegram and writes as a ChannelFetchRequest
* (so the admin can select sources in the UI)
* 2. Auto-joins the destination group if an invite link is configured
*/
export async function authenticateAccount(
account: TelegramAccount
): Promise<void> {
const aLog = childLogger("auth", { accountId: account.id, phone: account.phone });
aLog.info("Starting authentication flow");
let client: Client | undefined;
try {
client = (await createTdlibClient({
id: account.id,
phone: account.phone,
})).client;
aLog.info("Authentication successful");
// Auto-fetch channels and create a fetch request result
aLog.info("Fetching channels from Telegram...");
await createAutoFetchRequest(client, account.id, aLog);
// Auto-join the destination group if an invite link exists
const inviteLink = await getGlobalSetting("destination_invite_link");
if (inviteLink) {
aLog.info("Attempting to join destination group via invite link...");
try {
await joinChatByInviteLink(client, inviteLink);
// Link this account as WRITER to the destination channel
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
aLog.info({ destChannel: destChannel.title }, "Joined destination group and linked as WRITER");
}
} catch (err) {
// May already be a member — that's fine
aLog.warn({ err }, "Could not join destination group (may already be a member)");
// Still try to link as WRITER
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
}
}
}
} catch (err) {
aLog.error({ err }, "Authentication failed");
} finally {
if (client) {
await closeTdlibClient(client);
}
}
}
/**
* Process a ChannelFetchRequest: fetch channels from Telegram,
* enrich with DB state, and write the result JSON.
* Called by the fetch listener (pg_notify) and by authenticateAccount.
*/
export async function processFetchRequest(requestId: string): Promise<void> {
const aLog = childLogger("fetch-request", { requestId });
const request = await getChannelFetchRequest(requestId);
if (!request || request.status !== "PENDING") {
aLog.warn("Fetch request not found or not pending, skipping");
return;
}
await updateFetchRequestStatus(requestId, "IN_PROGRESS");
aLog.info({ accountId: request.accountId }, "Processing fetch request");
const { client } = await createTdlibClient({
id: request.account.id,
phone: request.account.phone,
});
try {
const chats = await getAccountChats(client);
// Enrich with DB state
const linkedTelegramIds = await getAccountLinkedChannelIds(request.accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Also upsert channel metadata while we have the data
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical — metadata sync can fail silently
}
}
await updateFetchRequestStatus(requestId, "COMPLETED", {
resultJson: JSON.stringify(enrichedChats),
});
aLog.info(
{ total: chats.length, linked: [...linkedTelegramIds].length },
"Fetch request completed"
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
aLog.error({ err }, "Fetch request failed");
await updateFetchRequestStatus(requestId, "FAILED", { error: message });
} finally {
await closeTdlibClient(client);
}
}
/**
* Internal helper called after authentication to auto-create a fetch request
* with the channel list (so the UI can show the picker immediately).
*/
async function createAutoFetchRequest(
client: Client,
accountId: string,
aLog: ReturnType<typeof childLogger>
): Promise<void> {
const chats = await getAccountChats(client);
const linkedTelegramIds = await getAccountLinkedChannelIds(accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Upsert channel metadata
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical
}
}
// Create the fetch request record with the result already filled in
const { db } = await import("./db/client.js");
await db.channelFetchRequest.create({
data: {
accountId,
status: "COMPLETED",
resultJson: JSON.stringify(enrichedChats),
},
});
aLog.info(
{ total: chats.length },
"Auto-fetch request created with channel list"
);
}
/**
* Throttle DB writes for download progress to avoid hammering the DB.
* Only writes if at least 2 seconds have passed since the last write.
*/
function createThrottledActivityUpdater(runId: string, minIntervalMs = 2000) {
let lastWriteTime = 0;
let pendingUpdate: ActivityUpdate | null = null;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const flush = async () => {
if (pendingUpdate) {
const update = pendingUpdate;
pendingUpdate = null;
lastWriteTime = Date.now();
await updateRunActivity(runId, update).catch(() => {});
}
};
return {
update: (activity: ActivityUpdate) => {
pendingUpdate = activity;
const elapsed = Date.now() - lastWriteTime;
if (elapsed >= minIntervalMs) {
if (flushTimer) clearTimeout(flushTimer);
flush();
} else if (!flushTimer) {
flushTimer = setTimeout(() => {
flushTimer = null;
flush();
}, minIntervalMs - elapsed);
}
},
flush,
};
}
/** Shared context passed to the archive processing pipeline. */
interface PipelineContext {
client: Client;
runId: string;
accountId: string;
accountPhone: string;
channelTitle: string;
channel: TelegramChannel;
destChannelTelegramId: bigint;
destChannelId: string;
throttled: ReturnType<typeof createThrottledActivityUpdater>;
counters: {
messagesScanned: number;
zipsFound: number;
zipsDuplicate: number;
zipsIngested: number;
};
/** Creator from forum topic name (null for non-forum). */
topicCreator: string | null;
/** Forum topic ID (null for non-forum). */
sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
/** How many consecutive upload stalls have occurred (resets on success). */
consecutiveStalls: number;
}
/**
* Run a full ingestion cycle for a single Telegram account.
* Every step writes live activity to the DB so the admin UI can display it.
*/
export async function runWorkerForAccount(
account: TelegramAccount
): Promise<void> {
const accountLog = childLogger("worker", { accountId: account.id, phone: account.phone });
// 1. Acquire advisory lock
const acquired = await tryAcquireLock(account.id);
if (!acquired) {
accountLog.info("Account already locked, skipping");
return;
}
let runId: string | undefined;
try {
// 2. Create ingestion run
const run = await createIngestionRun(account.id);
runId = run.id;
const activeRunId = runId;
accountLog.info({ runId }, "Ingestion run started");
const throttled = createThrottledActivityUpdater(activeRunId);
// 3. Initialize TDLib client
await updateRunActivity(activeRunId, {
currentActivity: "Connecting to Telegram",
currentStep: "connecting",
});
// Use let so the client can be replaced on TDLib recreation after stalls
let { client, isPremium } = await createTdlibClient({
id: account.id,
phone: account.phone,
});
const maxUploadSize = isPremium
? 3950n * 1024n * 1024n
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
// Load all chats into TDLib's local cache using loadChats (the recommended API).
// Without this, getChat/searchChatMessages fail with "Chat not found".
// loadChats returns a 404 when all chats have been loaded — that's the stop signal.
// Load from main, archive, AND chat folders to cover all chat types.
{
// Discover chat folders first
const folderLists: { _: "chatListFolder"; chat_folder_id: number }[] = [];
try {
const folders = await client.invoke({ _: "getChatFolders" }) as {
chat_folders?: { id: number }[];
};
if (folders.chat_folders) {
for (const f of folders.chat_folders) {
folderLists.push({ _: "chatListFolder", chat_folder_id: f.id });
}
}
} catch {
// getChatFolders may not be available in older TDLib versions
}
const chatLists: Record<string, unknown>[] = [
{ _: "chatListMain" },
{ _: "chatListArchive" },
...folderLists,
];
for (const chatList of chatLists) {
try {
for (let page = 0; page < 500; page++) {
await client.invoke({
_: "loadChats",
chat_list: chatList,
limit: 100,
});
// loadChats returns ok — keep going until 404
}
} catch {
// 404 = all chats loaded (expected), or unsupported list type
}
}
}
const counters = {
messagesScanned: 0,
zipsFound: 0,
zipsDuplicate: 0,
zipsIngested: 0,
};
try {
// 4. Get assigned source channels and global destination
const channelMappings = await getSourceChannelMappings(account.id);
const destChannel = await getGlobalDestinationChannel();
if (!destChannel) {
throw new Error("No global destination channel configured — set one in the admin UI");
}
const totalChannels = channelMappings.length;
if (totalChannels === 0) {
accountLog.info("No active source channels linked to this account — nothing to ingest");
}
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx];
const channel = mapping.channel;
const channelLabel = totalChannels > 1
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
: channel.title;
try {
// ── Ensure TDLib knows about this chat ──
// getChats may not have loaded all channels (pagination, archive folder, etc.)
// so we explicitly load each channel before scanning.
try {
await client.invoke({
_: "getChat",
chat_id: Number(channel.telegramId),
});
} catch (chatErr) {
accountLog.warn(
{ err: chatErr, channelId: channel.id, title: channel.title, telegramId: channel.telegramId.toString() },
"TDLib does not know about this chat — it may not be accessible to this account. Skipping."
);
continue;
}
// ── Check if channel is a forum ──
const forum = await isChatForum(client, channel.telegramId);
if (forum !== channel.isForum) {
await setChannelForum(channel.id, forum);
accountLog.info(
{ channelId: channel.id, title: channel.title, isForum: forum },
"Updated channel forum status"
);
}
const pipelineCtx: PipelineContext = {
client,
runId: activeRunId,
accountId: account.id,
accountPhone: account.phone,
channelTitle: channel.title,
channel,
destChannelTelegramId: destChannel.telegramId,
destChannelId: destChannel.id,
throttled,
counters,
topicCreator: null,
sourceTopicId: null,
accountLog,
maxUploadSize,
consecutiveStalls: 0,
};
if (forum) {
// ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, {
currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const topics = await getForumTopicList(client, channel.telegramId);
const topicProgressList = await getTopicProgress(mapping.id);
accountLog.info(
{ channelId: channel.id, title: channel.title, topicCount: topics.length },
"Scanning forum channel by topic"
);
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
const topic = topics[tIdx];
try {
const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
const topicLabel = `${channel.title} ${topic.name}`;
const topicProgress = topics.length > 1
? ` (topic ${tIdx + 1}/${topics.length})`
: "";
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
progress?.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${topicLabel}"${topicProgress}${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.info(
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic"
);
// Still advance topic watermark so we don't re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
scanResult.maxScannedMessageId
);
}
continue;
}
accountLog.info(
{ topic: topic.name, archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in topic"
);
// Process archives with topic creator
pipelineCtx.topicCreator = topic.name;
pipelineCtx.sourceTopicId = topic.topicId;
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const { maxProcessedId, minFailedId } = await processArchiveSets(
pipelineCtx,
scanResult,
run.id,
progress?.lastProcessedMessageId
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Advance progress: prefer archive watermark, fall back to scan watermark.
// Cap at one less than the lowest failed message ID so failed sets stay
// above the next scan boundary and get retried on the next cycle.
let topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && topicWatermark !== null && topicWatermark >= minFailedId) {
topicWatermark = minFailedId - 1n;
}
if (topicWatermark !== null) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
topicWatermark
);
}
} catch (topicErr) {
accountLog.warn(
{ err: topicErr, channelId: channel.id, topic: topic.name, topicId: topic.topicId.toString() },
"Failed to process topic, skipping"
);
}
}
} else {
// ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning",
currentChannel: channelLabel,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
messagesScanned: counters.messagesScanned,
});
accountLog.info(
{ channelId: channel.id, title: channel.title },
"Processing source channel"
);
const scanResult = await getChannelMessages(
client,
channel.telegramId,
mapping.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
);
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) {
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
// Still advance watermark to highest scanned message so we don't
// re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
}
continue;
}
accountLog.info(
{ archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in channel"
);
// For non-forum, creator comes from filename (set to null, resolved per-archive)
pipelineCtx.topicCreator = null;
pipelineCtx.sourceTopicId = null;
pipelineCtx.channelTitle = channel.title;
const { maxProcessedId, minFailedId } = await processArchiveSets(
pipelineCtx,
scanResult,
run.id,
mapping.lastProcessedMessageId
);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Advance progress: prefer archive watermark, fall back to scan watermark.
// Cap at one less than the lowest failed message ID so failed sets stay
// above the next scan boundary and get retried on the next cycle.
let channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (minFailedId !== null && channelWatermark !== null && channelWatermark >= minFailedId) {
channelWatermark = minFailedId - 1n;
}
if (channelWatermark !== null) {
await updateLastProcessedMessage(mapping.id, channelWatermark);
}
}
} catch (channelErr) {
accountLog.warn(
{ err: channelErr, channelId: channel.id, title: channel.title },
"Failed to process channel, skipping to next"
);
}
}
// ── Done ──
await throttled.flush();
await completeIngestionRun(activeRunId, counters);
accountLog.info({ counters }, "Ingestion run completed");
} finally {
await throttled.flush();
await closeTdlibClient(client);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
accountLog.error({ err }, "Ingestion run failed");
if (runId) {
await failIngestionRun(runId, message).catch((e) =>
accountLog.error({ e }, "Failed to mark run as failed")
);
}
} finally {
await releaseLock(account.id);
}
}
/**
* Infer the SkipReason from an error message so the UI shows the correct badge.
*/
function inferSkipReason(errMsg: string): "DOWNLOAD_FAILED" | "UPLOAD_FAILED" | "EXTRACT_FAILED" {
const lower = errMsg.toLowerCase();
if (lower.includes("upload") || lower.includes("too many requests") || lower.includes("retry after") || lower.includes("send")) {
return "UPLOAD_FAILED";
}
if (lower.includes("extract") || lower.includes("metadata") || lower.includes("central directory") || lower.includes("archive")) {
return "EXTRACT_FAILED";
}
return "DOWNLOAD_FAILED";
}
/**
* Process a scan result through the archive pipeline:
* group → download → hash → dedup → metadata → split → upload → preview → index.
*
* Returns the highest message ID that was successfully processed (ingested or
* confirmed duplicate). The caller should only advance the progress boundary
* to this value — never to the max of all scanned messages.
*/
async function processArchiveSets(
ctx: PipelineContext,
scanResult: ChannelScanResult,
ingestionRunId: string,
lastProcessedMessageId?: bigint | null
): Promise<{ maxProcessedId: bigint | null; minFailedId: bigint | null }> {
const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx;
// Group into archive sets
let archiveSets = groupArchiveSets(scanResult.archives);
// Filter out sets where ALL parts are at or below the boundary (already processed)
if (lastProcessedMessageId) {
const totalBefore = archiveSets.length;
archiveSets = archiveSets.filter((set) =>
set.parts.some((p) => p.id > lastProcessedMessageId)
);
const filtered = totalBefore - archiveSets.length;
if (filtered > 0) {
accountLog.info(
{ filtered, remaining: archiveSets.length },
"Filtered out already-processed archive sets"
);
}
}
counters.zipsFound += archiveSets.length;
// Match preview photos to archive sets
const previewMatches = matchPreviewToArchive(
scanResult.photos,
archiveSets.map((s) => ({
baseName: s.baseName,
firstMessageId: s.parts[0].id,
firstMessageDate: s.parts[0].date,
}))
);
if (previewMatches.size > 0) {
accountLog.info(
{ matched: previewMatches.size, total: archiveSets.length },
"Matched preview photos to archives"
);
}
await updateRunActivity(runId, {
currentActivity: `Found ${archiveSets.length} archive(s) in "${channelTitle}"`,
currentStep: "scanning",
currentChannel: channelTitle,
totalFiles: archiveSets.length,
zipsFound: counters.zipsFound,
messagesScanned: counters.messagesScanned,
});
// Track the highest message ID that was successfully processed and the
// lowest message ID of any failed set. The caller uses minFailedId to cap
// the watermark so failures get retried on the next cycle.
let maxProcessedId: bigint | null = null;
let minFailedId: bigint | null = null;
const indexedPackageRefs: IndexedPackageRef[] = [];
for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) {
try {
const packageId = await processOneArchiveSet(
ctx,
archiveSets[setIdx],
setIdx,
archiveSets.length,
previewMatches,
ingestionRunId
);
if (packageId) {
const firstPart = archiveSets[setIdx].parts[0];
indexedPackageRefs.push({
packageId,
sourceMessageId: firstPart.id,
mediaAlbumId: firstPart.mediaAlbumId,
});
}
// Set completed (ingested or confirmed duplicate) — advance watermark
const setMaxId = archiveSets[setIdx].parts.reduce(
(max, p) => (p.id > max ? p.id : max),
0n
);
if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId;
}
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) {
// If a set fails, do NOT advance the watermark past it
accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
// Record the lowest part ID of this set as a failure boundary so the
// caller can cap the watermark below it and the next scan re-picks it up.
const setMinId = archiveSets[setIdx].parts.reduce(
(min, p) => (p.id < min ? p.id : min),
archiveSets[setIdx].parts[0].id
);
if (minFailedId === null || setMinId < minFailedId) {
minFailedId = setMinId;
}
// ── TDLib client recreation on repeated upload stalls ──
// When the TDLib event stream degrades, uploads complete (bytes sent)
// but confirmations never arrive. Retrying with the same broken client
// is futile. Recreate the client to get a fresh connection.
if (setErr instanceof UploadStallError) {
ctx.consecutiveStalls++;
accountLog.warn(
{ consecutiveStalls: ctx.consecutiveStalls },
"Upload stall detected — TDLib event stream may be degraded"
);
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
if (ctx.consecutiveStalls >= 1) {
accountLog.info("Recreating TDLib client after consecutive upload stalls");
try {
await closeTdlibClient(ctx.client);
} catch (closeErr) {
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
}
try {
const { client: newClient } = await createTdlibClient({
id: ctx.accountId,
phone: ctx.accountPhone,
});
ctx.client = newClient;
// Reload chats so the new client can access channels
try {
for (let page = 0; page < 500; page++) {
await newClient.invoke({
_: "loadChats",
chat_list: { _: "chatListMain" },
limit: 100,
});
}
} catch {
// 404 = all loaded (expected)
}
ctx.consecutiveStalls = 0;
accountLog.info("TDLib client recreated successfully — continuing ingestion");
} catch (recreateErr) {
accountLog.error(
{ err: recreateErr },
"Failed to recreate TDLib client — aborting remaining uploads"
);
break;
}
}
}
// Record the failure for visibility in the UI
try {
const archiveSet = archiveSets[setIdx];
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const errMsg = setErr instanceof Error ? setErr.message : String(setErr);
await upsertSkippedPackage({
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
reason: inferSkipReason(errMsg),
errorMessage: errMsg,
sourceChannelId: ctx.channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
// Also create a persistent notification
await db.systemNotification.create({
data: {
type: inferSkipReason(errMsg) === "UPLOAD_FAILED" ? "UPLOAD_FAILED" : "DOWNLOAD_FAILED",
severity: "WARNING",
title: `Failed to process ${archiveSet.parts[0].fileName}`,
message: errMsg,
context: {
fileName: archiveSet.parts[0].fileName,
sourceChannelId: ctx.channel.id,
sourceMessageId: Number(archiveSet.parts[0].id),
channelTitle: ctx.channelTitle,
reason: inferSkipReason(errMsg),
},
},
});
} catch {
// Best-effort — don't fail the run if skip recording fails
}
}
}
// Post-processing: group packages by Telegram album ID
if (indexedPackageRefs.length > 0) {
await processAlbumGroups(
ctx.client,
channel.id,
indexedPackageRefs,
scanResult.photos
);
// Auto-grouping passes (gated by per-channel flag)
const channelRecord = await db.telegramChannel.findUnique({
where: { id: channel.id },
select: { autoGroupEnabled: true },
});
if (channelRecord?.autoGroupEnabled !== false) {
// Learned rule-based grouping (from manual overrides)
await processRuleBasedGroups(channel.id, indexedPackageRefs);
// Time-window grouping for remaining ungrouped packages
await processTimeWindowGroups(channel.id, indexedPackageRefs);
// Pattern-based grouping (date patterns, project slugs)
await processPatternGroups(channel.id, indexedPackageRefs);
// Creator-based grouping (3+ files from same creator)
await processCreatorGroups(channel.id, indexedPackageRefs);
// ZIP path prefix grouping (shared root folder inside archives)
await processZipPathGroups(channel.id, indexedPackageRefs);
// Reply chain grouping (messages replying to same root)
await processReplyChainGroups(channel.id, indexedPackageRefs);
// Caption fuzzy match grouping
await processCaptionGroups(channel.id, indexedPackageRefs);
}
// Check for potential grouping conflicts
await detectGroupingConflicts(channel.id, indexedPackageRefs);
}
return { maxProcessedId, minFailedId };
}
/**
* Process a single archive set through the full pipeline.
*/
async function processOneArchiveSet(
ctx: PipelineContext,
archiveSet: ArchiveSet,
setIdx: number,
totalSets: number,
previewMatches: Map<string, { id: bigint; fileId: string }>,
ingestionRunId: string
): Promise<string | null> {
const {
client, runId, channelTitle, channel,
destChannelTelegramId, destChannelId,
throttled, counters, topicCreator, sourceTopicId, accountLog,
} = ctx;
const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ──
// This avoids re-downloading large archives that were processed in a prior run.
const alreadyIngested = await packageExistsBySourceMessage(
channel.id,
archiveSet.parts[0].id
);
if (alreadyIngested) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, sourceMessageId: Number(archiveSet.parts[0].id) },
"Archive already ingested (by source message), skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (already ingested)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// ── Size guard: skip archives that exceed WORKER_MAX_ZIP_SIZE_MB ──
const totalArchiveSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n);
const maxSizeBytes = BigInt(config.maxZipSizeMB) * 1024n * 1024n;
if (totalArchiveSize > maxSizeBytes) {
accountLog.warn(
{
fileName: archiveName,
totalSizeMB: Number(totalArchiveSize / (1024n * 1024n)),
maxSizeMB: config.maxZipSizeMB,
},
"Archive exceeds max size limit, skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (exceeds ${config.maxZipSizeMB}MB limit)`,
currentStep: "skipping",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
await upsertSkippedPackage({
fileName: archiveName,
fileSize: totalArchiveSize,
reason: "SIZE_LIMIT",
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId: ctx.sourceTopicId,
isMultipart: archiveSet.isMultipart,
partCount: archiveSet.parts.length,
accountId: ctx.accountId,
});
return null;
}
const tempPaths: string[] = [];
let splitPaths: string[] = [];
// Per-set subdirectory so uploaded files keep their original filenames
const setDir = path.join(config.tempDir, `${ingestionRunId}_${archiveSet.parts[0].id}`);
await mkdir(setDir, { recursive: true });
try {
// ── Downloading ──
for (let partIdx = 0; partIdx < archiveSet.parts.length; partIdx++) {
const part = archiveSet.parts[partIdx];
const tempPath = path.join(setDir, part.fileName);
const partLabel = archiveSet.parts.length > 1
? ` (part ${partIdx + 1}/${archiveSet.parts.length})`
: "";
await updateRunActivity(runId, {
currentActivity: `Downloading ${part.fileName}${partLabel}`,
currentStep: "downloading",
currentChannel: channelTitle,
currentFile: part.fileName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: 0n,
totalBytes: part.fileSize,
downloadPercent: 0,
messagesScanned: counters.messagesScanned,
});
accountLog.info(
{
fileName: part.fileName,
fileSize: Number(part.fileSize),
part: partIdx + 1,
totalParts: archiveSet.parts.length,
},
"Downloading archive part"
);
await downloadFile(
client,
part.fileId,
tempPath,
part.fileSize,
part.fileName,
(progress: DownloadProgress) => {
throttled.update({
currentActivity: `Downloading ${part.fileName}${partLabel}${progress.percent}%`,
currentStep: "downloading",
currentChannel: channelTitle,
currentFile: part.fileName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: BigInt(progress.downloadedBytes),
totalBytes: BigInt(progress.totalBytes),
downloadPercent: progress.percent,
});
}
);
await throttled.flush();
tempPaths.push(tempPath);
}
// ── Hashing ──
await updateRunActivity(runId, {
currentActivity: `Computing hash for ${archiveName}`,
currentStep: "hashing",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
});
const contentHash = await hashParts(tempPaths);
// ── Deduplicating ──
await updateRunActivity(runId, {
currentActivity: `Checking if ${archiveName} is a duplicate`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
const exists = await packageExistsByHash(contentHash);
if (exists) {
counters.zipsDuplicate++;
accountLog.debug({ contentHash }, "Duplicate archive, skipping");
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (duplicate)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return null;
}
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
const hashLockAcquired = await tryAcquireHashLock(contentHash);
if (!hashLockAcquired) {
counters.zipsDuplicate++;
accountLog.info(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Hash lock held by another worker — skipping concurrent duplicate"
);
return null;
}
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
let creator: string | null = null;
const tags: string[] = [];
let stub: { id: string } | null = null;
try {
// Re-check after acquiring lock: another worker may have finished between
// the first check above and this point.
const existsAfterLock = await packageExistsByHash(contentHash);
if (existsAfterLock) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Duplicate detected after acquiring hash lock — skipping"
);
return null;
}
// ── Reading metadata ──
await updateRunActivity(runId, {
currentActivity: `Reading file list from ${archiveName}`,
currentStep: "reading_metadata",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
try {
if (archiveSet.type === "ZIP") {
entries = await readZipCentralDirectory(tempPaths);
} else if (archiveSet.type === "RAR") {
entries = await readRarContents(tempPaths[0]);
} else if (archiveSet.type === "7Z") {
entries = await read7zContents(tempPaths[0]);
} else if (archiveSet.type === "DOCUMENT") {
// Standalone documents (PDF, STL, etc.) — no extraction,
// record the file itself as the single entry
const part = archiveSet.parts[0];
const ext = part.fileName.match(/\.([^.]+)$/)?.[1] ?? null;
entries = [{
path: part.fileName,
fileName: part.fileName,
extension: ext,
compressedSize: part.fileSize,
uncompressedSize: part.fileSize,
crc32: null,
}];
}
} catch (err) {
accountLog.warn({ err, baseName: archiveSet.baseName }, "Failed to read archive metadata, ingesting without file list");
}
// ── Splitting / Repacking (if needed) ──
let uploadPaths = [...tempPaths];
const totalSize = archiveSet.parts.reduce(
(sum, p) => sum + p.fileSize,
0n
);
const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (hasOversizedPart) {
// Full repack: concatenate all parts → single file → re-split into uniform 2GB chunks
await updateRunActivity(runId, {
currentActivity: `Repacking ${archiveName} (parts >2GB, concatenating + re-splitting)`,
currentStep: "splitting",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
await concatenateFiles(tempPaths, concatPath);
splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
uploadPaths = splitPaths;
// Clean up the concat intermediate file
await unlink(concatPath).catch(() => {});
} else if (!archiveSet.isMultipart && totalSize > MAX_UPLOAD_SIZE) {
// Single file >2GB: split directly
await updateRunActivity(runId, {
currentActivity: `Splitting ${archiveName} for upload (>2GB)`,
currentStep: "splitting",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
uploadPaths = splitPaths;
}
// ── Hash verification after split ──
// If we split/repacked, verify the split parts hash matches the original
if (splitPaths.length > 0) {
const splitHash = await hashParts(splitPaths);
if (splitHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, splitHash, parts: splitPaths.length },
"Hash mismatch after split — file may be corrupted"
);
// Record notification for visibility
try {
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Hash mismatch after splitting ${archiveName}`,
message: `Expected ${contentHash.slice(0, 16)}… but got ${splitHash.slice(0, 16)}… after splitting into ${splitPaths.length} parts`,
context: {
fileName: archiveName,
originalHash: contentHash,
splitHash,
partCount: splitPaths.length,
sourceChannelId: channel.id,
},
},
});
} catch {
// Best-effort notification
}
throw new Error(`Hash mismatch after split for ${archiveName}: expected ${contentHash}, got ${splitHash}`);
}
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16), parts: splitPaths.length },
"Split hash verified — matches original"
);
}
// ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing)
const existingUpload = await getUploadedPackageByHash(contentHash);
let destResult: { messageId: bigint; messageIds: bigint[] };
if (existingUpload && existingUpload.destMessageId) {
accountLog.info(
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
"Reusing existing upload (file already on destination channel)"
);
destResult = {
messageId: existingUpload.destMessageId,
messageIds: existingUpload.destMessageIds?.length
? (existingUpload.destMessageIds as bigint[])
: [existingUpload.destMessageId],
};
} else {
const uploadLabel = uploadPaths.length > 1
? ` (${uploadPaths.length} parts)`
: "";
await updateRunActivity(runId, {
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
currentStep: "uploading",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
destResult = await uploadToChannel(
client,
destChannelTelegramId,
uploadPaths
);
}
// ── Post-upload integrity check ──
// Verify the files on disk still match before we index
if (uploadPaths.length > 0 && !existingUpload) {
try {
const postUploadHash = await hashParts(uploadPaths);
if (splitPaths.length > 0) {
// Split files — hash should match the split hash (already verified above)
// No additional check needed since we verified split hash = original hash
} else if (postUploadHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
"Hash changed between hashing and upload — possible disk corruption"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Post-upload hash mismatch: ${archiveName}`,
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}`,
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
},
});
}
} catch {
// Best-effort — don't fail the ingestion
}
}
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);
creator =
topicCreator ??
extractCreatorFromFileName(archiveName) ??
extractCreatorFromChannelTitle(channelTitle) ??
null;
if (channel.category) {
tags.push(channel.category);
}
stub = await createPackageStub({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
});
counters.zipsIngested++;
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
} finally {
await releaseHashLock(contentHash);
}
if (!stub) return null;
// ── Preview thumbnail ──
// (moved here from before stub creation — lock is released, preview doesn't need it)
let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null;
const matchedPhoto = previewMatches.get(archiveSet.baseName);
if (matchedPhoto) {
await updateRunActivity(runId, {
currentActivity: `Downloading preview image for ${archiveName}`,
currentStep: "preview",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
if (previewData) {
previewMsgId = matchedPhoto.id;
}
}
// ── Fallback: extract preview image from inside the archive ──
if (!previewData && entries.length > 0 && archiveSet.type !== "DOCUMENT") {
const previewEntry = pickPreviewFile(entries);
if (previewEntry) {
accountLog.debug(
{ fileName: archiveName, previewFile: previewEntry.path },
"Attempting to extract preview image from archive"
);
const archiveTypeForExtract = archiveSet.type === "7Z" ? "SEVEN_Z" as const : archiveSet.type as "ZIP" | "RAR";
previewData = await extractPreviewImage(
tempPaths[0],
archiveTypeForExtract,
previewEntry.path
);
}
}
// ── Phase 2: Update stub with file entries and preview ──
await updateRunActivity(runId, {
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
currentStep: "indexing",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
await updatePackageWithMetadata(stub.id, {
files: entries,
previewData,
previewMsgId,
});
await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
currentStep: "complete",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsIngested: counters.zipsIngested,
});
accountLog.info(
{ fileName: archiveName, contentHash, fileCount: entries.length, creator },
"Archive ingested"
);
return stub.id;
} finally {
// ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]);
await rm(setDir, { recursive: true, force: true }).catch(() => {});
}
}
async function deleteFiles(paths: string[]): Promise<void> {
for (const p of paths) {
try {
await unlink(p);
} catch {
// File may already be deleted or never created
}
}
}
/**
* Clean up any leftover temp files/directories from previous runs.
*/
export async function cleanupTempDir(): Promise<void> {
try {
const entries = await readdir(config.tempDir);
for (const entry of entries) {
await rm(path.join(config.tempDir, entry), { recursive: true, force: true }).catch(() => {});
}
if (entries.length > 0) {
log.info({ count: entries.length }, "Cleaned up stale temp files");
}
} catch {
// Directory might not exist yet
}
}