mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 14:21:15 +00:00
7-task plan covering double-upload fix (hash lock + two-phase write), parallel account ingestion (per-key mutex), and Premium 4GB upload limit with automatic detection. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1001 lines
30 KiB
Markdown
1001 lines
30 KiB
Markdown
# Worker Improvements Implementation Plan
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** Fix double-uploads from concurrent accounts and crash-before-write scenarios, enable parallel account ingestion, and apply per-account Telegram Premium upload limits.
|
||
|
||
**Architecture:** Three independent changes wired together: (1) hash advisory lock + two-phase DB write closes both double-upload races; (2) per-key mutex replaces the global TDLib mutex so different accounts run in parallel while the same account is still serialized; (3) `getMe().is_premium` drives a per-account `maxUploadSize` that overrides the global `MAX_PART_SIZE_MB` default — effectively eliminating repacking for the Premium account.
|
||
|
||
**Tech Stack:** Node.js, TypeScript, TDLib (`tdl`), PostgreSQL (Prisma ORM + raw `pg` pool for advisory locks), Docker Compose. No test framework — verification is manual via logs, DB inspection, and Telegram channel checks.
|
||
|
||
---
|
||
|
||
## File Map
|
||
|
||
| File | What changes |
|
||
|---|---|
|
||
| `prisma/schema.prisma` | Add `isPremium Boolean @default(false)` to `TelegramAccount` |
|
||
| `worker/src/db/queries.ts` | Add `updateAccountPremiumStatus`, `createPackageStub`, `updatePackageWithMetadata` |
|
||
| `worker/src/db/locks.ts` | Add `tryAcquireHashLock`, `releaseHashLock` |
|
||
| `worker/src/tdlib/client.ts` | Return `{ client, isPremium }`, detect via `getMe()`, log speed limit events |
|
||
| `worker/src/util/mutex.ts` | Convert from single global boolean to per-key map; add `accountKey` param |
|
||
| `worker/src/worker.ts` | Two-phase write, hash lock, `maxUploadSize` from `isPremium`; update all `createTdlibClient` call sites |
|
||
| `worker/src/archive/split.ts` | Accept optional `maxPartSize` parameter in `byteLevelSplit` |
|
||
| `worker/src/scheduler.ts` | Replace sequential loop + `withTdlibMutex` with `Promise.allSettled`; update mutex call signatures |
|
||
| `worker/src/fetch-listener.ts` | Update 5 `withTdlibMutex` call sites to new signature |
|
||
| `worker/src/extract-listener.ts` | Update 1 `withTdlibMutex` call site to new signature |
|
||
| `worker/src/recovery.ts` | Update `createTdlibClient` call site to destructure |
|
||
|
||
---
|
||
|
||
## Task 1: Add `isPremium` to `TelegramAccount` schema
|
||
|
||
**Files:**
|
||
- Modify: `prisma/schema.prisma`
|
||
- Create: Prisma migration (via CLI)
|
||
- Modify: `worker/src/db/queries.ts`
|
||
|
||
- [ ] **Step 1: Add field to schema**
|
||
|
||
In `prisma/schema.prisma`, find `model TelegramAccount` and add `isPremium` after `authCode`:
|
||
|
||
```prisma
|
||
model TelegramAccount {
|
||
id String @id @default(cuid())
|
||
phone String @unique
|
||
displayName String?
|
||
isActive Boolean @default(true)
|
||
authState AuthState @default(PENDING)
|
||
authCode String?
|
||
isPremium Boolean @default(false)
|
||
lastSeenAt DateTime?
|
||
createdAt DateTime @default(now())
|
||
updatedAt DateTime @updatedAt
|
||
|
||
channelMaps AccountChannelMap[]
|
||
ingestionRuns IngestionRun[]
|
||
fetchRequests ChannelFetchRequest[]
|
||
skippedPackages SkippedPackage[]
|
||
|
||
@@index([isActive])
|
||
@@map("telegram_accounts")
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Generate migration**
|
||
|
||
```bash
|
||
npx prisma migrate dev --name add_is_premium_to_telegram_account
|
||
```
|
||
|
||
Expected output: `The following migration(s) have been created and applied ... add_is_premium_to_telegram_account`
|
||
|
||
- [ ] **Step 3: Add `updateAccountPremiumStatus` to `worker/src/db/queries.ts`**
|
||
|
||
Add after the `updateAccountAuthState` function:
|
||
|
||
```typescript
|
||
export async function updateAccountPremiumStatus(
|
||
accountId: string,
|
||
isPremium: boolean
|
||
): Promise<void> {
|
||
await db.telegramAccount.update({
|
||
where: { id: accountId },
|
||
data: { isPremium },
|
||
});
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no errors.
|
||
|
||
- [ ] **Step 5: Commit**
|
||
|
||
```bash
|
||
git add prisma/schema.prisma prisma/migrations/ worker/src/db/queries.ts
|
||
git commit -m "feat: add isPremium field to TelegramAccount"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 2: Detect Premium status in `createTdlibClient`
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/tdlib/client.ts`
|
||
- Modify: `worker/src/worker.ts` (two call sites)
|
||
- Modify: `worker/src/recovery.ts` (one call site)
|
||
|
||
- [ ] **Step 1: Update imports in `client.ts`**
|
||
|
||
```typescript
|
||
import {
|
||
updateAccountAuthState,
|
||
getAccountAuthCode,
|
||
updateAccountPremiumStatus,
|
||
} from "../db/queries.js";
|
||
```
|
||
|
||
- [ ] **Step 2: Change return type of `createTdlibClient`**
|
||
|
||
```typescript
|
||
export async function createTdlibClient(
|
||
account: AccountConfig
|
||
): Promise<{ client: Client; isPremium: boolean }> {
|
||
```
|
||
|
||
- [ ] **Step 3: Replace `return client` at the end of the try block**
|
||
|
||
Find `return client;` inside the try block and replace with:
|
||
|
||
```typescript
|
||
await updateAccountAuthState(account.id, "AUTHENTICATED");
|
||
log.info({ accountId: account.id }, "TDLib client authenticated");
|
||
|
||
let isPremium = false;
|
||
try {
|
||
const me = await client.invoke({ _: "getMe" }) as { is_premium?: boolean };
|
||
isPremium = me.is_premium ?? false;
|
||
await updateAccountPremiumStatus(account.id, isPremium);
|
||
log.info({ accountId: account.id, isPremium }, "Account Premium status detected");
|
||
} catch (err) {
|
||
log.warn({ err, accountId: account.id }, "Could not detect Premium status, defaulting to false");
|
||
}
|
||
|
||
client.on("update", (update: unknown) => {
|
||
const u = update as { _?: string; is_upload?: boolean };
|
||
if (u?._ === "updateSpeedLimitNotification") {
|
||
log.warn(
|
||
{ accountId: account.id, isUpload: u.is_upload },
|
||
u.is_upload
|
||
? "Upload speed limited by Telegram (account is not Premium)"
|
||
: "Download speed limited by Telegram (account is not Premium)"
|
||
);
|
||
}
|
||
});
|
||
|
||
return { client, isPremium };
|
||
```
|
||
|
||
- [ ] **Step 4: Update `authenticateAccount` in `worker/src/worker.ts`**
|
||
|
||
Find the call in `authenticateAccount`:
|
||
```typescript
|
||
client = await createTdlibClient({
|
||
id: account.id,
|
||
phone: account.phone,
|
||
});
|
||
```
|
||
|
||
Change to:
|
||
```typescript
|
||
client = (await createTdlibClient({
|
||
id: account.id,
|
||
phone: account.phone,
|
||
})).client;
|
||
```
|
||
|
||
- [ ] **Step 5: Update `runWorkerForAccount` in `worker/src/worker.ts`**
|
||
|
||
Find the call in `runWorkerForAccount`:
|
||
```typescript
|
||
const client = await createTdlibClient({
|
||
id: account.id,
|
||
phone: account.phone,
|
||
});
|
||
```
|
||
|
||
Change to:
|
||
```typescript
|
||
const { client, isPremium } = await createTdlibClient({
|
||
id: account.id,
|
||
phone: account.phone,
|
||
});
|
||
```
|
||
|
||
(`isPremium` is used in Task 6.)
|
||
|
||
- [ ] **Step 6: Update `worker/src/recovery.ts`**
|
||
|
||
Find:
|
||
```typescript
|
||
let client: Client | undefined;
|
||
|
||
try {
|
||
client = await createTdlibClient({ id: account.id, phone: account.phone });
|
||
```
|
||
|
||
Change to:
|
||
```typescript
|
||
let client: Client | undefined;
|
||
|
||
try {
|
||
({ client } = await createTdlibClient({ id: account.id, phone: account.phone }));
|
||
```
|
||
|
||
- [ ] **Step 7: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no errors.
|
||
|
||
- [ ] **Step 8: Commit**
|
||
|
||
```bash
|
||
git add worker/src/tdlib/client.ts worker/src/worker.ts worker/src/recovery.ts
|
||
git commit -m "feat: detect and persist Telegram Premium status after authentication"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 3: Add hash advisory lock to `locks.ts`
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/db/locks.ts`
|
||
|
||
The existing `tryAcquireLock` / `releaseLock` pattern (pool.connect → keep connection → release on unlock) is reused exactly. A `hash:` prefix on the lock key string prevents collision with account lock IDs in the 32-bit hash space.
|
||
|
||
- [ ] **Step 1: Add hash lock functions after `releaseLock`**
|
||
|
||
```typescript
|
||
/**
|
||
* Derive a lock ID for a content hash. Prefixes with "hash:" so the resulting
|
||
* 32-bit integer does not collide with account advisory lock IDs.
|
||
*/
|
||
function contentHashToLockId(contentHash: string): number {
|
||
return hashToLockId(`hash:${contentHash}`);
|
||
}
|
||
|
||
/**
|
||
* Acquire a per-content-hash advisory lock before uploading.
|
||
* Prevents two concurrent workers from uploading the same archive
|
||
* when both scan a shared source channel.
|
||
*
|
||
* Returns true if acquired (proceed with upload).
|
||
* Returns false if already held (another worker is handling this archive — skip).
|
||
*
|
||
* MUST be released via releaseHashLock() after createPackageStub() completes,
|
||
* including on all error paths (use try/finally).
|
||
*/
|
||
export async function tryAcquireHashLock(contentHash: string): Promise<boolean> {
|
||
const lockId = contentHashToLockId(contentHash);
|
||
const client = await pool.connect();
|
||
try {
|
||
const result = await client.query<{ pg_try_advisory_lock: boolean }>(
|
||
"SELECT pg_try_advisory_lock($1)",
|
||
[lockId]
|
||
);
|
||
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
|
||
if (acquired) {
|
||
heldConnections.set(`hash:${contentHash}`, client);
|
||
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock acquired");
|
||
return true;
|
||
} else {
|
||
client.release();
|
||
log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock held by another worker — skipping");
|
||
return false;
|
||
}
|
||
} catch (err) {
|
||
client.release();
|
||
throw err;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Release the per-content-hash advisory lock.
|
||
* Call after createPackageStub() completes (or on any error path).
|
||
*/
|
||
export async function releaseHashLock(contentHash: string): Promise<void> {
|
||
const lockId = contentHashToLockId(contentHash);
|
||
const client = heldConnections.get(`hash:${contentHash}`);
|
||
if (!client) {
|
||
log.warn({ hash: contentHash.slice(0, 16) }, "No held connection for hash lock release");
|
||
return;
|
||
}
|
||
try {
|
||
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
|
||
log.debug({ hash: contentHash.slice(0, 16) }, "Hash lock released");
|
||
} finally {
|
||
heldConnections.delete(`hash:${contentHash}`);
|
||
client.release();
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no errors.
|
||
|
||
- [ ] **Step 3: Commit**
|
||
|
||
```bash
|
||
git add worker/src/db/locks.ts
|
||
git commit -m "feat: add per-content-hash advisory lock to prevent concurrent duplicate uploads"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 4: Add `createPackageStub` and `updatePackageWithMetadata` to `queries.ts`
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/db/queries.ts`
|
||
|
||
- [ ] **Step 1: Add `createPackageStub` after `getUploadedPackageByHash`**
|
||
|
||
```typescript
|
||
export interface CreatePackageStubInput {
|
||
contentHash: string;
|
||
fileName: string;
|
||
fileSize: bigint;
|
||
archiveType: ArchiveType;
|
||
sourceChannelId: string;
|
||
sourceMessageId: bigint;
|
||
sourceTopicId?: bigint | null;
|
||
destChannelId: string;
|
||
destMessageId: bigint;
|
||
destMessageIds: bigint[];
|
||
isMultipart: boolean;
|
||
partCount: number;
|
||
ingestionRunId: string;
|
||
creator?: string | null;
|
||
tags?: string[];
|
||
}
|
||
|
||
/**
|
||
* Write a minimal Package record immediately after Telegram confirms the upload.
|
||
* Call this before preview/metadata extraction so recoverIncompleteUploads() can
|
||
* detect and verify the package if the worker crashes mid-metadata.
|
||
*
|
||
* Follow with updatePackageWithMetadata() once file entries and preview are ready.
|
||
*/
|
||
export async function createPackageStub(
|
||
input: CreatePackageStubInput
|
||
): Promise<{ id: string }> {
|
||
const pkg = await db.package.create({
|
||
data: {
|
||
contentHash: input.contentHash,
|
||
fileName: input.fileName,
|
||
fileSize: input.fileSize,
|
||
archiveType: input.archiveType,
|
||
sourceChannelId: input.sourceChannelId,
|
||
sourceMessageId: input.sourceMessageId,
|
||
sourceTopicId: input.sourceTopicId ?? undefined,
|
||
destChannelId: input.destChannelId,
|
||
destMessageId: input.destMessageId,
|
||
destMessageIds: input.destMessageIds,
|
||
isMultipart: input.isMultipart,
|
||
partCount: input.partCount,
|
||
fileCount: 0,
|
||
ingestionRunId: input.ingestionRunId,
|
||
creator: input.creator ?? undefined,
|
||
tags: input.tags?.length ? input.tags : undefined,
|
||
},
|
||
select: { id: true },
|
||
});
|
||
|
||
try {
|
||
await db.$queryRawUnsafe(
|
||
`SELECT pg_notify('new_package', $1)`,
|
||
JSON.stringify({
|
||
packageId: pkg.id,
|
||
fileName: input.fileName,
|
||
creator: input.creator ?? null,
|
||
tags: input.tags ?? [],
|
||
})
|
||
);
|
||
} catch {
|
||
// Best-effort
|
||
}
|
||
|
||
return pkg;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Add `updatePackageWithMetadata` after `createPackageStub`**
|
||
|
||
```typescript
|
||
/**
|
||
* Update a stub Package with file entries and preview after metadata extraction.
|
||
* Called as Phase 2 of the two-phase write after createPackageStub().
|
||
*/
|
||
export async function updatePackageWithMetadata(
|
||
packageId: string,
|
||
input: {
|
||
files: {
|
||
path: string;
|
||
fileName: string;
|
||
extension: string | null;
|
||
compressedSize: bigint;
|
||
uncompressedSize: bigint;
|
||
crc32: string | null;
|
||
}[];
|
||
previewData?: Buffer | null;
|
||
previewMsgId?: bigint | null;
|
||
}
|
||
): Promise<void> {
|
||
await db.package.update({
|
||
where: { id: packageId },
|
||
data: {
|
||
fileCount: input.files.length,
|
||
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
||
previewMsgId: input.previewMsgId ?? undefined,
|
||
files: {
|
||
create: input.files,
|
||
},
|
||
},
|
||
});
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no errors.
|
||
|
||
- [ ] **Step 4: Commit**
|
||
|
||
```bash
|
||
git add worker/src/db/queries.ts
|
||
git commit -m "feat: add createPackageStub and updatePackageWithMetadata for two-phase DB write"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 5: Two-phase write + hash lock in `worker.ts`
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/worker.ts`
|
||
|
||
This task rewires the upload + indexing section of `processOneArchiveSet`. No other function changes.
|
||
|
||
- [ ] **Step 1: Update imports from `./db/queries.js`**
|
||
|
||
Remove `createPackageWithFiles` and add the new functions:
|
||
|
||
```typescript
|
||
import {
|
||
getSourceChannelMappings,
|
||
getGlobalDestinationChannel,
|
||
packageExistsByHash,
|
||
packageExistsBySourceMessage,
|
||
createPackageStub,
|
||
updatePackageWithMetadata,
|
||
createIngestionRun,
|
||
completeIngestionRun,
|
||
failIngestionRun,
|
||
updateLastProcessedMessage,
|
||
updateRunActivity,
|
||
setChannelForum,
|
||
getTopicProgress,
|
||
upsertTopicProgress,
|
||
upsertChannel,
|
||
ensureAccountChannelLink,
|
||
getGlobalSetting,
|
||
getChannelFetchRequest,
|
||
updateFetchRequestStatus,
|
||
getAccountLinkedChannelIds,
|
||
getExistingChannelsByTelegramId,
|
||
getAccountById,
|
||
deleteOrphanedPackageByHash,
|
||
getUploadedPackageByHash,
|
||
upsertSkippedPackage,
|
||
deleteSkippedPackage,
|
||
} from "./db/queries.js";
|
||
```
|
||
|
||
- [ ] **Step 2: Update imports from `./db/locks.js`**
|
||
|
||
```typescript
|
||
import { tryAcquireLock, releaseLock, tryAcquireHashLock, releaseHashLock } from "./db/locks.js";
|
||
```
|
||
|
||
- [ ] **Step 3: Add `maxUploadSize` to `PipelineContext`**
|
||
|
||
Find the `PipelineContext` interface and add the field:
|
||
|
||
```typescript
|
||
interface PipelineContext {
|
||
client: Client;
|
||
runId: string;
|
||
channelTitle: string;
|
||
channel: TelegramChannel;
|
||
destChannelTelegramId: bigint;
|
||
destChannelId: string;
|
||
throttled: ThrottledActivity;
|
||
counters: RunCounters;
|
||
topicCreator: string | null;
|
||
sourceTopicId: bigint | null;
|
||
accountLog: ReturnType<typeof childLogger>;
|
||
accountId: string;
|
||
maxUploadSize: bigint;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Add hash lock + re-check after the `packageExistsByHash` skip block**
|
||
|
||
In `processOneArchiveSet`, find the end of the `packageExistsByHash` block (around line 979, just after `return null;`). Insert after it:
|
||
|
||
```typescript
|
||
// ── Hash lock: prevent concurrent workers racing on shared-channel archives ──
|
||
const hashLockAcquired = await tryAcquireHashLock(contentHash);
|
||
if (!hashLockAcquired) {
|
||
counters.zipsDuplicate++;
|
||
accountLog.info(
|
||
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
|
||
"Hash lock held by another worker — skipping concurrent duplicate"
|
||
);
|
||
return null;
|
||
}
|
||
|
||
// Re-check after acquiring lock: another worker may have finished between
|
||
// the first check above and this point.
|
||
const existsAfterLock = await packageExistsByHash(contentHash);
|
||
if (existsAfterLock) {
|
||
await releaseHashLock(contentHash);
|
||
counters.zipsDuplicate++;
|
||
accountLog.debug(
|
||
{ fileName: archiveName, hash: contentHash.slice(0, 16) },
|
||
"Duplicate detected after acquiring hash lock — skipping"
|
||
);
|
||
return null;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5: Wrap upload + stub creation in try/finally to guarantee lock release**
|
||
|
||
Find the `// ── Uploading ──` comment. Wrap everything from that comment through the end of the indexing section (the `deleteSkippedPackage` call) in a `try/finally`:
|
||
|
||
```typescript
|
||
let stub: { id: string } | null = null;
|
||
try {
|
||
// ── Uploading ──
|
||
const existingUpload = await getUploadedPackageByHash(contentHash);
|
||
let destResult: { messageId: bigint; messageIds: bigint[] };
|
||
|
||
if (existingUpload && existingUpload.destMessageId) {
|
||
accountLog.info(
|
||
{ fileName: archiveName, destMessageId: Number(existingUpload.destMessageId) },
|
||
"Reusing existing upload (file already on destination channel)"
|
||
);
|
||
destResult = {
|
||
messageId: existingUpload.destMessageId,
|
||
messageIds: existingUpload.destMessageIds?.length
|
||
? (existingUpload.destMessageIds as bigint[])
|
||
: [existingUpload.destMessageId],
|
||
};
|
||
} else {
|
||
const uploadLabel = uploadPaths.length > 1 ? ` (${uploadPaths.length} parts)` : "";
|
||
await updateRunActivity(runId, {
|
||
currentActivity: `Uploading ${archiveName} to archive channel${uploadLabel}`,
|
||
currentStep: "uploading",
|
||
currentChannel: channelTitle,
|
||
currentFile: archiveName,
|
||
currentFileNum: setIdx + 1,
|
||
totalFiles: totalSets,
|
||
});
|
||
destResult = await uploadToChannel(client, destChannelTelegramId, uploadPaths);
|
||
}
|
||
|
||
// ── Post-upload integrity check ── (keep existing code as-is)
|
||
// ...
|
||
|
||
// ── Phase 1: Stub record — persisted before preview/metadata ──
|
||
await deleteOrphanedPackageByHash(contentHash);
|
||
|
||
const creator =
|
||
topicCreator ??
|
||
extractCreatorFromFileName(archiveName) ??
|
||
extractCreatorFromChannelTitle(channelTitle) ??
|
||
null;
|
||
|
||
const tags: string[] = [];
|
||
if (channel.category) {
|
||
tags.push(channel.category);
|
||
}
|
||
|
||
stub = await createPackageStub({
|
||
contentHash,
|
||
fileName: archiveName,
|
||
fileSize: totalSize,
|
||
archiveType: archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
|
||
sourceChannelId: channel.id,
|
||
sourceMessageId: archiveSet.parts[0].id,
|
||
sourceTopicId,
|
||
destChannelId,
|
||
destMessageId: destResult.messageId,
|
||
destMessageIds: destResult.messageIds,
|
||
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
|
||
partCount: uploadPaths.length,
|
||
ingestionRunId,
|
||
creator,
|
||
tags,
|
||
});
|
||
|
||
counters.zipsIngested++;
|
||
await deleteSkippedPackage(channel.id, archiveSet.parts[0].id);
|
||
} finally {
|
||
await releaseHashLock(contentHash);
|
||
}
|
||
|
||
if (!stub) return null;
|
||
```
|
||
|
||
- [ ] **Step 6: Replace `createPackageWithFiles` call with `updatePackageWithMetadata`**
|
||
|
||
Find the old `createPackageWithFiles` call (and surrounding `updateRunActivity` + `accountLog.info`). Replace with:
|
||
|
||
```typescript
|
||
await updateRunActivity(runId, {
|
||
currentActivity: `Saving metadata for ${archiveName} (${entries.length} files)`,
|
||
currentStep: "indexing",
|
||
currentChannel: channelTitle,
|
||
currentFile: archiveName,
|
||
currentFileNum: setIdx + 1,
|
||
totalFiles: totalSets,
|
||
});
|
||
|
||
await updatePackageWithMetadata(stub.id, {
|
||
files: entries,
|
||
previewData,
|
||
previewMsgId,
|
||
});
|
||
|
||
await updateRunActivity(runId, {
|
||
currentActivity: `Ingested ${archiveName} (${entries.length} files indexed)`,
|
||
currentStep: "complete",
|
||
currentChannel: channelTitle,
|
||
currentFile: archiveName,
|
||
currentFileNum: setIdx + 1,
|
||
totalFiles: totalSets,
|
||
zipsIngested: counters.zipsIngested,
|
||
});
|
||
|
||
accountLog.info(
|
||
{ fileName: archiveName, contentHash, fileCount: entries.length, creator: stub.creator ?? null },
|
||
"Archive ingested"
|
||
);
|
||
|
||
return stub.id;
|
||
```
|
||
|
||
Note: Remove the old `const tags = []`, `deleteOrphanedPackageByHash`, and `deleteSkippedPackage` calls from after the preview section — they now live inside the `try/finally` above.
|
||
|
||
- [ ] **Step 7: Remove `creator` and `tags` derivation from after the preview section**
|
||
|
||
The old code derived `creator` and `tags` between the preview section and `createPackageWithFiles`. Since they now live in the stub creation (Step 5), remove those lines from the old location.
|
||
|
||
- [ ] **Step 8: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no TypeScript errors.
|
||
|
||
Manual check: run the worker (`cd worker && npm run dev`), process a test archive, and confirm:
|
||
1. Logs show `"Hash lock acquired"` and `"Hash lock released"` around each upload
|
||
2. Package is created in DB immediately after upload (check via Prisma Studio — `fileCount` will be 0 briefly, then updated)
|
||
3. No double messages appear in the destination Telegram channel
|
||
|
||
- [ ] **Step 9: Commit**
|
||
|
||
```bash
|
||
git add worker/src/worker.ts
|
||
git commit -m "feat: add two-phase DB write and hash advisory lock to prevent double-uploads"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 6: Per-account upload size limit via `isPremium`
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/archive/split.ts`
|
||
- Modify: `worker/src/worker.ts`
|
||
|
||
- [ ] **Step 1: Add optional `maxPartSize` parameter to `byteLevelSplit`**
|
||
|
||
In `worker/src/archive/split.ts`, change the signature of `byteLevelSplit`:
|
||
|
||
```typescript
|
||
export async function byteLevelSplit(
|
||
filePath: string,
|
||
maxPartSize?: bigint
|
||
): Promise<string[]> {
|
||
const effectiveMax = maxPartSize ?? MAX_PART_SIZE;
|
||
const stats = await stat(filePath);
|
||
const fileSize = BigInt(stats.size);
|
||
|
||
if (fileSize <= effectiveMax) {
|
||
return [filePath];
|
||
}
|
||
|
||
const dir = path.dirname(filePath);
|
||
const baseName = path.basename(filePath);
|
||
const partSize = Number(effectiveMax);
|
||
const totalParts = Math.ceil(Number(fileSize) / partSize);
|
||
const parts: string[] = [];
|
||
|
||
log.info({ filePath, fileSize: Number(fileSize), totalParts }, "Splitting file");
|
||
|
||
for (let i = 0; i < totalParts; i++) {
|
||
const partNum = String(i + 1).padStart(3, "0");
|
||
const partPath = path.join(dir, `${baseName}.${partNum}`);
|
||
const start = i * partSize;
|
||
const end = Math.min(start + partSize - 1, Number(fileSize) - 1);
|
||
|
||
await pipeline(
|
||
createReadStream(filePath, { start, end }),
|
||
createWriteStream(partPath)
|
||
);
|
||
|
||
parts.push(partPath);
|
||
}
|
||
|
||
log.info({ filePath, parts: parts.length }, "File split complete");
|
||
return parts;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Set `maxUploadSize` in `runWorkerForAccount` from `isPremium`**
|
||
|
||
In `worker/src/worker.ts`, in `runWorkerForAccount`, add after `const { client, isPremium } = await createTdlibClient(...)`:
|
||
|
||
```typescript
|
||
const maxUploadSize = isPremium
|
||
? 3950n * 1024n * 1024n
|
||
: BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
||
```
|
||
|
||
Then include `maxUploadSize` in the `PipelineContext` object passed to `processOneArchiveSet` calls.
|
||
|
||
- [ ] **Step 3: Use `ctx.maxUploadSize` in `processOneArchiveSet`**
|
||
|
||
Find:
|
||
```typescript
|
||
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
||
```
|
||
|
||
Replace with:
|
||
```typescript
|
||
const MAX_UPLOAD_SIZE = ctx.maxUploadSize;
|
||
```
|
||
|
||
- [ ] **Step 4: Pass `maxUploadSize` to `byteLevelSplit` calls**
|
||
|
||
Find the two `byteLevelSplit` calls in `processOneArchiveSet`:
|
||
|
||
```typescript
|
||
splitPaths = await byteLevelSplit(concatPath);
|
||
```
|
||
and
|
||
```typescript
|
||
splitPaths = await byteLevelSplit(tempPaths[0]);
|
||
```
|
||
|
||
Change both to pass the upload size:
|
||
|
||
```typescript
|
||
splitPaths = await byteLevelSplit(concatPath, ctx.maxUploadSize);
|
||
```
|
||
and
|
||
```typescript
|
||
splitPaths = await byteLevelSplit(tempPaths[0], ctx.maxUploadSize);
|
||
```
|
||
|
||
- [ ] **Step 5: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Manual check: confirm that a freshly authenticated Premium account logs `isPremium: true` and that the worker logs show `"Account Premium status detected" isPremium=true`. The non-Premium account should log `isPremium: false`. Repack/split will only trigger for Premium if a file part exceeds 3.95 GB.
|
||
|
||
- [ ] **Step 6: Commit**
|
||
|
||
```bash
|
||
git add worker/src/archive/split.ts worker/src/worker.ts
|
||
git commit -m "feat: apply per-account Premium 4GB upload limit to bypass repacking"
|
||
```
|
||
|
||
---
|
||
|
||
## Task 7: Per-key mutex and parallel account scheduler
|
||
|
||
**Files:**
|
||
- Modify: `worker/src/util/mutex.ts`
|
||
- Modify: `worker/src/scheduler.ts`
|
||
- Modify: `worker/src/fetch-listener.ts`
|
||
- Modify: `worker/src/extract-listener.ts`
|
||
|
||
**Why per-key:** The global boolean mutex serializes ALL TDLib operations across ALL accounts. Replacing it with a per-key map allows Account A and Account B to run their TDLib clients concurrently (different keys) while still preventing concurrent use of the SAME account's TDLib state dir (same key).
|
||
|
||
- [ ] **Step 1: Rewrite `worker/src/util/mutex.ts`**
|
||
|
||
Replace the entire file:
|
||
|
||
```typescript
|
||
import { childLogger } from "./logger.js";
|
||
|
||
const log = childLogger("mutex");
|
||
|
||
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
|
||
|
||
const locks = new Map<string, boolean>();
|
||
const holders = new Map<string, string>();
|
||
const queues = new Map<
|
||
string,
|
||
Array<{ resolve: () => void; reject: (err: Error) => void; label: string }>
|
||
>();
|
||
|
||
/**
|
||
* Ensures only one TDLib operation runs at a time FOR THE SAME KEY.
|
||
* Different keys run concurrently — this allows two accounts to ingest in parallel
|
||
* while still preventing concurrent use of the same account's TDLib state dir.
|
||
*
|
||
* key: the account phone number for account-specific ops (auth, ingest),
|
||
* or 'global' for ops that don't belong to a specific account.
|
||
* label: human-readable name for logging.
|
||
*/
|
||
export async function withTdlibMutex<T>(
|
||
key: string,
|
||
label: string,
|
||
fn: () => Promise<T>
|
||
): Promise<T> {
|
||
if (locks.get(key)) {
|
||
log.info({ waiting: label, key, holder: holders.get(key) }, "Waiting for TDLib mutex");
|
||
await new Promise<void>((resolve, reject) => {
|
||
const timer = setTimeout(() => {
|
||
const q = queues.get(key) ?? [];
|
||
const idx = q.indexOf(entry);
|
||
if (idx !== -1) {
|
||
q.splice(idx, 1);
|
||
reject(
|
||
new Error(
|
||
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
|
||
`(waiting: ${label}, key: ${key}, holder: ${holders.get(key)})`
|
||
)
|
||
);
|
||
}
|
||
}, MUTEX_WAIT_TIMEOUT_MS);
|
||
|
||
const entry = {
|
||
resolve: () => {
|
||
clearTimeout(timer);
|
||
resolve();
|
||
},
|
||
reject,
|
||
label,
|
||
};
|
||
|
||
if (!queues.has(key)) queues.set(key, []);
|
||
queues.get(key)!.push(entry);
|
||
});
|
||
}
|
||
|
||
locks.set(key, true);
|
||
holders.set(key, label);
|
||
log.debug({ key, label }, "TDLib mutex acquired");
|
||
|
||
try {
|
||
return await fn();
|
||
} finally {
|
||
locks.delete(key);
|
||
holders.delete(key);
|
||
const next = queues.get(key)?.shift();
|
||
if (next) {
|
||
log.debug({ key, next: next.label }, "TDLib mutex releasing to next waiter");
|
||
next.resolve();
|
||
} else {
|
||
queues.delete(key);
|
||
log.debug({ key, label }, "TDLib mutex released");
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Update `withTdlibMutex` calls in `worker/src/scheduler.ts`**
|
||
|
||
Auth loop — add `account.phone` as key:
|
||
```typescript
|
||
await withTdlibMutex(account.phone, `auth:${account.phone}`, () =>
|
||
authenticateAccount(account)
|
||
);
|
||
```
|
||
|
||
Ingest loop — change to `Promise.allSettled` and add key:
|
||
```typescript
|
||
await Promise.allSettled(
|
||
accounts.map((account) =>
|
||
withTdlibMutex(account.phone, `ingest:${account.phone}`, () =>
|
||
runWorkerForAccount(account)
|
||
)
|
||
)
|
||
);
|
||
```
|
||
|
||
Also remove the cycle timeout check from the account loop — all accounts start simultaneously so there's nothing to gate. The `cycleStart` / `CYCLE_TIMEOUT_MS` logic in the auth loop can stay as-is.
|
||
|
||
- [ ] **Step 3: Update `withTdlibMutex` calls in `worker/src/fetch-listener.ts`**
|
||
|
||
All 5 calls use global operations (no specific account phone). Add `'global'` as the first argument to each:
|
||
|
||
```typescript
|
||
// fetch-channels
|
||
await withTdlibMutex("global", "fetch-channels", () => ...);
|
||
|
||
// generate-invite
|
||
await withTdlibMutex("global", "generate-invite", async () => { ... });
|
||
|
||
// create-destination
|
||
await withTdlibMutex("global", "create-destination", async () => { ... });
|
||
|
||
// join-channel
|
||
await withTdlibMutex("global", "join-channel", async () => { ... });
|
||
|
||
// rebuild-packages
|
||
await withTdlibMutex("global", "rebuild-packages", () => ...);
|
||
```
|
||
|
||
- [ ] **Step 4: Update `withTdlibMutex` call in `worker/src/extract-listener.ts`**
|
||
|
||
```typescript
|
||
await withTdlibMutex("global", "extract", async () => { ... });
|
||
```
|
||
|
||
- [ ] **Step 5: Verify**
|
||
|
||
```bash
|
||
cd worker && npx tsc --noEmit
|
||
```
|
||
|
||
Expected: no errors.
|
||
|
||
- [ ] **Step 6: Manual smoke test**
|
||
|
||
Start the worker with two active accounts:
|
||
```bash
|
||
cd worker && npm run dev
|
||
```
|
||
|
||
Confirm in logs:
|
||
1. Both accounts start ingestion at approximately the same time (timestamps within a second of each other)
|
||
2. Each account logs its own `"TDLib mutex acquired"` with its phone number as key
|
||
3. `"Ingestion cycle complete"` appears after both accounts finish (not just the first)
|
||
4. No `"Waiting for TDLib mutex"` between accounts (they don't block each other)
|
||
|
||
- [ ] **Step 7: Commit**
|
||
|
||
```bash
|
||
git add worker/src/util/mutex.ts worker/src/scheduler.ts worker/src/fetch-listener.ts worker/src/extract-listener.ts
|
||
git commit -m "feat: parallel account ingestion via per-key TDLib mutex"
|
||
```
|
||
|
||
---
|
||
|
||
## Self-Review Checklist
|
||
|
||
- [x] **Spec § Double-upload fix (crash):** Covered by Task 5 two-phase write — stub written immediately after `uploadToChannel()` returns.
|
||
- [x] **Spec § Double-upload fix (race):** Covered by Task 3 hash lock + Task 5 lock acquisition + re-check before upload.
|
||
- [x] **Spec § Re-check after lock acquisition:** Explicitly in Task 5 Step 4 — `existsAfterLock` check after acquiring lock.
|
||
- [x] **Spec § Parallel accounts:** Covered by Task 7 per-key mutex + `Promise.allSettled`.
|
||
- [x] **Spec § Premium 4GB limit:** Covered by Tasks 1–2 (detection) + Task 6 (application).
|
||
- [x] **Spec § Premium effectively eliminates repacking:** `MAX_UPLOAD_SIZE = 3,950 MB` → `hasOversizedPart` never true for normal archives.
|
||
- [x] **Spec § Speed limit notification:** Logged at warn level in Task 2 Step 3.
|
||
- [x] **Spec § No Docker changes needed:** Confirmed — single container, per-key mutex handles parallelism.
|
||
- [x] **`destMessageIds` field:** Included in `createPackageStub` input (Task 4 + Task 5).
|
||
- [x] **`deleteSkippedPackage` call:** Moved into `try/finally` block in Task 5 Step 5.
|
||
- [x] **`createPackageWithFiles` preserved:** Not deleted from `queries.ts` — kept for any future use.
|