mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-10 22:01:16 +00:00
Compare commits
18 Commits
be4daf950b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 59038889ae | |||
| 77c26adb31 | |||
| 35cce3151c | |||
| d6c82ede1e | |||
| 7e48131f67 | |||
| a79cb4749b | |||
| e9017fc518 | |||
| 4f59d19ac2 | |||
| 579276ee2d | |||
| b48cc510a4 | |||
| 614c8e5b74 | |||
| 3019c23f70 | |||
| 436a576085 | |||
| f454303352 | |||
| e29bd79d66 | |||
| 61e61d0085 | |||
| 925d916a3c | |||
| 27bacaf24c |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -54,3 +54,4 @@ src/generated
|
|||||||
# temp files
|
# temp files
|
||||||
nul
|
nul
|
||||||
tmpclaude-*
|
tmpclaude-*
|
||||||
|
.worktrees/
|
||||||
|
|||||||
1000
docs/superpowers/plans/2026-05-02-worker-improvements.md
Normal file
1000
docs/superpowers/plans/2026-05-02-worker-improvements.md
Normal file
File diff suppressed because it is too large
Load Diff
184
docs/superpowers/specs/2026-05-02-worker-improvements-design.md
Normal file
184
docs/superpowers/specs/2026-05-02-worker-improvements-design.md
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
# Worker Improvements Design
|
||||||
|
|
||||||
|
**Date:** 2026-05-02
|
||||||
|
**Status:** Approved
|
||||||
|
**Scope:** Dragon's Stash Telegram ingestion worker
|
||||||
|
|
||||||
|
## Problem Statement
|
||||||
|
|
||||||
|
Three issues to address:
|
||||||
|
|
||||||
|
1. **Double-uploads**: The same archive occasionally appears twice in the destination Telegram channel. Root causes: (a) the worker crashes between `uploadToChannel()` confirming success and `createPackageWithFiles()` writing to the DB — no DB record means `recoverIncompleteUploads()` can't detect the orphaned Telegram message, and the next cycle re-uploads; (b) two accounts scanning the same source channel can both pass the hash dedup check before either creates a DB record, racing to upload the same file.
|
||||||
|
|
||||||
|
2. **Sequential account processing**: Both Telegram accounts are processed one after another via `withTdlibMutex`, even though TDLib fully supports multiple concurrent clients in the same process (each with separate `databaseDirectory` and `filesDirectory`). This halves throughput unnecessarily.
|
||||||
|
|
||||||
|
3. **Premium upload limit not used**: The Premium account can upload up to 4 GB per file, but `MAX_UPLOAD_SIZE` is hardcoded at ~1,950 MB. This causes unnecessary file splitting and expensive repack operations for files that could upload directly.
|
||||||
|
|
||||||
|
## Solution Overview
|
||||||
|
|
||||||
|
Three targeted changes, no architectural overhaul:
|
||||||
|
|
||||||
|
1. Two-phase DB write + hash advisory lock (fixes double-uploads)
|
||||||
|
2. Remove TDLib mutex from the scheduler loop (enables parallel accounts)
|
||||||
|
3. Per-account `maxUploadSize` from `getMe().is_premium` (enables 4 GB for Premium)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Section 1: Double-Upload Fix
|
||||||
|
|
||||||
|
### 1a. Two-Phase DB Write
|
||||||
|
|
||||||
|
**Current flow:**
|
||||||
|
```
|
||||||
|
uploadToChannel() → preview download → metadata extraction → createPackageWithFiles()
|
||||||
|
```
|
||||||
|
|
||||||
|
If the worker crashes anywhere between upload confirmation and `createPackageWithFiles()`, no DB record exists. `recoverIncompleteUploads()` only checks packages with an existing `destMessageId` in the DB — it cannot find an orphaned Telegram message with no corresponding row.
|
||||||
|
|
||||||
|
**New flow:**
|
||||||
|
```
|
||||||
|
uploadToChannel()
|
||||||
|
→ createPackageStub() ← minimal record, destMessageId set immediately
|
||||||
|
→ preview download
|
||||||
|
→ metadata extraction
|
||||||
|
→ updatePackageWithMetadata() ← adds file list, preview, creator, tags
|
||||||
|
```
|
||||||
|
|
||||||
|
`createPackageStub()` writes: `contentHash`, `fileName`, `fileSize`, `archiveType`, `sourceChannelId`, `sourceMessageId`, `destChannelId`, `destMessageId`, `isMultipart`, `partCount`, `ingestionRunId`. File list and preview are left empty.
|
||||||
|
|
||||||
|
If the worker crashes after the stub is written:
|
||||||
|
- `recoverIncompleteUploads()` finds the record (has `destMessageId`), verifies the Telegram message exists, keeps it.
|
||||||
|
- Next cycle: `packageExistsByHash()` returns true → skips re-upload.
|
||||||
|
- The stub has `fileCount = 0` and no file listing. The UI shows "metadata pending" rather than failing silently.
|
||||||
|
|
||||||
|
Stubs with `fileCount = 0` are valid deliverable packages (the bot can still send the file). Backfilling metadata on stubs is out of scope for this change — the crash case is rare and the stub is functional.
|
||||||
|
|
||||||
|
### 1b. Hash Advisory Lock
|
||||||
|
|
||||||
|
**The race (two accounts, shared source channel):**
|
||||||
|
```
|
||||||
|
Worker A: packageExistsByHash(X) → false (no record yet)
|
||||||
|
Worker B: packageExistsByHash(X) → false (no record yet)
|
||||||
|
Worker A: uploads file → destMessageId_A
|
||||||
|
Worker B: uploads file → destMessageId_B ← duplicate Telegram message
|
||||||
|
Worker A: createPackageStub() → succeeds (contentHash @unique satisfied)
|
||||||
|
Worker B: createPackageStub() → fails unique constraint on contentHash
|
||||||
|
```
|
||||||
|
Result: two Telegram messages, one DB record. Worker B's upload is wasted.
|
||||||
|
|
||||||
|
**Fix:** Before calling `uploadToChannel()`, acquire a PostgreSQL session advisory lock keyed on the content hash:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT pg_try_advisory_lock(hash_bigint)
|
||||||
|
```
|
||||||
|
|
||||||
|
Where `hash_bigint` is the first 8 bytes of the SHA-256 content hash interpreted as a signed bigint.
|
||||||
|
|
||||||
|
- `pg_try_advisory_lock` is non-blocking. If another worker holds the lock (same file, shared channel), return `false` → treat as duplicate, skip.
|
||||||
|
- After acquiring the lock, **re-run `packageExistsByHash()`** before uploading. This catches the case where another worker finished and released the lock between the first check and this one — without the re-check, the current worker would proceed to re-upload.
|
||||||
|
- The lock is session-scoped: released automatically on DB session end. No manual cleanup needed on crash.
|
||||||
|
- The lock is released explicitly after `createPackageStub()` completes (or on any error path).
|
||||||
|
|
||||||
|
**Implementation location:** New helper `tryAcquireHashLock(contentHash)` / `releaseHashLock(contentHash)` in `worker/src/db/locks.ts`, reusing the existing DB client pattern.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Section 2: Parallel Account Processing
|
||||||
|
|
||||||
|
### Current Constraint
|
||||||
|
|
||||||
|
`withTdlibMutex` in `scheduler.ts` serializes all TDLib operations across accounts. This was a conservative guard, but TDLib explicitly supports multiple concurrent clients in the same process provided each has its own `databaseDirectory` and `filesDirectory`.
|
||||||
|
|
||||||
|
The codebase already satisfies this requirement:
|
||||||
|
```typescript
|
||||||
|
// worker/src/tdlib/client.ts
|
||||||
|
const dbPath = path.join(config.tdlibStateDir, account.id);
|
||||||
|
const client = createClient({
|
||||||
|
databaseDirectory: dbPath,
|
||||||
|
filesDirectory: path.join(dbPath, "files"),
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
Each account gets `<TDLIB_STATE_DIR>/<account.id>/` — fully isolated.
|
||||||
|
|
||||||
|
### Change
|
||||||
|
|
||||||
|
Replace the sequential `for` loop in `scheduler.ts` with `Promise.allSettled()`:
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
// Before
|
||||||
|
for (const account of accounts) {
|
||||||
|
await withTdlibMutex(`ingest:${account.phone}`, () => runWorkerForAccount(account));
|
||||||
|
}
|
||||||
|
|
||||||
|
// After
|
||||||
|
await Promise.allSettled(accounts.map((account) => runWorkerForAccount(account)));
|
||||||
|
```
|
||||||
|
|
||||||
|
The per-account PostgreSQL advisory lock in `db/locks.ts` already prevents any account from being processed twice simultaneously. `Promise.allSettled()` ensures one account's failure doesn't abort the other.
|
||||||
|
|
||||||
|
The `withTdlibMutex` wrapper can be removed from the ingest path entirely. The auth path (`authenticateAccount`) should also be run in parallel but may remain guarded if TDLib auth flows have ordering dependencies — verify during implementation.
|
||||||
|
|
||||||
|
**No Docker Compose changes needed.** Both accounts run in the same container.
|
||||||
|
|
||||||
|
### Speed Limit Notifications
|
||||||
|
|
||||||
|
TDLib fires `updateSpeedLimitNotification` when an account's upload or download speed is throttled (non-Premium accounts). Log this event at `warn` level in the client update handler so it's visible in logs without being actionable.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Section 3: Per-Account Premium Upload Limit
|
||||||
|
|
||||||
|
### Premium Detection
|
||||||
|
|
||||||
|
After successful authentication, call `getMe()` and read `is_premium: bool` from the returned `user` object. Store this on `TelegramAccount.isPremium` (new boolean field, default `false`, updated on each successful auth).
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
const me = await client.invoke({ _: 'getMe' }) as { is_premium?: boolean };
|
||||||
|
await updateAccountPremiumStatus(account.id, me.is_premium ?? false);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Upload Size Limits
|
||||||
|
|
||||||
|
| Account type | `maxUploadSize` | Effect |
|
||||||
|
|---|---|---|
|
||||||
|
| Premium | 3,950 MB | Parts ≤ 3.95 GB upload as-is; repack only for parts >3.95 GB (extremely rare) |
|
||||||
|
| Non-Premium | 1,950 MB | Current behavior unchanged |
|
||||||
|
|
||||||
|
Pass `maxUploadSize` into `processOneArchiveSet()` as a parameter (currently hardcoded as `MAX_UPLOAD_SIZE` at `worker.ts:1023` and in `archive/split.ts`).
|
||||||
|
|
||||||
|
The `hasOversizedPart` check and `byteLevelSplit` call both use this value, so the repack step is effectively eliminated for Premium accounts in practice — no separate "skip repack" flag needed.
|
||||||
|
|
||||||
|
### Migration
|
||||||
|
|
||||||
|
```prisma
|
||||||
|
model TelegramAccount {
|
||||||
|
// ... existing fields
|
||||||
|
isPremium Boolean @default(false)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
One migration, one new query `updateAccountPremiumStatus(accountId, isPremium)`.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Files to Change
|
||||||
|
|
||||||
|
| File | Change |
|
||||||
|
|---|---|
|
||||||
|
| `prisma/schema.prisma` | Add `isPremium Boolean @default(false)` to `TelegramAccount` |
|
||||||
|
| `worker/src/db/queries.ts` | Add `updateAccountPremiumStatus()`, `createPackageStub()`, `updatePackageWithMetadata()` |
|
||||||
|
| `worker/src/db/locks.ts` | Add `tryAcquireHashLock()`, `releaseHashLock()` |
|
||||||
|
| `worker/src/tdlib/client.ts` | Call `getMe()` after auth, return `isPremium` from `createTdlibClient()` |
|
||||||
|
| `worker/src/worker.ts` | Two-phase write, hash lock acquire/release, pass `maxUploadSize` per account |
|
||||||
|
| `worker/src/archive/split.ts` | Accept `maxPartSize` parameter instead of hardcoded constant |
|
||||||
|
| `worker/src/scheduler.ts` | Replace sequential loop with `Promise.allSettled()`, remove `withTdlibMutex` from ingest path |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## What Is Explicitly Out of Scope
|
||||||
|
|
||||||
|
- Backfilling metadata on stub records (rare crash case, functional without it)
|
||||||
|
- Download pre-fetching / pipeline parallelism within one account
|
||||||
|
- Two separate worker containers (single container is sufficient)
|
||||||
|
- Bot or app changes (worker-only)
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
-- AlterTable
|
||||||
|
ALTER TABLE "telegram_accounts" ADD COLUMN "isPremium" BOOLEAN NOT NULL DEFAULT false;
|
||||||
@@ -406,6 +406,7 @@ model TelegramAccount {
|
|||||||
isActive Boolean @default(true)
|
isActive Boolean @default(true)
|
||||||
authState AuthState @default(PENDING)
|
authState AuthState @default(PENDING)
|
||||||
authCode String?
|
authCode String?
|
||||||
|
isPremium Boolean @default(false)
|
||||||
lastSeenAt DateTime?
|
lastSeenAt DateTime?
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
|
|||||||
@@ -18,20 +18,22 @@ const log = childLogger("split");
|
|||||||
const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
const MAX_PART_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a file into ≤2GB parts using byte-level splitting.
|
* Split a file into parts using byte-level splitting.
|
||||||
* Returns paths to the split parts. If the file is already ≤2GB, returns the original path.
|
* Returns paths to the split parts. If the file fits in one part, returns the original path.
|
||||||
|
* Pass maxPartSize to override the global default (e.g., 3950 MiB for Premium accounts).
|
||||||
*/
|
*/
|
||||||
export async function byteLevelSplit(filePath: string): Promise<string[]> {
|
export async function byteLevelSplit(filePath: string, maxPartSize?: bigint): Promise<string[]> {
|
||||||
|
const effectiveMax = maxPartSize ?? MAX_PART_SIZE;
|
||||||
const stats = await stat(filePath);
|
const stats = await stat(filePath);
|
||||||
const fileSize = BigInt(stats.size);
|
const fileSize = BigInt(stats.size);
|
||||||
|
|
||||||
if (fileSize <= MAX_PART_SIZE) {
|
if (fileSize <= effectiveMax) {
|
||||||
return [filePath];
|
return [filePath];
|
||||||
}
|
}
|
||||||
|
|
||||||
const dir = path.dirname(filePath);
|
const dir = path.dirname(filePath);
|
||||||
const baseName = path.basename(filePath);
|
const baseName = path.basename(filePath);
|
||||||
const partSize = Number(MAX_PART_SIZE);
|
const partSize = Number(effectiveMax);
|
||||||
const totalParts = Math.ceil(Number(fileSize) / partSize);
|
const totalParts = Math.ceil(Number(fileSize) / partSize);
|
||||||
const parts: string[] = [];
|
const parts: string[] = [];
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,14 @@ import { config } from "../util/config.js";
|
|||||||
|
|
||||||
const pool = new pg.Pool({
|
const pool = new pg.Pool({
|
||||||
connectionString: config.databaseUrl,
|
connectionString: config.databaseUrl,
|
||||||
max: 5,
|
// Pool needs headroom for: 2 account advisory locks (held for entire cycle),
|
||||||
|
// up to 2 concurrent hash locks, plus Prisma operations from both accounts.
|
||||||
|
// Previously max=5 caused pool exhaustion and indefinite hangs.
|
||||||
|
max: 15,
|
||||||
|
// Prevent pool.connect() from blocking forever when pool is exhausted.
|
||||||
|
// Throws an error after 30s so the operation can fail and retry instead of
|
||||||
|
// silently hanging for hours (as happened with the Turnbase.7z stall).
|
||||||
|
connectionTimeoutMillis: 30_000,
|
||||||
});
|
});
|
||||||
|
|
||||||
const adapter = new PrismaPg(pool);
|
const adapter = new PrismaPg(pool);
|
||||||
|
|||||||
@@ -79,3 +79,66 @@ export async function releaseLock(accountId: string): Promise<void> {
|
|||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Derive a lock ID for a content hash. Prefixes with "hash:" so the resulting
|
||||||
|
* 32-bit integer does not collide with account advisory lock IDs.
|
||||||
|
*/
|
||||||
|
function contentHashToLockId(contentHash: string): number {
|
||||||
|
return hashToLockId(`hash:${contentHash}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire a per-content-hash advisory lock before uploading.
|
||||||
|
* Prevents two concurrent workers from uploading the same archive
|
||||||
|
* when both scan a shared source channel.
|
||||||
|
*
|
||||||
|
* Returns true if acquired (proceed with upload).
|
||||||
|
* Returns false if already held (another worker is handling this archive — skip).
|
||||||
|
*
|
||||||
|
* MUST be released via releaseHashLock() after createPackageStub() completes,
|
||||||
|
* including on all error paths (use try/finally).
|
||||||
|
*/
|
||||||
|
export async function tryAcquireHashLock(contentHash: string): Promise<boolean> {
|
||||||
|
const lockId = contentHashToLockId(contentHash);
|
||||||
|
const client = await pool.connect();
|
||||||
|
try {
|
||||||
|
const result = await client.query<{ pg_try_advisory_lock: boolean }>(
|
||||||
|
"SELECT pg_try_advisory_lock($1)",
|
||||||
|
[lockId]
|
||||||
|
);
|
||||||
|
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
|
||||||
|
if (acquired) {
|
||||||
|
heldConnections.set(`hash:${contentHash}`, client);
|
||||||
|
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock acquired");
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
client.release();
|
||||||
|
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock held by another worker — skipping");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
client.release();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release the per-content-hash advisory lock.
|
||||||
|
* Call after createPackageStub() completes (or on any error path).
|
||||||
|
*/
|
||||||
|
export async function releaseHashLock(contentHash: string): Promise<void> {
|
||||||
|
const lockId = contentHashToLockId(contentHash);
|
||||||
|
const client = heldConnections.get(`hash:${contentHash}`);
|
||||||
|
if (!client) {
|
||||||
|
log.warn({ hash: contentHash.slice(0, 16) }, "No held connection for hash lock release");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
|
||||||
|
log.debug({ hash: contentHash.slice(0, 16) }, "Hash lock released");
|
||||||
|
} finally {
|
||||||
|
heldConnections.delete(`hash:${contentHash}`);
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -74,6 +74,105 @@ export async function getUploadedPackageByHash(contentHash: string) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface CreatePackageStubInput {
|
||||||
|
contentHash: string;
|
||||||
|
fileName: string;
|
||||||
|
fileSize: bigint;
|
||||||
|
archiveType: ArchiveType;
|
||||||
|
sourceChannelId: string;
|
||||||
|
sourceMessageId: bigint;
|
||||||
|
sourceTopicId?: bigint | null;
|
||||||
|
destChannelId: string;
|
||||||
|
destMessageId: bigint;
|
||||||
|
destMessageIds: bigint[];
|
||||||
|
isMultipart: boolean;
|
||||||
|
partCount: number;
|
||||||
|
ingestionRunId: string;
|
||||||
|
creator?: string | null;
|
||||||
|
tags?: string[];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a minimal Package record immediately after Telegram confirms the upload.
|
||||||
|
* Call this before preview/metadata extraction so recoverIncompleteUploads() can
|
||||||
|
* detect and verify the package if the worker crashes mid-metadata.
|
||||||
|
*
|
||||||
|
* Follow with updatePackageWithMetadata() once file entries and preview are ready.
|
||||||
|
*/
|
||||||
|
export async function createPackageStub(
|
||||||
|
input: CreatePackageStubInput
|
||||||
|
): Promise<{ id: string }> {
|
||||||
|
const pkg = await db.package.create({
|
||||||
|
data: {
|
||||||
|
contentHash: input.contentHash,
|
||||||
|
fileName: input.fileName,
|
||||||
|
fileSize: input.fileSize,
|
||||||
|
archiveType: input.archiveType,
|
||||||
|
sourceChannelId: input.sourceChannelId,
|
||||||
|
sourceMessageId: input.sourceMessageId,
|
||||||
|
sourceTopicId: input.sourceTopicId ?? undefined,
|
||||||
|
destChannelId: input.destChannelId,
|
||||||
|
destMessageId: input.destMessageId,
|
||||||
|
destMessageIds: input.destMessageIds,
|
||||||
|
isMultipart: input.isMultipart,
|
||||||
|
partCount: input.partCount,
|
||||||
|
fileCount: 0,
|
||||||
|
ingestionRunId: input.ingestionRunId,
|
||||||
|
creator: input.creator ?? undefined,
|
||||||
|
tags: input.tags?.length ? input.tags : undefined,
|
||||||
|
},
|
||||||
|
select: { id: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
await db.$queryRawUnsafe(
|
||||||
|
`SELECT pg_notify('new_package', $1)`,
|
||||||
|
JSON.stringify({
|
||||||
|
packageId: pkg.id,
|
||||||
|
fileName: input.fileName,
|
||||||
|
creator: input.creator ?? null,
|
||||||
|
tags: input.tags ?? [],
|
||||||
|
})
|
||||||
|
);
|
||||||
|
} catch {
|
||||||
|
// Best-effort
|
||||||
|
}
|
||||||
|
|
||||||
|
return pkg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update a stub Package with file entries and preview after metadata extraction.
|
||||||
|
* Called as Phase 2 of the two-phase write after createPackageStub().
|
||||||
|
*/
|
||||||
|
export async function updatePackageWithMetadata(
|
||||||
|
packageId: string,
|
||||||
|
input: {
|
||||||
|
files: {
|
||||||
|
path: string;
|
||||||
|
fileName: string;
|
||||||
|
extension: string | null;
|
||||||
|
compressedSize: bigint;
|
||||||
|
uncompressedSize: bigint;
|
||||||
|
crc32: string | null;
|
||||||
|
}[];
|
||||||
|
previewData?: Buffer | null;
|
||||||
|
previewMsgId?: bigint | null;
|
||||||
|
}
|
||||||
|
): Promise<void> {
|
||||||
|
await db.package.update({
|
||||||
|
where: { id: packageId },
|
||||||
|
data: {
|
||||||
|
fileCount: input.files.length,
|
||||||
|
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
||||||
|
previewMsgId: input.previewMsgId ?? undefined,
|
||||||
|
files: {
|
||||||
|
create: input.files,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a package already exists for a given source message ID
|
* Check if a package already exists for a given source message ID
|
||||||
* AND was successfully uploaded to the destination (destMessageId is set).
|
* AND was successfully uploaded to the destination (destMessageId is set).
|
||||||
@@ -308,6 +407,16 @@ export async function updateAccountAuthState(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function updateAccountPremiumStatus(
|
||||||
|
accountId: string,
|
||||||
|
isPremium: boolean
|
||||||
|
): Promise<void> {
|
||||||
|
await db.telegramAccount.update({
|
||||||
|
where: { id: accountId },
|
||||||
|
data: { isPremium },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
export async function getAccountAuthCode(accountId: string) {
|
export async function getAccountAuthCode(accountId: string) {
|
||||||
const account = await db.telegramAccount.findUnique({
|
const account = await db.telegramAccount.findUnique({
|
||||||
where: { id: accountId },
|
where: { id: accountId },
|
||||||
|
|||||||
@@ -101,16 +101,14 @@ export async function processExtractRequest(requestId: string): Promise<void> {
|
|||||||
try {
|
try {
|
||||||
await mkdir(tempDir, { recursive: true });
|
await mkdir(tempDir, { recursive: true });
|
||||||
|
|
||||||
// Wrap the entire TDLib session in the mutex so no other TDLib
|
const accounts = await getActiveAccounts();
|
||||||
// operation can run concurrently (TDLib is single-session).
|
if (accounts.length === 0) {
|
||||||
await withTdlibMutex("extract", async () => {
|
throw new Error("No authenticated Telegram accounts available");
|
||||||
const accounts = await getActiveAccounts();
|
}
|
||||||
if (accounts.length === 0) {
|
const account = accounts[0];
|
||||||
throw new Error("No authenticated Telegram accounts available");
|
|
||||||
}
|
|
||||||
|
|
||||||
const account = accounts[0];
|
await withTdlibMutex(account.phone, "extract", async () => {
|
||||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Load chat list so TDLib can find the dest channel
|
// Load chat list so TDLib can find the dest channel
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import {
|
|||||||
getGlobalSetting,
|
getGlobalSetting,
|
||||||
setGlobalSetting,
|
setGlobalSetting,
|
||||||
getActiveAccounts,
|
getActiveAccounts,
|
||||||
|
getChannelFetchRequest,
|
||||||
upsertChannel,
|
upsertChannel,
|
||||||
ensureAccountChannelLink,
|
ensureAccountChannelLink,
|
||||||
updateFetchRequestStatus,
|
updateFetchRequestStatus,
|
||||||
@@ -133,7 +134,9 @@ let fetchQueue: Promise<void> = Promise.resolve();
|
|||||||
function handleChannelFetch(requestId: string): void {
|
function handleChannelFetch(requestId: string): void {
|
||||||
fetchQueue = fetchQueue.then(async () => {
|
fetchQueue = fetchQueue.then(async () => {
|
||||||
try {
|
try {
|
||||||
await withTdlibMutex("fetch-channels", () =>
|
const request = await getChannelFetchRequest(requestId);
|
||||||
|
const key = request?.account?.phone ?? "global";
|
||||||
|
await withTdlibMutex(key, "fetch-channels", () =>
|
||||||
processFetchRequest(requestId)
|
processFetchRequest(requestId)
|
||||||
);
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -147,22 +150,20 @@ function handleChannelFetch(requestId: string): void {
|
|||||||
function handleGenerateInvite(channelId: string): void {
|
function handleGenerateInvite(channelId: string): void {
|
||||||
fetchQueue = fetchQueue.then(async () => {
|
fetchQueue = fetchQueue.then(async () => {
|
||||||
try {
|
try {
|
||||||
await withTdlibMutex("generate-invite", async () => {
|
const accounts = await getActiveAccounts();
|
||||||
|
if (accounts.length === 0) {
|
||||||
|
log.warn("No authenticated accounts to generate invite link");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const account = accounts[0];
|
||||||
|
await withTdlibMutex(account.phone, "generate-invite", async () => {
|
||||||
const destChannel = await getGlobalDestinationChannel();
|
const destChannel = await getGlobalDestinationChannel();
|
||||||
if (!destChannel || destChannel.id !== channelId) {
|
if (!destChannel || destChannel.id !== channelId) {
|
||||||
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
|
log.warn({ channelId }, "Destination channel mismatch, skipping invite generation");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the first available authenticated account to generate the link
|
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||||
const accounts = await getActiveAccounts();
|
|
||||||
if (accounts.length === 0) {
|
|
||||||
log.warn("No authenticated accounts to generate invite link");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const account = accounts[0];
|
|
||||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const link = await generateInviteLink(client, destChannel.telegramId);
|
const link = await generateInviteLink(client, destChannel.telegramId);
|
||||||
@@ -187,7 +188,13 @@ function handleCreateDestination(payload: string): void {
|
|||||||
const parsed = JSON.parse(payload) as { requestId: string; title: string };
|
const parsed = JSON.parse(payload) as { requestId: string; title: string };
|
||||||
requestId = parsed.requestId;
|
requestId = parsed.requestId;
|
||||||
|
|
||||||
await withTdlibMutex("create-destination", async () => {
|
const accounts = await getActiveAccounts();
|
||||||
|
if (accounts.length === 0) {
|
||||||
|
throw new Error("No authenticated accounts available to create the group");
|
||||||
|
}
|
||||||
|
const account = accounts[0];
|
||||||
|
|
||||||
|
await withTdlibMutex(account.phone, "create-destination", async () => {
|
||||||
const { db } = await import("./db/client.js");
|
const { db } = await import("./db/client.js");
|
||||||
|
|
||||||
// Mark the request as in-progress
|
// Mark the request as in-progress
|
||||||
@@ -196,14 +203,7 @@ function handleCreateDestination(payload: string): void {
|
|||||||
data: { status: "IN_PROGRESS" },
|
data: { status: "IN_PROGRESS" },
|
||||||
});
|
});
|
||||||
|
|
||||||
// Use the first available authenticated account
|
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||||
const accounts = await getActiveAccounts();
|
|
||||||
if (accounts.length === 0) {
|
|
||||||
throw new Error("No authenticated accounts available to create the group");
|
|
||||||
}
|
|
||||||
|
|
||||||
const account = accounts[0];
|
|
||||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create the supergroup via TDLib
|
// Create the supergroup via TDLib
|
||||||
@@ -328,16 +328,16 @@ function handleJoinChannel(payload: string): void {
|
|||||||
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
|
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
|
||||||
requestId = parsed.requestId;
|
requestId = parsed.requestId;
|
||||||
|
|
||||||
await withTdlibMutex("join-channel", async () => {
|
const accounts = await getActiveAccounts();
|
||||||
|
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
|
||||||
|
if (!account) {
|
||||||
|
throw new Error("No authenticated accounts available");
|
||||||
|
}
|
||||||
|
|
||||||
|
await withTdlibMutex(account.phone, "join-channel", async () => {
|
||||||
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
|
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
|
||||||
|
|
||||||
const accounts = await getActiveAccounts();
|
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||||
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
|
|
||||||
if (!account) {
|
|
||||||
throw new Error("No authenticated accounts available");
|
|
||||||
}
|
|
||||||
|
|
||||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const linkInfo = parseTelegramInput(parsed.input);
|
const linkInfo = parseTelegramInput(parsed.input);
|
||||||
@@ -507,7 +507,12 @@ function handleIngestionTrigger(): void {
|
|||||||
function handleRebuildPackages(requestId: string): void {
|
function handleRebuildPackages(requestId: string): void {
|
||||||
fetchQueue = fetchQueue.then(async () => {
|
fetchQueue = fetchQueue.then(async () => {
|
||||||
try {
|
try {
|
||||||
await withTdlibMutex("rebuild-packages", () =>
|
const accounts = await getActiveAccounts();
|
||||||
|
if (accounts.length === 0) {
|
||||||
|
log.warn("No authenticated accounts to rebuild packages");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await withTdlibMutex(accounts[0].phone, "rebuild-packages", () =>
|
||||||
rebuildPackageDatabase(requestId)
|
rebuildPackageDatabase(requestId)
|
||||||
);
|
);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -27,6 +27,33 @@ async function main(): Promise<void> {
|
|||||||
await cleanupTempDir();
|
await cleanupTempDir();
|
||||||
await markStaleRunsAsFailed();
|
await markStaleRunsAsFailed();
|
||||||
|
|
||||||
|
// Release any advisory locks orphaned by a previous worker instance.
|
||||||
|
// When Docker kills a container, PostgreSQL may keep the session alive
|
||||||
|
// (zombie connections), holding advisory locks that block the new worker.
|
||||||
|
try {
|
||||||
|
const result = await pool.query(`
|
||||||
|
SELECT pid, state, left(query, 80) as query, age(clock_timestamp(), state_change) as idle_time
|
||||||
|
FROM pg_stat_activity
|
||||||
|
WHERE datname = current_database()
|
||||||
|
AND pid != pg_backend_pid()
|
||||||
|
AND state = 'idle'
|
||||||
|
AND query LIKE '%pg_try_advisory_lock%'
|
||||||
|
AND state_change < clock_timestamp() - interval '5 minutes'
|
||||||
|
`);
|
||||||
|
for (const row of result.rows) {
|
||||||
|
log.warn(
|
||||||
|
{ pid: row.pid, idleTime: row.idle_time, query: row.query },
|
||||||
|
"Terminating stale advisory lock session from previous worker"
|
||||||
|
);
|
||||||
|
await pool.query("SELECT pg_terminate_backend($1)", [row.pid]);
|
||||||
|
}
|
||||||
|
if (result.rows.length > 0) {
|
||||||
|
log.info({ terminated: result.rows.length }, "Cleaned up stale advisory lock sessions");
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ err }, "Failed to clean up stale advisory locks (non-fatal)");
|
||||||
|
}
|
||||||
|
|
||||||
// Verify destination messages exist for all "uploaded" packages.
|
// Verify destination messages exist for all "uploaded" packages.
|
||||||
// Resets any packages whose dest message is missing so they get re-processed.
|
// Resets any packages whose dest message is missing so they get re-processed.
|
||||||
await recoverIncompleteUploads();
|
await recoverIncompleteUploads();
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ export async function processManualUpload(uploadId: string): Promise<void> {
|
|||||||
const account = accounts[0];
|
const account = accounts[0];
|
||||||
if (!account) throw new Error("No authenticated Telegram account available");
|
if (!account) throw new Error("No authenticated Telegram account available");
|
||||||
|
|
||||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
const { client } = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const packageIds: string[] = [];
|
const packageIds: string[] = [];
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ export async function rebuildPackageDatabase(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const account = accounts[0];
|
const account = accounts[0];
|
||||||
const client = await createTdlibClient({
|
const { client } = await createTdlibClient({
|
||||||
id: account.id,
|
id: account.id,
|
||||||
phone: account.phone,
|
phone: account.phone,
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ export async function recoverIncompleteUploads(): Promise<void> {
|
|||||||
let client: Client | undefined;
|
let client: Client | undefined;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client = await createTdlibClient({ id: account.id, phone: account.phone });
|
({ client } = await createTdlibClient({ id: account.id, phone: account.phone }));
|
||||||
|
|
||||||
// Load the chat list so TDLib can resolve chat IDs
|
// Load the chat list so TDLib can resolve chat IDs
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { config } from "./util/config.js";
|
import { config } from "./util/config.js";
|
||||||
import { childLogger } from "./util/logger.js";
|
import { childLogger } from "./util/logger.js";
|
||||||
import { withTdlibMutex } from "./util/mutex.js";
|
import { withTdlibMutex, forceReleaseMutex } from "./util/mutex.js";
|
||||||
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
import { getActiveAccounts, getPendingAccounts } from "./db/queries.js";
|
||||||
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
import { runWorkerForAccount, authenticateAccount } from "./worker.js";
|
||||||
import { runIntegrityAudit } from "./audit.js";
|
import { runIntegrityAudit } from "./audit.js";
|
||||||
@@ -24,8 +24,8 @@ const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "
|
|||||||
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
||||||
* 2. Process all active AUTHENTICATED accounts for ingestion
|
* 2. Process all active AUTHENTICATED accounts for ingestion
|
||||||
*
|
*
|
||||||
* All TDLib operations are wrapped in the mutex to ensure only one client
|
* Each account's TDLib operations are wrapped in a per-key mutex so different
|
||||||
* runs at a time (also shared with the fetch listener for on-demand requests).
|
* accounts run concurrently while the same account is still serialized.
|
||||||
*
|
*
|
||||||
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
|
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
|
||||||
* Once the timeout elapses, no new accounts will be started but any in-progress
|
* Once the timeout elapses, no new accounts will be started but any in-progress
|
||||||
@@ -55,7 +55,7 @@ async function runCycle(): Promise<void> {
|
|||||||
log.warn("Cycle timeout reached during authentication phase, stopping");
|
log.warn("Cycle timeout reached during authentication phase, stopping");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
await withTdlibMutex(`auth:${account.phone}`, () =>
|
await withTdlibMutex(account.phone, `auth:${account.phone}`, () =>
|
||||||
authenticateAccount(account)
|
authenticateAccount(account)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -71,17 +71,38 @@ async function runCycle(): Promise<void> {
|
|||||||
|
|
||||||
log.info({ accountCount: accounts.length }, "Processing accounts");
|
log.info({ accountCount: accounts.length }, "Processing accounts");
|
||||||
|
|
||||||
for (const account of accounts) {
|
const results = await Promise.allSettled(
|
||||||
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
|
accounts.map((account) => {
|
||||||
log.warn(
|
let timer: ReturnType<typeof setTimeout>;
|
||||||
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
|
return Promise.race([
|
||||||
"Cycle timeout reached, skipping remaining accounts"
|
withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
|
||||||
|
runWorkerForAccount(account)
|
||||||
|
),
|
||||||
|
new Promise<never>((_, reject) => {
|
||||||
|
timer = setTimeout(
|
||||||
|
() => reject(new Error(`Account ${account.phone} ingestion timed out after ${CYCLE_TIMEOUT_MS / 60_000}min`)),
|
||||||
|
CYCLE_TIMEOUT_MS
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
]).finally(() => clearTimeout(timer));
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
for (let i = 0; i < results.length; i++) {
|
||||||
|
if (results[i].status === "rejected") {
|
||||||
|
const reason = (results[i] as PromiseRejectedResult).reason;
|
||||||
|
log.error(
|
||||||
|
{ phone: accounts[i].phone, err: reason },
|
||||||
|
"Account ingestion failed"
|
||||||
);
|
);
|
||||||
break;
|
// If the cycle timed out, force-release the mutex so the next cycle
|
||||||
|
// (or other operations like fetch-channels) can proceed immediately
|
||||||
|
// instead of waiting 30 minutes for the mutex timeout.
|
||||||
|
const errMsg = reason instanceof Error ? reason.message : String(reason);
|
||||||
|
if (errMsg.includes("timed out") || errMsg.includes("mutex wait timeout")) {
|
||||||
|
forceReleaseMutex(accounts[i].phone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
await withTdlibMutex(`ingest:${account.phone}`, () =>
|
|
||||||
runWorkerForAccount(account)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { childLogger } from "../util/logger.js";
|
|||||||
import {
|
import {
|
||||||
updateAccountAuthState,
|
updateAccountAuthState,
|
||||||
getAccountAuthCode,
|
getAccountAuthCode,
|
||||||
|
updateAccountPremiumStatus,
|
||||||
} from "../db/queries.js";
|
} from "../db/queries.js";
|
||||||
|
|
||||||
const log = childLogger("tdlib-client");
|
const log = childLogger("tdlib-client");
|
||||||
@@ -27,7 +28,7 @@ interface AccountConfig {
|
|||||||
*/
|
*/
|
||||||
export async function createTdlibClient(
|
export async function createTdlibClient(
|
||||||
account: AccountConfig
|
account: AccountConfig
|
||||||
): Promise<Client> {
|
): Promise<{ client: Client; isPremium: boolean }> {
|
||||||
const dbPath = path.join(config.tdlibStateDir, account.id);
|
const dbPath = path.join(config.tdlibStateDir, account.id);
|
||||||
|
|
||||||
const client = createClient({
|
const client = createClient({
|
||||||
@@ -78,7 +79,30 @@ export async function createTdlibClient(
|
|||||||
|
|
||||||
await updateAccountAuthState(account.id, "AUTHENTICATED");
|
await updateAccountAuthState(account.id, "AUTHENTICATED");
|
||||||
log.info({ accountId: account.id }, "TDLib client authenticated");
|
log.info({ accountId: account.id }, "TDLib client authenticated");
|
||||||
return client;
|
|
||||||
|
let isPremium = false;
|
||||||
|
try {
|
||||||
|
const me = await client.invoke({ _: "getMe" }) as { is_premium?: boolean };
|
||||||
|
isPremium = me.is_premium ?? false;
|
||||||
|
await updateAccountPremiumStatus(account.id, isPremium);
|
||||||
|
log.info({ accountId: account.id, isPremium }, "Account Premium status detected");
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ err, accountId: account.id }, "Could not detect Premium status, defaulting to false");
|
||||||
|
}
|
||||||
|
|
||||||
|
client.on("update", (update: unknown) => {
|
||||||
|
const u = update as { _?: string; is_upload?: boolean };
|
||||||
|
if (u?._ === "updateSpeedLimitNotification") {
|
||||||
|
log.warn(
|
||||||
|
{ accountId: account.id, isUpload: u.is_upload },
|
||||||
|
u.is_upload
|
||||||
|
? "Upload speed limited by Telegram (account is not Premium)"
|
||||||
|
: "Download speed limited by Telegram (account is not Premium)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return { client, isPremium };
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error({ err, accountId: account.id }, "TDLib authentication failed");
|
log.error({ err, accountId: account.id }, "TDLib authentication failed");
|
||||||
await updateAccountAuthState(account.id, "EXPIRED");
|
await updateAccountAuthState(account.id, "EXPIRED");
|
||||||
|
|||||||
@@ -79,6 +79,8 @@ export interface ChannelScanResult {
|
|||||||
archives: TelegramMessage[];
|
archives: TelegramMessage[];
|
||||||
photos: TelegramPhoto[];
|
photos: TelegramPhoto[];
|
||||||
totalScanned: number;
|
totalScanned: number;
|
||||||
|
/** Highest message ID seen during scan (for watermark, even when no archives found). */
|
||||||
|
maxScannedMessageId: bigint | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||||
@@ -158,6 +160,7 @@ export async function getChannelMessages(
|
|||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
let maxScannedMessageId: bigint | null = null;
|
||||||
|
|
||||||
// Open the chat so TDLib can access it
|
// Open the chat so TDLib can access it
|
||||||
try {
|
try {
|
||||||
@@ -204,6 +207,12 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
totalScanned += result.messages.length;
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
|
// Track highest message ID (first message in batch = newest, since results are newest-first)
|
||||||
|
const batchMaxId = BigInt(result.messages[0].id);
|
||||||
|
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||||
|
maxScannedMessageId = batchMaxId;
|
||||||
|
}
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -246,6 +255,11 @@ export async function getChannelMessages(
|
|||||||
fromMessageId = result.messages[result.messages.length - 1].id;
|
fromMessageId = result.messages[result.messages.length - 1].id;
|
||||||
if (result.messages.length < Math.min(limit, 100)) break;
|
if (result.messages.length < Math.min(limit, 100)) break;
|
||||||
|
|
||||||
|
// Early exit: searchChatMessages returns newest-first. Once the oldest
|
||||||
|
// message on this page is at or below the boundary, all remaining pages
|
||||||
|
// are even older — no new messages exist, stop scanning immediately.
|
||||||
|
if (boundary && fromMessageId <= boundary) break;
|
||||||
|
|
||||||
await sleep(config.apiDelayMs);
|
await sleep(config.apiDelayMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,6 +280,7 @@ export async function getChannelMessages(
|
|||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
totalScanned,
|
totalScanned,
|
||||||
|
maxScannedMessageId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -178,6 +178,7 @@ export async function getTopicMessages(
|
|||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
let maxScannedMessageId: bigint | null = null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
let totalScanned = 0;
|
let totalScanned = 0;
|
||||||
@@ -239,6 +240,12 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
totalScanned += result.messages.length;
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
|
// Track highest message ID (first message = newest, since results are newest-first)
|
||||||
|
const batchMaxId = BigInt(result.messages[0].id);
|
||||||
|
if (maxScannedMessageId === null || batchMaxId > maxScannedMessageId) {
|
||||||
|
maxScannedMessageId = batchMaxId;
|
||||||
|
}
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -302,6 +309,7 @@ export async function getTopicMessages(
|
|||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
totalScanned,
|
totalScanned,
|
||||||
|
maxScannedMessageId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,18 @@ import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
|
|||||||
|
|
||||||
const log = childLogger("upload");
|
const log = childLogger("upload");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom error class to distinguish upload stalls from other errors.
|
||||||
|
* When consecutive stalls occur, the caller can use this signal to
|
||||||
|
* recreate the TDLib client (whose event stream may have degraded).
|
||||||
|
*/
|
||||||
|
export class UploadStallError extends Error {
|
||||||
|
constructor(message: string) {
|
||||||
|
super(message);
|
||||||
|
this.name = "UploadStallError";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export interface UploadResult {
|
export interface UploadResult {
|
||||||
messageId: bigint;
|
messageId: bigint;
|
||||||
messageIds: bigint[];
|
messageIds: bigint[];
|
||||||
@@ -109,13 +121,21 @@ async function sendWithRetry(
|
|||||||
|
|
||||||
// Stall or timeout — retry with a cooldown
|
// Stall or timeout — retry with a cooldown
|
||||||
const errMsg = err instanceof Error ? err.message : "";
|
const errMsg = err instanceof Error ? err.message : "";
|
||||||
if ((errMsg.includes("stalled") || errMsg.includes("timed out")) && !isLastAttempt) {
|
if (errMsg.includes("stalled") || errMsg.includes("timed out")) {
|
||||||
log.warn(
|
if (!isLastAttempt) {
|
||||||
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
log.warn(
|
||||||
"Upload stalled/timed out — retrying"
|
{ fileName, attempt: attempt + 1, maxRetries: MAX_UPLOAD_RETRIES },
|
||||||
|
"Upload stalled/timed out — retrying"
|
||||||
|
);
|
||||||
|
await sleep(10_000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// All stall retries exhausted — throw UploadStallError so the caller
|
||||||
|
// knows the TDLib client's event stream is likely degraded and can
|
||||||
|
// recreate the client before continuing.
|
||||||
|
throw new UploadStallError(
|
||||||
|
`Upload stalled after ${MAX_UPLOAD_RETRIES} retries for ${fileName}`
|
||||||
);
|
);
|
||||||
await sleep(10_000);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw err;
|
throw err;
|
||||||
@@ -166,8 +186,10 @@ async function sendAndWaitForUpload(
|
|||||||
}
|
}
|
||||||
}, timeoutMs);
|
}, timeoutMs);
|
||||||
|
|
||||||
// Stall detection: no progress for 5 minutes after upload started → reject
|
// Stall detection: no progress for 3 minutes after upload started → reject
|
||||||
const STALL_TIMEOUT_MS = 5 * 60_000;
|
// (reduced from 5min — once data is fully sent, confirmation should arrive quickly;
|
||||||
|
// a 3min silence strongly indicates a degraded TDLib event stream)
|
||||||
|
const STALL_TIMEOUT_MS = 3 * 60_000;
|
||||||
const stallChecker = setInterval(() => {
|
const stallChecker = setInterval(() => {
|
||||||
if (settled || !uploadStarted) return;
|
if (settled || !uploadStarted) return;
|
||||||
const stallMs = Date.now() - lastProgressTime;
|
const stallMs = Date.now() - lastProgressTime;
|
||||||
|
|||||||
@@ -2,39 +2,66 @@ import { childLogger } from "./logger.js";
|
|||||||
|
|
||||||
const log = childLogger("mutex");
|
const log = childLogger("mutex");
|
||||||
|
|
||||||
let locked = false;
|
|
||||||
let holder = "";
|
|
||||||
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maximum time to wait for the TDLib mutex (ms).
|
|
||||||
* If the mutex is not available within this time, the operation is rejected.
|
|
||||||
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
|
|
||||||
*/
|
|
||||||
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
|
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
|
||||||
|
|
||||||
|
const locks = new Map<string, boolean>();
|
||||||
|
const holders = new Map<string, string>();
|
||||||
|
const queues = new Map<
|
||||||
|
string,
|
||||||
|
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
|
||||||
|
>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures only one TDLib client runs at a time across the entire worker process.
|
* Force-release a stuck mutex.
|
||||||
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
|
* This should only be called when the holder is known to be stuck (e.g. after
|
||||||
* before creating any TDLib client.
|
* a cycle timeout). It releases the lock and lets the next queued waiter proceed.
|
||||||
|
*/
|
||||||
|
export function forceReleaseMutex(key: string): void {
|
||||||
|
if (!locks.has(key)) return;
|
||||||
|
|
||||||
|
const holder = holders.get(key);
|
||||||
|
log.warn({ key, holder }, "Force-releasing stuck TDLib mutex");
|
||||||
|
|
||||||
|
locks.delete(key);
|
||||||
|
holders.delete(key);
|
||||||
|
const next = queues.get(key)?.shift();
|
||||||
|
if (next) {
|
||||||
|
log.info({ key, next: next.label }, "TDLib mutex force-released to next waiter");
|
||||||
|
next.resolve();
|
||||||
|
} else {
|
||||||
|
queues.delete(key);
|
||||||
|
log.info({ key }, "TDLib mutex force-released (no waiters)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
|
||||||
|
* Different keys run concurrently — this allows two accounts to ingest in parallel
|
||||||
|
* while still preventing concurrent use of the same account's TDLib state dir.
|
||||||
*
|
*
|
||||||
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
|
* key: the account phone number for account-specific ops (auth, ingest),
|
||||||
|
* or 'global' for ops that don't belong to a specific account.
|
||||||
|
* label: human-readable name for logging.
|
||||||
*/
|
*/
|
||||||
export async function withTdlibMutex<T>(
|
export async function withTdlibMutex<T>(
|
||||||
|
key: string,
|
||||||
label: string,
|
label: string,
|
||||||
fn: () => Promise<T>
|
fn: () => Promise<T>
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
if (locked) {
|
if (locks.get(key)) {
|
||||||
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
|
log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex");
|
||||||
await new Promise<void>((resolve, reject) => {
|
await new Promise<void>((resolve, reject) => {
|
||||||
const timer = setTimeout(() => {
|
const timer = setTimeout(() => {
|
||||||
const idx = queue.indexOf(entry);
|
const q = queues.get(key) ?? [];
|
||||||
|
const idx = q.indexOf(entry);
|
||||||
if (idx !== -1) {
|
if (idx !== -1) {
|
||||||
queue.splice(idx, 1);
|
q.splice(idx, 1);
|
||||||
reject(new Error(
|
reject(
|
||||||
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
|
new Error(
|
||||||
`(waiting: ${label}, holder: ${holder})`
|
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
|
||||||
));
|
`(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})`
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}, MUTEX_WAIT_TIMEOUT_MS);
|
}, MUTEX_WAIT_TIMEOUT_MS);
|
||||||
|
|
||||||
@@ -46,25 +73,28 @@ export async function withTdlibMutex<T>(
|
|||||||
reject,
|
reject,
|
||||||
label,
|
label,
|
||||||
};
|
};
|
||||||
queue.push(entry);
|
|
||||||
|
if (!queues.has(key)) queues.set(key, []);
|
||||||
|
queues.get(key)!.push(entry);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
locked = true;
|
locks.set(key, true);
|
||||||
holder = label;
|
holders.set(key, label);
|
||||||
log.debug({ label }, "TDLib mutex acquired");
|
log.debug({ key, label }, "TDLib mutex acquired");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await fn();
|
return await fn();
|
||||||
} finally {
|
} finally {
|
||||||
locked = false;
|
locks.delete(key);
|
||||||
holder = "";
|
holders.delete(key);
|
||||||
const next = queue.shift();
|
const next = queues.get(key)?.shift();
|
||||||
if (next) {
|
if (next) {
|
||||||
log.debug({ next: next.label }, "TDLib mutex releasing to next waiter");
|
log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter");
|
||||||
next.resolve();
|
next.resolve();
|
||||||
} else {
|
} else {
|
||||||
log.debug({ label }, "TDLib mutex released");
|
queues.delete(key);
|
||||||
|
log.debug({ key, label }, "TDLib mutex released");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,14 @@ import path from "path";
|
|||||||
import { unlink, readdir, mkdir, rm } from "fs/promises";
|
import { unlink, readdir, mkdir, rm } from "fs/promises";
|
||||||
import { config } from "./util/config.js";
|
import { config } from "./util/config.js";
|
||||||
import { childLogger } from "./util/logger.js";
|
import { childLogger } from "./util/logger.js";
|
||||||
import { tryAcquireLock, releaseLock } from "./db/locks.js";
|
import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
|
||||||
import {
|
import {
|
||||||
getSourceChannelMappings,
|
getSourceChannelMappings,
|
||||||
getGlobalDestinationChannel,
|
getGlobalDestinationChannel,
|
||||||
packageExistsByHash,
|
packageExistsByHash,
|
||||||
packageExistsBySourceMessage,
|
packageExistsBySourceMessage,
|
||||||
createPackageWithFiles,
|
createPackageStub,
|
||||||
|
updatePackageWithMetadata,
|
||||||
createIngestionRun,
|
createIngestionRun,
|
||||||
completeIngestionRun,
|
completeIngestionRun,
|
||||||
failIngestionRun,
|
failIngestionRun,
|
||||||
@@ -46,7 +47,7 @@ import { readZipCentralDirectory } from "./archive/zip-reader.js";
|
|||||||
import { readRarContents } from "./archive/rar-reader.js";
|
import { readRarContents } from "./archive/rar-reader.js";
|
||||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||||
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
import { byteLevelSplit, concatenateFiles } from "./archive/split.js";
|
||||||
import { uploadToChannel } from "./upload/channel.js";
|
import { uploadToChannel, UploadStallError } from "./upload/channel.js";
|
||||||
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
import { processAlbumGroups, processRuleBasedGroups, processTimeWindowGroups, processPatternGroups, processCreatorGroups, processZipPathGroups, processReplyChainGroups, processCaptionGroups, detectGroupingConflicts, type IndexedPackageRef } from "./grouping.js";
|
||||||
import { db } from "./db/client.js";
|
import { db } from "./db/client.js";
|
||||||
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
import type { TelegramAccount, TelegramChannel } from "@prisma/client";
|
||||||
@@ -73,10 +74,10 @@ export async function authenticateAccount(
|
|||||||
|
|
||||||
let client: Client | undefined;
|
let client: Client | undefined;
|
||||||
try {
|
try {
|
||||||
client = await createTdlibClient({
|
client = (await createTdlibClient({
|
||||||
id: account.id,
|
id: account.id,
|
||||||
phone: account.phone,
|
phone: account.phone,
|
||||||
});
|
})).client;
|
||||||
aLog.info("Authentication successful");
|
aLog.info("Authentication successful");
|
||||||
|
|
||||||
// Auto-fetch channels and create a fetch request result
|
// Auto-fetch channels and create a fetch request result
|
||||||
@@ -131,7 +132,7 @@ export async function processFetchRequest(requestId: string): Promise<void> {
|
|||||||
await updateFetchRequestStatus(requestId, "IN_PROGRESS");
|
await updateFetchRequestStatus(requestId, "IN_PROGRESS");
|
||||||
aLog.info({ accountId: request.accountId }, "Processing fetch request");
|
aLog.info({ accountId: request.accountId }, "Processing fetch request");
|
||||||
|
|
||||||
const client = await createTdlibClient({
|
const { client } = await createTdlibClient({
|
||||||
id: request.account.id,
|
id: request.account.id,
|
||||||
phone: request.account.phone,
|
phone: request.account.phone,
|
||||||
});
|
});
|
||||||
@@ -285,6 +286,7 @@ interface PipelineContext {
|
|||||||
client: Client;
|
client: Client;
|
||||||
runId: string;
|
runId: string;
|
||||||
accountId: string;
|
accountId: string;
|
||||||
|
accountPhone: string;
|
||||||
channelTitle: string;
|
channelTitle: string;
|
||||||
channel: TelegramChannel;
|
channel: TelegramChannel;
|
||||||
destChannelTelegramId: bigint;
|
destChannelTelegramId: bigint;
|
||||||
@@ -301,6 +303,9 @@ interface PipelineContext {
|
|||||||
/** Forum topic ID (null for non-forum). */
|
/** Forum topic ID (null for non-forum). */
|
||||||
sourceTopicId: bigint | null;
|
sourceTopicId: bigint | null;
|
||||||
accountLog: ReturnType<typeof childLogger>;
|
accountLog: ReturnType<typeof childLogger>;
|
||||||
|
maxUploadSize: bigint;
|
||||||
|
/** How many consecutive upload stalls have occurred (resets on success). */
|
||||||
|
consecutiveStalls: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -336,10 +341,14 @@ export async function runWorkerForAccount(
|
|||||||
currentStep: "connecting",
|
currentStep: "connecting",
|
||||||
});
|
});
|
||||||
|
|
||||||
const client = await createTdlibClient({
|
// Use let so the client can be replaced on TDLib recreation after stalls
|
||||||
|
let { client, isPremium } = await createTdlibClient({
|
||||||
id: account.id,
|
id: account.id,
|
||||||
phone: account.phone,
|
phone: account.phone,
|
||||||
});
|
});
|
||||||
|
const maxUploadSize = isPremium
|
||||||
|
? 3950n * 1024n * 1024n
|
||||||
|
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
||||||
|
|
||||||
// Load all chats into TDLib's local cache using loadChats (the recommended API).
|
// Load all chats into TDLib's local cache using loadChats (the recommended API).
|
||||||
// Without this, getChat/searchChatMessages fail with "Chat not found".
|
// Without this, getChat/searchChatMessages fail with "Chat not found".
|
||||||
@@ -443,6 +452,7 @@ export async function runWorkerForAccount(
|
|||||||
client,
|
client,
|
||||||
runId: activeRunId,
|
runId: activeRunId,
|
||||||
accountId: account.id,
|
accountId: account.id,
|
||||||
|
accountPhone: account.phone,
|
||||||
channelTitle: channel.title,
|
channelTitle: channel.title,
|
||||||
channel,
|
channel,
|
||||||
destChannelTelegramId: destChannel.telegramId,
|
destChannelTelegramId: destChannel.telegramId,
|
||||||
@@ -452,6 +462,8 @@ export async function runWorkerForAccount(
|
|||||||
topicCreator: null,
|
topicCreator: null,
|
||||||
sourceTopicId: null,
|
sourceTopicId: null,
|
||||||
accountLog,
|
accountLog,
|
||||||
|
maxUploadSize,
|
||||||
|
consecutiveStalls: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (forum) {
|
if (forum) {
|
||||||
@@ -526,6 +538,15 @@ export async function runWorkerForAccount(
|
|||||||
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
||||||
"No new archives in topic"
|
"No new archives in topic"
|
||||||
);
|
);
|
||||||
|
// Still advance topic watermark so we don't re-scan these messages next cycle
|
||||||
|
if (scanResult.maxScannedMessageId) {
|
||||||
|
await upsertTopicProgress(
|
||||||
|
mapping.id,
|
||||||
|
topic.topicId,
|
||||||
|
topic.name,
|
||||||
|
scanResult.maxScannedMessageId
|
||||||
|
);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -540,14 +561,17 @@ export async function runWorkerForAccount(
|
|||||||
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
pipelineCtx.channelTitle = `${channel.title} › ${topic.name}`;
|
||||||
|
|
||||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, progress?.lastProcessedMessageId);
|
||||||
|
// Sync client back in case it was recreated during upload stall recovery
|
||||||
|
client = pipelineCtx.client;
|
||||||
|
|
||||||
// Only advance progress to the highest successfully processed message
|
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||||
if (maxProcessedId) {
|
const topicWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||||
|
if (topicWatermark) {
|
||||||
await upsertTopicProgress(
|
await upsertTopicProgress(
|
||||||
mapping.id,
|
mapping.id,
|
||||||
topic.topicId,
|
topic.topicId,
|
||||||
topic.name,
|
topic.name,
|
||||||
maxProcessedId
|
topicWatermark
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (topicErr) {
|
} catch (topicErr) {
|
||||||
@@ -597,6 +621,11 @@ export async function runWorkerForAccount(
|
|||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||||
|
// Still advance watermark to highest scanned message so we don't
|
||||||
|
// re-scan these messages next cycle
|
||||||
|
if (scanResult.maxScannedMessageId) {
|
||||||
|
await updateLastProcessedMessage(mapping.id, scanResult.maxScannedMessageId);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -611,10 +640,13 @@ export async function runWorkerForAccount(
|
|||||||
pipelineCtx.channelTitle = channel.title;
|
pipelineCtx.channelTitle = channel.title;
|
||||||
|
|
||||||
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
const maxProcessedId = await processArchiveSets(pipelineCtx, scanResult, run.id, mapping.lastProcessedMessageId);
|
||||||
|
// Sync client back in case it was recreated during upload stall recovery
|
||||||
|
client = pipelineCtx.client;
|
||||||
|
|
||||||
// Only advance progress to the highest successfully processed message
|
// Advance progress: use archive watermark if available, fall back to scan watermark
|
||||||
if (maxProcessedId) {
|
const channelWatermark = maxProcessedId ?? scanResult.maxScannedMessageId;
|
||||||
await updateLastProcessedMessage(mapping.id, maxProcessedId);
|
if (channelWatermark) {
|
||||||
|
await updateLastProcessedMessage(mapping.id, channelWatermark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (channelErr) {
|
} catch (channelErr) {
|
||||||
@@ -754,12 +786,68 @@ async function processArchiveSets(
|
|||||||
if (setMaxId > (maxProcessedId ?? 0n)) {
|
if (setMaxId > (maxProcessedId ?? 0n)) {
|
||||||
maxProcessedId = setMaxId;
|
maxProcessedId = setMaxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset stall counter on any successful upload
|
||||||
|
ctx.consecutiveStalls = 0;
|
||||||
} catch (setErr) {
|
} catch (setErr) {
|
||||||
// If a set fails, do NOT advance the watermark past it
|
// If a set fails, do NOT advance the watermark past it
|
||||||
accountLog.warn(
|
accountLog.warn(
|
||||||
{ err: setErr, baseName: archiveSets[setIdx].baseName },
|
{ err: setErr, baseName: archiveSets[setIdx].baseName },
|
||||||
"Archive set failed, watermark will not advance past this set"
|
"Archive set failed, watermark will not advance past this set"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// ── TDLib client recreation on repeated upload stalls ──
|
||||||
|
// When the TDLib event stream degrades, uploads complete (bytes sent)
|
||||||
|
// but confirmations never arrive. Retrying with the same broken client
|
||||||
|
// is futile. Recreate the client to get a fresh connection.
|
||||||
|
if (setErr instanceof UploadStallError) {
|
||||||
|
ctx.consecutiveStalls++;
|
||||||
|
accountLog.warn(
|
||||||
|
{ consecutiveStalls: ctx.consecutiveStalls },
|
||||||
|
"Upload stall detected — TDLib event stream may be degraded"
|
||||||
|
);
|
||||||
|
|
||||||
|
// After 1 stalled set (= 3 failed retry attempts already), recreate the client
|
||||||
|
if (ctx.consecutiveStalls >= 1) {
|
||||||
|
accountLog.info("Recreating TDLib client after consecutive upload stalls");
|
||||||
|
try {
|
||||||
|
await closeTdlibClient(ctx.client);
|
||||||
|
} catch (closeErr) {
|
||||||
|
accountLog.warn({ err: closeErr }, "Error closing stale TDLib client");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { client: newClient } = await createTdlibClient({
|
||||||
|
id: ctx.accountId,
|
||||||
|
phone: ctx.accountPhone,
|
||||||
|
});
|
||||||
|
ctx.client = newClient;
|
||||||
|
|
||||||
|
// Reload chats so the new client can access channels
|
||||||
|
try {
|
||||||
|
for (let page = 0; page < 500; page++) {
|
||||||
|
await newClient.invoke({
|
||||||
|
_: "loadChats",
|
||||||
|
chat_list: { _: "chatListMain" },
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// 404 = all loaded (expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.consecutiveStalls = 0;
|
||||||
|
accountLog.info("TDLib client recreated successfully — continuing ingestion");
|
||||||
|
} catch (recreateErr) {
|
||||||
|
accountLog.error(
|
||||||
|
{ err: recreateErr },
|
||||||
|
"Failed to recreate TDLib client — aborting remaining uploads"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Record the failure for visibility in the UI
|
// Record the failure for visibility in the UI
|
||||||
try {
|
try {
|
||||||
const archiveSet = archiveSets[setIdx];
|
const archiveSet = archiveSets[setIdx];
|
||||||
@@ -1027,6 +1115,35 @@ async function processOneArchiveSet(
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
|
||||||
|
const hashLockAcquired = await tryAcquireHashLock(contentHash);
|
||||||
|
if (!hashLockAcquired) {
|
||||||
|
counters.zipsDuplicate++;
|
||||||
|
accountLog.info(
|
||||||
|
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
|
||||||
|
"Hash lock held by another worker — skipping concurrent duplicate"
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
|
||||||
|
let creator: string | null = null;
|
||||||
|
const tags: string[] = [];
|
||||||
|
let stub: { id: string } | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Re-check after acquiring lock: another worker may have finished between
|
||||||
|
// the first check above and this point.
|
||||||
|
const existsAfterLock = await packageExistsByHash(contentHash);
|
||||||
|
if (existsAfterLock) {
|
||||||
|
counters.zipsDuplicate++;
|
||||||
|
accountLog.debug(
|
||||||
|
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
|
||||||
|
"Duplicate detected after acquiring hash lock — skipping"
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// ── Reading metadata ──
|
// ── Reading metadata ──
|
||||||
await updateRunActivity(runId, {
|
await updateRunActivity(runId, {
|
||||||
currentActivity: `Reading file list from ${archiveName}`,
|
currentActivity: `Reading file list from ${archiveName}`,
|
||||||
@@ -1037,7 +1154,6 @@ async function processOneArchiveSet(
|
|||||||
totalFiles: totalSets,
|
totalFiles: totalSets,
|
||||||
});
|
});
|
||||||
|
|
||||||
let entries: { path: string; fileName: string; extension: string | null; compressedSize: bigint; uncompressedSize: bigint; crc32: string | null }[] = [];
|
|
||||||
try {
|
try {
|
||||||
if (archiveSet.type === "ZIP") {
|
if (archiveSet.type === "ZIP") {
|
||||||
entries = await readZipCentralDirectory(tempPaths);
|
entries = await readZipCentralDirectory(tempPaths);
|
||||||
@@ -1069,7 +1185,7 @@ async function processOneArchiveSet(
|
|||||||
(sum, p) => sum + p.fileSize,
|
(sum, p) => sum + p.fileSize,
|
||||||
0n
|
0n
|
||||||
);
|
);
|
||||||
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
|
||||||
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
|
const hasOversizedPart = archiveSet.parts.some((p) => p.fileSize > MAX_UPLOAD_SIZE);
|
||||||
|
|
||||||
if (hasOversizedPart) {
|
if (hasOversizedPart) {
|
||||||
@@ -1084,7 +1200,7 @@ async function processOneArchiveSet(
|
|||||||
});
|
});
|
||||||
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
|
const concatPath = path.join(setDir, `${archiveSet.baseName}.concat`);
|
||||||
await concatenateFiles(tempPaths, concatPath);
|
await concatenateFiles(tempPaths, concatPath);
|
||||||
splitPaths = await byteLevelSplit(concatPath);
|
splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
|
||||||
uploadPaths = splitPaths;
|
uploadPaths = splitPaths;
|
||||||
// Clean up the concat intermediate file
|
// Clean up the concat intermediate file
|
||||||
await unlink(concatPath).catch(() => {});
|
await unlink(concatPath).catch(() => {});
|
||||||
@@ -1098,7 +1214,7 @@ async function processOneArchiveSet(
|
|||||||
currentFileNum: setIdx + 1,
|
currentFileNum: setIdx + 1,
|
||||||
totalFiles: totalSets,
|
totalFiles: totalSets,
|
||||||
});
|
});
|
||||||
splitPaths = await byteLevelSplit(tempPaths[0]);
|
splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
|
||||||
uploadPaths = splitPaths;
|
uploadPaths = splitPaths;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1139,72 +1255,112 @@ async function processOneArchiveSet(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Uploading ──
|
// ── Uploading ──
|
||||||
// Check if a prior run already uploaded this file (orphaned upload scenario:
|
// Check if a prior run already uploaded this file (orphaned upload scenario:
|
||||||
// file reached Telegram but DB write failed or worker crashed before indexing)
|
// file reached Telegram but DB write failed or worker crashed before indexing)
|
||||||
const existingUpload = await getUploadedPackageByHash(contentHash);
|
const existingUpload = await getUploadedPackageByHash(contentHash);
|
||||||
let destResult: { messageId: bigint; messageIds: bigint[] };
|
let destResult: { messageId: bigint; messageIds: bigint[] };
|
||||||
|
|
||||||
if (existingUpload && existingUpload.destMessageId) {
|
if (existingUpload && existingUpload.destMessageId) {
|
||||||
accountLog.info(
|
accountLog.info(
|
||||||
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
|
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
|
||||||
"Reusing existing upload (file already on destination channel)"
|
"Reusing existing upload (file already on destination channel)"
|
||||||
);
|
);
|
||||||
destResult = {
|
destResult = {
|
||||||
messageId: existingUpload.destMessageId,
|
messageId: existingUpload.destMessageId,
|
||||||
messageIds: existingUpload.destMessageIds?.length
|
messageIds: existingUpload.destMessageIds?.length
|
||||||
? (existingUpload.destMessageIds as bigint[])
|
? (existingUpload.destMessageIds as bigint[])
|
||||||
: [existingUpload.destMessageId],
|
: [existingUpload.destMessageId],
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
const uploadLabel = uploadPaths.length > 1
|
const uploadLabel = uploadPaths.length > 1
|
||||||
? ` (${uploadPaths.length} parts)`
|
? ` (${uploadPaths.length} parts)`
|
||||||
: "";
|
: "";
|
||||||
await updateRunActivity(runId, {
|
await updateRunActivity(runId, {
|
||||||
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
|
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
|
||||||
currentStep: "uploading",
|
currentStep: "uploading",
|
||||||
currentChannel: channelTitle,
|
currentChannel: channelTitle,
|
||||||
currentFile: archiveName,
|
currentFile: archiveName,
|
||||||
currentFileNum: setIdx + 1,
|
currentFileNum: setIdx + 1,
|
||||||
totalFiles: totalSets,
|
totalFiles: totalSets,
|
||||||
|
});
|
||||||
|
|
||||||
|
destResult = await uploadToChannel(
|
||||||
|
client,
|
||||||
|
destChannelTelegramId,
|
||||||
|
uploadPaths
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Post-upload integrity check ──
|
||||||
|
// Verify the files on disk still match before we index
|
||||||
|
if (uploadPaths.length > 0 && !existingUpload) {
|
||||||
|
try {
|
||||||
|
const postUploadHash = await hashParts(uploadPaths);
|
||||||
|
if (splitPaths.length > 0) {
|
||||||
|
// Split files — hash should match the split hash (already verified above)
|
||||||
|
// No additional check needed since we verified split hash = original hash
|
||||||
|
} else if (postUploadHash !== contentHash) {
|
||||||
|
accountLog.error(
|
||||||
|
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
|
||||||
|
"Hash changed between hashing and upload — possible disk corruption"
|
||||||
|
);
|
||||||
|
await db.systemNotification.create({
|
||||||
|
data: {
|
||||||
|
type: "HASH_MISMATCH",
|
||||||
|
severity: "ERROR",
|
||||||
|
title: `Post-upload hash mismatch: ${archiveName}`,
|
||||||
|
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`,
|
||||||
|
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Best-effort — don't fail the ingestion
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Phase 1: Stub record — persisted immediately after upload ──
|
||||||
|
await deleteOrphanedPackageByHash(contentHash);
|
||||||
|
|
||||||
|
creator =
|
||||||
|
topicCreator ??
|
||||||
|
extractCreatorFromFileName(archiveName) ??
|
||||||
|
extractCreatorFromChannelTitle(channelTitle) ??
|
||||||
|
null;
|
||||||
|
|
||||||
|
if (channel.category) {
|
||||||
|
tags.push(channel.category);
|
||||||
|
}
|
||||||
|
|
||||||
|
stub = await createPackageStub({
|
||||||
|
contentHash,
|
||||||
|
fileName: archiveName,
|
||||||
|
fileSize: totalSize,
|
||||||
|
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
|
||||||
|
sourceChannelId: channel.id,
|
||||||
|
sourceMessageId: archiveSet.parts[0].id,
|
||||||
|
sourceTopicId,
|
||||||
|
destChannelId,
|
||||||
|
destMessageId: destResult.messageId,
|
||||||
|
destMessageIds: destResult.messageIds,
|
||||||
|
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
|
||||||
|
partCount: uploadPaths.length,
|
||||||
|
ingestionRunId,
|
||||||
|
creator,
|
||||||
|
tags,
|
||||||
});
|
});
|
||||||
|
|
||||||
destResult = await uploadToChannel(
|
counters.zipsIngested++;
|
||||||
client,
|
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
|
||||||
destChannelTelegramId,
|
} finally {
|
||||||
uploadPaths
|
await releaseHashLock(contentHash);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Post-upload integrity check ──
|
if (!stub) return null;
|
||||||
// Verify the files on disk still match before we index
|
|
||||||
if (uploadPaths.length > 0 && !existingUpload) {
|
|
||||||
try {
|
|
||||||
const postUploadHash = await hashParts(uploadPaths);
|
|
||||||
if (splitPaths.length > 0) {
|
|
||||||
// Split files — hash should match the split hash (already verified above)
|
|
||||||
// No additional check needed since we verified split hash = original hash
|
|
||||||
} else if (postUploadHash !== contentHash) {
|
|
||||||
accountLog.error(
|
|
||||||
{ fileName: archiveName, originalHash: contentHash, postUploadHash },
|
|
||||||
"Hash changed between hashing and upload — possible disk corruption"
|
|
||||||
);
|
|
||||||
await db.systemNotification.create({
|
|
||||||
data: {
|
|
||||||
type: "HASH_MISMATCH",
|
|
||||||
severity: "ERROR",
|
|
||||||
title: `Post-upload hash mismatch: ${archiveName}`,
|
|
||||||
message: `Hash changed between download and upload. Original: ${contentHash.slice(0, 16)}…, post-upload: ${postUploadHash.slice(0, 16)}…`,
|
|
||||||
context: { fileName: archiveName, originalHash: contentHash, postUploadHash, sourceChannelId: channel.id },
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Best-effort — don't fail the ingestion
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Preview thumbnail ──
|
// ── Preview thumbnail ──
|
||||||
|
// (moved here from before stub creation — lock is released, preview doesn't need it)
|
||||||
let previewData: Buffer | null = null;
|
let previewData: Buffer | null = null;
|
||||||
let previewMsgId: bigint | null = null;
|
let previewMsgId: bigint | null = null;
|
||||||
const matchedPhoto = previewMatches.get(archiveSet.baseName);
|
const matchedPhoto = previewMatches.get(archiveSet.baseName);
|
||||||
@@ -1218,8 +1374,6 @@ async function processOneArchiveSet(
|
|||||||
totalFiles: totalSets,
|
totalFiles: totalSets,
|
||||||
});
|
});
|
||||||
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
|
previewData = await downloadPhotoThumbnail(client, matchedPhoto.fileId);
|
||||||
// Only set previewMsgId if we actually got the image data —
|
|
||||||
// otherwise the UI thinks there's a preview but the API returns 404
|
|
||||||
if (previewData) {
|
if (previewData) {
|
||||||
previewMsgId = matchedPhoto.id;
|
previewMsgId = matchedPhoto.id;
|
||||||
}
|
}
|
||||||
@@ -1242,13 +1396,7 @@ async function processOneArchiveSet(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Resolve creator: topic name > filename extraction > channel title > null ──
|
// ── Phase 2: Update stub with file entries and preview ──
|
||||||
const creator = topicCreator
|
|
||||||
?? extractCreatorFromFileName(archiveName)
|
|
||||||
?? extractCreatorFromChannelTitle(channelTitle)
|
|
||||||
?? null;
|
|
||||||
|
|
||||||
// ── Indexing ──
|
|
||||||
await updateRunActivity(runId, {
|
await updateRunActivity(runId, {
|
||||||
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
|
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
|
||||||
currentStep: "indexing",
|
currentStep: "indexing",
|
||||||
@@ -1258,43 +1406,12 @@ async function processOneArchiveSet(
|
|||||||
totalFiles: totalSets,
|
totalFiles: totalSets,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Clean up any orphaned record (same hash but no dest upload) before creating
|
await updatePackageWithMetadata(stub.id, {
|
||||||
await deleteOrphanedPackageByHash(contentHash);
|
files: entries,
|
||||||
|
|
||||||
// Auto-inherit source channel category as initial tag
|
|
||||||
const tags: string[] = [];
|
|
||||||
if (channel.category) {
|
|
||||||
tags.push(channel.category);
|
|
||||||
}
|
|
||||||
|
|
||||||
const pkg = await createPackageWithFiles({
|
|
||||||
contentHash,
|
|
||||||
fileName: archiveName,
|
|
||||||
fileSize: totalSize,
|
|
||||||
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
|
|
||||||
sourceChannelId: channel.id,
|
|
||||||
sourceMessageId: archiveSet.parts[0].id,
|
|
||||||
sourceTopicId,
|
|
||||||
destChannelId,
|
|
||||||
destMessageId: destResult.messageId,
|
|
||||||
destMessageIds: destResult.messageIds,
|
|
||||||
isMultipart:
|
|
||||||
archiveSet.parts.length > 1 || uploadPaths.length > 1,
|
|
||||||
partCount: uploadPaths.length,
|
|
||||||
ingestionRunId,
|
|
||||||
creator,
|
|
||||||
tags,
|
|
||||||
previewData,
|
previewData,
|
||||||
previewMsgId,
|
previewMsgId,
|
||||||
sourceCaption: archiveSet.parts[0].caption ?? null,
|
|
||||||
replyToMessageId: archiveSet.parts[0].replyToMessageId ?? null,
|
|
||||||
files: entries,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
counters.zipsIngested++;
|
|
||||||
// Clean up any prior skip record for this archive
|
|
||||||
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
|
|
||||||
|
|
||||||
await updateRunActivity(runId, {
|
await updateRunActivity(runId, {
|
||||||
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
|
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
|
||||||
currentStep: "complete",
|
currentStep: "complete",
|
||||||
@@ -1310,7 +1427,7 @@ async function processOneArchiveSet(
|
|||||||
"Archive ingested"
|
"Archive ingested"
|
||||||
);
|
);
|
||||||
|
|
||||||
return pkg.id;
|
return stub.id;
|
||||||
} finally {
|
} finally {
|
||||||
// ALWAYS delete temp files and the set directory
|
// ALWAYS delete temp files and the set directory
|
||||||
await deleteFiles([...tempPaths, ...splitPaths]);
|
await deleteFiles([...tempPaths, ...splitPaths]);
|
||||||
|
|||||||
Reference in New Issue
Block a user