18 Commits

Author SHA1 Message Date
59038889ae fix: prevent pool exhaustion that caused 4-hour duplicate check stall
All checks were successful
continuous-integration/drone/push Build is passing
The pg pool had max=5 connections shared between Prisma operations and
advisory locks. With 2 account locks held permanently and hash locks
from timed-out (but still running) background work, pool.connect()
would block forever — causing the Turnbase.7z stall.

- Increase pool max from 5 to 15 for headroom
- Add 30s connectionTimeoutMillis so pool.connect() throws instead of
  hanging forever when the pool is exhausted
- On startup, terminate zombie PostgreSQL sessions from previous worker
  instances that hold stale advisory locks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:39:00 +02:00
77c26adb31 perf: set watermarks even when no archives found to prevent re-scanning
All checks were successful
continuous-integration/drone/push Build is passing
Previously, channels/topics with no new archives never had their
watermark updated. This meant every cycle re-scanned all messages from
scratch just to discover nothing new — especially costly for the 1079-
topic Model Printing Emporium forum.

- Add maxScannedMessageId to ChannelScanResult (highest msg ID seen)
- Set channel watermark to scan boundary when no archives are found
- Set topic watermark to scan boundary when no archives are found
- Fall back to scan watermark when archive processing doesn't advance it

After one full cycle, subsequent cycles will skip already-scanned
messages via the early-exit boundary check, dramatically reducing
TDLib API calls on channels with mostly non-archive content.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 20:37:42 +02:00
35cce3151c perf: early-exit channel scan when all messages are below watermark
searchChatMessages returns newest-first. Once the oldest message on a
page is at or below the lastProcessedMessageId boundary, all remaining
pages are even older. Stop scanning immediately instead of reading every
message in the channel.

This was already implemented for topic scans but missing from channel
scans. On a test run, total messages scanned dropped from 3805 to 1615
(57% reduction) for an account with no new archives.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 19:58:30 +02:00
d6c82ede1e fix: auto-recover from TDLib upload stalls by recreating client
When TDLib's event stream degrades, uploads complete (bytes sent) but
confirmations never arrive. Previously the worker retried 3x with the
same broken client, wasting 60+ min per archive and holding the mutex.

- Add UploadStallError class to distinguish stalls from other failures
- Reduce stall detection timeout from 5min to 3min (faster detection)
- Recreate TDLib client after consecutive upload stalls instead of
  retrying on the same degraded connection
- Add forceReleaseMutex() to prevent cascade failures when one account
  blocks others via stuck mutex after cycle timeout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-04 18:02:42 +02:00
7e48131f67 fix: clear timeout on race settlement to prevent orphaned timers
All checks were successful
continuous-integration/drone/push Build is passing
2026-05-02 23:44:18 +02:00
a79cb4749b fix: use per-account mutex keys in fetch/extract listeners, add cycle timeout and error logging 2026-05-02 23:40:37 +02:00
e9017fc518 feat: parallel account ingestion via per-key TDLib mutex 2026-05-02 23:31:02 +02:00
4f59d19ac2 feat: apply per-account Premium 4GB upload limit to bypass repacking 2026-05-02 23:28:00 +02:00
579276ee2d fix: widen hash lock try/finally to prevent lock leak on error paths 2026-05-02 23:24:08 +02:00
b48cc510a4 feat: add two-phase DB write and hash advisory lock to prevent double-uploads 2026-05-02 23:13:55 +02:00
614c8e5b74 feat: add createPackageStub and updatePackageWithMetadata for two-phase DB write 2026-05-02 23:06:17 +02:00
3019c23f70 feat: add per-content-hash advisory lock to prevent concurrent duplicate uploads
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 23:04:43 +02:00
436a576085 feat: detect and persist Telegram Premium status after authentication
After TDLib login completes, calls getMe() to detect isPremium, persists
it to DB via updateAccountPremiumStatus, and returns { client, isPremium }
from createTdlibClient. All callers updated to destructure accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 23:02:46 +02:00
f454303352 feat: add isPremium field to TelegramAccount
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:58:53 +02:00
e29bd79d66 chore: ignore .worktrees directory 2026-05-02 22:54:56 +02:00
61e61d0085 docs: add worker improvements implementation plan
7-task plan covering double-upload fix (hash lock + two-phase write),
parallel account ingestion (per-key mutex), and Premium 4GB upload
limit with automatic detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:47:52 +02:00
925d916a3c Merge branch 'main' of https://github.com/xCyanGrizzly/DragonsStash 2026-05-02 22:38:32 +02:00
27bacaf24c docs: add worker improvements design spec
Covers double-upload fix (two-phase DB write + hash advisory lock),
parallel account processing (remove TDLib mutex), and per-account
Premium 4GB upload limit with automatic is_premium detection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 22:35:27 +02:00
22 changed files with 1855 additions and 219 deletions

1
.gitignore vendored
View File

@@ -54,3 +54,4 @@ src/generated
# temp files # temp files
nul nul
tmpclaude-* tmpclaude-*
.worktrees/

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
# Worker Improvements Design
**Date:** 2026-05-02
**Status:** Approved
**Scope:** Dragon's Stash Telegram ingestion worker
## Problem Statement
Three issues to address:
1. **Double-uploads**: The same archive occasionally appears twice in the destination Telegram channel. Root causes: (a) the worker crashes between `uploadToChannel()` confirming success and `createPackageWithFiles()` writing to the DB — no DB record means `recoverIncompleteUploads()` can't detect the orphaned Telegram message, and the next cycle re-uploads; (b) two accounts scanning the same source channel can both pass the hash dedup check before either creates a DB record, racing to upload the same file.
2. **Sequential account processing**: Both Telegram accounts are processed one after another via `withTdlibMutex`, even though TDLib fully supports multiple concurrent clients in the same process (each with separate `databaseDirectory` and `filesDirectory`). This halves throughput unnecessarily.
3. **Premium upload limit not used**: The Premium account can upload up to 4 GB per file, but `MAX_UPLOAD_SIZE` is hardcoded at ~1,950 MB. This causes unnecessary file splitting and expensive repack operations for files that could upload directly.
## Solution Overview
Three targeted changes, no architectural overhaul:
1. Two-phase DB write + hash advisory lock (fixes double-uploads)
2. Remove TDLib mutex from the scheduler loop (enables parallel accounts)
3. Per-account `maxUploadSize` from `getMe().is_premium` (enables 4 GB for Premium)
---
## Section 1: Double-Upload Fix
### 1a. Two-Phase DB Write
**Current flow:**
```
uploadToChannel() → preview download → metadata extraction → createPackageWithFiles()
```
If the worker crashes anywhere between upload confirmation and `createPackageWithFiles()`, no DB record exists. `recoverIncompleteUploads()` only checks packages with an existing `destMessageId` in the DB — it cannot find an orphaned Telegram message with no corresponding row.
**New flow:**
```
uploadToChannel()
→ createPackageStub() ← minimal record, destMessageId set immediately
→ preview download
→ metadata extraction
→ updatePackageWithMetadata() ← adds file list, preview, creator, tags
```
`createPackageStub()` writes: `contentHash`, `fileName`, `fileSize`, `archiveType`, `sourceChannelId`, `sourceMessageId`, `destChannelId`, `destMessageId`, `isMultipart`, `partCount`, `ingestionRunId`. File list and preview are left empty.
If the worker crashes after the stub is written:
- `recoverIncompleteUploads()` finds the record (has `destMessageId`), verifies the Telegram message exists, keeps it.
- Next cycle: `packageExistsByHash()` returns true → skips re-upload.
- The stub has `fileCount = 0` and no file listing. The UI shows "metadata pending" rather than failing silently.
Stubs with `fileCount = 0` are valid deliverable packages (the bot can still send the file). Backfilling metadata on stubs is out of scope for this change — the crash case is rare and the stub is functional.
### 1b. Hash Advisory Lock
**The race (two accounts, shared source channel):**
```
Worker A: packageExistsByHash(X) → false (no record yet)
Worker B: packageExistsByHash(X) → false (no record yet)
Worker A: uploads file → destMessageId_A
Worker B: uploads file → destMessageId_B ← duplicate Telegram message
Worker A: createPackageStub() → succeeds (contentHash @unique satisfied)
Worker B: createPackageStub() → fails unique constraint on contentHash
```
Result: two Telegram messages, one DB record. Worker B's upload is wasted.
**Fix:** Before calling `uploadToChannel()`, acquire a PostgreSQL session advisory lock keyed on the content hash:
```sql
SELECT pg_try_advisory_lock(hash_bigint)
```
Where `hash_bigint` is the first 8 bytes of the SHA-256 content hash interpreted as a signed bigint.
- `pg_try_advisory_lock` is non-blocking. If another worker holds the lock (same file, shared channel), return `false` → treat as duplicate, skip.
- After acquiring the lock, **re-run `packageExistsByHash()`** before uploading. This catches the case where another worker finished and released the lock between the first check and this one — without the re-check, the current worker would proceed to re-upload.
- The lock is session-scoped: released automatically on DB session end. No manual cleanup needed on crash.
- The lock is released explicitly after `createPackageStub()` completes (or on any error path).
**Implementation location:** New helper `tryAcquireHashLock(contentHash)` / `releaseHashLock(contentHash)` in `worker/src/db/locks.ts`, reusing the existing DB client pattern.
---
## Section 2: Parallel Account Processing
### Current Constraint
`withTdlibMutex` in `scheduler.ts` serializes all TDLib operations across accounts. This was a conservative guard, but TDLib explicitly supports multiple concurrent clients in the same process provided each has its own `databaseDirectory` and `filesDirectory`.
The codebase already satisfies this requirement:
```typescript
// worker/src/tdlib/client.ts
const dbPath = path.join(config.tdlibStateDir, account.id);
const client = createClient({
databaseDirectory: dbPath,
filesDirectory: path.join(dbPath, "files"),
});
```
Each account gets `<TDLIB_STATE_DIR>/<account.id>/` — fully isolated.
### Change
Replace the sequential `for` loop in `scheduler.ts` with `Promise.allSettled()`:
```typescript
// Before
for (const account of accounts) {
await withTdlibMutex(`ingest:${account.phone}`, () => runWorkerForAccount(account));
}
// After
await Promise.allSettled(accounts.map((account) => runWorkerForAccount(account)));
```
The per-account PostgreSQL advisory lock in `db/locks.ts` already prevents any account from being processed twice simultaneously. `Promise.allSettled()` ensures one account's failure doesn't abort the other.
The `withTdlibMutex` wrapper can be removed from the ingest path entirely. The auth path (`authenticateAccount`) should also be run in parallel but may remain guarded if TDLib auth flows have ordering dependencies — verify during implementation.
**No Docker Compose changes needed.** Both accounts run in the same container.
### Speed Limit Notifications
TDLib fires `updateSpeedLimitNotification` when an account's upload or download speed is throttled (non-Premium accounts). Log this event at `warn` level in the client update handler so it's visible in logs without being actionable.
---
## Section 3: Per-Account Premium Upload Limit
### Premium Detection
After successful authentication, call `getMe()` and read `is_premium: bool` from the returned `user` object. Store this on `TelegramAccount.isPremium` (new boolean field, default `false`, updated on each successful auth).
```typescript
const me = await client.invoke({ _: 'getMe' }) as { is_premium?: boolean };
await updateAccountPremiumStatus(account.id, me.is_premium ?? false);
```
### Upload Size Limits
| Account type | `maxUploadSize` | Effect |
|---|---|---|
| Premium | 3,950 MB | Parts ≤ 3.95 GB upload as-is; repack only for parts >3.95 GB (extremely rare) |
| Non-Premium | 1,950 MB | Current behavior unchanged |
Pass `maxUploadSize` into `processOneArchiveSet()` as a parameter (currently hardcoded as `MAX_UPLOAD_SIZE` at `worker.ts:1023` and in `archive/split.ts`).
The `hasOversizedPart` check and `byteLevelSplit` call both use this value, so the repack step is effectively eliminated for Premium accounts in practice — no separate "skip repack" flag needed.
### Migration
```prisma
model TelegramAccount {
// ... existing fields
isPremium Boolean @default(false)
}
```
One migration, one new query `updateAccountPremiumStatus(accountId, isPremium)`.
---
## Files to Change
| File | Change |
|---|---|
| `prisma/schema.prisma` | Add `isPremium Boolean @default(false)` to `TelegramAccount` |
| `worker/src/db/queries.ts` | Add `updateAccountPremiumStatus()`, `createPackageStub()`, `updatePackageWithMetadata()` |
| `worker/src/db/locks.ts` | Add `tryAcquireHashLock()`, `releaseHashLock()` |
| `worker/src/tdlib/client.ts` | Call `getMe()` after auth, return `isPremium` from `createTdlibClient()` |
| `worker/src/worker.ts` | Two-phase write, hash lock acquire/release, pass `maxUploadSize` per account |
| `worker/src/archive/split.ts` | Accept `maxPartSize` parameter instead of hardcoded constant |
| `worker/src/scheduler.ts` | Replace sequential loop with `Promise.allSettled()`, remove `withTdlibMutex` from ingest path |
---
## What Is Explicitly Out of Scope
- Backfilling metadata on stub records (rare crash case, functional without it)
- Download pre-fetching / pipeline parallelism within one account
- Two separate worker containers (single container is sufficient)
- Bot or app changes (worker-only)

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "telegram_accounts" ADD COLUMN "isPremium" BOOLEAN NOT NULL DEFAULT false;

View File

@@ -406,6 +406,7 @@ model TelegramAccount {
isActive Boolean @default(true) isActive Boolean @default(true)
authState AuthState @default(PENDING) authState AuthState @default(PENDING)
authCode String? authCode String?
isPremium Boolean @default(false)
lastSeenAt DateTime? lastSeenAt DateTime?
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt

View File

@@ -18,20 +18,22 @@ const log = childLogger("split");
const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
/** /**
* Split a file into ≤2GB parts using byte-level splitting. * Split a file into parts using byte-level splitting.
* Returns paths to the split parts. If the file is already ≤2GB, returns the original path. * Returns paths to the split parts. If the file fits in one part, returns the original path.
* Pass maxPartSize to override the global default (e.g., 3950 MiB for Premium accounts).
*/ */
export async function byteLevelSplit(filePath: string): Promise<string[]> { export async function byteLevelSplit(filePath: string, maxPartSize?: bigint): Promise<string[]> {
const effectiveMax = maxPartSize ?? MAX_PART_SIZE;
const stats = await stat(filePath); const stats = await stat(filePath);
const fileSize = BigInt(stats.size); const fileSize = BigInt(stats.size);
if (fileSize <= MAX_PART_SIZE) { if (fileSize <= effectiveMax) {
return [filePath]; return [filePath];
} }
const dir = path.dirname(filePath); const dir = path.dirname(filePath);
const baseName = path.basename(filePath); const baseName = path.basename(filePath);
const partSize = Number(MAX_PART_SIZE); const partSize = Number(effectiveMax);
const totalParts = Math.ceil(Number(fileSize) / partSize); const totalParts = Math.ceil(Number(fileSize) / partSize);
const parts: string[] = []; const parts: string[] = [];

View File

@@ -5,7 +5,14 @@ import { config } from "../util/config.js";
const pool = new pg.Pool({ const pool = new pg.Pool({
connectionString: config.databaseUrl, connectionString: config.databaseUrl,
max: 5, // Pool needs headroom for: 2 account advisory locks (held for entire cycle),
// up to 2 concurrent hash locks, plus Prisma operations from both accounts.
// Previously max=5 caused pool exhaustion and indefinite hangs.
max: 15,
// Prevent pool.connect() from blocking forever when pool is exhausted.
// Throws an error after 30s so the operation can fail and retry instead of
// silently hanging for hours (as happened with the Turnbase.7z stall).
connectionTimeoutMillis: 30_000,
}); });
const adapter = new PrismaPg(pool); const adapter = new PrismaPg(pool);

View File

@@ -79,3 +79,66 @@ export async function releaseLock(accountId: string): Promise<void> {
client.release(); client.release();
} }
} }
/**
* Derive a lock ID for a content hash. Prefixes with "hash:" so the resulting
* 32-bit integer does not collide with account advisory lock IDs.
*/
function contentHashToLockId(contentHash: string): number {
return hashToLockId(`hash:${contentHash}`);
}
/**
* Acquire a per-content-hash advisory lock before uploading.
* Prevents two concurrent workers from uploading the same archive
* when both scan a shared source channel.
*
* Returns true if acquired (proceed with upload).
* Returns false if already held (another worker is handling this archive — skip).
*
* MUST be released via releaseHashLock() after createPackageStub() completes,
* including on all error paths (use try/finally).
*/
export async function tryAcquireHashLock(contentHash: string): Promise<boolean> {
const lockId = contentHashToLockId(contentHash);
const client = await pool.connect();
try {
const result = await client.query<{ pg_try_advisory_lock: boolean }>(
"SELECT pg_try_advisory_lock($1)",
[lockId]
);
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
if (acquired) {
heldConnections.set(`hash:${contentHash}`, client);
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock acquired");
return true;
} else {
client.release();
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock held by another worker — skipping");
return false;
}
} catch (err) {
client.release();
throw err;
}
}
/**
* Release the per-content-hash advisory lock.
* Call after createPackageStub() completes (or on any error path).
*/
export async function releaseHashLock(contentHash: string): Promise<void> {
const lockId = contentHashToLockId(contentHash);
const client = heldConnections.get(`hash:${contentHash}`);
if (!client) {
log.warn({ hash: contentHash.slice(0, 16) }, "No held connection for hash lock release");
return;
}
try {
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
log.debug({ hash: contentHash.slice(0, 16) }, "Hash lock released");
} finally {
heldConnections.delete(`hash:${contentHash}`);
client.release();
}
}

View File

@@ -74,6 +74,105 @@ export async function getUploadedPackageByHash(contentHash: string) {
}); });
} }
export interface CreatePackageStubInput {
contentHash: string;
fileName: string;
fileSize: bigint;
archiveType: ArchiveType;
sourceChannelId: string;
sourceMessageId: bigint;
sourceTopicId?: bigint | null;
destChannelId: string;
destMessageId: bigint;
destMessageIds: bigint[];
isMultipart: boolean;
partCount: number;
ingestionRunId: string;
creator?: string | null;
tags?: string[];
}
/**
* Write a minimal Package record immediately after Telegram confirms the upload.
* Call this before preview/metadata extraction so recoverIncompleteUploads() can
* detect and verify the package if the worker crashes mid-metadata.
*
* Follow with updatePackageWithMetadata() once file entries and preview are ready.
*/
export async function createPackageStub(
input: CreatePackageStubInput
): Promise<{ id: string }> {
const pkg = await db.package.create({
data: {
contentHash: input.contentHash,
fileName: input.fileName,
fileSize: input.fileSize,
archiveType: input.archiveType,
sourceChannelId: input.sourceChannelId,
sourceMessageId: input.sourceMessageId,
sourceTopicId: input.sourceTopicId ?? undefined,
destChannelId: input.destChannelId,
destMessageId: input.destMessageId,
destMessageIds: input.destMessageIds,
isMultipart: input.isMultipart,
partCount: input.partCount,
fileCount: 0,
ingestionRunId: input.ingestionRunId,
creator: input.creator ?? undefined,
tags: input.tags?.length ? input.tags : undefined,
},
select: { id: true },
});
try {
await db.$queryRawUnsafe(
`SELECT pg_notify('new_package', $1)`,
JSON.stringify({
packageId: pkg.id,
fileName: input.fileName,
creator: input.creator ?? null,
tags: input.tags ?? [],
})
);
} catch {
// Best-effort
}
return pkg;
}
/**
* Update a stub Package with file entries and preview after metadata extraction.
* Called as Phase 2 of the two-phase write after createPackageStub().
*/
export async function updatePackageWithMetadata(
packageId: string,
input: {
files: {
path: string;
fileName: string;
extension: string | null;
compressedSize: bigint;
uncompressedSize: bigint;
crc32: string | null;
}[];
previewData?: Buffer | null;
previewMsgId?: bigint | null;
}
): Promise<void> {
await db.package.update({
where: { id: packageId },
data: {
fileCount: input.files.length,
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
previewMsgId: input.previewMsgId ?? undefined,
files: {
create: input.files,
},
},
});
}
/** /**
* Check if a package already exists for a given source message ID * Check if a package already exists for a given source message ID
* AND was successfully uploaded to the destination (destMessageId is set). * AND was successfully uploaded to the destination (destMessageId is set).
@@ -308,6 +407,16 @@ export async function updateAccountAuthState(
}); });
} }
export async function updateAccountPremiumStatus(
accountId: string,
isPremium: boolean
): Promise<void> {
await db.telegramAccount.update({
where: { id: accountId },
data: { isPremium },
});
}
export async function getAccountAuthCode(accountId: string) { export async function getAccountAuthCode(accountId: string) {
const account = await db.telegramAccount.findUnique({ const account = await db.telegramAccount.findUnique({
where: { id: accountId }, where: { id: accountId },

View File

@@ -101,16 +101,14 @@ export async function processExtractRequest(requestId: string): Promise<void> {
try { try {
await mkdir(tempDir, { recursive: true }); await mkdir(tempDir, { recursive: true });
// Wrap the entire TDLib session in the mutex so no other TDLib
// operation can run concurrently (TDLib is single-session).
await withTdlibMutex("extract", async () => {
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
if (accounts.length === 0) { if (accounts.length === 0) {
throw new Error("No authenticated Telegram accounts available"); throw new Error("No authenticated Telegram accounts available");
} }
const account = accounts[0]; const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
await withTdlibMutex(account.phone, "extract", async () => {
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
// Load chat list so TDLib can find the dest channel // Load chat list so TDLib can find the dest channel

View File

@@ -14,6 +14,7 @@ import {
getGlobalSetting, getGlobalSetting,
setGlobalSetting, setGlobalSetting,
getActiveAccounts, getActiveAccounts,
getChannelFetchRequest,
upsertChannel, upsertChannel,
ensureAccountChannelLink, ensureAccountChannelLink,
updateFetchRequestStatus, updateFetchRequestStatus,
@@ -133,7 +134,9 @@ let fetchQueue: Promise<void> = Promise.resolve();
function handleChannelFetch(requestId: string): void { function handleChannelFetch(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("fetch-channels", () => const request = await getChannelFetchRequest(requestId);
const key = request?.account?.phone ?? "global";
await withTdlibMutex(key, "fetch-channels", () =>
processFetchRequest(requestId) processFetchRequest(requestId)
); );
} catch (err) { } catch (err) {
@@ -147,22 +150,20 @@ function handleChannelFetch(requestId: string): void {
function handleGenerateInvite(channelId: string): void { function handleGenerateInvite(channelId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("generate-invite", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
await withTdlibMutex(account.phone, "generate-invite", async () => {
const destChannel = await getGlobalDestinationChannel(); const destChannel = await getGlobalDestinationChannel();
if (!destChannel || destChannel.id !== channelId) { if (!destChannel || destChannel.id !== channelId) {
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation"); log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
return; return;
} }
// Use the first available authenticated account to generate the link const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to generate invite link");
return;
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const link = await generateInviteLink(client, destChannel.telegramId); const link = await generateInviteLink(client, destChannel.telegramId);
@@ -187,7 +188,13 @@ function handleCreateDestination(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; title: string }; const parsed = JSON.parse(payload) as { requestId: string; title: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("create-destination", async () => { const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
await withTdlibMutex(account.phone, "create-destination", async () => {
const { db } = await import("./db/client.js"); const { db } = await import("./db/client.js");
// Mark the request as in-progress // Mark the request as in-progress
@@ -196,14 +203,7 @@ function handleCreateDestination(payload: string): void {
data: { status: "IN_PROGRESS" }, data: { status: "IN_PROGRESS" },
}); });
// Use the first available authenticated account const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available to create the group");
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
// Create the supergroup via TDLib // Create the supergroup via TDLib
@@ -328,16 +328,16 @@ function handleJoinChannel(payload: string): void {
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string }; const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
requestId = parsed.requestId; requestId = parsed.requestId;
await withTdlibMutex("join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const accounts = await getActiveAccounts(); const accounts = await getActiveAccounts();
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0]; const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
if (!account) { if (!account) {
throw new Error("No authenticated accounts available"); throw new Error("No authenticated accounts available");
} }
const client = await createTdlibClient({ id: account.id, phone: account.phone }); await withTdlibMutex(account.phone, "join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const linkInfo = parseTelegramInput(parsed.input); const linkInfo = parseTelegramInput(parsed.input);
@@ -507,7 +507,12 @@ function handleIngestionTrigger(): void {
function handleRebuildPackages(requestId: string): void { function handleRebuildPackages(requestId: string): void {
fetchQueue = fetchQueue.then(async () => { fetchQueue = fetchQueue.then(async () => {
try { try {
await withTdlibMutex("rebuild-packages", () => const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.warn("No authenticated accounts to rebuild packages");
return;
}
await withTdlibMutex(accounts[0].phone, "rebuild-packages", () =>
rebuildPackageDatabase(requestId) rebuildPackageDatabase(requestId)
); );
} catch (err) { } catch (err) {

View File

@@ -27,6 +27,33 @@ async function main(): Promise<void> {
await cleanupTempDir(); await cleanupTempDir();
await markStaleRunsAsFailed(); await markStaleRunsAsFailed();
// Release any advisory locks orphaned by a previous worker instance.
// When Docker kills a container, PostgreSQL may keep the session alive
// (zombie connections), holding advisory locks that block the new worker.
try {
const result = await pool.query(`
SELECT pid, state, left(query, 80) as query, age(clock_timestamp(), state_change) as idle_time
FROM pg_stat_activity
WHERE datname = current_database()
AND pid != pg_backend_pid()
AND state = 'idle'
AND query LIKE '%pg_try_advisory_lock%'
AND state_change < clock_timestamp() - interval '5 minutes'
`);
for (const row of result.rows) {
log.warn(
{ pid: row.pid, idleTime: row.idle_time, query: row.query },
"Terminating stale advisory lock session from previous worker"
);
await pool.query("SELECT pg_terminate_backend($1)", [row.pid]);
}
if (result.rows.length > 0) {
log.info({ terminated: result.rows.length }, "Cleaned up stale advisory lock sessions");
}
} catch (err) {
log.warn({ err }, "Failed to clean up stale advisory locks (non-fatal)");
}
// Verify destination messages exist for all "uploaded" packages. // Verify destination messages exist for all "uploaded" packages.
// Resets any packages whose dest message is missing so they get re-processed. // Resets any packages whose dest message is missing so they get re-processed.
await recoverIncompleteUploads(); await recoverIncompleteUploads();

View File

@@ -49,7 +49,7 @@ export async function processManualUpload(uploadId: string): Promise<void> {
const account = accounts[0]; const account = accounts[0];
if (!account) throw new Error("No authenticated Telegram account available"); if (!account) throw new Error("No authenticated Telegram account available");
const client = await createTdlibClient({ id: account.id, phone: account.phone }); const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
try { try {
const packageIds: string[] = []; const packageIds: string[] = [];

View File

@@ -63,7 +63,7 @@ export async function rebuildPackageDatabase(
} }
const account = accounts[0]; const account = accounts[0];
const client = await createTdlibClient({ const { client } = await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); });

View File

@@ -63,7 +63,7 @@ export async function recoverIncompleteUploads(): Promise<void> {
let client: Client | undefined; let client: Client | undefined;
try { try {
client = await createTdlibClient({ id: account.id, phone: account.phone }); ({ client } = await createTdlibClient({ id: account.id, phone: account.phone }));
// Load the chat list so TDLib can resolve chat IDs // Load the chat list so TDLib can resolve chat IDs
try { try {

View File

@@ -1,6 +1,6 @@
import { config } from "./util/config.js"; import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js"; import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js"; import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js"; import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
import { runWorkerForAccount, authenticateAccount } from "./worker.js"; import { runWorkerForAccount, authenticateAccount } from "./worker.js";
import { runIntegrityAudit } from "./audit.js"; import { runIntegrityAudit } from "./audit.js";
@@ -24,8 +24,8 @@ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
* 2. Process all active AUTHENTICATED accounts for ingestion * 2. Process all active AUTHENTICATED accounts for ingestion
* *
* All TDLib operations are wrapped in the mutex to ensure only one client * Each account's TDLib operations are wrapped in a per-key mutex so different
* runs at a time (also shared with the fetch listener for on-demand requests). * accounts run concurrently while the same account is still serialized.
* *
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h). * The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress * Once the timeout elapses, no new accounts will be started but any in-progress
@@ -55,7 +55,7 @@ async function runCycle(): Promise<void> {
log.warn("Cycle timeout reached during authentication phase, stopping"); log.warn("Cycle timeout reached during authentication phase, stopping");
break; break;
} }
await withTdlibMutex(`auth:${account.phone}`, () => await withTdlibMutex(account.phone, `auth:${account.phone}`, () =>
authenticateAccount(account) authenticateAccount(account)
); );
} }
@@ -71,17 +71,38 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) { const results = await Promise.allSettled(
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) { accounts.map((account) => {
log.warn( let timer: ReturnType<typeof setTimeout>;
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, return Promise.race([
"Cycle timeout reached, skipping remaining accounts" withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
);
break;
}
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account) runWorkerForAccount(account)
),
new Promise<never>((_, reject) => {
timer = setTimeout(
() => reject(new Error(`Account ${account.phone} ingestion timed out after ${CYCLE_TIMEOUT_MS / 60_000}min`)),
CYCLE_TIMEOUT_MS
); );
}),
]).finally(() => clearTimeout(timer));
})
);
for (let i = 0; i < results.length; i++) {
if (results[i].status === "rejected") {
const reason = (results[i] as PromiseRejectedResult).reason;
log.error(
{ phone: accounts[i].phone, err: reason },
"Account ingestion failed"
);
// If the cycle timed out, force-release the mutex so the next cycle
// (or other operations like fetch-channels) can proceed immediately
// instead of waiting 30 minutes for the mutex timeout.
const errMsg = reason instanceof Error ? reason.message : String(reason);
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
forceReleaseMutex(accounts[i].phone);
}
}
} }
log.info( log.info(

View File

@@ -6,6 +6,7 @@ import { childLogger } from "../util/logger.js";
import { import {
updateAccountAuthState, updateAccountAuthState,
getAccountAuthCode, getAccountAuthCode,
updateAccountPremiumStatus,
} from "../db/queries.js"; } from "../db/queries.js";
const log = childLogger("tdlib-client"); const log = childLogger("tdlib-client");
@@ -27,7 +28,7 @@ interface AccountConfig {
*/ */
export async function createTdlibClient( export async function createTdlibClient(
account: AccountConfig account: AccountConfig
): Promise<Client> { ): Promise<{ client: Client; isPremium: boolean }> {
const dbPath = path.join(config.tdlibStateDir, account.id); const dbPath = path.join(config.tdlibStateDir, account.id);
const client = createClient({ const client = createClient({
@@ -78,7 +79,30 @@ export async function createTdlibClient(
await updateAccountAuthState(account.id, "AUTHENTICATED"); await updateAccountAuthState(account.id, "AUTHENTICATED");
log.info({ accountId: account.id }, "TDLib client authenticated"); log.info({ accountId: account.id }, "TDLib client authenticated");
return client;
let isPremium = false;
try {
const me = await client.invoke({ _: "getMe" }) as { is_premium?: boolean };
isPremium = me.is_premium ?? false;
await updateAccountPremiumStatus(account.id, isPremium);
log.info({ accountId: account.id, isPremium }, "Account Premium status detected");
} catch (err) {
log.warn({ err, accountId: account.id }, "Could not detect Premium status, defaulting to false");
}
client.on("update", (update: unknown) => {
const u = update as { _?: string; is_upload?: boolean };
if (u?._ === "updateSpeedLimitNotification") {
log.warn(
{ accountId: account.id, isUpload: u.is_upload },
u.is_upload
? "Upload speed limited by Telegram (account is not Premium)"
: "Download speed limited by Telegram (account is not Premium)"
);
}
});
return { client, isPremium };
} catch (err) { } catch (err) {
log.error({ err, accountId: account.id }, "TDLib authentication failed"); log.error({ err, accountId: account.id }, "TDLib authentication failed");
await updateAccountAuthState(account.id, "EXPIRED"); await updateAccountAuthState(account.id, "EXPIRED");

View File

@@ -79,6 +79,8 @@ export interface ChannelScanResult {
archives: TelegramMessage[]; archives: TelegramMessage[];
photos: TelegramPhoto[]; photos: TelegramPhoto[];
totalScanned: number; totalScanned: number;
/** Highest message ID seen during scan (for watermark, even when no archives found). */
maxScannedMessageId: bigint | null;
} }
export type ScanProgressCallback = (messagesScanned: number) => void; export type ScanProgressCallback = (messagesScanned: number) => void;
@@ -158,6 +160,7 @@ export async function getChannelMessages(
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let maxScannedMessageId: bigint | null = null;
// Open the chat so TDLib can access it // Open the chat so TDLib can access it
try { try {
@@ -204,6 +207,12 @@ export async function getChannelMessages(
totalScanned += result.messages.length; totalScanned += result.messages.length;
// Track highest message ID (first message in batch = newest, since results are newest-first)
const batchMaxId = BigInt(result.messages[0].id);
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
maxScannedMessageId = batchMaxId;
}
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -246,6 +255,11 @@ export async function getChannelMessages(
fromMessageId = result.messages[result.messages.length - 1].id; fromMessageId = result.messages[result.messages.length - 1].id;
if (result.messages.length < Math.min(limit, 100)) break; if (result.messages.length < Math.min(limit, 100)) break;
// Early exit: searchChatMessages returns newest-first. Once the oldest
// message on this page is at or below the boundary, all remaining pages
// are even older — no new messages exist, stop scanning immediately.
if (boundary && fromMessageId <= boundary) break;
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
} }
@@ -266,6 +280,7 @@ export async function getChannelMessages(
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned, totalScanned,
maxScannedMessageId,
}; };
} }

View File

@@ -178,6 +178,7 @@ export async function getTopicMessages(
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let maxScannedMessageId: bigint | null = null;
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0; let totalScanned = 0;
@@ -239,6 +240,12 @@ export async function getTopicMessages(
totalScanned += result.messages.length; totalScanned += result.messages.length;
// Track highest message ID (first message = newest, since results are newest-first)
const batchMaxId = BigInt(result.messages[0].id);
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
maxScannedMessageId = batchMaxId;
}
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -302,6 +309,7 @@ export async function getTopicMessages(
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned, totalScanned,
maxScannedMessageId,
}; };
} }

View File

@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
const log = childLogger("upload"); const log = childLogger("upload");
/**
* Custom error class to distinguish upload stalls from other errors.
* When consecutive stalls occur, the caller can use this signal to
* recreate the TDLib client (whose event stream may have degraded).
*/
export class UploadStallError extends Error {
constructor(message: string) {
super(message);
this.name = "UploadStallError";
}
}
export interface UploadResult { export interface UploadResult {
messageId: bigint; messageId: bigint;
messageIds: bigint[]; messageIds: bigint[];
@@ -109,7 +121,8 @@ async function sendWithRetry(
// Stall or timeout — retry with a cooldown // Stall or timeout — retry with a cooldown
const errMsg = err instanceof Error ? err.message : ""; const errMsg = err instanceof Error ? err.message : "";
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) { if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
if (!isLastAttempt) {
log.warn( log.warn(
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES }, { fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
"Upload stalled/timed out — retrying" "Upload stalled/timed out — retrying"
@@ -117,6 +130,13 @@ async function sendWithRetry(
await sleep(10_000); await sleep(10_000);
continue; continue;
} }
// All stall retries exhausted — throw UploadStallError so the caller
// knows the TDLib client's event stream is likely degraded and can
// recreate the client before continuing.
throw new UploadStallError(
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
);
}
throw err; throw err;
} }
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
} }
}, timeoutMs); }, timeoutMs);
// Stall detection: no progress for 5 minutes after upload started → reject // Stall detection: no progress for 3 minutes after upload started → reject
const STALL_TIMEOUT_MS = 5 * 60_000; // (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
// a 3min silence strongly indicates a degraded TDLib event stream)
const STALL_TIMEOUT_MS = 3 * 60_000;
const stallChecker = setInterval(() => { const stallChecker = setInterval(() => {
if (settled || !uploadStarted) return; if (settled || !uploadStarted) return;
const stallMs = Date.now() - lastProgressTime; const stallMs = Date.now() - lastProgressTime;

View File

@@ -2,39 +2,66 @@ import { childLogger } from "./logger.js";
const log = childLogger("mutex"); const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000; const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
const locks = new Map<string, boolean>();
const holders = new Map<string, string>();
const queues = new Map<
string,
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
>();
/** /**
* Ensures only one TDLib client runs at a time across the entire worker process. * Force-release a stuck mutex.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this * This should only be called when the holder is known to be stuck (e.g. after
* before creating any TDLib client. * a cycle timeout). It releases the lock and lets the next queued waiter proceed.
*/
export function forceReleaseMutex(key: string): void {
if (!locks.has(key)) return;
const holder = holders.get(key);
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
locks.delete(key);
holders.delete(key);
const next = queues.get(key)?.shift();
if (next) {
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
next.resolve();
} else {
queues.delete(key);
log.info({ key }, "TDLib mutex force-released (no waiters)");
}
}
/**
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
* Different keys run concurrently — this allows two accounts to ingest in parallel
* while still preventing concurrent use of the same account's TDLib state dir.
* *
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs. * key: the account phone number for account-specific ops (auth, ingest),
* or 'global' for ops that don't belong to a specific account.
* label: human-readable name for logging.
*/ */
export async function withTdlibMutex<T>( export async function withTdlibMutex<T>(
key: string,
label: string, label: string,
fn: () => Promise<T> fn: () => Promise<T>
): Promise<T> { ): Promise<T> {
if (locked) { if (locks.get(key)) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex");
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => { const timer = setTimeout(() => {
const idx = queue.indexOf(entry); const q = queues.get(key) ?? [];
const idx = q.indexOf(entry);
if (idx !== -1) { if (idx !== -1) {
queue.splice(idx, 1); q.splice(idx, 1);
reject(new Error( reject(
new Error(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` + `TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
`(waiting: ${label}, holder: ${holder})` `(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})`
)); )
);
} }
}, MUTEX_WAIT_TIMEOUT_MS); }, MUTEX_WAIT_TIMEOUT_MS);
@@ -46,25 +73,28 @@ export async function withTdlibMutex<T>(
reject, reject,
label, label,
}; };
queue.push(entry);
if (!queues.has(key)) queues.set(key, []);
queues.get(key)!.push(entry);
}); });
} }
locked = true; locks.set(key, true);
holder = label; holders.set(key, label);
log.debug({ label }, "TDLib mutex acquired"); log.debug({ key, label }, "TDLib mutex acquired");
try { try {
return await fn(); return await fn();
} finally { } finally {
locked = false; locks.delete(key);
holder = ""; holders.delete(key);
const next = queue.shift(); const next = queues.get(key)?.shift();
if (next) { if (next) {
log.debug({ next: next.label }, "TDLib mutex releasing to next waiter"); log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter");
next.resolve(); next.resolve();
} else { } else {
log.debug({ label }, "TDLib mutex released"); queues.delete(key);
log.debug({ key, label }, "TDLib mutex released");
} }
} }
} }

View File

@@ -2,13 +2,14 @@ import path from "path";
import { unlink, readdir, mkdir, rm } from "fs/promises"; import { unlink, readdir, mkdir, rm } from "fs/promises";
import { config } from "./util/config.js"; import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js"; import { childLogger } from "./util/logger.js";
import { tryAcquireLock, releaseLock } from "./db/locks.js"; import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
import { import {
getSourceChannelMappings, getSourceChannelMappings,
getGlobalDestinationChannel, getGlobalDestinationChannel,
packageExistsByHash, packageExistsByHash,
packageExistsBySourceMessage, packageExistsBySourceMessage,
createPackageWithFiles, createPackageStub,
updatePackageWithMetadata,
createIngestionRun, createIngestionRun,
completeIngestionRun, completeIngestionRun,
failIngestionRun, failIngestionRun,
@@ -46,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js"; import { readRarContents } from "./archive/rar-reader.js";
import { read7zContents } from "./archive/sevenz-reader.js"; import { read7zContents } from "./archive/sevenz-reader.js";
import { byteLevelSplit, concatenateFiles } from "./archive/split.js"; import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
import { uploadToChannel } from "./upload/channel.js"; import { uploadToChannel, UploadStallError } from "./upload/channel.js";
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js"; import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
import { db } from "./db/client.js"; import { db } from "./db/client.js";
import type { TelegramAccount, TelegramChannel } from "@prisma/client"; import type { TelegramAccount, TelegramChannel } from "@prisma/client";
@@ -73,10 +74,10 @@ export async function authenticateAccount(
let client: Client | undefined; let client: Client | undefined;
try { try {
client = await createTdlibClient({ client = (await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); })).client;
aLog.info("Authentication successful"); aLog.info("Authentication successful");
// Auto-fetch channels and create a fetch request result // Auto-fetch channels and create a fetch request result
@@ -131,7 +132,7 @@ export async function processFetchRequest(requestId: string): Promise<void> {
await updateFetchRequestStatus(requestId, "IN_PROGRESS"); await updateFetchRequestStatus(requestId, "IN_PROGRESS");
aLog.info({ accountId: request.accountId }, "Processing fetch request"); aLog.info({ accountId: request.accountId }, "Processing fetch request");
const client = await createTdlibClient({ const { client } = await createTdlibClient({
id: request.account.id, id: request.account.id,
phone: request.account.phone, phone: request.account.phone,
}); });
@@ -285,6 +286,7 @@ interface PipelineContext {
client: Client; client: Client;
runId: string; runId: string;
accountId: string; accountId: string;
accountPhone: string;
channelTitle: string; channelTitle: string;
channel: TelegramChannel; channel: TelegramChannel;
destChannelTelegramId: bigint; destChannelTelegramId: bigint;
@@ -301,6 +303,9 @@ interface PipelineContext {
/** Forum topic ID (null for non-forum). */ /** Forum topic ID (null for non-forum). */
sourceTopicId: bigint | null; sourceTopicId: bigint | null;
accountLog: ReturnType<typeof childLogger>; accountLog: ReturnType<typeof childLogger>;
maxUploadSize: bigint;
/** How many consecutive upload stalls have occurred (resets on success). */
consecutiveStalls: number;
} }
/** /**
@@ -336,10 +341,14 @@ export async function runWorkerForAccount(
currentStep: "connecting", currentStep: "connecting",
}); });
const client = await createTdlibClient({ // Use let so the client can be replaced on TDLib recreation after stalls
let { client, isPremium } = await createTdlibClient({
id: account.id, id: account.id,
phone: account.phone, phone: account.phone,
}); });
const maxUploadSize = isPremium
? 3950n * 1024n * 1024n
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
// Load all chats into TDLib's local cache using loadChats (the recommended API). // Load all chats into TDLib's local cache using loadChats (the recommended API).
// Without this, getChat/searchChatMessages fail with "Chat not found". // Without this, getChat/searchChatMessages fail with "Chat not found".
@@ -443,6 +452,7 @@ export async function runWorkerForAccount(
client, client,
runId: activeRunId, runId: activeRunId,
accountId: account.id, accountId: account.id,
accountPhone: account.phone,
channelTitle: channel.title, channelTitle: channel.title,
channel, channel,
destChannelTelegramId: destChannel.telegramId, destChannelTelegramId: destChannel.telegramId,
@@ -452,6 +462,8 @@ export async function runWorkerForAccount(
topicCreator: null, topicCreator: null,
sourceTopicId: null, sourceTopicId: null,
accountLog, accountLog,
maxUploadSize,
consecutiveStalls: 0,
}; };
if (forum) { if (forum) {
@@ -526,6 +538,15 @@ export async function runWorkerForAccount(
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned }, { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic" "No new archives in topic"
); );
// Still advance topic watermark so we don't re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await upsertTopicProgress(
mapping.id,
topic.topicId,
topic.name,
scanResult.maxScannedMessageId
);
}
continue; continue;
} }
@@ -540,14 +561,17 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = `${channel.title} ${topic.name}`; pipelineCtx.channelTitle = `${channel.title} ${topic.name}`;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId); const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message // Advance progress: use archive watermark if available, fall back to scan watermark
if (maxProcessedId) { const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
if (topicWatermark) {
await upsertTopicProgress( await upsertTopicProgress(
mapping.id, mapping.id,
topic.topicId, topic.topicId,
topic.name, topic.name,
maxProcessedId topicWatermark
); );
} }
} catch (topicErr) { } catch (topicErr) {
@@ -597,6 +621,11 @@ export async function runWorkerForAccount(
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel"); accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
// Still advance watermark to highest scanned message so we don't
// re-scan these messages next cycle
if (scanResult.maxScannedMessageId) {
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
}
continue; continue;
} }
@@ -611,10 +640,13 @@ export async function runWorkerForAccount(
pipelineCtx.channelTitle = channel.title; pipelineCtx.channelTitle = channel.title;
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId); const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
// Sync client back in case it was recreated during upload stall recovery
client = pipelineCtx.client;
// Only advance progress to the highest successfully processed message // Advance progress: use archive watermark if available, fall back to scan watermark
if (maxProcessedId) { const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
await updateLastProcessedMessage(mapping.id, maxProcessedId); if (channelWatermark) {
await updateLastProcessedMessage(mapping.id, channelWatermark);
} }
} }
} catch (channelErr) { } catch (channelErr) {
@@ -754,12 +786,68 @@ async function processArchiveSets(
if (setMaxId > (maxProcessedId ?? 0n)) { if (setMaxId > (maxProcessedId ?? 0n)) {
maxProcessedId = setMaxId; maxProcessedId = setMaxId;
} }
// Reset stall counter on any successful upload
ctx.consecutiveStalls = 0;
} catch (setErr) { } catch (setErr) {
// If a set fails, do NOT advance the watermark past it // If a set fails, do NOT advance the watermark past it
accountLog.warn( accountLog.warn(
{ err: setErr, baseName: archiveSets[setIdx].baseName }, { err: setErr, baseName: archiveSets[setIdx].baseName },
"Archive set failed, watermark will not advance past this set" "Archive set failed, watermark will not advance past this set"
); );
// ── TDLib client recreation on repeated upload stalls ──
// When the TDLib event stream degrades, uploads complete (bytes sent)
// but confirmations never arrive. Retrying with the same broken client
// is futile. Recreate the client to get a fresh connection.
if (setErr instanceof UploadStallError) {
ctx.consecutiveStalls++;
accountLog.warn(
{ consecutiveStalls: ctx.consecutiveStalls },
"Upload stall detected — TDLib event stream may be degraded"
);
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
if (ctx.consecutiveStalls >= 1) {
accountLog.info("Recreating TDLib client after consecutive upload stalls");
try {
await closeTdlibClient(ctx.client);
} catch (closeErr) {
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
}
try {
const { client: newClient } = await createTdlibClient({
id: ctx.accountId,
phone: ctx.accountPhone,
});
ctx.client = newClient;
// Reload chats so the new client can access channels
try {
for (let page = 0; page < 500; page++) {
await newClient.invoke({
_: "loadChats",
chat_list: { _: "chatListMain" },
limit: 100,
});
}
} catch {
// 404 = all loaded (expected)
}
ctx.consecutiveStalls = 0;
accountLog.info("TDLib client recreated successfully — continuing ingestion");
} catch (recreateErr) {
accountLog.error(
{ err: recreateErr },
"Failed to recreate TDLib client — aborting remaining uploads"
);
break;
}
}
}
// Record the failure for visibility in the UI // Record the failure for visibility in the UI
try { try {
const archiveSet = archiveSets[setIdx]; const archiveSet = archiveSets[setIdx];
@@ -1027,6 +1115,35 @@ async function processOneArchiveSet(
return null; return null;
} }
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
const hashLockAcquired = await tryAcquireHashLock(contentHash);
if (!hashLockAcquired) {
counters.zipsDuplicate++;
accountLog.info(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Hash lock held by another worker — skipping concurrent duplicate"
);
return null;
}
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
let creator: string | null = null;
const tags: string[] = [];
let stub: { id: string } | null = null;
try {
// Re-check after acquiring lock: another worker may have finished between
// the first check above and this point.
const existsAfterLock = await packageExistsByHash(contentHash);
if (existsAfterLock) {
counters.zipsDuplicate++;
accountLog.debug(
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
"Duplicate detected after acquiring hash lock — skipping"
);
return null;
}
// ── Reading metadata ── // ── Reading metadata ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Reading file list from ${archiveName}`, currentActivity: `Reading file list from ${archiveName}`,
@@ -1037,7 +1154,6 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
try { try {
if (archiveSet.type === "ZIP") { if (archiveSet.type === "ZIP") {
entries = await readZipCentralDirectory(tempPaths); entries = await readZipCentralDirectory(tempPaths);
@@ -1069,7 +1185,7 @@ async function processOneArchiveSet(
(sum, p) => sum + p.fileSize, (sum, p) => sum + p.fileSize,
0n 0n
); );
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n; const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE); const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
if (hasOversizedPart) { if (hasOversizedPart) {
@@ -1084,7 +1200,7 @@ async function processOneArchiveSet(
}); });
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`); const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
await concatenateFiles(tempPaths, concatPath); await concatenateFiles(tempPaths, concatPath);
splitPaths = await byteLevelSplit(concatPath); splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
uploadPaths = splitPaths; uploadPaths = splitPaths;
// Clean up the concat intermediate file // Clean up the concat intermediate file
await unlink(concatPath).catch(() => {}); await unlink(concatPath).catch(() => {});
@@ -1098,7 +1214,7 @@ async function processOneArchiveSet(
currentFileNum: setIdx + 1, currentFileNum: setIdx + 1,
totalFiles: totalSets, totalFiles: totalSets,
}); });
splitPaths = await byteLevelSplit(tempPaths[0]); splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
uploadPaths = splitPaths; uploadPaths = splitPaths;
} }
@@ -1204,7 +1320,47 @@ async function processOneArchiveSet(
} }
} }
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);
creator =
topicCreator ??
extractCreatorFromFileName(archiveName) ??
extractCreatorFromChannelTitle(channelTitle) ??
null;
if (channel.category) {
tags.push(channel.category);
}
stub = await createPackageStub({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
});
counters.zipsIngested++;
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
} finally {
await releaseHashLock(contentHash);
}
if (!stub) return null;
// ── Preview thumbnail ── // ── Preview thumbnail ──
// (moved here from before stub creation — lock is released, preview doesn't need it)
let previewData: Buffer | null = null; let previewData: Buffer | null = null;
let previewMsgId: bigint | null = null; let previewMsgId: bigint | null = null;
const matchedPhoto = previewMatches.get(archiveSet.baseName); const matchedPhoto = previewMatches.get(archiveSet.baseName);
@@ -1218,8 +1374,6 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId); previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
// Only set previewMsgId if we actually got the image data —
// otherwise the UI thinks there's a preview but the API returns 404
if (previewData) { if (previewData) {
previewMsgId = matchedPhoto.id; previewMsgId = matchedPhoto.id;
} }
@@ -1242,13 +1396,7 @@ async function processOneArchiveSet(
} }
} }
// ── Resolve creator: topic name > filename extraction > channel title > null ── // ── Phase 2: Update stub with file entries and preview ──
const creator = topicCreator
?? extractCreatorFromFileName(archiveName)
?? extractCreatorFromChannelTitle(channelTitle)
?? null;
// ── Indexing ──
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`, currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
currentStep: "indexing", currentStep: "indexing",
@@ -1258,43 +1406,12 @@ async function processOneArchiveSet(
totalFiles: totalSets, totalFiles: totalSets,
}); });
// Clean up any orphaned record (same hash but no dest upload) before creating await updatePackageWithMetadata(stub.id, {
await deleteOrphanedPackageByHash(contentHash); files: entries,
// Auto-inherit source channel category as initial tag
const tags: string[] = [];
if (channel.category) {
tags.push(channel.category);
}
const pkg = await createPackageWithFiles({
contentHash,
fileName: archiveName,
fileSize: totalSize,
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
sourceTopicId,
destChannelId,
destMessageId: destResult.messageId,
destMessageIds: destResult.messageIds,
isMultipart:
archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
ingestionRunId,
creator,
tags,
previewData, previewData,
previewMsgId, previewMsgId,
sourceCaption: archiveSet.parts[0].caption ?? null,
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
files: entries,
}); });
counters.zipsIngested++;
// Clean up any prior skip record for this archive
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
await updateRunActivity(runId, { await updateRunActivity(runId, {
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`, currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
currentStep: "complete", currentStep: "complete",
@@ -1310,7 +1427,7 @@ async function processOneArchiveSet(
"Archive ingested" "Archive ingested"
); );
return pkg.id; return stub.id;
} finally { } finally {
// ALWAYS delete temp files and the set directory // ALWAYS delete temp files and the set directory
await deleteFiles([...tempPaths, ...splitPaths]); await deleteFiles([...tempPaths, ...splitPaths]);