Files
dragonsstash/TELEGRAM_INTEGRATION_PLAN.md
2026-03-02 11:57:17 +01:00

1193 lines
51 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Telegram ZIP Ingestion & Indexing — Integration Plan
> **Status:** Planning phase — no implementation code yet
> **Date:** 2026-02-24
> **Base system:** DragonsStash — Next.js 16 / Prisma 7.4 / PostgreSQL 16 / Docker
---
## 1. Architecture Summary
### Current State
DragonsStash is a monolithic Next.js 16 App Router application for 3D printing inventory management. It uses:
- **Prisma 7.4** with `@prisma/adapter-pg` and native `pg.Pool` connection pooling
- **NextAuth.js 5 beta** with JWT strategy (credentials + optional GitHub OAuth)
- **Docker** multi-stage build (`node:20-alpine`), standalone output
- **PostgreSQL 16-alpine** via docker-compose
- **No background job infrastructure** — all mutations are synchronous Server Actions
### Proposed Architecture
```
┌─────────────────────────────────────────────────┐
│ Docker Compose │
│ │
│ ┌──────────────┐ ┌────────────────────────┐ │
│ │ next-app │ │ telegram-worker │ │
│ │ (control │ │ (data plane) │ │
│ │ plane) │ │ │ │
│ │ Port 3000 │ │ - TDLib per account │ │
│ │ │ │ - ZIP processing │ │
│ │ - Admin UI │ │ - Upload to channel │ │
│ │ - API routes│ │ - Metadata indexing │ │
│ │ - Auth │ │ │ │
│ └──────┬───────┘ └───────────┬────────────┘ │
│ │ │ │
│ └──────────┬───────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ PostgreSQL 16 │ │
│ │ (shared state) │ │
│ └─────────────────────┘ │
│ │
│ Volumes: │
│ - postgres_data (persistent) │
│ - tdlib_state (persistent per account) │
│ - tmp_zips (ephemeral, bounded) │
└─────────────────────────────────────────────────┘
```
**Key principle:** The Next.js app is the **control plane** (UI, API, scheduling triggers). The worker container is the **data plane** (TDLib sessions, ZIP download/hash/upload). They communicate exclusively through PostgreSQL.
---
## 2. Proposed Folder Structure
```
DragonsStash/
├── src/ # Existing Next.js app (unchanged)
│ ├── app/
│ │ ├── (app)/
│ │ │ ├── telegram/ # NEW — admin UI pages
│ │ │ │ ├── accounts/ # Manage Telegram accounts
│ │ │ │ │ └── [id]/
│ │ │ │ │ └── auth/ # [Q2] Phone code entry UI for TDLib auth
│ │ │ │ ├── channels/ # Manage source/destination channels
│ │ │ │ ├── packages/ # Browse indexed ZIPs
│ │ │ │ └── ingestion/ # Ingestion run history & status
│ │ │ └── ...existing...
│ │ └── api/
│ │ ├── ...existing...
│ │ ├── zips/ # NEW — ZIP query endpoints
│ │ │ ├── route.ts # GET /api/zips
│ │ │ ├── search/
│ │ │ │ └── route.ts # GET /api/zips/search
│ │ │ └── [id]/
│ │ │ ├── route.ts # GET /api/zips/:id
│ │ │ └── files/
│ │ │ └── route.ts # GET /api/zips/:id/files
│ │ └── ingestion/ # NEW — ingestion control endpoints
│ │ ├── trigger/
│ │ │ └── route.ts # POST /api/ingestion/trigger
│ │ └── status/
│ │ └── route.ts # GET /api/ingestion/status
│ ├── lib/
│ │ ├── ...existing...
│ │ └── telegram/ # NEW — shared types & DB queries
│ │ ├── queries.ts # Prisma queries for telegram models
│ │ └── types.ts # Shared TypeScript types
│ └── schemas/
│ ├── ...existing...
│ └── telegram.ts # NEW — Zod schemas for telegram models
├── worker/ # NEW — separate process, NOT bundled by Next.js
│ ├── Dockerfile # Worker-specific Dockerfile (Debian, not Alpine)
│ ├── package.json # Worker-only dependencies (tdl, node-stream-zip, unrar, etc.)
│ ├── tsconfig.json # Worker TS config (Node target, not bundler)
│ ├── src/
│ │ ├── index.ts # Entry point — spawns per-account workers
│ │ ├── scheduler.ts # Hourly scheduler with jitter
│ │ ├── worker.ts # Single-account worker loop
│ │ ├── tdlib/
│ │ │ ├── client.ts # TDLib client wrapper
│ │ │ └── download.ts # File download logic
│ │ ├── archive/ # Renamed from zip/ — handles ZIP + RAR
│ │ │ ├── hash.ts # Streaming SHA-256 (single + concatenated multipart)
│ │ │ ├── detect.ts # Archive type & multipart detection
│ │ │ ├── zip-reader.ts # ZIP central directory reader (yauzl)
│ │ │ ├── rar-reader.ts # RAR metadata reader (via unrar binary)
│ │ │ ├── multipart.ts # Multipart grouping & concatenation logic
│ │ │ └── split.ts # Byte-level splitting for >2GB re-upload
│ │ ├── upload/
│ │ │ └── channel.ts # Upload to private channel
│ │ ├── db/
│ │ │ ├── client.ts # Prisma client (shared schema)
│ │ │ ├── locks.ts # Advisory lock helpers
│ │ │ └── queries.ts # Worker-specific DB operations
│ │ └── util/
│ │ ├── logger.ts # Structured logging
│ │ └── config.ts # Environment config
│ └── tests/
│ └── ...
├── prisma/
│ ├── schema.prisma # MODIFIED — add telegram models
│ └── migrations/ # NEW migration(s) added
├── docker-compose.yml # MODIFIED — add worker service
├── docker-compose.dev.yml # MODIFIED — add worker service for dev
└── ...existing config files...
```
### Boundary Rules
| Concern | Lives in | Reason |
|---------|----------|--------|
| Telegram admin UI | `src/app/(app)/telegram/` | Part of existing authenticated app |
| API routes for querying ZIPs | `src/app/api/zips/`, `src/app/api/ingestion/` | Served by Next.js, uses existing auth |
| Shared Prisma schema | `prisma/schema.prisma` | Single source of truth for all models |
| Worker process | `worker/` | Separate Node.js process, own Dockerfile, own dependencies |
| TDLib native bindings | `worker/` only | Never in the Next.js bundle |
| ZIP processing | `worker/` only | I/O-heavy, must not block Next.js |
### Why not a monorepo / separate package?
- The project is a single repo today. Adding a `worker/` directory is the lightest change.
- The worker shares the Prisma schema but has its own `package.json` — no dependency contamination.
- No need for turborepo/nx complexity for two processes.
---
## 3. Database Schema Proposal
### New Models
All new tables are prefixed with `telegram_` or `tg_` to avoid collision with existing models. Added to `prisma/schema.prisma`.
```prisma
// ──────────────────────────────────────────────
// Telegram Accounts
// ──────────────────────────────────────────────
model TelegramAccount {
id String @id @default(cuid())
phone String @unique // Phone number (encrypted at rest recommended)
displayName String? // Friendly label
apiId Int // Telegram API credentials
apiHash String // Telegram API credentials
sessionPath String // Path to TDLib session directory
isActive Boolean @default(true) // Enabled/disabled toggle
authState AuthState @default(PENDING) // [Q2] TDLib auth state for admin UI flow
authCode String? // Temporary: phone code entered via admin UI
lastSeenAt DateTime? // Last successful TDLib connection
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// Relations
channelMaps AccountChannelMap[]
ingestionRuns IngestionRun[]
@@index([isActive])
@@map("telegram_accounts")
}
enum AuthState {
PENDING // Account created, not yet authenticated
AWAITING_CODE // Worker requested code, waiting for admin to enter it
AWAITING_PASSWORD // 2FA password needed
AUTHENTICATED // Session active
EXPIRED // Session expired, needs re-auth
}
// ──────────────────────────────────────────────
// Source & Destination Channels
// ──────────────────────────────────────────────
model TelegramChannel {
id String @id @default(cuid())
telegramId BigInt @unique // Telegram's numeric channel ID
title String // Channel title (display only)
type ChannelType // SOURCE or DESTINATION
isActive Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
// Relations
accountMaps AccountChannelMap[]
packages Package[] // ZIPs sourced from / uploaded to
@@index([type, isActive])
@@map("telegram_channels")
}
enum ChannelType {
SOURCE
DESTINATION
}
// ──────────────────────────────────────────────
// Account ↔ Channel Mapping (many-to-many)
// ──────────────────────────────────────────────
model AccountChannelMap {
id String @id @default(cuid())
accountId String
channelId String
role ChannelRole @default(READER) // READER for source, WRITER for destination
lastProcessedMessageId BigInt? // [Q3] Last Telegram message ID processed for this account+channel
createdAt DateTime @default(now())
account TelegramAccount @relation(fields: [accountId], references: [id], onDelete: Cascade)
channel TelegramChannel @relation(fields: [channelId], references: [id], onDelete: Cascade)
@@unique([accountId, channelId])
@@index([accountId])
@@index([channelId])
@@map("account_channel_map")
}
enum ChannelRole {
READER
WRITER
}
// ──────────────────────────────────────────────
// Packages (indexed archives — ZIP + RAR)
// ──────────────────────────────────────────────
model Package {
id String @id @default(cuid())
contentHash String @unique // SHA-256 of full content (concatenated for multipart)
fileName String // Original filename (first part if multipart)
fileSize BigInt // Total size in bytes (sum of all parts)
archiveType ArchiveType // ZIP or RAR
sourceChannelId String // Channel it was found in
sourceMessageId BigInt // Telegram message ID (first part if multipart)
destChannelId String? // Channel it was re-uploaded to
destMessageId BigInt? // Telegram message ID after upload (first part)
isMultipart Boolean @default(false) // Was this a multipart archive?
partCount Int @default(1) // Number of parts (1 if single)
fileCount Int @default(0) // Number of entries inside archive
indexedAt DateTime @default(now())
createdAt DateTime @default(now())
// Relations
sourceChannel TelegramChannel @relation(fields: [sourceChannelId], references: [id])
files PackageFile[]
ingestionRun IngestionRun? @relation(fields: [ingestionRunId], references: [id])
ingestionRunId String?
@@index([sourceChannelId])
@@index([destChannelId])
@@index([fileName])
@@index([indexedAt])
@@index([archiveType])
@@index([contentHash]) // Already unique, but explicit for search
@@map("packages")
}
enum ArchiveType {
ZIP
RAR
}
// ──────────────────────────────────────────────
// Package Files (metadata only — no binary storage)
// ──────────────────────────────────────────────
model PackageFile {
id String @id @default(cuid())
packageId String
path String // Full path inside archive
fileName String // Leaf filename
extension String? // Lowercase file extension
compressedSize BigInt @default(0) // Compressed size (from ZIP central dir or RAR header)
uncompressedSize BigInt @default(0) // Uncompressed size
crc32 String? // CRC-32 (available in both ZIP and RAR)
package Package @relation(fields: [packageId], references: [id], onDelete: Cascade)
@@index([packageId])
@@index([extension])
@@index([fileName])
@@map("package_files")
}
// ──────────────────────────────────────────────
// Ingestion Runs (observability)
// ──────────────────────────────────────────────
model IngestionRun {
id String @id @default(cuid())
accountId String
status IngestionStatus @default(RUNNING)
startedAt DateTime @default(now())
finishedAt DateTime?
messagesScanned Int @default(0)
zipsFound Int @default(0)
zipsDuplicate Int @default(0)
zipsIngested Int @default(0)
errorMessage String?
account TelegramAccount @relation(fields: [accountId], references: [id])
packages Package[]
@@index([accountId])
@@index([status])
@@index([startedAt])
@@map("ingestion_runs")
}
enum IngestionStatus {
RUNNING
COMPLETED
FAILED
CANCELLED
}
```
### Index Strategy
| Table | Index | Purpose |
|-------|-------|---------|
| `packages` | `contentHash` (UNIQUE) | Global deduplication — the core constraint |
| `packages` | `sourceChannelId` | Filter ZIPs by source channel |
| `packages` | `fileName` | Search by filename |
| `packages` | `indexedAt` | Sort by recency |
| `package_files` | `packageId` | Lookup files per package |
| `package_files` | `extension` | Filter by file type (e.g., `.stl`, `.gcode`) |
| `package_files` | `fileName` | Full-text-like search on filenames |
| `ingestion_runs` | `accountId` + `status` | Find running jobs per account |
| `telegram_accounts` | `isActive` | Filter active accounts |
| `telegram_channels` | `type` + `isActive` | Filter active source/destination channels |
### Full-Text Search Consideration
For `GET /api/zips/search?q=`, Prisma's `contains` with `mode: insensitive` is sufficient for moderate data volumes (<100k packages). If search becomes a bottleneck:
- Add a PostgreSQL `GIN` index with `pg_trgm` on `package_files.fileName`
- This can be done via a raw SQL migration later without schema changes
### Migration Approach
1. Create a new Prisma migration: `npx prisma migrate dev --name add_telegram_models`
2. This is purely additive — no existing tables are modified
3. Deploy with existing `docker-entrypoint.sh` which already runs `prisma migrate deploy`
4. No data migration needed — all new tables start empty
---
## 4. Worker Lifecycle Design
### 4.1 Process Model
```
telegram-worker container
├── index.ts (main process)
│ ├── Reads active accounts from DB
│ ├── Starts scheduler
│ └── Handles SIGTERM/SIGINT for graceful shutdown
├── scheduler.ts
│ ├── Runs on configurable interval (default: 60 min)
│ ├── Adds random jitter (05 min) to avoid thundering herd
│ └── For each active account → enqueue work
└── worker.ts (per-account execution)
├── Acquires PostgreSQL advisory lock (account-specific)
├── If lock fails → skip (another instance is running)
├── Creates TDLib client for account
├── Iterates source channels
├── For each new message with ZIP attachment:
│ ├── Download ZIP to temp directory
│ ├── Stream SHA-256 hash
│ ├── Check contentHash uniqueness in DB
│ ├── If duplicate → delete temp file, record skip
│ ├── If new:
│ │ ├── Read central directory for metadata
│ │ ├── If >2GB → repack into parts
│ │ ├── Upload to destination channel via TDLib
│ │ ├── Insert Package + PackageFile rows
│ │ └── Delete temp file
│ └── Update ingestion run counters
├── Finalize ingestion run (status = COMPLETED or FAILED)
└── Release advisory lock
```
### 4.2 Advisory Lock Strategy
PostgreSQL advisory locks prevent concurrent ingestion for the same account, even across multiple worker containers (for future horizontal scaling).
```
Lock ID derivation:
lock_id = hash(account.id) → stable 64-bit integer
Acquisition:
SELECT pg_try_advisory_lock($lock_id)
→ Returns true if acquired, false if held by another session
Release:
SELECT pg_advisory_unlock($lock_id)
→ Explicitly released at end of worker run
Crash recovery:
Advisory locks are session-scoped — if the worker process dies,
the DB connection closes and the lock is automatically released.
```
### 4.3 Worker Loop Pseudocode
```
async function runWorkerForAccount(accountId: string) {
const lockId = stableHash(accountId)
// 1. Acquire lock
const acquired = await db.$queryRaw`SELECT pg_try_advisory_lock(${lockId})`
if (!acquired) {
log.info(`Account ${accountId} already locked, skipping`)
return
}
try {
// 2. Create ingestion run record
const run = await db.ingestionRun.create({
data: { accountId, status: 'RUNNING' }
})
// 3. Initialize TDLib client
const client = await createTdlibClient(account)
// 4. Get assigned source channels
const channels = await getSourceChannels(accountId)
for (const channel of channels) {
// 5. Get messages since last processed message
const mapping = await getChannelMapping(accountId, channel.id)
const messages = await getChannelMessages(client, channel.telegramId, mapping.lastProcessedMessageId)
// 6. Detect archives and group multipart sets
const archiveSets = groupArchiveSets(messages)
// archiveSets = [{ type: 'ZIP'|'RAR', parts: [msg, msg, ...], baseName: '...' }, ...]
for (const archiveSet of archiveSets) {
run.messagesScanned += archiveSet.parts.length
const tempPaths: string[] = []
try {
// 7. Download all parts
for (const part of archiveSet.parts) {
const tempPath = path.join(TEMP_DIR, `${run.id}_${part.id}_${part.fileName}`)
await downloadFile(client, part.fileId, tempPath)
tempPaths.push(tempPath)
}
// 8. Concatenated SHA-256 hash (streams all parts in order)
const contentHash = await hashParts(tempPaths)
// 9. Deduplicate
const exists = await db.package.findUnique({ where: { contentHash } })
if (exists) {
run.zipsDuplicate++
continue // temp files deleted in finally
}
// 10. Read archive metadata (without extraction)
let entries: FileEntry[] = []
if (archiveSet.type === 'ZIP') {
// Read central directory from last part (or reassembled file)
entries = await readZipCentralDirectory(tempPaths)
} else {
// RAR: unrar l -v on first part auto-discovers other parts
entries = await readRarContents(tempPaths[0])
}
// 11. Prepare upload — byte-level split if single file >2GB
const totalSize = archiveSet.parts.reduce((sum, p) => sum + p.fileSize, 0n)
let uploadPaths = tempPaths
if (!archiveSet.isMultipart && totalSize > 2n * 1024n * 1024n * 1024n) {
uploadPaths = await byteLevelSplit(tempPaths[0])
}
// 12. Upload to destination channel
const destResult = await uploadToChannel(client, destChannel, uploadPaths)
// 13. Persist metadata
await db.package.create({
data: {
contentHash,
fileName: archiveSet.parts[0].fileName,
fileSize: totalSize,
archiveType: archiveSet.type,
sourceChannelId: channel.id,
sourceMessageId: archiveSet.parts[0].id,
destChannelId: destChannel.id,
destMessageId: destResult.messageId,
isMultipart: archiveSet.parts.length > 1 || uploadPaths.length > 1,
partCount: uploadPaths.length,
fileCount: entries.length,
ingestionRunId: run.id,
files: {
create: entries.map(e => ({
path: e.path,
fileName: e.fileName,
extension: e.extension,
compressedSize: e.compressedSize,
uncompressedSize: e.uncompressedSize,
crc32: e.crc32,
}))
}
}
})
run.zipsIngested++
} finally {
// 14. ALWAYS delete all temp files
await deleteFiles(...tempPaths, ...splitPaths)
}
}
// 15. Update last processed message ID
const lastMsg = messages[messages.length - 1]
if (lastMsg) {
await db.accountChannelMap.update({
where: { id: mapping.id },
data: { lastProcessedMessageId: lastMsg.id }
})
}
}
// 14. Finalize run
await db.ingestionRun.update({
where: { id: run.id },
data: { status: 'COMPLETED', finishedAt: new Date(), ...run.counters }
})
} catch (error) {
// 15. Record failure
await db.ingestionRun.update({
where: { id: run.id },
data: { status: 'FAILED', finishedAt: new Date(), errorMessage: error.message }
})
} finally {
// 16. Release lock
await db.$queryRaw`SELECT pg_advisory_unlock(${lockId})`
// 17. Destroy TDLib client
await client?.close()
}
}
```
### 4.4 Crash Recovery
| Scenario | Recovery |
|----------|----------|
| Worker process crashes mid-ingestion | Advisory lock auto-released on DB disconnect. Next scheduled run picks up. Partial `IngestionRun` with `RUNNING` status is detected on startup and marked `FAILED`. |
| DB connection lost | Worker catches error, marks run as `FAILED`, exits. Scheduler retries on next cycle. |
| TDLib rate-limited (420/429) | Exponential backoff with max 5 retries. If exhausted, marks run as `FAILED` with error message. |
| Temp file left on disk | On worker startup, sweep `TEMP_DIR` and delete all files (no state depends on temp files). |
| Duplicate detection race | `contentHash` UNIQUE constraint is the final guard — `INSERT` will fail with unique violation, which is caught and treated as a duplicate. |
### 4.5 Scheduler Design
```
┌─────────────────────────────────────────────┐
│ Scheduler (runs in main worker process) │
│ │
│ setInterval(runCycle, INTERVAL_MS) │
│ + random jitter: Math.random() * 5min │
│ │
│ runCycle(): │
│ accounts = db.telegramAccount.findMany({ │
│ where: { isActive: true } │
│ }) │
│ for (account of accounts): │
│ // Sequential, not parallel │
│ await runWorkerForAccount(account.id) │
│ │
│ // Also responds to manual triggers: │
│ // Polls ingestion_trigger table or uses │
│ // PostgreSQL LISTEN/NOTIFY │
└─────────────────────────────────────────────┘
```
**Manual trigger mechanism:** The `POST /api/ingestion/trigger` API route writes a row to a lightweight `ingestion_triggers` table (or uses `pg_notify`). The worker polls this table or listens on a channel.
---
## 5. Docker Strategy
### 5.1 Recommended Architecture: Separate Containers
**Reason:** TDLib requires Debian/Ubuntu (not Alpine) and native compilation. The Next.js app uses `node:20-alpine`. Mixing them bloats the app image and introduces risk.
### 5.2 Worker Dockerfile
```dockerfile
# worker/Dockerfile
FROM node:20-bookworm-slim AS base
# TDLib system dependencies + unrar for RAR metadata reading
RUN apt-get update && apt-get install -y \
libssl-dev \
zlib1g-dev \
unrar \
&& rm -rf /var/lib/apt/lists/*
# Pre-built TDLib binary (or build from source in multi-stage)
COPY --from=tdlib-builder /usr/local/lib/libtdjson.so /usr/local/lib/
RUN ldconfig
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm ci --production
COPY prisma/ ./prisma/
RUN npx prisma generate
COPY dist/ ./dist/
# Non-root user
RUN addgroup --system worker && adduser --system --ingroup worker worker
USER worker
# Volumes
VOLUME ["/data/tdlib", "/tmp/zips"]
CMD ["node", "dist/index.js"]
```
### 5.3 Updated docker-compose.yml
```yaml
services:
app:
# ...existing config unchanged...
depends_on:
db:
condition: service_healthy
worker:
build:
context: .
dockerfile: worker/Dockerfile
environment:
DATABASE_URL: ${DATABASE_URL}
WORKER_INTERVAL_MINUTES: 60
WORKER_TEMP_DIR: /tmp/zips
TDLIB_STATE_DIR: /data/tdlib
LOG_LEVEL: info
volumes:
- tdlib_state:/data/tdlib # Persistent TDLib sessions
- tmp_zips:/tmp/zips # Ephemeral ZIP processing
depends_on:
db:
condition: service_healthy
restart: unless-stopped
# Resource limits (optional but recommended)
deploy:
resources:
limits:
memory: 1G
reservations:
memory: 256M
db:
# ...existing config unchanged...
volumes:
postgres_data:
tdlib_state:
tmp_zips: # Disk-backed (not tmpfs) — 16GB RAM is not enough for large archives
```
### 5.4 Volume Plan
| Volume | Type | Purpose | Lifecycle |
|--------|------|---------|-----------|
| `postgres_data` | Persistent | Database storage | Permanent |
| `tdlib_state` | Persistent | TDLib session databases (one subdirectory per account) | Permanent — losing this requires re-authentication |
| `tmp_zips` | Disk-backed volume | Temporary archive download/processing | Worker sweeps on startup + deletes after each archive. Not RAM-bound. |
### 5.5 Environment Variable Separation
| Variable | App | Worker | Description |
|----------|-----|--------|-------------|
| `DATABASE_URL` | Yes | Yes | Shared PostgreSQL connection |
| `AUTH_SECRET` | Yes | No | NextAuth session secret |
| `NEXT_PUBLIC_APP_URL` | Yes | No | Public URL |
| `WORKER_INTERVAL_MINUTES` | No | Yes | Scheduler interval |
| `WORKER_TEMP_DIR` | No | Yes | Temp ZIP storage path |
| `TDLIB_STATE_DIR` | No | Yes | TDLib session storage path |
| `WORKER_MAX_ZIP_SIZE_MB` | No | Yes | Max ZIP size before rejecting (default: 4096) |
| `TELEGRAM_API_KEY` | Yes | No | [Q4] Static API key for external app access (MVP) |
| `LOG_LEVEL` | Optional | Yes | Logging verbosity |
---
## 6. Archive Processing Strategy
Supports **ZIP** and **RAR** archives, including multipart variants of both.
### 6.1 Supported Archive Formats
| Format | Single file | Multipart patterns | Metadata reader |
|--------|------------|-------------------|-----------------|
| ZIP | `.zip` | `.zip.001`/`.002`/... or `.z01`/`.z02`/...+`.zip` | `yauzl` — reads central directory without extraction |
| RAR | `.rar` | `.part1.rar`/`.part2.rar`/... or `.r00`/`.r01`/...+`.rar` | `unrar l -v` — lists contents via CLI binary |
### 6.2 Processing Pipeline
```
Messages scanned in source channel
Detect archive attachments (.zip, .rar, .z01, .r01, .part1.rar, etc.)
Group multipart sets (by filename pattern + message proximity)
├── Single-file archive → download one file
└── Multipart set → download ALL parts
Concatenated SHA-256 hash (stream all parts in order through hasher)
│ ┌──────────────────────┐
▼ │ contentHash exists? │
Check contentHash against packages │ YES → delete all temp │
│ │ files & skip │
│ NO (new archive) └──────────────────────┘
Reassemble if multipart (concatenate parts into single file)
Read archive metadata
├── ZIP → yauzl central directory reader (no extraction)
└── RAR → `unrar l -v <file>` (lists contents without extraction)
Prepare for upload
├── Total size ≤2GB → upload as-is (single file)
├── Total size >2GB → byte-level split into ≤2GB parts
└── Originally multipart → re-upload original parts as-is
Upload to destination channel via TDLib
Insert Package + PackageFile rows in single transaction
DELETE all temp files immediately (in finally block)
```
### 6.3 Multipart Grouping Logic
Archives split into multiple parts arrive as **separate Telegram messages**. The worker must group them before processing.
**Detection rules:**
```
For a message with filename "pack.zip.003":
→ base = "pack.zip", part = 3, type = ZIP_NUMBERED
For a message with filename "pack.z02":
→ base = "pack", part = 2, type = ZIP_LEGACY (final part is "pack.zip")
For a message with filename "pack.part2.rar":
→ base = "pack", part = 2, type = RAR_PART
For a message with filename "pack.r01":
→ base = "pack", part = 1, type = RAR_LEGACY (final part is "pack.rar")
```
**Grouping strategy:**
1. Scan channel messages and build a map: `base_name → [parts]`
2. A multipart set is complete when parts form a contiguous sequence (1..N)
3. **Timeout:** If parts span >24 hours of messages, treat as incomplete — log warning, skip
4. Incomplete sets are retried on next ingestion run (parts may still be uploading to source)
### 6.4 Concatenated Hashing
For multipart archives, all parts are streamed through a single SHA-256 hasher **in order**:
```typescript
import { createReadStream } from 'fs'
import { createHash } from 'crypto'
import { pipeline } from 'stream/promises'
import { PassThrough } from 'stream'
async function hashParts(filePaths: string[]): Promise<string> {
const hash = createHash('sha256')
for (const filePath of filePaths) {
await pipeline(createReadStream(filePath), new PassThrough({ transform(chunk, _, cb) {
hash.update(chunk)
cb()
}}))
}
return hash.digest('hex')
}
```
- Memory: O(1) — streams 64KB chunks regardless of total size
- For single files, this is equivalent to hashing one file
- Part order is determined by the numeric suffix (sorted ascending)
### 6.5 Metadata Reading
**ZIP (via `yauzl`):**
- Opens the (reassembled) ZIP file
- Iterates central directory entries at the end of the file — **no extraction**
- Collects: `path`, `fileName`, `extension`, `compressedSize`, `uncompressedSize`, `crc32`
- Memory: O(n) where n = number of entries (metadata only, typically <1MB)
**RAR (via `unrar` binary):**
- Runs `unrar l -v <file>` as a child process
- Parses stdout for file list with sizes and CRC
- **No extraction** — `l` (list) mode only
- Collects same fields: `path`, `fileName`, `extension`, `compressedSize`, `uncompressedSize`, `crc32`
- Requires `unrar` installed in worker Docker image
**Fallback:** If metadata reading fails (corrupted archive, unsupported format), the package is still ingested with `fileCount = 0` and no `PackageFile` rows. A warning is logged. The archive is still hashed, uploaded, and deduplicated — just without internal file listing.
### 6.6 Re-upload Strategy
| Scenario | Action |
|----------|--------|
| Single file ≤2GB | Upload as-is |
| Single file >2GB | Byte-level split into ≤2GB chunks, upload each as separate message |
| Originally multipart, each part ≤2GB | Re-upload each original part as-is (preserving original split) |
| Originally multipart, any part >2GB | This shouldn't happen (Telegram's own limit) — log error, skip |
**Byte-level splitting** uses `fs.createReadStream` with `start`/`end` byte offsets. Parts are named `filename.zip.001`, `.002`, etc. No decompression or recompression involved.
### 6.7 Disk Usage Guarantees
- **Bounded by `WORKER_MAX_ZIP_SIZE_MB` env var** (default: 4096MB per archive set)
- **One archive set per worker at a time** (sequential per account)
- **Immediate deletion** of all temp files after upload or on any error (in `finally` block)
- **Startup cleanup:** Worker sweeps `TEMP_DIR` on boot
**Worst-case disk usage scenarios:**
| Scenario | Temp disk needed | Notes |
|----------|-----------------|-------|
| Single 2GB ZIP | 2GB | Trivial |
| Single 10GB ZIP → split for upload | ~20GB (original + parts) | Needs free disk space |
| Multipart RAR (10 × 2GB parts) | 20GB (parts) | No reassembly needed for RAR |
| Multipart ZIP (10 × 2GB parts, no reassembly) | 20GB (parts only) | Central dir read from last part |
| Multipart ZIP (10 × 2GB parts) + reassembly fallback | ~40GB (parts + reassembled) | Only if last-part read fails |
Disk space is bounded per-archive-set, not globally. Worker processes one set at a time and deletes everything before moving to the next. Ensure the host has sufficient free disk space for the largest expected archive set.
**Optimization for multipart archives:** Avoid reassembly into a single file when possible:
- **Hashing:** Stream parts in order — no reassembly needed
- **ZIP metadata:** Read central directory from last part only (it's stored at the end) — avoids full reassembly in most cases
- **RAR metadata:** Run `unrar l -v` on the first part (it auto-discovers subsequent parts if co-located) — no reassembly needed
- **Full reassembly** only needed if the above approaches fail (corrupted or non-standard split)
---
## 7. API Route Plan
### 7.1 Endpoint List
| Method | Path | Description | Auth Required |
|--------|------|-------------|---------------|
| `GET` | `/api/zips` | List packages with pagination & filters | Yes |
| `GET` | `/api/zips/:id` | Get single package details | Yes |
| `GET` | `/api/zips/:id/files` | List files inside a package | Yes |
| `GET` | `/api/zips/search?q=` | Search packages/files by name | Yes |
| `POST` | `/api/ingestion/trigger` | Trigger manual ingestion for account(s) | Yes (ADMIN) |
| `GET` | `/api/ingestion/status` | Get current ingestion status | Yes |
### 7.2 Endpoint Details
#### `GET /api/zips`
**Query Parameters:**
```
?page=1 (default: 1)
&limit=25 (default: 25, max: 100)
&channelId=... (filter by source channel)
&sortBy=indexedAt (indexedAt | fileName | fileSize)
&order=desc (asc | desc)
```
**Response:**
```json
{
"items": [
{
"id": "clx...",
"fileName": "model-pack-v2.zip",
"fileSize": 1073741824,
"contentHash": "a1b2c3...",
"archiveType": "ZIP",
"fileCount": 47,
"sourceChannel": { "id": "...", "title": "3D Models Group" },
"isMultipart": false,
"indexedAt": "2026-02-24T10:00:00Z"
}
],
"pagination": {
"page": 1,
"limit": 25,
"total": 1234,
"totalPages": 50
}
}
```
#### `GET /api/zips/:id`
**Response:**
```json
{
"id": "clx...",
"fileName": "model-pack-v2.zip",
"fileSize": 1073741824,
"contentHash": "a1b2c3d4...",
"archiveType": "ZIP",
"fileCount": 47,
"sourceChannel": { "id": "...", "title": "3D Models Group" },
"destChannel": { "id": "...", "title": "Archive Channel" },
"destMessageId": 12345,
"isMultipart": false,
"partCount": 1,
"indexedAt": "2026-02-24T10:00:00Z",
"ingestionRun": { "id": "...", "startedAt": "..." }
}
```
#### `GET /api/zips/:id/files`
**Query Parameters:**
```
?page=1
&limit=50 (default: 50, max: 500)
&extension=stl (filter by extension)
```
**Response:**
```json
{
"items": [
{
"id": "clx...",
"path": "models/dragon/body.stl",
"fileName": "body.stl",
"extension": "stl",
"compressedSize": 524288,
"uncompressedSize": 1048576,
"crc32": "deadbeef"
}
],
"pagination": { ... }
}
```
#### `GET /api/zips/search?q=`
**Query Parameters:**
```
?q=dragon (search term — matches filename, file paths)
&page=1
&limit=25
&searchIn=files (packages | files | both; default: both)
```
**Response:** Same format as `GET /api/zips` but with additional `matchedFiles` count per package.
#### `POST /api/ingestion/trigger`
**Request Body:**
```json
{
"accountId": "clx..." // Optional — omit to trigger all active accounts
}
```
**Response:**
```json
{
"triggered": true,
"accountIds": ["clx..."],
"message": "Ingestion queued for 1 account(s)"
}
```
**Implementation:** Inserts into a `ingestion_triggers` table or sends `pg_notify('ingestion_trigger', accountId)`. Returns immediately — does NOT wait for ingestion to complete.
#### `GET /api/ingestion/status`
**Response:**
```json
{
"accounts": [
{
"id": "clx...",
"displayName": "Bot Account 1",
"isActive": true,
"lastRun": {
"id": "clx...",
"status": "COMPLETED",
"startedAt": "2026-02-24T09:00:00Z",
"finishedAt": "2026-02-24T09:12:34Z",
"messagesScanned": 150,
"zipsFound": 12,
"zipsDuplicate": 3,
"zipsIngested": 9
},
"currentRun": null
}
]
}
```
### 7.3 Authentication Strategy
**For existing admin UI routes:** Use the existing NextAuth.js session — these routes are already behind the middleware auth check.
**For external app API access (MVP):** Single static API key via `TELEGRAM_API_KEY` env var. API route middleware checks `X-API-Key` header against this value first, then falls back to NextAuth session. No DB table needed. Upgrade to dynamic key management later if needed.
### 7.4 Security Considerations
- All endpoints require authentication (no public access)
- `POST /api/ingestion/trigger` requires ADMIN role
- Rate limiting on search endpoint (prevent abuse)
- No binary data returned — metadata only
- Input validation with Zod on all query parameters
- Pagination enforced with max limits to prevent large responses
---
## 8. Environment Audit Checklist
### Node.js & Runtime
| Check | Status | Notes |
|-------|--------|-------|
| Node.js version | **20.x** (current) | Compatible with TDLib bindings. Node 20 is LTS until 2026-10. |
| `node:20-alpine` for Next.js | **OK** | Keep as-is for app container |
| `node:20-bookworm-slim` for worker | **Required** | TDLib needs glibc, not musl (Alpine). Debian Bookworm is the right base. |
| ES module support | **OK** | tsconfig targets ES2017, worker can use same |
### TDLib
| Check | Status | Notes |
|-------|--------|-------|
| TDLib Node.js binding | Use `tdl` npm package | Wraps `libtdjson.so` via FFI |
| `libtdjson.so` availability | Must compile or use pre-built | Pre-built for Debian available via GitHub releases |
| Required OS packages | `libssl-dev`, `zlib1g-dev`, `unrar` | TDLib runtime + RAR metadata reading. Build needs `cmake`, `g++`, `git` (multi-stage). |
| TDLib state persistence | Volume-mount `/data/tdlib` | One subdirectory per account. Losing this = re-auth required. |
| TDLib version | Use latest stable (1.8.x+) | Check `tdl` compatibility matrix |
### PostgreSQL
| Check | Status | Notes |
|-------|--------|-------|
| PostgreSQL version | **16-alpine** (current) | Fully compatible, supports advisory locks, `pg_trgm`, `BigInt` |
| Connection pooling | **`pg.Pool`** via `@prisma/adapter-pg` | Worker needs its own pool (separate process). Default pool size = 10 is fine. |
| Max connections | Check `max_connections` | Default is 100. App + worker + Prisma Studio = ~30 connections typical. Safe. |
| Advisory lock support | **Built-in** | `pg_try_advisory_lock()` / `pg_advisory_unlock()` — no extensions needed |
| `BigInt` column support | **OK** | Prisma 7.4 supports `BigInt` natively. Telegram IDs need `BigInt`. |
| `pg_trgm` extension | **Not installed** | Optional — only needed if full-text search on filenames becomes a requirement |
### Docker
| Check | Status | Notes |
|-------|--------|-------|
| Multi-service compose | **Supported** | Current compose already has `app` + `db`. Adding `worker` is straightforward. |
| tmpfs volume | **Supported** | For bounded temp ZIP storage |
| Health checks | **Exists for `db`** | Add health check for worker (e.g., check DB connectivity) |
| Resource limits | **Not set** | Recommend adding `memory: 1G` limit for worker |
### Disk I/O
| Check | Status | Notes |
|-------|--------|-------|
| Temp archive storage | Disk-backed Docker volume | Not RAM-bound. Cleaned by worker on startup + after each set. |
| Max single set I/O | Depends on archive size | One set at a time. Bounded by `WORKER_MAX_ZIP_SIZE_MB`. |
| TDLib state I/O | Low | Session DB is small (<10MB per account) |
| PostgreSQL I/O | Moderate | Package metadata is small. 10k packages ≈ few MB. |
### Logging
| Check | Status | Notes |
|-------|--------|-------|
| App logging | Next.js default (console) | No change needed |
| Worker logging | **Needs structured logging** | Use `pino` for JSON-structured logs. Docker captures stdout. |
| Log volume | Moderate | Log ingestion run summaries, not per-message details |
---
## 9. Risk Assessment
### Risk Matrix
| # | Risk | Likelihood | Impact | Mitigation |
|---|------|-----------|--------|------------|
| R1 | **Telegram rate limiting (420 FLOOD_WAIT)** | High | Medium — ingestion paused | Exponential backoff with jitter. Respect `retry_after` from Telegram. Sequential processing per account. Configurable inter-message delay (default: 1s). |
| R2 | **DB contention on `packages.contentHash` unique check** | Low | Low — single writer per account | Advisory locks serialize writes per account. Unique constraint handles races at DB level. No read contention (separate queries). |
| R3 | **Multi-account race on same ZIP** | Medium | Low — duplicate insert fails safely | `contentHash` UNIQUE constraint is the ultimate guard. Worker catches unique violation and treats as duplicate. No data corruption possible. |
| R4 | **TDLib session invalidation** | Medium | High — account becomes unusable | Monitor `lastSeenAt`. Alert in admin UI when >2 hours stale. Document re-authentication procedure. Store session in persistent volume. |
| R5 | **Worker OOM on large ZIP** | Low | Medium — worker crashes | Streaming hash (O(1) memory). Central directory reading is O(entries) not O(file-size). tmpfs bound prevents unbounded growth. Container memory limit enforced. |
| R6 | **Temp files not cleaned up** | Low | Low — bounded by tmpfs | `finally` blocks on all paths. Startup sweep of temp dir. tmpfs auto-cleared on container restart. |
| R7 | **TDLib native dependency breakage on upgrade** | Medium | High — worker won't start | Pin TDLib version. Test upgrades in CI. Multi-stage Docker build isolates build dependencies. |
| R8 | **PostgreSQL connection exhaustion** | Low | High — all services affected | Worker uses own pool (max 5 connections). App pool unchanged. Monitor with `pg_stat_activity`. Total < 50% of `max_connections`. |
| R9 | **Schema migration breaks existing app** | Very Low | High — production down | New tables only — no modifications to existing tables. Test migration on staging first. Rollback = drop new tables. |
| R10 | **Telegram account banned** | Medium | Medium — one account lost | Use multiple accounts across channels. Don't exceed rate limits. Implement per-account disable toggle. Monitor in admin UI. |
| R11 | **Multipart ZIP reassembly failure** | Low | Low — single ZIP skipped | Log error, mark run as partial. Don't block ingestion of other ZIPs. Admin can investigate specific failures. |
| R12 | **Database grows too large** | Low (long-term) | Medium | `PackageFile` is metadata only (~200 bytes/row). 100k ZIPs × 100 files = 10M rows ≈ 2GB. Add retention policy if needed later. |
### Critical Path Risks (ordered by priority)
1. **TDLib compilation & runtime in Docker** — This is the highest-risk item. TDLib native compilation is complex. Mitigate by using pre-built binaries from `https://github.com/nicknisi/tdlib-builds` or building in a dedicated multi-stage Dockerfile.
2. **Telegram rate limits** — Primary bottleneck for ingestion throughput. Cannot be eliminated, only managed. Design must be rate-limit-aware from day one.
3. **TDLib session persistence** — Losing session state means manual re-authentication (phone code). Volume mount is critical and must survive container rebuilds.
---
## 10. Assumptions & Open Questions
### Assumptions Made
| # | Assumption | Impact if Wrong |
|---|-----------|----------------|
| A1 | One Telegram account maps to multiple source channels | Schema supports this via `AccountChannelMap` |
| A2 | One shared destination channel for all re-uploads | If multiple destinations needed, `Package.destChannelId` already supports it |
| A3 | ZIP files are single-message attachments (not split across messages by Telegram) | Multipart detection logic may need adjustment |
| A4 | Worker runs 24/7 in Docker alongside the app | If serverless/on-demand execution needed, architecture changes |
| A5 | All Telegram accounts share the same `apiId`/`apiHash` | If not, the schema already supports per-account credentials |
| A6 | No need for real-time notifications (webhooks) on new ZIPs | If needed, add a webhook/event system later |
| A7 | Admin users manage Telegram config; regular users only query ZIPs | Role-based access matches existing `ADMIN`/`USER` enum |
### Decisions (Confirmed)
| # | Question | Decision | Implications |
|---|----------|----------|-------------|
| Q1 | Prisma schema sharing | **Shared** — single `prisma/schema.prisma` | Worker copies `prisma/` at build time. One migration path. Worker runs `prisma generate` in its own Dockerfile. |
| Q2 | TDLib authentication flow | **Admin UI** — Next.js page for phone code entry | Requires an `auth_state` column on `TelegramAccount` + a polling/SSE mechanism. Worker watches DB for auth completion. New page at `src/app/(app)/telegram/accounts/[id]/auth/`. |
| Q3 | Last processed message tracking | **In DB**`lastProcessedMessageId BigInt?` on `AccountChannelMap` | Worker updates after each channel scan. Allows manual reset for re-processing. Survives TDLib session loss. |
| Q4 | API key management | **Env var for MVP** — single `TELEGRAM_API_KEY` in `.env` | API routes check `X-API-Key` header against env var. No DB table needed yet. Upgrade to dynamic keys later if needed. |
| Q5 | File search strategy | **Prisma `contains`** (case-insensitive `ILIKE`) | No extra extensions or indexes. Revisit with `pg_trgm` GIN index if search exceeds ~100k `PackageFile` rows. |
| Q6 | Repack strategy for >2GB | **Byte-level split** — raw file splitting into ≤2GB chunks | No decompression. Fast. Uses `fs.createReadStream` with `start`/`end` options. Parts named `filename.zip.001`, `.002`, etc. |
| Q7 | Worker package structure | **Standalone** `package.json` | Own `node_modules`, own lockfile. No npm workspace config. Simpler Docker builds. Copies `prisma/` from root at build time. |
| Q8 | Archive format support | **ZIP + RAR (full index)** | Both formats supported. RAR metadata via `unrar l -v` binary (no extraction). Worker Dockerfile includes `unrar` package. `ArchiveType` enum on `Package` model. |
| Q9 | Multipart hashing strategy | **Concatenate then hash** | All parts streamed in order through a single SHA-256 hasher. True content-level dedup. Disk must hold all parts simultaneously. Volume is disk-backed (not tmpfs) to avoid RAM pressure. |
| Q10 | Multipart metadata indexing | **Yes, full indexing** | ZIP: read central directory from last part. RAR: `unrar l -v` on first part auto-discovers siblings. Fallback: ingest without file listing if reading fails. |
---
## Summary of Changes to Existing System
| File/Area | Change Type | Risk |
|-----------|-------------|------|
| `prisma/schema.prisma` | **Add** new models (no modify) | Very Low |
| `prisma/migrations/` | **Add** new migration | Very Low |
| `docker-compose.yml` | **Modify** — add worker service + volumes | Low |
| `docker-compose.dev.yml` | **Modify** — add worker service | Low |
| `src/app/(app)/telegram/` | **Add** new pages | None — new route group |
| `src/app/api/zips/` | **Add** new API routes | None — new routes |
| `src/app/api/ingestion/` | **Add** new API routes | None — new routes |
| `src/lib/telegram/` | **Add** shared types & queries | None — new files |
| `worker/` | **Add** entire new directory | None — isolated process |
| Existing code | **No changes** | Zero risk |
**Total impact on existing system: Minimal.** All changes are additive. No existing files are modified except `prisma/schema.prisma` (additive models) and `docker-compose.yml` (additive service).