14 Commits

Author SHA1 Message Date
7e48131f67 fix: clear timeout on race settlement to prevent orphaned timers
All checks were successful
continuous-integration/drone/push Build is passing
2026-05-02 23:44:18 +02:00
a79cb4749b fix: use per-account mutex keys in fetch/extract listeners, add cycle timeout and error logging 2026-05-02 23:40:37 +02:00
e9017fc518 feat: parallel account ingestion via per-key TDLib mutex 2026-05-02 23:31:02 +02:00
4f59d19ac2 feat: apply per-account Premium 4GB upload limit to bypass repacking 2026-05-02 23:28:00 +02:00
579276ee2d fix: widen hash lock try/finally to prevent lock leak on error paths 2026-05-02 23:24:08 +02:00
b48cc510a4 feat: add two-phase DB write and hash advisory lock to prevent double-uploads 2026-05-02 23:13:55 +02:00
614c8e5b74 feat: add createPackageStub and updatePackageWithMetadata for two-phase DB write 2026-05-02 23:06:17 +02:00
3019c23f70 feat: add per-content-hash advisory lock to prevent concurrent duplicate uploads
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 23:04:43 +02:00
436a576085 feat: detect and persist Telegram Premium status after authentication
After TDLib login completes, calls getMe() to detect isPremium, persists
it to DB via updateAccountPremiumStatus, and returns { client, isPremium }
from createTdlibClient. All callers updated to destructure accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 23:02:46 +02:00
f454303352 feat: add isPremium field to TelegramAccount
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:58:53 +02:00
e29bd79d66 chore: ignore .worktrees directory 2026-05-02 22:54:56 +02:00
61e61d0085 docs: add worker improvements implementation plan
7-task plan covering double-upload fix (hash lock + two-phase write),
parallel account ingestion (per-key mutex), and Premium 4GB upload
limit with automatic detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:47:52 +02:00
925d916a3c Merge branch 'main' of https://github.com/xCyanGrizzly/DragonsStash 2026-05-02 22:38:32 +02:00
27bacaf24c docs: add worker improvements design spec
Covers double-upload fix (two-phase DB write + hash advisory lock),
parallel account processing (remove TDLib mutex), and per-account
Premium 4GB upload limit with automatic is_premium detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:35:27 +02:00
17 changed files with 1646 additions and 202 deletions

1
.gitignore vendored
View File

@@ -54,3 +54,4 @@ src/generated
# temp files # temp files
nul nul
tmpclaude-* tmpclaude-*
.worktrees/

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
# Worker Improvements Design
**Date:** 2026-05-02
**Status:** Approved
**Scope:** Dragon's Stash Telegram ingestion worker
## Problem Statement
Three issues to address:
1. **Double-uploads**: The same archive occasionally appears twice in the destination Telegram channel. Root causes: (a) the worker crashes between `uploadToChannel()` confirming success and `createPackageWithFiles()` writing to the DB — no DB record means `recoverIncompleteUploads()` can't detect the orphaned Telegram message, and the next cycle re-uploads; (b) two accounts scanning the same source channel can both pass the hash dedup check before either creates a DB record, racing to upload the same file.
2. **Sequential account processing**: Both Telegram accounts are processed one after another via `withTdlibMutex`, even though TDLib fully supports multiple concurrent clients in the same process (each with separate `databaseDirectory` and `filesDirectory`). This halves throughput unnecessarily.
3. **Premium upload limit not used**: The Premium account can upload up to 4 GB per file, but `MAX_UPLOAD_SIZE` is hardcoded at ~1,950 MB. This causes unnecessary file splitting and expensive repack operations for files that could upload directly.
## Solution Overview
Three targeted changes, no architectural overhaul:
1. Two-phase DB write + hash advisory lock (fixes double-uploads)
2. Remove TDLib mutex from the scheduler loop (enables parallel accounts)
3. Per-account `maxUploadSize` from `getMe().is_premium` (enables 4 GB for Premium)
---
## Section 1: Double-Upload Fix
### 1a. Two-Phase DB Write
**Current flow:**
```
uploadToChannel() → preview download → metadata extraction → createPackageWithFiles()
```
If the worker crashes anywhere between upload confirmation and `createPackageWithFiles()`, no DB record exists. `recoverIncompleteUploads()` only checks packages with an existing `destMessageId` in the DB — it cannot find an orphaned Telegram message with no corresponding row.
**New flow:**
```
uploadToChannel()
→ createPackageStub() ← minimal record, destMessageId set immediately
→ preview download
→ metadata extraction
→ updatePackageWithMetadata() ← adds file list, preview, creator, tags
```
`createPackageStub()` writes: `contentHash`, `fileName`, `fileSize`, `archiveType`, `sourceChannelId`, `sourceMessageId`, `destChannelId`, `destMessageId`, `isMultipart`, `partCount`, `ingestionRunId`. File list and preview are left empty.
If the worker crashes after the stub is written:
- `recoverIncompleteUploads()` finds the record (has `destMessageId`), verifies the Telegram message exists, keeps it.
- Next cycle: `packageExistsByHash()` returns true → skips re-upload.
- The stub has `fileCount = 0` and no file listing. The UI shows "metadata pending" rather than failing silently.
Stubs with `fileCount = 0` are valid deliverable packages (the bot can still send the file). Backfilling metadata on stubs is out of scope for this change — the crash case is rare and the stub is functional.
### 1b. Hash Advisory Lock
**The race (two accounts, shared source channel):**
```
Worker A: packageExistsByHash(X) → false (no record yet)
Worker B: packageExistsByHash(X) → false (no record yet)
Worker A: uploads file → destMessageId_A
Worker B: uploads file → destMessageId_B ← duplicate Telegram message
Worker A: createPackageStub() → succeeds (contentHash @unique satisfied)
Worker B: createPackageStub() → fails unique constraint on contentHash
```
Result: two Telegram messages, one DB record. Worker B's upload is wasted.
**Fix:** Before calling `uploadToChannel()`, acquire a PostgreSQL session advisory lock keyed on the content hash:
```sql
SELECT pg_try_advisory_lock(hash_bigint)
```
Where `hash_bigint` is the first 8 bytes of the SHA-256 content hash interpreted as a signed bigint.
- `pg_try_advisory_lock` is non-blocking. If another worker holds the lock (same file, shared channel), return `false` → treat as duplicate, skip.
- After acquiring the lock, **re-run `packageExistsByHash()`** before uploading. This catches the case where another worker finished and released the lock between the first check and this one — without the re-check, the current worker would proceed to re-upload.
- The lock is session-scoped: released automatically on DB session end. No manual cleanup needed on crash.
- The lock is released explicitly after `createPackageStub()` completes (or on any error path).
**Implementation location:** New helper `tryAcquireHashLock(contentHash)` / `releaseHashLock(contentHash)` in `worker/src/db/locks.ts`, reusing the existing DB client pattern.
---
## Section 2: Parallel Account Processing
### Current Constraint
`withTdlibMutex` in `scheduler.ts` serializes all TDLib operations across accounts. This was a conservative guard, but TDLib explicitly supports multiple concurrent clients in the same process provided each has its own `databaseDirectory` and `filesDirectory`.
The codebase already satisfies this requirement:
```typescript
// worker/src/tdlib/client.ts
const dbPath = path.join(config.tdlibStateDir, account.id);
const client = createClient({
databaseDirectory: dbPath,
filesDirectory: path.join(dbPath, "files"),
});
```
Each account gets `<TDLIB_STATE_DIR>/<account.id>/` — fully isolated.
### Change
Replace the sequential `for` loop in `scheduler.ts` with `Promise.allSettled()`:
```typescript
// Before
for (const account of accounts) {
await withTdlibMutex(`ingest:${account.phone}`, () => runWorkerForAccount(account));
}
// After
await Promise.allSettled(accounts.map((account) => runWorkerForAccount(account)));
```
The per-account PostgreSQL advisory lock in `db/locks.ts` already prevents any account from being processed twice simultaneously. `Promise.allSettled()` ensures one account's failure doesn't abort the other.
The `withTdlibMutex` wrapper can be removed from the ingest path entirely. The auth path (`authenticateAccount`) should also be run in parallel but may remain guarded if TDLib auth flows have ordering dependencies — verify during implementation.
**No Docker Compose changes needed.** Both accounts run in the same container.
### Speed Limit Notifications
TDLib fires `updateSpeedLimitNotification` when an account's upload or download speed is throttled (non-Premium accounts). Log this event at `warn` level in the client update handler so it's visible in logs without being actionable.
---
## Section 3: Per-Account Premium Upload Limit
### Premium Detection
After successful authentication, call `getMe()` and read `is_premium: bool` from the returned `user` object. Store this on `TelegramAccount.isPremium` (new boolean field, default `false`, updated on each successful auth).
```typescript
const me = await client.invoke({ _: 'getMe' }) as { is_premium?: boolean };
await updateAccountPremiumStatus(account.id, me.is_premium ?? false);
```
### Upload Size Limits
| Account type | `maxUploadSize` | Effect |
|---|---|---|
| Premium | 3,950 MB | Parts ≤ 3.95 GB upload as-is; repack only for parts >3.95 GB (extremely rare) |
| Non-Premium | 1,950 MB | Current behavior unchanged |
Pass `maxUploadSize` into `processOneArchiveSet()` as a parameter (currently hardcoded as `MAX_UPLOAD_SIZE` at `worker.ts:1023` and in `archive/split.ts`).
The `hasOversizedPart` check and `byteLevelSplit` call both use this value, so the repack step is effectively eliminated for Premium accounts in practice — no separate "skip repack" flag needed.
### Migration
```prisma
model TelegramAccount {
// ... existing fields
isPremium Boolean @default(false)
}
```
One migration, one new query `updateAccountPremiumStatus(accountId, isPremium)`.
---
## Files to Change
| File | Change |
|---|---|
| `prisma/schema.prisma` | Add `isPremium Boolean @default(false)` to `TelegramAccount` |
| `worker/src/db/queries.ts` | Add `updateAccountPremiumStatus()`, `createPackageStub()`, `updatePackageWithMetadata()` |
| `worker/src/db/locks.ts` | Add `tryAcquireHashLock()`, `releaseHashLock()` |
| `worker/src/tdlib/client.ts` | Call `getMe()` after auth, return `isPremium` from `createTdlibClient()` |
| `worker/src/worker.ts` | Two-phase write, hash lock acquire/release, pass `maxUploadSize` per account |
| `worker/src/archive/split.ts` | Accept `maxPartSize` parameter instead of hardcoded constant |
| `worker/src/scheduler.ts` | Replace sequential loop with `Promise.allSettled()`, remove `withTdlibMutex` from ingest path |
---
## What Is Explicitly Out of Scope
- Backfilling metadata on stub records (rare crash case, functional without it)
- Download pre-fetching / pipeline parallelism within one account
- Two separate worker containers (single container is sufficient)
- Bot or app changes (worker-only)

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "telegram_accounts" ADD COLUMN "isPremium" BOOLEAN NOT NULL DEFAULT false;

View File

@@ -406,6 +406,7 @@ model TelegramAccount {
isActive Boolean @default(true) isActive Boolean @default(true)
authState AuthState @default(PENDING) authState AuthState @default(PENDING)
authCode String? authCode String?
isPremium Boolean @default(false)
lastSeenAt DateTime? lastSeenAt DateTime?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt

View File

@@ -18,20 +18,22 @@ const log = childLogger("split");
const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
/** /**
* Split a file into ≤2GB parts using byte-level splitting. * Split a file into parts using byte-level splitting.
* Returns paths to the split parts. If the file is already ≤2GB, returns the original path. * Returns paths to the split parts. If the file fits in one part, returns the original path.
* Pass maxPartSize to override the global default (e.g., 3950 MiB for Premium accounts).
*/ */
export async function byteLevelSplit(filePath: string): Promise<string[]> { export async function byteLevelSplit(filePath: string, maxPartSize?: bigint): Promise<string[]> {
const effectiveMax = maxPartSize ?? MAX_PART_SIZE;
const stats = await stat(filePath); const stats = await stat(filePath);
const fileSize = BigInt(stats.size); const fileSize = BigInt(stats.size);
if (fileSize <= MAX_PART_SIZE) { if (fileSize <= effectiveMax) {
return [filePath]; return [filePath];
} }
const dir = path.dirname(filePath); const dir = path.dirname(filePath);
const baseName = path.basename(filePath); const baseName = path.basename(filePath);
const partSize = Number(MAX_PART_SIZE); const partSize = Number(effectiveMax);
const totalParts = Math.ceil(Number(fileSize) / partSize); const totalParts = Math.ceil(Number(fileSize) / partSize);
const parts: string[] = []; const parts: string[] = [];

View File

@@ -79,3 +79,66 @@ export async function releaseLock(accountId: string): Promise<void> {
client.release(); client.release();
} }
} }
/**
* Derive a lock ID for a content hash. Prefixes with "hash:" so the resulting
* 32-bit integer does not collide with account advisory lock IDs.
*/
function contentHashToLockId(contentHash: string): number {
return hashToLockId(`hash:${contentHash}`);
}
/**
* Acquire a per-content-hash advisory lock before uploading.
* Prevents two concurrent workers from uploading the same archive
* when both scan a shared source channel.
*
* Returns true if acquired (proceed with upload).
* Returns false if already held (another worker is handling this archive — skip).
*
* MUST be released via releaseHashLock() after createPackageStub() completes,
* including on all error paths (use try/finally).
*/
export async function tryAcquireHashLock(contentHash: string): Promise<boolean> {
const lockId = contentHashToLockId(contentHash);
const client = await pool.connect();
try {
const result = await client.query<{ pg_try_advisory_lock: boolean }>(
"SELECT pg_try_advisory_lock($1)",
[lockId]
);
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
if (acquired) {
heldConnections.set(`hash:${contentHash}`, client);
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock acquired");
return true;
} else {
client.release();
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock held by another worker — skipping");
return false;
}
} catch (err) {
client.release();
throw err;
}
}
/**
* Release the per-content-hash advisory lock.
* Call after createPackageStub() completes (or on any error path).
*/
export async function releaseHashLock(contentHash: string): Promise<void> {
const lockId = contentHashToLockId(contentHash);
const client = heldConnections.get(`hash:${contentHash}`);
if (!client) {
log.warn({ hash: contentHash.slice(0, 16) }, "No held connection for hash lock release");
return;
}
try {
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
log.debug({ hash: contentHash.slice(0, 16) }, "Hash lock released");
} finally {
heldConnections.delete(`hash:${contentHash}`);
client.release();
}
}

View File

@@ -74,6 +74,105 @@ export async function getUploadedPackageByHash(contentHash: string) {
}); });
} }
export interface CreatePackageStubInput {
contentHash: string;
fileName: string;
fileSize: bigint;
archiveType: ArchiveType;
sourceChannelId: string;
sourceMessageId: bigint;
sourceTopicId?: bigint | null;
destChannelId: string;
destMessageId: bigint;
destMessageIds: bigint[];
isMultipart: boolean;
partCount: number;
ingestionRunId: string;
creator?: string | null;
tags?: string[];
}
/**
* Write a minimal Package record immediately after Telegram confirms the upload.
* Call this before preview/metadata extraction so recoverIncompleteUploads() can
* detect and verify the package if the worker crashes mid-metadata.
*
* Follow with updatePackageWithMetadata() once file entries and preview are ready.
*/
export async function createPackageStub(
input: CreatePackageStubInput
): Promise<{ id: string }> {
const pkg = await db.package.create({
data: {
contentHash: input.contentHash,
fileName: input.fileName,
fileSize: input.fileSize,
archiveType: input.archiveType,
sourceChannelId: input.sourceChannelId,
sourceMessageId: input.sourceMessageId,
sourceTopicId: input.sourceTopicId ?? undefined,
destChannelId: input.destChannelId,
destMessageId: input.destMessageId,
destMessageIds: input.destMessageIds,
isMultipart: input.isMultipart,
partCount: input.partCount,
fileCount: 0,
ingestionRunId: input.ingestionRunId,
creator: input.creator ?? undefined,
tags: input.tags?.length ? input.tags : undefined,
},
select: { id: true },
});
try {
await db.$queryRawUnsafe(
`SELECT pg_notify('new_package', $1)`,
JSON.stringify({
packageId: pkg.id,
fileName: input.fileName,
creator: input.creator ?? null,
tags: input.tags ?? [],
})
);
} catch {
// Best-effort
}
return pkg;
}
/**
* Update a stub Package with file entries and preview after metadata extraction.
* Called as Phase 2 of the two-phase write after createPackageStub().
*/
export async function updatePackageWithMetadata(
packageId: string,
input: {
files: {
path: string;
fileName: string;
extension: string | null;
compressedSize: bigint;
uncompressedSize: bigint;
crc32: string | null;
}[];
previewData?: Buffer | null;
previewMsgId?: bigint | null;
}
): Promise<void> {
await db.package.update({
where: { id: packageId },
data: {
fileCount: input.files.length,
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
previewMsgId: input.previewMsgId ?? undefined,
files: {
create: input.files,
},
},
});
}
/** /**
* Check if a package already exists for a given source message ID * Check if a package already exists for a given source message ID
* AND was successfully uploaded to the destination (destMessageId is set). * AND was successfully uploaded to the destination (destMessageId is set).
@@ -308,6 +407,16 @@ export async function updateAccountAuthState(
}); });
} }
export async function updateAccountPremiumStatus(
accountId: string,
isPremium: boolean
): Promise<void> {
await db.telegramAccount.update({
where: { id: accountId },
data: { isPremium },
});
}
export async function getAccountAuthCode(accountId: string) { export async function getAccountAuthCode(accountId: string) {
const account = await db.telegramAccount.findUnique({ const account = await db.telegramAccount.findUnique({
where: { id: accountId }, where: { id: accountId },

View File

@@ -101,16 +101,14 @@ export async function processExtractRequest(requestId: string): Promise<void> {
try { try {
await mkdir(tempDir, { recursive: true }); await mkdir(tempDir, { recursive: true });
// Wrap the entire TDLib session in the mutex so no other TDLib const accounts = await getActiveAccounts();
// operation can run concurrently (TDLib is single-session). if (accounts.length === 0) {
await withTdlibMutex("extract", async () => { throw new Error("No authenticated Telegram accounts available");
const accounts = await getActiveAccounts(); }
if (accounts.length === 0) { const account = accounts[0];
throw new Error("No authenticated Telegram accounts available");
}
const account = accounts[0]; await withTdlibMutex(account.phone, "extract", async () => {
const client = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
// Load chat list so TDLib can find the dest channel // Load chat list so TDLib can find the dest channel

View File

@@ -14,6 +14,7 @@ import {
getGlobalSetting, getGlobalSetting,
setGlobalSetting, setGlobalSetting,
getActiveAccounts, getActiveAccounts,
getChannelFetchRequest,
upsertChannel, upsertChannel,
ensureAccountChannelLink, ensureAccountChannelLink,
updateFetchRequestStatus, updateFetchRequestStatus,
@@ -133,7 +134,9 @@ let fetchQueue: Promise<void> = Promise.resolve();
function handleChannelFetch(requestId: string): void { function handleChannelFetch(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("fetch-channels", () => const request = await getChannelFetchRequest(requestId);
const key = request?.account?.phone ?? "global";
await withTdlibMutex(key, "fetch-channels", () =>
processFetchRequest(requestId) processFetchRequest(requestId)
); );
} catch (err) { } catch (err) {
@@ -147,22 +150,20 @@ function handleChannelFetch(requestId: string): void {
function handleGenerateInvite(channelId: string): void { function handleGenerateInvite(channelId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("generate-invite", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
await withTdlibMutex(account.phone, "generate-invite", async () => {
const destChannel = await getGlobalDestinationChannel(); const destChannel = await getGlobalDestinationChannel();
if (!destChannel || destChannel.id !== channelId) { if (!destChannel || destChannel.id !== channelId) {
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
return; return;
} }
// Use the first available authenticated account to generate the link const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const link = await generateInviteLink(client, destChannel.telegramId); const link = await generateInviteLink(client, destChannel.telegramId);
@@ -187,7 +188,13 @@ function handleCreateDestination(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; title: string }; const parsed = JSON.parse(payload) as { requestId: string; title: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("create-destination", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
await withTdlibMutex(account.phone, "create-destination", async () => {
const { db } = await import("./db/client.js"); const { db } = await import("./db/client.js");
// Mark the request as in-progress // Mark the request as in-progress
@@ -196,14 +203,7 @@ function handleCreateDestination(payload: string): void {
data: { status: "IN_PROGRESS" }, data: { status: "IN_PROGRESS" },
}); });
// Use the first available authenticated account const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
// Create the supergroup via TDLib // Create the supergroup via TDLib
@@ -328,16 +328,16 @@ function handleJoinChannel(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("join-channel", async () => { const accounts = await getActiveAccounts();
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
if (!account) {
throw new Error("No authenticated accounts available");
}
await withTdlibMutex(account.phone, "join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS"); await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const accounts = await getActiveAccounts(); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
if (!account) {
throw new Error("No authenticated accounts available");
}
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const linkInfo = parseTelegramInput(parsed.input); const linkInfo = parseTelegramInput(parsed.input);
@@ -507,7 +507,12 @@ function handleIngestionTrigger(): void {
function handleRebuildPackages(requestId: string): void { function handleRebuildPackages(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("rebuild-packages", () => const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to rebuild packages");
return;
}
await withTdlibMutex(accounts[0].phone, "rebuild-packages", () =>
rebuildPackageDatabase(requestId) rebuildPackageDatabase(requestId)
); );
} catch (err) { } catch (err) {

View File

@@ -49,7 +49,7 @@ export async function processManualUpload(uploadId: string): Promise<void> {
const account = accounts[0]; const account = accounts[0];
if (!account) throw new Error("No authenticated Telegram account available"); if (!account) throw new Error("No authenticated Telegram account available");
const client = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const packageIds: string[] = []; const packageIds: string[] = [];

View File

@@ -63,7 +63,7 @@ export async function rebuildPackageDatabase(
} }
const account = accounts[0]; const account = accounts[0];
const client = await createTdlibClient({ const { client } = await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); });

View File

@@ -63,7 +63,7 @@ export async function recoverIncompleteUploads(): Promise<void> {
let client: Client | undefined; let client: Client | undefined;
try { try {
client = await createTdlibClient({ id: account.id, phone: account.phone }); ({ client } = await createTdlibClient({ id: account.id, phone: account.phone }));
// Load the chat list so TDLib can resolve chat IDs // Load the chat list so TDLib can resolve chat IDs
try { try {

View File

@@ -24,8 +24,8 @@ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
* 2. Process all active AUTHENTICATED accounts for ingestion * 2. Process all active AUTHENTICATED accounts for ingestion
* *
* All TDLib operations are wrapped in the mutex to ensure only one client * Each account's TDLib operations are wrapped in a per-key mutex so different
* runs at a time (also shared with the fetch listener for on-demand requests). * accounts run concurrently while the same account is still serialized.
* *
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h). * The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress * Once the timeout elapses, no new accounts will be started but any in-progress
@@ -55,7 +55,7 @@ async function runCycle(): Promise<void> {
log.warn("Cycle timeout reached during authentication phase, stopping"); log.warn("Cycle timeout reached during authentication phase, stopping");
break; break;
} }
await withTdlibMutex(`auth:${account.phone}`, () => await withTdlibMutex(account.phone, `auth:${account.phone}`, () =>
authenticateAccount(account) authenticateAccount(account)
); );
} }
@@ -71,17 +71,30 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) { const results = await Promise.allSettled(
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { accounts.map((account) => {
log.warn( let timer: ReturnType<typeof setTimeout>;
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, return Promise.race([
"Cycle timeout reached, skipping remaining accounts" withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
runWorkerForAccount(account)
),
new Promise<never>((_, reject) => {
timer = setTimeout(
() => reject(new Error(`Account ${account.phone} ingestion timed out after ${CYCLE_TIMEOUT_MS / 60_000}min`)),
CYCLE_TIMEOUT_MS
);
}),
]).finally(() => clearTimeout(timer));
})
);
for (let i = 0; i < results.length; i++) {
if (results[i].status === "rejected") {
log.error(
{ phone: accounts[i].phone, err: (results[i] as PromiseRejectedResult).reason },
"Account ingestion failed"
); );
break;
} }
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account)
);
} }
log.info( log.info(

View File

@@ -6,6 +6,7 @@ import { childLogger } from "../util/logger.js";
import { import {
updateAccountAuthState, updateAccountAuthState,
getAccountAuthCode, getAccountAuthCode,
updateAccountPremiumStatus,
} from "../db/queries.js"; } from "../db/queries.js";
const log = childLogger("tdlib-client"); const log = childLogger("tdlib-client");
@@ -27,7 +28,7 @@ interface AccountConfig {
*/ */
export async function createTdlibClient( export async function createTdlibClient(
account: AccountConfig account: AccountConfig
): Promise<Client> { ): Promise<{ client: Client; isPremium: boolean }> {
const dbPath = path.join(config.tdlibStateDir, account.id); const dbPath = path.join(config.tdlibStateDir, account.id);
const client = createClient({ const client = createClient({
@@ -78,7 +79,30 @@ export async function createTdlibClient(
await updateAccountAuthState(account.id, "AUTHENTICATED"); await updateAccountAuthState(account.id, "AUTHENTICATED");
log.info({ accountId: account.id }, "TDLib client authenticated"); log.info({ accountId: account.id }, "TDLib client authenticated");
return client;
let isPremium = false;
try {
const me = await client.invoke({ _: "getMe" }) as { is_premium?: boolean };
isPremium = me.is_premium ?? false;
await updateAccountPremiumStatus(account.id, isPremium);
log.info({ accountId: account.id, isPremium }, "Account Premium status detected");
} catch (err) {
log.warn({ err, accountId: account.id }, "Could not detect Premium status, defaulting to false");
}
client.on("update", (update: unknown) => {
const u = update as { _?: string; is_upload?: boolean };
if (u?._ === "updateSpeedLimitNotification") {
log.warn(
{ accountId: account.id, isUpload: u.is_upload },
u.is_upload
? "Upload speed limited by Telegram (account is not Premium)"
: "Download speed limited by Telegram (account is not Premium)"
);
}
});
return { client, isPremium };
} catch (err) { } catch (err) {
log.error({ err, accountId: account.id }, "TDLib authentication failed"); log.error({ err, accountId: account.id }, "TDLib authentication failed");
await updateAccountAuthState(account.id, "EXPIRED"); await updateAccountAuthState(account.id, "EXPIRED");

View File

@@ -2,39 +2,43 @@ import { childLogger } from "./logger.js";
const log = childLogger("mutex"); const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000; const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
const locks = new Map<string, boolean>();
const holders = new Map<string, string>();
const queues = new Map<
string,
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
>();
/** /**
* Ensures only one TDLib client runs at a time across the entire worker process. * Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this * Different keys run concurrently — this allows two accounts to ingest in parallel
* before creating any TDLib client. * while still preventing concurrent use of the same account's TDLib state dir.
* *
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs. * key: the account phone number for account-specific ops (auth, ingest),
* or 'global' for ops that don't belong to a specific account.
* label: human-readable name for logging.
*/ */
export async function withTdlibMutex<T>( export async function withTdlibMutex<T>(
key: string,
label: string, label: string,
fn: () => Promise<T> fn: () => Promise<T>
): Promise<T> { ): Promise<T> {
if (locked) { if (locks.get(key)) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex");
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => { const timer = setTimeout(() => {
const idx = queue.indexOf(entry); const q = queues.get(key) ?? [];
const idx = q.indexOf(entry);
if (idx !== -1) { if (idx !== -1) {
queue.splice(idx, 1); q.splice(idx, 1);
reject(new Error( reject(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + new Error(
`(waiting: ${label}, holder: ${holder})` `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
)); `(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})`
)
);
} }
}, MUTEX_WAIT_TIMEOUT_MS); }, MUTEX_WAIT_TIMEOUT_MS);
@@ -46,25 +50,28 @@ export async function withTdlibMutex<T>(
reject, reject,
label, label,
}; };
queue.push(entry);
if (!queues.has(key)) queues.set(key, []);
queues.get(key)!.push(entry);
}); });
} }
locked = true; locks.set(key, true);
holder = label; holders.set(key, label);
log.debug({ label }, "TDLib mutex acquired"); log.debug({ key, label }, "TDLib mutex acquired");
try { try {
return await fn(); return await fn();
} finally { } finally {
locked = false; locks.delete(key);
holder = ""; holders.delete(key);
const next = queue.shift(); const next = queues.get(key)?.shift();
if (next) { if (next) {
log.debug({ next: next.label }, "TDLib mutex releasing to next waiter"); log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter");
next.resolve(); next.resolve();
} else { } else {
log.debug({ label }, "TDLib mutex released"); queues.delete(key);
log.debug({ key, label }, "TDLib mutex released");
} }
} }
} }

View File

@@ -2,13 +2,14 @@ import path from "path";
import { unlink, readdir, mkdir, rm } from "fs/promises"; import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js"; import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js"; import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock } from "./db/locks.js"; import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
import { import {
getSourceChannelMappings, getSourceChannelMappings,
getGlobalDestinationChannel, getGlobalDestinationChannel,
packageExistsByHash, packageExistsByHash,
packageExistsBySourceMessage, packageExistsBySourceMessage,
createPackageWithFiles, createPackageStub,
updatePackageWithMetadata,
createIngestionRun, createIngestionRun,
completeIngestionRun, completeIngestionRun,
failIngestionRun, failIngestionRun,
@@ -73,10 +74,10 @@ export async function authenticateAccount(
let client: Client | undefined; let client: Client | undefined;
try { try {
client = await createTdlibClient({ client = (await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); })).client;
aLog.info("Authentication successful"); aLog.info("Authentication successful");
// Auto-fetch channels and create a fetch request result // Auto-fetch channels and create a fetch request result
@@ -131,7 +132,7 @@ export async function processFetchRequest(requestId: string): Promise<void> {
await updateFetchRequestStatus(requestId, "IN_PROGRESS"); await updateFetchRequestStatus(requestId, "IN_PROGRESS");
aLog.info({ accountId: request.accountId }, "Processing fetch request"); aLog.info({ accountId: request.accountId }, "Processing fetch request");
const client = await createTdlibClient({ const { client } = await createTdlibClient({
id: request.account.id, id: request.account.id,
phone: request.account.phone, phone: request.account.phone,
}); });
@@ -301,6 +302,7 @@ interface PipelineContext {
/** Forum topic ID (null for non-forum). */ /** Forum topic ID (null for non-forum). */
sourceTopicId: bigint | null; sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>; accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
} }
/** /**
@@ -336,10 +338,13 @@ export async function runWorkerForAccount(
currentStep: "connecting", currentStep: "connecting",
}); });
const client = await createTdlibClient({ const { client, isPremium } = await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); });
const maxUploadSize = isPremium
? 3950n * 1024n * 1024n
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
// Load all chats into TDLib's local cache using loadChats (the recommended API). // Load all chats into TDLib's local cache using loadChats (the recommended API).
// Without this, getChat/searchChatMessages fail with "Chat not found". // Without this, getChat/searchChatMessages fail with "Chat not found".
@@ -452,6 +457,7 @@ export async function runWorkerForAccount(
topicCreator: null, topicCreator: null,
sourceTopicId: null, sourceTopicId: null,
accountLog, accountLog,
maxUploadSize,
}; };
if (forum) { if (forum) {
@@ -1027,6 +1033,35 @@ async function processOneArchiveSet(
return null; return null;
} }
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
const hashLockAcquired = await tryAcquireHashLock(contentHash);
if (!hashLockAcquired) {
counters.zipsDuplicate++;
accountLog.info(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Hash lock held by another worker — skipping concurrent duplicate"
);
return null;
}
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
let creator: string | null = null;
const tags: string[] = [];
let stub: { id: string } | null = null;
try {
// Re-check after acquiring lock: another worker may have finished between
// the first check above and this point.
const existsAfterLock = await packageExistsByHash(contentHash);
if (existsAfterLock) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Duplicate detected after acquiring hash lock — skipping"
);
return null;
}
// ── Reading metadata ── // ── Reading metadata ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Reading file list from ${archiveName}`, currentActivity: `Reading file list from ${archiveName}`,
@@ -1037,7 +1072,6 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
try { try {
if (archiveSet.type === "ZIP") { if (archiveSet.type === "ZIP") {
entries = await readZipCentralDirectory(tempPaths); entries = await readZipCentralDirectory(tempPaths);
@@ -1069,7 +1103,7 @@ async function processOneArchiveSet(
(sum, p) => sum + p.fileSize, (sum, p) => sum + p.fileSize,
0n 0n
); );
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE); const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (hasOversizedPart) { if (hasOversizedPart) {
@@ -1084,7 +1118,7 @@ async function processOneArchiveSet(
}); });
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`); const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
await concatenateFiles(tempPaths, concatPath); await concatenateFiles(tempPaths, concatPath);
splitPaths = await byteLevelSplit(concatPath); splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
uploadPaths = splitPaths; uploadPaths = splitPaths;
// Clean up the concat intermediate file // Clean up the concat intermediate file
await unlink(concatPath).catch(() => {}); await unlink(concatPath).catch(() => {});
@@ -1098,7 +1132,7 @@ async function processOneArchiveSet(
currentFileNum: setIdx + 1, currentFileNum: setIdx + 1,
totalFiles: totalSets, totalFiles: totalSets,
}); });
splitPaths = await byteLevelSplit(tempPaths[0]); splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
uploadPaths = splitPaths; uploadPaths = splitPaths;
} }
@@ -1139,72 +1173,112 @@ async function processOneArchiveSet(
); );
} }
// ── Uploading ── // ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario: // Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing) // file reached Telegram but DB write failed or worker crashed before indexing)
const existingUpload = await getUploadedPackageByHash(contentHash); const existingUpload = await getUploadedPackageByHash(contentHash);
let destResult: { messageId: bigint; messageIds: bigint[] }; let destResult: { messageId: bigint; messageIds: bigint[] };
if (existingUpload && existingUpload.destMessageId) { if (existingUpload && existingUpload.destMessageId) {
accountLog.info( accountLog.info(
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) }, { fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
"Reusing existing upload (file already on destination channel)" "Reusing existing upload (file already on destination channel)"
); );
destResult = { destResult = {
messageId: existingUpload.destMessageId, messageId: existingUpload.destMessageId,
messageIds: existingUpload.destMessageIds?.length messageIds: existingUpload.destMessageIds?.length
? (existingUpload.destMessageIds as bigint[]) ? (existingUpload.destMessageIds as bigint[])
: [existingUpload.destMessageId], : [existingUpload.destMessageId],
}; };
} else { } else {
const uploadLabel = uploadPaths.length > 1 const uploadLabel = uploadPaths.length > 1
? ` (${uploadPaths.length} parts)` ? ` (${uploadPaths.length} parts)`
: ""; : "";
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`, currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
currentStep: "uploading", currentStep: "uploading",
currentChannel: channelTitle, currentChannel: channelTitle,
currentFile: archiveName, currentFile: archiveName,
currentFileNum: setIdx + 1, currentFileNum: setIdx + 1,
totalFiles: totalSets, totalFiles: totalSets,
});
destResult = await uploadToChannel(
client,
destChannelTelegramId,
uploadPaths
);
}
// ── Post-upload integrity check ──
// Verify the files on disk still match before we index
if (uploadPaths.length > 0 && !existingUpload) {
try {
const postUploadHash = await hashParts(uploadPaths);
if (splitPaths.length > 0) {
// Split files — hash should match the split hash (already verified above)
// No additional check needed since we verified split hash = original hash
} else if (postUploadHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
"Hash changed between hashing and upload — possible disk corruption"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Post-upload hash mismatch: ${archiveName}`,
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}`,
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
},
});
}
} catch {
// Best-effort — don't fail the ingestion
}
}
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);
creator =
topicCreator ??
extractCreatorFromFileName(archiveName) ??
extractCreatorFromChannelTitle(channelTitle) ??
null;
if (channel.category) {
tags.push(channel.category);
}
stub = await createPackageStub({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
}); });
destResult = await uploadToChannel( counters.zipsIngested++;
client, await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
destChannelTelegramId, } finally {
uploadPaths await releaseHashLock(contentHash);
);
} }
// ── Post-upload integrity check ── if (!stub) return null;
// Verify the files on disk still match before we index
if (uploadPaths.length > 0 && !existingUpload) {
try {
const postUploadHash = await hashParts(uploadPaths);
if (splitPaths.length > 0) {
// Split files — hash should match the split hash (already verified above)
// No additional check needed since we verified split hash = original hash
} else if (postUploadHash !== contentHash) {
accountLog.error(
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
"Hash changed between hashing and upload — possible disk corruption"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Post-upload hash mismatch: ${archiveName}`,
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}`,
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
},
});
}
} catch {
// Best-effort — don't fail the ingestion
}
}
// ── Preview thumbnail ── // ── Preview thumbnail ──
// (moved here from before stub creation — lock is released, preview doesn't need it)
let previewData: Buffer | null = null; let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null; let previewMsgId: bigint | null = null;
const matchedPhoto = previewMatches.get(archiveSet.baseName); const matchedPhoto = previewMatches.get(archiveSet.baseName);
@@ -1218,8 +1292,6 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId); previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
// Only set previewMsgId if we actually got the image data —
// otherwise the UI thinks there's a preview but the API returns 404
if (previewData) { if (previewData) {
previewMsgId = matchedPhoto.id; previewMsgId = matchedPhoto.id;
} }
@@ -1242,13 +1314,7 @@ async function processOneArchiveSet(
} }
} }
// ── Resolve creator: topic name > filename extraction > channel title > null ── // ── Phase 2: Update stub with file entries and preview ──
const creator = topicCreator
?? extractCreatorFromFileName(archiveName)
?? extractCreatorFromChannelTitle(channelTitle)
?? null;
// ── Indexing ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`, currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
currentStep: "indexing", currentStep: "indexing",
@@ -1258,43 +1324,12 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
// Clean up any orphaned record (same hash but no dest upload) before creating await updatePackageWithMetadata(stub.id, {
await deleteOrphanedPackageByHash(contentHash); files: entries,
// Auto-inherit source channel category as initial tag
const tags: string[] = [];
if (channel.category) {
tags.push(channel.category);
}
const pkg = await createPackageWithFiles({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart:
archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
previewData, previewData,
previewMsgId, previewMsgId,
sourceCaption: archiveSet.parts[0].caption ?? null,
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
files: entries,
}); });
counters.zipsIngested++;
// Clean up any prior skip record for this archive
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`, currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
currentStep: "complete", currentStep: "complete",
@@ -1310,7 +1345,7 @@ async function processOneArchiveSet(
"Archive ingested" "Archive ingested"
); );
return pkg.id; return stub.id;
} finally { } finally {
// ALWAYS delete temp files and the set directory // ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]); await deleteFiles([...tempPaths, ...splitPaths]);