addd TG integration

This commit is contained in:
xCyanGrizzly
2026-03-02 11:57:17 +01:00
parent b427193d17
commit 4d0df6b1a4
35 changed files with 4436 additions and 242 deletions

View File

@@ -57,17 +57,19 @@ export function groupArchiveSets(messages: TelegramMessage[]): ArchiveSet[] {
// Check if any single entry is the "final part" of a legacy split
const allEntries = [...multipartEntries, ...singleEntries];
// Check time span — skip if parts span too long
const dates = allEntries.map((e) => e.msg.date.getTime());
const span = Math.max(...dates) - Math.min(...dates);
const maxSpanMs = config.multipartTimeoutHours * 60 * 60 * 1000;
// Check time span — skip if parts span too long (0 = no limit)
if (config.multipartTimeoutHours > 0) {
const dates = allEntries.map((e) => e.msg.date.getTime());
const span = Math.max(...dates) - Math.min(...dates);
const maxSpanMs = config.multipartTimeoutHours * 60 * 60 * 1000;
if (span > maxSpanMs) {
log.warn(
{ baseName, format, span: span / 3600000 },
"Multipart set spans too long, skipping"
);
continue;
if (span > maxSpanMs) {
log.warn(
{ baseName, format, span: span / 3600000 },
"Multipart set spans too long, skipping"
);
continue;
}
}
// Sort by part number (singles get a very high number so they come last — they're the final part)

View File

@@ -46,3 +46,35 @@ export async function byteLevelSplit(filePath: string): Promise<string[]> {
log.info({ filePath, parts: parts.length }, "File split complete");
return parts;
}
/**
* Concatenate multiple files into a single output file by streaming
* each input sequentially. Used for repacking multipart archives
* that have oversized parts (>2GB) before re-splitting.
*/
export async function concatenateFiles(
inputPaths: string[],
outputPath: string
): Promise<void> {
const out = createWriteStream(outputPath);
for (let i = 0; i < inputPaths.length; i++) {
log.info(
{ part: i + 1, total: inputPaths.length, file: path.basename(inputPaths[i]) },
"Concatenating part"
);
await pipeline(createReadStream(inputPaths[i]), out, { end: false });
}
// Close the output stream
await new Promise<void>((resolve, reject) => {
out.end(() => resolve());
out.on("error", reject);
});
const stats = await stat(outputPath);
log.info(
{ outputPath, totalBytes: stats.size, parts: inputPaths.length },
"Concatenation complete"
);
}

View File

@@ -1,5 +1,7 @@
import yauzl from "yauzl";
import { open as fsOpen, stat as fsStat } from "fs/promises";
import path from "path";
import { Readable } from "stream";
import { childLogger } from "../util/logger.js";
const log = childLogger("zip-reader");
@@ -15,20 +17,28 @@ export interface FileEntry {
/**
* Read the central directory of a ZIP file without extracting any contents.
* For multipart ZIPs, pass the paths sorted by part order.
* We attempt to read from the last part first (central directory is at the end).
* For multipart ZIPs (.zip.001, .zip.002 etc.), uses a custom random-access
* reader that spans all parts seamlessly so yauzl can find the central
* directory at the end of the combined data.
*/
export async function readZipCentralDirectory(
filePaths: string[]
): Promise<FileEntry[]> {
// The central directory lives at the end of the last file
const targetFile = filePaths[filePaths.length - 1];
if (filePaths.length === 1) {
return readSingleZip(filePaths[0]);
}
return new Promise((resolve, reject) => {
// Multipart: use a spanning random-access reader
return readMultipartZip(filePaths);
}
/** Read a single (non-split) ZIP file. */
function readSingleZip(targetFile: string): Promise<FileEntry[]> {
return new Promise((resolve) => {
yauzl.open(targetFile, { lazyEntries: true, autoClose: true }, (err, zipFile) => {
if (err) {
log.warn({ err, file: targetFile }, "Failed to open ZIP for reading");
resolve([]); // Fallback: return empty on error
resolve([]);
return;
}
@@ -36,13 +46,12 @@ export async function readZipCentralDirectory(
zipFile.readEntry();
zipFile.on("entry", (entry: yauzl.Entry) => {
// Skip directories
if (!entry.fileName.endsWith("/")) {
const ext = path.extname(entry.fileName).toLowerCase();
entries.push({
path: entry.fileName,
fileName: path.basename(entry.fileName),
extension: ext ? ext.slice(1) : null, // Remove leading dot
extension: ext ? ext.slice(1) : null,
compressedSize: BigInt(entry.compressedSize),
uncompressedSize: BigInt(entry.uncompressedSize),
crc32: entry.crc32 !== 0 ? entry.crc32.toString(16).padStart(8, "0") : null,
@@ -54,8 +63,144 @@ export async function readZipCentralDirectory(
zipFile.on("end", () => resolve(entries));
zipFile.on("error", (error) => {
log.warn({ error, file: targetFile }, "Error reading ZIP entries");
resolve(entries); // Return whatever we got
resolve(entries);
});
});
});
}
/**
* Read a multipart split ZIP using yauzl's RandomAccessReader API.
* This creates a virtual "file" that spans all parts so yauzl can
* seek freely across the entire archive to read the central directory.
*/
async function readMultipartZip(filePaths: string[]): Promise<FileEntry[]> {
// Get sizes of all parts
const partSizes: number[] = [];
for (const fp of filePaths) {
const s = await fsStat(fp);
partSizes.push(s.size);
}
const totalSize = partSizes.reduce((a, b) => a + b, 0);
log.debug(
{ parts: filePaths.length, totalSize },
"Reading multipart ZIP via spanning reader"
);
return new Promise((resolve) => {
const reader = createMultiPartReader(filePaths, partSizes);
yauzl.fromRandomAccessReader(
reader,
totalSize,
{ lazyEntries: true, autoClose: true },
(err, zipFile) => {
if (err) {
log.warn({ err }, "Failed to open multipart ZIP for reading");
reader.close(() => {});
resolve([]);
return;
}
const entries: FileEntry[] = [];
zipFile.readEntry();
zipFile.on("entry", (entry: yauzl.Entry) => {
if (!entry.fileName.endsWith("/")) {
const ext = path.extname(entry.fileName).toLowerCase();
entries.push({
path: entry.fileName,
fileName: path.basename(entry.fileName),
extension: ext ? ext.slice(1) : null,
compressedSize: BigInt(entry.compressedSize),
uncompressedSize: BigInt(entry.uncompressedSize),
crc32: entry.crc32 !== 0 ? entry.crc32.toString(16).padStart(8, "0") : null,
});
}
zipFile.readEntry();
});
zipFile.on("end", () => {
log.info({ entries: entries.length }, "Multipart ZIP entries read");
resolve(entries);
});
zipFile.on("error", (error) => {
log.warn({ error }, "Error reading multipart ZIP entries");
resolve(entries);
});
}
);
});
}
/**
* Create a yauzl RandomAccessReader that reads across multiple split part files.
* Maps a global offset to the correct part file and local offset.
*
* Uses Object.create to properly inherit from yauzl.RandomAccessReader
* (whose constructor + prototype is defined at runtime, not as a TS class).
*/
function createMultiPartReader(
filePaths: string[],
partSizes: number[]
): yauzl.RandomAccessReader {
// Build cumulative offset table
const partOffsets: number[] = [];
let offset = 0;
for (const size of partSizes) {
partOffsets.push(offset);
offset += size;
}
// Create an instance by calling the parent constructor
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const reader = new (yauzl.RandomAccessReader as any)() as yauzl.RandomAccessReader;
// Override _readStreamForRange — yauzl calls this to read a range of bytes
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(reader as any)._readStreamForRange = function (start: number, end: number): Readable {
const readable = new Readable({ read() {} });
readRange(start, end, readable).catch((err) => {
readable.destroy(err);
});
return readable;
};
async function readRange(start: number, end: number, readable: Readable): Promise<void> {
let remaining = end - start;
let globalOffset = start;
while (remaining > 0) {
// Find which part this offset falls in
let partIdx = partOffsets.length - 1;
for (let i = 0; i < partOffsets.length; i++) {
if (i + 1 < partOffsets.length && globalOffset < partOffsets[i + 1]) {
partIdx = i;
break;
}
}
const localOffset = globalOffset - partOffsets[partIdx];
const partRemaining = partSizes[partIdx] - localOffset;
const toRead = Math.min(remaining, partRemaining);
const fh = await fsOpen(filePaths[partIdx], "r");
try {
const buf = Buffer.alloc(toRead);
const { bytesRead } = await fh.read(buf, 0, toRead, localOffset);
readable.push(buf.subarray(0, bytesRead));
remaining -= bytesRead;
globalOffset += bytesRead;
} finally {
await fh.close();
}
}
readable.push(null); // Signal end of stream
}
return reader;
}

View File

@@ -1,5 +1,5 @@
import { db } from "./client.js";
import type { ArchiveType } from "@prisma/client";
import type { ArchiveType, FetchStatus } from "@prisma/client";
export async function getActiveAccounts() {
return db.telegramAccount.findMany({
@@ -7,6 +7,17 @@ export async function getActiveAccounts() {
});
}
export async function getPendingAccounts() {
return db.telegramAccount.findMany({
where: { isActive: true, authState: "PENDING" },
});
}
export async function hasAnyChannels(): Promise<boolean> {
const count = await db.telegramChannel.count();
return count > 0;
}
export async function getSourceChannelMappings(accountId: string) {
return db.accountChannelMap.findMany({
where: {
@@ -18,26 +29,66 @@ export async function getSourceChannelMappings(accountId: string) {
});
}
export async function getDestinationChannel(accountId: string) {
const mapping = await db.accountChannelMap.findFirst({
where: {
accountId,
role: "WRITER",
channel: { type: "DESTINATION", isActive: true },
},
include: { channel: true },
// ── Global destination channel ──
export async function getGlobalDestinationChannel() {
const setting = await db.globalSetting.findUnique({
where: { key: "destination_channel_id" },
});
if (!setting) return null;
return db.telegramChannel.findFirst({
where: { id: setting.value, type: "DESTINATION", isActive: true },
});
}
export async function getGlobalSetting(key: string): Promise<string | null> {
const setting = await db.globalSetting.findUnique({ where: { key } });
return setting?.value ?? null;
}
export async function setGlobalSetting(key: string, value: string) {
return db.globalSetting.upsert({
where: { key },
create: { key, value },
update: { value },
});
return mapping?.channel ?? null;
}
export async function packageExistsByHash(contentHash: string) {
const pkg = await db.package.findUnique({
where: { contentHash },
const pkg = await db.package.findFirst({
where: { contentHash, destMessageId: { not: null } },
select: { id: true },
});
return pkg !== null;
}
/**
* Check if a package already exists for a given source message ID
* AND was successfully uploaded to the destination (destMessageId is set).
* Used as an early skip before downloading.
*/
export async function packageExistsBySourceMessage(
sourceChannelId: string,
sourceMessageId: bigint
): Promise<boolean> {
const pkg = await db.package.findFirst({
where: { sourceChannelId, sourceMessageId, destMessageId: { not: null } },
select: { id: true },
});
return pkg !== null;
}
/**
* Delete orphaned Package rows that have the same content hash but never
* completed the upload (destMessageId is null). Called before creating a
* new complete record to avoid unique constraint violations.
*/
export async function deleteOrphanedPackageByHash(contentHash: string): Promise<void> {
await db.package.deleteMany({
where: { contentHash, destMessageId: null },
});
}
export interface CreatePackageInput {
contentHash: string;
fileName: string;
@@ -228,6 +279,57 @@ export async function getAccountAuthCode(accountId: string) {
return account;
}
// ── Channel sync (auto-discovery from Telegram) ──
export interface UpsertChannelInput {
telegramId: bigint;
title: string;
type: "SOURCE" | "DESTINATION";
isForum: boolean;
}
/**
* Upsert a channel by telegramId. Returns the channel record.
* If it already exists, update title and forum status.
*/
export async function upsertChannel(input: UpsertChannelInput) {
return db.telegramChannel.upsert({
where: { telegramId: input.telegramId },
create: {
telegramId: input.telegramId,
title: input.title,
type: input.type,
isForum: input.isForum,
},
update: {
title: input.title,
isForum: input.isForum,
},
});
}
/**
* Link an account to a channel if not already linked.
* Uses a try/catch on unique constraint to make it idempotent.
*/
export async function ensureAccountChannelLink(
accountId: string,
channelId: string,
role: "READER" | "WRITER"
) {
try {
return await db.accountChannelMap.create({
data: { accountId, channelId, role },
});
} catch (err: unknown) {
// Already linked — ignore unique constraint violation
if (err instanceof Error && err.message.includes("Unique constraint")) {
return null;
}
throw err;
}
}
// ── Forum / Topic progress ──
export async function setChannelForum(channelId: string, isForum: boolean) {
@@ -268,3 +370,50 @@ export async function upsertTopicProgress(
},
});
}
// ── Channel fetch requests (DB-mediated communication with web app) ──
export async function getChannelFetchRequest(requestId: string) {
return db.channelFetchRequest.findUnique({
where: { id: requestId },
include: { account: true },
});
}
export async function updateFetchRequestStatus(
requestId: string,
status: FetchStatus,
extra?: { resultJson?: string; error?: string }
) {
return db.channelFetchRequest.update({
where: { id: requestId },
data: {
status,
resultJson: extra?.resultJson ?? undefined,
error: extra?.error ?? undefined,
},
});
}
export async function getAccountLinkedChannelIds(accountId: string): Promise<Set<string>> {
const links = await db.accountChannelMap.findMany({
where: { accountId },
select: { channel: { select: { telegramId: true } } },
});
return new Set(links.map((l) => l.channel.telegramId.toString()));
}
export async function getExistingChannelsByTelegramId(): Promise<Map<string, string>> {
const channels = await db.telegramChannel.findMany({
select: { id: true, telegramId: true },
});
const map = new Map<string, string>();
for (const ch of channels) {
map.set(ch.telegramId.toString(), ch.id);
}
return map;
}
export async function getAccountById(accountId: string) {
return db.telegramAccount.findUnique({ where: { id: accountId } });
}

View File

@@ -0,0 +1,206 @@
import type pg from "pg";
import { pool } from "./db/client.js";
import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js";
import { processFetchRequest } from "./worker.js";
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import {
getGlobalDestinationChannel,
getGlobalSetting,
setGlobalSetting,
getActiveAccounts,
upsertChannel,
ensureAccountChannelLink,
} from "./db/queries.js";
const log = childLogger("fetch-listener");
let pgClient: pg.PoolClient | null = null;
/**
* Start listening for pg_notify signals from the web app.
*
* Channels:
* - `channel_fetch` — payload = requestId → fetch channels for an account
* - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
*/
export async function startFetchListener(): Promise<void> {
pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite");
await pgClient.query("LISTEN create_destination");
pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) {
handleChannelFetch(msg.payload);
} else if (msg.channel === "generate_invite" && msg.payload) {
handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload);
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
}
export function stopFetchListener(): void {
if (pgClient) {
pgClient.release();
pgClient = null;
}
log.info("Fetch listener stopped");
}
// ── Channel fetch handler ──
// Chain promises to ensure sequential execution
let fetchQueue: Promise<void> = Promise.resolve();
function handleChannelFetch(requestId: string): void {
fetchQueue = fetchQueue.then(async () => {
try {
await withTdlibMutex("fetch-channels", () =>
processFetchRequest(requestId)
);
} catch (err) {
log.error({ err, requestId }, "Failed to process fetch request");
}
});
}
// ── Invite link generation handler ──
function handleGenerateInvite(channelId: string): void {
fetchQueue = fetchQueue.then(async () => {
try {
await withTdlibMutex("generate-invite", async () => {
const destChannel = await getGlobalDestinationChannel();
if (!destChannel || destChannel.id !== channelId) {
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
return;
}
// Use the first available authenticated account to generate the link
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try {
const link = await generateInviteLink(client, destChannel.telegramId);
await setGlobalSetting("destination_invite_link", link);
log.info({ link }, "Invite link generated and saved");
} finally {
await closeTdlibClient(client);
}
});
} catch (err) {
log.error({ err, channelId }, "Failed to generate invite link");
}
});
}
// ── Create destination supergroup handler ──
function handleCreateDestination(payload: string): void {
fetchQueue = fetchQueue.then(async () => {
let requestId: string | undefined;
try {
const parsed = JSON.parse(payload) as { requestId: string; title: string };
requestId = parsed.requestId;
await withTdlibMutex("create-destination", async () => {
const { db } = await import("./db/client.js");
// Mark the request as in-progress
await db.channelFetchRequest.update({
where: { id: parsed.requestId },
data: { status: "IN_PROGRESS" },
});
// Use the first available authenticated account
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try {
// Create the supergroup via TDLib
const result = await createSupergroup(client, parsed.title);
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
// Upsert it as a DESTINATION channel in the DB
const channel = await upsertChannel({
telegramId: result.chatId,
title: result.title,
type: "DESTINATION",
isForum: false,
});
// Set as global destination
await setGlobalSetting("destination_channel_id", channel.id);
// Generate an invite link
const link = await generateInviteLink(client, result.chatId);
await setGlobalSetting("destination_invite_link", link);
log.info({ link }, "Invite link generated for new destination");
// Link all authenticated accounts as WRITER
for (const acc of accounts) {
try {
await ensureAccountChannelLink(acc.id, channel.id, "WRITER");
} catch {
// Already linked
}
}
// Mark fetch request as completed with the channel info
await db.channelFetchRequest.update({
where: { id: parsed.requestId },
data: {
status: "COMPLETED",
resultJson: JSON.stringify({
channelId: channel.id,
telegramId: result.chatId.toString(),
title: result.title,
inviteLink: link,
}),
},
});
log.info(
{ channelId: channel.id, telegramId: result.chatId.toString() },
"Destination channel created and configured"
);
} finally {
await closeTdlibClient(client);
}
});
} catch (err) {
log.error({ err, payload }, "Failed to create destination channel");
if (requestId) {
try {
const { db } = await import("./db/client.js");
await db.channelFetchRequest.update({
where: { id: requestId },
data: {
status: "FAILED",
error: err instanceof Error ? err.message : String(err),
},
});
} catch {
// Best-effort
}
}
}
});
}

View File

@@ -4,6 +4,7 @@ import { logger } from "./util/logger.js";
import { markStaleRunsAsFailed } from "./db/queries.js";
import { cleanupTempDir } from "./worker.js";
import { startScheduler, stopScheduler } from "./scheduler.js";
import { startFetchListener, stopFetchListener } from "./fetch-listener.js";
import { db, pool } from "./db/client.js";
const log = logger.child({ module: "main" });
@@ -20,6 +21,9 @@ async function main(): Promise<void> {
await cleanupTempDir();
await markStaleRunsAsFailed();
// Start the fetch listener (pg_notify for on-demand channel fetching)
await startFetchListener();
// Start the scheduler
await startScheduler();
}
@@ -28,6 +32,7 @@ async function main(): Promise<void> {
function shutdown(signal: string): void {
log.info({ signal }, "Shutdown signal received");
stopScheduler();
stopFetchListener();
// Close DB connections
Promise.all([db.$disconnect(), pool.end()])

View File

@@ -1,15 +1,22 @@
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { getActiveAccounts } from "./db/queries.js";
import { runWorkerForAccount } from "./worker.js";
import { withTdlibMutex } from "./util/mutex.js";
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
const log = childLogger("scheduler");
let running = false;
let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0;
/**
* Run one ingestion cycle: process all active, authenticated accounts sequentially.
* Run one ingestion cycle:
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
* 2. Process all active AUTHENTICATED accounts for ingestion
*
* All TDLib operations are wrapped in the mutex to ensure only one client
* runs at a time (also shared with the fetch listener for on-demand requests).
*/
async function runCycle(): Promise<void> {
if (running) {
@@ -18,20 +25,38 @@ async function runCycle(): Promise<void> {
}
running = true;
log.info("Starting ingestion cycle");
cycleCount++;
log.info({ cycle: cycleCount }, "Starting ingestion cycle");
try {
// ── Phase 1: Authenticate pending accounts ──
const pendingAccounts = await getPendingAccounts();
if (pendingAccounts.length > 0) {
log.info(
{ count: pendingAccounts.length },
"Found pending accounts, starting authentication"
);
for (const account of pendingAccounts) {
await withTdlibMutex(`auth:${account.phone}`, () =>
authenticateAccount(account)
);
}
}
// ── Phase 2: Ingest for authenticated accounts ──
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.info("No active authenticated accounts, nothing to do");
log.info("No active authenticated accounts, nothing to ingest");
return;
}
log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) {
await runWorkerForAccount(account);
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account)
);
}
log.info("Ingestion cycle complete");

162
worker/src/tdlib/chats.ts Normal file
View File

@@ -0,0 +1,162 @@
import type { Client } from "tdl";
import { childLogger } from "../util/logger.js";
import { config } from "../util/config.js";
const log = childLogger("chats");
export interface TelegramChatInfo {
chatId: bigint;
title: string;
type: "channel" | "supergroup" | "group" | "private" | "other";
isForum: boolean;
memberCount?: number;
}
/**
* Fetch all chats the account is a member of.
* Uses TDLib's getChats to load the chat list, then getChat for details.
* Filters to channels and supergroups only (groups/privates are not useful for ingestion).
*/
export async function getAccountChats(
client: Client
): Promise<TelegramChatInfo[]> {
const chats: TelegramChatInfo[] = [];
// Load main chat list — TDLib loads in batches
let offsetOrder = "9223372036854775807"; // max int64 as string
let offsetChatId = 0;
let hasMore = true;
while (hasMore) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 100,
})) as { chat_ids: number[] };
if (!result.chat_ids || result.chat_ids.length === 0) {
break;
}
for (const chatId of result.chat_ids) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chat = (await client.invoke({
_: "getChat",
chat_id: chatId,
})) as any;
const chatType = chat.type?._;
let type: TelegramChatInfo["type"] = "other";
let isForum = false;
if (chatType === "chatTypeSupergroup") {
// Get supergroup details to check if it's a channel or group
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sg = (await client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as any;
type = sg.is_channel ? "channel" : "supergroup";
isForum = sg.is_forum ?? false;
} catch {
type = "supergroup";
}
} else if (chatType === "chatTypeBasicGroup") {
type = "group";
} else if (chatType === "chatTypePrivate" || chatType === "chatTypeSecret") {
type = "private";
}
// Only include channels and supergroups
if (type === "channel" || type === "supergroup") {
chats.push({
chatId: BigInt(chatId),
title: chat.title ?? `Chat ${chatId}`,
type,
isForum,
});
}
} catch (err) {
log.warn({ chatId, err }, "Failed to get chat details, skipping");
}
}
// getChats with chatListMain returns all chats at once in newer TDLib versions
// So we break after the first batch
hasMore = false;
await sleep(config.apiDelayMs);
}
log.info(
{ total: chats.length },
"Fetched channels/supergroups from Telegram"
);
return chats;
}
/**
* Generate an invite link for a chat. The account must be an admin or have
* invite link permissions.
*/
export async function generateInviteLink(
client: Client,
chatId: bigint
): Promise<string> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "createChatInviteLink",
chat_id: Number(chatId),
name: "DragonsStash Auto-Join",
creates_join_request: false,
})) as any;
const link = result.invite_link as string;
log.info({ chatId: chatId.toString(), link }, "Generated invite link");
return link;
}
/**
* Create a new supergroup (private group) via TDLib.
* Returns the chat ID and title.
*/
export async function createSupergroup(
client: Client,
title: string
): Promise<{ chatId: bigint; title: string }> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "createNewSupergroupChat",
title,
is_forum: false,
is_channel: false,
description: "DragonsStash archive destination — all accounts write here",
})) as any;
const chatId = BigInt(result.id);
log.info({ chatId: chatId.toString(), title }, "Created new supergroup");
return { chatId, title: result.title ?? title };
}
/**
* Join a chat using an invite link.
*/
export async function joinChatByInviteLink(
client: Client,
inviteLink: string
): Promise<void> {
await client.invoke({
_: "joinChatByInviteLink",
invite_link: inviteLink,
});
log.info({ inviteLink }, "Joined chat by invite link");
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -1,5 +1,5 @@
import type { Client } from "tdl";
import { readFile, rename, stat } from "fs/promises";
import { readFile, rename, copyFile, unlink, stat } from "fs/promises";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js";
@@ -69,19 +69,26 @@ export interface ChannelScanResult {
}
/**
* Fetch messages from a channel since a given message ID.
* Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety).
* Collects both archive attachments AND photo messages (for preview matching).
* Returns messages in chronological order (oldest first).
*
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*/
export async function getChannelMessages(
client: Client,
chatId: bigint,
fromMessageId?: bigint | null,
lastProcessedMessageId?: bigint | null,
limit = 100
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
let currentFromId = fromMessageId ? Number(fromMessageId) : 0;
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
@@ -114,8 +121,6 @@ export async function getChannelMessages(
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
// Pick the smallest size for thumbnail (type "s" or "m")
// TDLib photo sizes are ordered from smallest to largest
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
@@ -128,13 +133,22 @@ export async function getChannelMessages(
}
currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break;
// Rate limit delay
await sleep(config.apiDelayMs);
}
// Return in chronological order (oldest first)
log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
"Channel scan complete"
);
// Reverse to chronological order (oldest first) so worker processes old→new
return {
archives: archives.reverse(),
photos: photos.reverse(),
@@ -380,8 +394,23 @@ async function verifyAndMove(
"File verified and complete"
);
// Move from TDLib's cache to our temp directory
await rename(localPath, destPath);
// Move from TDLib's cache to our temp directory.
// Use rename first (fast, same filesystem), fall back to copy+delete
// when source and destination are on different filesystems (EXDEV).
try {
await rename(localPath, destPath);
} catch (err: unknown) {
if ((err as NodeJS.ErrnoException).code === "EXDEV") {
log.debug(
{ fileId, fileName },
"Cross-device rename — falling back to copy + unlink"
);
await copyFile(localPath, destPath);
await unlink(localPath);
} else {
throw err;
}
}
}
function sleep(ms: number): Promise<void> {

View File

@@ -125,29 +125,43 @@ export async function getForumTopicList(
}
/**
* Fetch messages from a specific forum topic (thread).
* Uses getMessageThreadHistory to scan within a topic.
* Fetch messages from a specific forum topic (thread), stopping once
* we've scanned past the last-processed boundary (with one page of lookback).
* Uses searchChatMessages with message_thread_id to scan within a topic.
*
* Returns messages in chronological order (oldest first).
*
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*/
export async function getTopicMessages(
client: Client,
chatId: bigint,
topicId: bigint,
fromMessageId?: bigint | null,
lastProcessedMessageId?: bigint | null,
limit = 100
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
let currentFromId = fromMessageId ? Number(fromMessageId) : 0;
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "getMessageThreadHistory",
_: "searchChatMessages",
chat_id: Number(chatId),
message_id: Number(topicId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
})) as {
messages?: {
id: number;
@@ -206,11 +220,21 @@ export async function getTopicMessages(
}
currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break;
await sleep(config.apiDelayMs);
}
log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
"Topic scan complete"
);
// Reverse to chronological order (oldest first) so worker processes old→new
return {
archives: archives.reverse(),
photos: photos.reverse(),

View File

@@ -1,3 +1,5 @@
import path from "path";
import { stat } from "fs/promises";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
@@ -11,7 +13,13 @@ export interface UploadResult {
/**
* Upload one or more files to a destination Telegram channel.
* For multipart archives, each file is sent as a separate message.
* Returns the message ID of the first uploaded message.
* Returns the **final** (server-assigned) message ID of the first uploaded message.
*
* IMPORTANT: `sendMessage` returns a *temporary* message immediately.
* The actual file upload happens asynchronously in TDLib. We listen for
* `updateMessageSendSucceeded` to get the real server-side message ID and
* to make sure the upload is fully committed before we clean up temp files
* or close the TDLib client (which would cancel pending uploads).
*/
export async function uploadToChannel(
client: Client,
@@ -26,31 +34,24 @@ export async function uploadToChannel(
const fileCaption =
i === 0 && caption ? caption : undefined;
log.debug(
{ chatId: Number(chatId), filePath, part: i + 1, total: filePaths.length },
const fileName = path.basename(filePath);
let fileSizeMB = 0;
try {
const s = await stat(filePath);
fileSizeMB = Math.round(s.size / (1024 * 1024));
} catch {
// Non-critical
}
log.info(
{ chatId: Number(chatId), fileName, sizeMB: fileSizeMB, part: i + 1, total: filePaths.length },
"Uploading file to channel"
);
const result = (await client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: fileCaption
? {
_: "formattedText",
text: fileCaption,
}
: undefined,
},
})) as { id: number };
const serverMsgId = await sendAndWaitForUpload(client, chatId, filePath, fileCaption, fileName, fileSizeMB);
if (i === 0) {
firstMessageId = BigInt(result.id);
firstMessageId = serverMsgId;
}
// Rate limit delay between uploads
@@ -65,12 +66,133 @@ export async function uploadToChannel(
log.info(
{ chatId: Number(chatId), messageId: Number(firstMessageId), files: filePaths.length },
"Upload complete"
"All uploads confirmed by Telegram"
);
return { messageId: firstMessageId };
}
/**
* Send a single file message and wait for Telegram to confirm the upload.
* Returns the final server-assigned message ID.
*/
async function sendAndWaitForUpload(
client: Client,
chatId: bigint,
filePath: string,
caption: string | undefined,
fileName: string,
fileSizeMB: number
): Promise<bigint> {
// Send the message — this returns a temporary message immediately
const tempMsg = (await client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
})) as { id: number };
const tempMsgId = tempMsg.id;
log.debug(
{ fileName, tempMsgId },
"Message queued, waiting for upload confirmation"
);
// Wait for the actual upload to complete
return new Promise<bigint>((resolve, reject) => {
let settled = false;
let lastLoggedPercent = 0;
// Timeout: 10 minutes per GB, minimum 10 minutes
const timeoutMs = Math.max(
10 * 60_000,
(fileSizeMB / 1024) * 10 * 60_000
);
const timer = setTimeout(() => {
if (!settled) {
settled = true;
cleanup();
reject(
new Error(
`Upload timed out after ${Math.round(timeoutMs / 60_000)}min for ${fileName}`
)
);
}
}, timeoutMs);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const handleUpdate = (update: any) => {
// Track upload progress via updateFile events
if (update?._ === "updateFile") {
const file = update.file;
if (file?.remote?.is_uploading_active && file.expected_size > 0) {
const uploaded = file.remote.uploaded_size ?? 0;
const total = file.expected_size;
const percent = Math.round((uploaded / total) * 100);
if (percent >= lastLoggedPercent + 20) {
lastLoggedPercent = percent - (percent % 20);
log.info(
{ fileName, uploaded, total, percent: `${percent}%` },
"Upload progress"
);
}
}
}
// The money event: upload succeeded, we get the final server message ID
if (update?._ === "updateMessageSendSucceeded") {
const msg = update.message;
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
const finalId = BigInt(msg.id);
log.info(
{ fileName, tempMsgId, finalMsgId: Number(finalId) },
"Upload confirmed by Telegram"
);
resolve(finalId);
}
}
}
// Upload failed
if (update?._ === "updateMessageSendFailed") {
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
const errorMsg = update.error?.message ?? "Unknown upload error";
reject(new Error(`Upload failed for ${fileName}: ${errorMsg}`));
}
}
}
};
const cleanup = () => {
clearTimeout(timer);
client.off("update", handleUpdate);
};
client.on("update", handleUpdate);
});
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -9,8 +9,8 @@ export const config = {
telegramApiHash: process.env.TELEGRAM_API_HASH ?? "",
/** Maximum jitter added to scheduler interval (in minutes) */
jitterMinutes: 5,
/** Maximum time between multipart archive parts (in hours) */
multipartTimeoutHours: 24,
/** Maximum time span for multipart archive parts (in hours). 0 = no limit. */
multipartTimeoutHours: parseInt(process.env.MULTIPART_TIMEOUT_HOURS ?? "0", 10),
/** Delay between Telegram API calls (in ms) to avoid rate limits */
apiDelayMs: 1000,
/** Max retries for rate-limited requests */

40
worker/src/util/mutex.ts Normal file
View File

@@ -0,0 +1,40 @@
import { childLogger } from "./logger.js";
const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; label: string }> = [];
/**
* Ensures only one TDLib client runs at a time across the entire worker process.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
* before creating any TDLib client.
*/
export async function withTdlibMutex<T>(
label: string,
fn: () => Promise<T>
): Promise<T> {
if (locked) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
await new Promise<void>((resolve) => queue.push({ resolve, label }));
}
locked = true;
holder = label;
log.debug({ label }, "TDLib mutex acquired");
try {
return await fn();
} finally {
locked = false;
holder = "";
const next = queue.shift();
if (next) {
log.debug({ next: next.label }, "TDLib mutex releasing to next waiter");
next.resolve();
} else {
log.debug({ label }, "TDLib mutex released");
}
}
}

View File

@@ -1,12 +1,13 @@
import path from "path";
import { unlink, readdir } from "fs/promises";
import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock } from "./db/locks.js";
import {
getSourceChannelMappings,
getDestinationChannel,
getGlobalDestinationChannel,
packageExistsByHash,
packageExistsBySourceMessage,
createPackageWithFiles,
createIngestionRun,
completeIngestionRun,
@@ -16,9 +17,19 @@ import {
setChannelForum,
getTopicProgress,
upsertTopicProgress,
upsertChannel,
ensureAccountChannelLink,
getGlobalSetting,
getChannelFetchRequest,
updateFetchRequestStatus,
getAccountLinkedChannelIds,
getExistingChannelsByTelegramId,
getAccountById,
deleteOrphanedPackageByHash,
} from "./db/queries.js";
import type { ActivityUpdate } from "./db/queries.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { getAccountChats, joinChatByInviteLink } from "./tdlib/chats.js";
import { getChannelMessages, downloadFile, downloadPhotoThumbnail } from "./tdlib/download.js";
import type { DownloadProgress, ChannelScanResult } from "./tdlib/download.js";
import { isChatForum, getForumTopicList, getTopicMessages } from "./tdlib/topics.js";
@@ -29,13 +40,203 @@ import { extractCreatorFromFileName } from "./archive/creator.js";
import { hashParts } from "./archive/hash.js";
import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
import { byteLevelSplit } from "./archive/split.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
import type { Client } from "tdl";
const log = childLogger("worker");
/**
* Authenticate a PENDING account by creating a TDLib client.
* TDLib will send an SMS code to the phone number, and the client.login()
* callbacks set the authState to AWAITING_CODE. Once the admin enters the
* code via the UI, pollForAuthCode picks it up and completes the login.
*
* After successful auth:
* 1. Fetches channels from Telegram and writes as a ChannelFetchRequest
* (so the admin can select sources in the UI)
* 2. Auto-joins the destination group if an invite link is configured
*/
export async function authenticateAccount(
account: TelegramAccount
): Promise<void> {
const aLog = childLogger("auth", { accountId: account.id, phone: account.phone });
aLog.info("Starting authentication flow");
let client: Client | undefined;
try {
client = await createTdlibClient({
id: account.id,
phone: account.phone,
});
aLog.info("Authentication successful");
// Auto-fetch channels and create a fetch request result
aLog.info("Fetching channels from Telegram...");
await createAutoFetchRequest(client, account.id, aLog);
// Auto-join the destination group if an invite link exists
const inviteLink = await getGlobalSetting("destination_invite_link");
if (inviteLink) {
aLog.info("Attempting to join destination group via invite link...");
try {
await joinChatByInviteLink(client, inviteLink);
// Link this account as WRITER to the destination channel
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
aLog.info({ destChannel: destChannel.title }, "Joined destination group and linked as WRITER");
}
} catch (err) {
// May already be a member — that's fine
aLog.warn({ err }, "Could not join destination group (may already be a member)");
// Still try to link as WRITER
const destChannel = await getGlobalDestinationChannel();
if (destChannel) {
await ensureAccountChannelLink(account.id, destChannel.id, "WRITER");
}
}
}
} catch (err) {
aLog.error({ err }, "Authentication failed");
} finally {
if (client) {
await closeTdlibClient(client);
}
}
}
/**
* Process a ChannelFetchRequest: fetch channels from Telegram,
* enrich with DB state, and write the result JSON.
* Called by the fetch listener (pg_notify) and by authenticateAccount.
*/
export async function processFetchRequest(requestId: string): Promise<void> {
const aLog = childLogger("fetch-request", { requestId });
const request = await getChannelFetchRequest(requestId);
if (!request || request.status !== "PENDING") {
aLog.warn("Fetch request not found or not pending, skipping");
return;
}
await updateFetchRequestStatus(requestId, "IN_PROGRESS");
aLog.info({ accountId: request.accountId }, "Processing fetch request");
const client = await createTdlibClient({
id: request.account.id,
phone: request.account.phone,
});
try {
const chats = await getAccountChats(client);
// Enrich with DB state
const linkedTelegramIds = await getAccountLinkedChannelIds(request.accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Also upsert channel metadata while we have the data
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical — metadata sync can fail silently
}
}
await updateFetchRequestStatus(requestId, "COMPLETED", {
resultJson: JSON.stringify(enrichedChats),
});
aLog.info(
{ total: chats.length, linked: [...linkedTelegramIds].length },
"Fetch request completed"
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
aLog.error({ err }, "Fetch request failed");
await updateFetchRequestStatus(requestId, "FAILED", { error: message });
} finally {
await closeTdlibClient(client);
}
}
/**
* Internal helper called after authentication to auto-create a fetch request
* with the channel list (so the UI can show the picker immediately).
*/
async function createAutoFetchRequest(
client: Client,
accountId: string,
aLog: ReturnType<typeof childLogger>
): Promise<void> {
const chats = await getAccountChats(client);
const linkedTelegramIds = await getAccountLinkedChannelIds(accountId);
const existingChannels = await getExistingChannelsByTelegramId();
const enrichedChats = chats.map((chat) => {
const telegramIdStr = chat.chatId.toString();
return {
chatId: telegramIdStr,
title: chat.title,
type: chat.type,
isForum: chat.isForum,
memberCount: chat.memberCount ?? null,
alreadyLinked: linkedTelegramIds.has(telegramIdStr),
existingChannelId: existingChannels.get(telegramIdStr) ?? null,
};
});
// Upsert channel metadata
for (const chat of chats) {
try {
await upsertChannel({
telegramId: chat.chatId,
title: chat.title,
type: "SOURCE",
isForum: chat.isForum,
});
} catch {
// Non-critical
}
}
// Create the fetch request record with the result already filled in
const { db } = await import("./db/client.js");
await db.channelFetchRequest.create({
data: {
accountId,
status: "COMPLETED",
resultJson: JSON.stringify(enrichedChats),
},
});
aLog.info(
{ total: chats.length },
"Auto-fetch request created with channel list"
);
}
/**
* Throttle DB writes for download progress to avoid hammering the DB.
* Only writes if at least 2 seconds have passed since the last write.
@@ -140,17 +341,18 @@ export async function runWorkerForAccount(
};
try {
// 4. Get assigned source channels and destination
// 4. Get assigned source channels and global destination
const channelMappings = await getSourceChannelMappings(account.id);
const destChannel = await getDestinationChannel(account.id);
const destChannel = await getGlobalDestinationChannel();
if (!destChannel) {
throw new Error("No active destination channel configured");
throw new Error("No global destination channel configured — set one in the admin UI");
}
for (const mapping of channelMappings) {
const channel = mapping.channel;
try {
// ── Check if channel is a forum ──
const forum = await isChatForum(client, channel.telegramId);
if (forum !== channel.isForum) {
@@ -198,61 +400,63 @@ export async function runWorkerForAccount(
);
for (const topic of topics) {
const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
await updateRunActivity(activeRunId, {
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
currentStep: "scanning",
currentChannel: `${channel.title} ${topic.name}`,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
});
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
progress?.lastProcessedMessageId
);
if (scanResult.archives.length === 0) {
accountLog.debug(
{ channelId: channel.id, topic: topic.name },
"No new archives in topic"
try {
const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId
);
continue;
}
accountLog.info(
{ topic: topic.name, archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in topic"
);
await updateRunActivity(activeRunId, {
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
currentStep: "scanning",
currentChannel: `${channel.title} ${topic.name}`,
currentFile: null,
currentFileNum: null,
totalFiles: null,
downloadedBytes: null,
totalBytes: null,
downloadPercent: null,
});
// Process archives with topic creator
pipelineCtx.topicCreator = topic.name;
pipelineCtx.sourceTopicId = topic.topicId;
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
await processArchiveSets(pipelineCtx, scanResult, run.id);
// Update topic progress
const allMsgIds = [
...scanResult.archives.map((m) => m.id),
...scanResult.photos.map((p) => p.id),
];
if (allMsgIds.length > 0) {
const maxId = allMsgIds.reduce((a, b) => (a > b ? a : b));
await upsertTopicProgress(
mapping.id,
const scanResult = await getTopicMessages(
client,
channel.telegramId,
topic.topicId,
topic.name,
maxId
progress?.lastProcessedMessageId
);
if (scanResult.archives.length === 0) {
accountLog.debug(
{ channelId: channel.id, topic: topic.name },
"No new archives in topic"
);
continue;
}
accountLog.info(
{ topic: topic.name, archives: scanResult.archives.length, photos: scanResult.photos.length },
"Found messages in topic"
);
// Process archives with topic creator
pipelineCtx.topicCreator = topic.name;
pipelineCtx.sourceTopicId = topic.topicId;
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
maxProcessedId
);
}
} catch (topicErr) {
accountLog.warn(
{ err: topicErr, channelId: channel.id, topic: topic.name, topicId: topic.topicId.toString() },
"Failed to process topic, skipping"
);
}
}
@@ -296,18 +500,19 @@ export async function runWorkerForAccount(
pipelineCtx.sourceTopicId = null;
pipelineCtx.channelTitle = channel.title;
await processArchiveSets(pipelineCtx, scanResult, run.id);
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
// Update last processed message
const allMsgIds = [
...scanResult.archives.map((m) => m.id),
...scanResult.photos.map((p) => p.id),
];
if (allMsgIds.length > 0) {
const maxId = allMsgIds.reduce((a, b) => (a > b ? a : b));
await updateLastProcessedMessage(mapping.id, maxId);
// Only advance progress to the highest successfully processed message
if (maxProcessedId) {
await updateLastProcessedMessage(mapping.id, maxProcessedId);
}
}
} catch (channelErr) {
accountLog.warn(
{ err: channelErr, channelId: channel.id, title: channel.title },
"Failed to process channel, skipping to next"
);
}
}
// ── Done ──
@@ -332,16 +537,37 @@ export async function runWorkerForAccount(
/**
* Process a scan result through the archive pipeline:
* group → download → hash → dedup → metadata → split → upload → preview → index.
*
* Returns the highest message ID that was successfully processed (ingested or
* confirmed duplicate). The caller should only advance the progress boundary
* to this value — never to the max of all scanned messages.
*/
async function processArchiveSets(
ctx: PipelineContext,
scanResult: ChannelScanResult,
ingestionRunId: string
): Promise<void> {
ingestionRunId: string,
lastProcessedMessageId?: bigint | null
): Promise<bigint | null> {
const { client, runId, channelTitle, channel, throttled, counters, accountLog } = ctx;
// Group into archive sets
const archiveSets = groupArchiveSets(scanResult.archives);
let archiveSets = groupArchiveSets(scanResult.archives);
// Filter out sets where ALL parts are at or below the boundary (already processed)
if (lastProcessedMessageId) {
const totalBefore = archiveSets.length;
archiveSets = archiveSets.filter((set) =>
set.parts.some((p) => p.id > lastProcessedMessageId)
);
const filtered = totalBefore - archiveSets.length;
if (filtered > 0) {
accountLog.info(
{ filtered, remaining: archiveSets.length },
"Filtered out already-processed archive sets"
);
}
}
counters.zipsFound += archiveSets.length;
// Match preview photos to archive sets
@@ -369,16 +595,38 @@ async function processArchiveSets(
zipsFound: counters.zipsFound,
});
// Track the highest message ID that was successfully processed
let maxProcessedId: bigint | null = null;
for (let setIdx = 0; setIdx < archiveSets.length; setIdx++) {
await processOneArchiveSet(
ctx,
archiveSets[setIdx],
setIdx,
archiveSets.length,
previewMatches,
ingestionRunId
);
try {
await processOneArchiveSet(
ctx,
archiveSets[setIdx],
setIdx,
archiveSets.length,
previewMatches,
ingestionRunId
);
// Set completed (ingested or confirmed duplicate) — advance watermark
const setMaxId = archiveSets[setIdx].parts.reduce(
(max, p) => (p.id > max ? p.id : max),
0n
);
if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId;
}
} catch (setErr) {
// If a set fails, do NOT advance the watermark past it
accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set"
);
}
}
return maxProcessedId;
}
/**
@@ -400,17 +648,43 @@ async function processOneArchiveSet(
counters.messagesScanned += archiveSet.parts.length;
const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ──
// This avoids re-downloading large archives that were processed in a prior run.
const alreadyIngested = await packageExistsBySourceMessage(
channel.id,
archiveSet.parts[0].id
);
if (alreadyIngested) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, sourceMessageId: Number(archiveSet.parts[0].id) },
"Archive already ingested (by source message), skipping"
);
await updateRunActivity(runId, {
currentActivity: `Skipped ${archiveName} (already ingested)`,
currentStep: "deduplicating",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
zipsDuplicate: counters.zipsDuplicate,
});
return;
}
const tempPaths: string[] = [];
let splitPaths: string[] = [];
// Per-set subdirectory so uploaded files keep their original filenames
const setDir = path.join(config.tempDir, `${ingestionRunId}_${archiveSet.parts[0].id}`);
await mkdir(setDir, { recursive: true });
try {
// ── Downloading ──
for (let partIdx = 0; partIdx < archiveSet.parts.length; partIdx++) {
const part = archiveSet.parts[partIdx];
const tempPath = path.join(
config.tempDir,
`${ingestionRunId}_${part.id}_${part.fileName}`
);
const tempPath = path.join(setDir, part.fileName);
const partLabel = archiveSet.parts.length > 1
? ` (part ${partIdx + 1}/${archiveSet.parts.length})`
@@ -526,14 +800,33 @@ async function processOneArchiveSet(
accountLog.warn({ err, baseName: archiveSet.baseName }, "Failed to read archive metadata, ingesting without file list");
}
// ── Splitting (if needed) ──
let uploadPaths = tempPaths;
// ── Splitting / Repacking (if needed) ──
let uploadPaths = [...tempPaths];
const totalSize = archiveSet.parts.reduce(
(sum, p) => sum + p.fileSize,
0n
);
const MAX_UPLOAD_SIZE = 2n * 1024n * 1024n * 1024n;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (!archiveSet.isMultipart && totalSize > 2n * 1024n * 1024n * 1024n) {
if (hasOversizedPart) {
// Full repack: concatenate all parts → single file → re-split into uniform 2GB chunks
await updateRunActivity(runId, {
currentActivity: `Repacking ${archiveName} (parts >2GB, concatenating + re-splitting)`,
currentStep: "splitting",
currentChannel: channelTitle,
currentFile: archiveName,
currentFileNum: setIdx + 1,
totalFiles: totalSets,
});
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
await concatenateFiles(tempPaths, concatPath);
splitPaths = await byteLevelSplit(concatPath);
uploadPaths = splitPaths;
// Clean up the concat intermediate file
await unlink(concatPath).catch(() => {});
} else if (!archiveSet.isMultipart && totalSize > MAX_UPLOAD_SIZE) {
// Single file >2GB: split directly
await updateRunActivity(runId, {
currentActivity: `Splitting ${archiveName} for upload (>2GB)`,
currentStep: "splitting",
@@ -595,6 +888,9 @@ async function processOneArchiveSet(
totalFiles: totalSets,
});
// Clean up any orphaned record (same hash but no dest upload) before creating
await deleteOrphanedPackageByHash(contentHash);
await createPackageWithFiles({
contentHash,
fileName: archiveName,
@@ -632,8 +928,9 @@ async function processOneArchiveSet(
"Archive ingested"
);
} finally {
// ALWAYS delete temp files
// ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]);
await rm(setDir, { recursive: true, force: true }).catch(() => {});
}
}
@@ -648,16 +945,16 @@ async function deleteFiles(paths: string[]): Promise<void> {
}
/**
* Clean up any leftover temp files from previous runs.
* Clean up any leftover temp files/directories from previous runs.
*/
export async function cleanupTempDir(): Promise<void> {
try {
const files = await readdir(config.tempDir);
for (const file of files) {
await unlink(path.join(config.tempDir, file)).catch(() => {});
const entries = await readdir(config.tempDir);
for (const entry of entries) {
await rm(path.join(config.tempDir, entry), { recursive: true, force: true }).catch(() => {});
}
if (files.length > 0) {
log.info({ count: files.length }, "Cleaned up stale temp files");
if (entries.length > 0) {
log.info({ count: entries.length }, "Cleaned up stale temp files");
}
} catch {
// Directory might not exist yet