76 Commits

Author SHA1 Message Date
admin
1fc2d3e1ae feat: add migration for archive extract requests and invite user relation
Some checks failed
continuous-integration/drone/push Build is failing
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 00:12:26 +01:00
admin
ab558e00f5 feat: add preview management, channel controls, invite polish, and recovery
- Auto-extract preview images from ZIP/RAR/7z archives during ingestion
- Upload custom preview images via package drawer
- Select preview from archive contents with on-demand extraction UI
- Manually add Telegram channels by t.me link, username, or invite link
- Invite code UX: bulk create, copy link, usage tracking, delete confirm
- Incomplete upload recovery: verify dest messages on worker startup
- Rebuild package DB by scanning destination channel with live progress

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 00:09:59 +01:00
admin
bf093cdfca fix: 7z parser handles solid archives with empty Compressed column 2026-03-21 21:18:33 +01:00
admin
a90f653314 feat: add 7z archive content listing via p7zip
- Add p7zip-full to worker Docker image
- New read7zContents() parser using 7z l output
- 7z archives now get full file listings like ZIP/RAR
- Standalone DOCUMENT types still show as single entry
2026-03-21 21:13:58 +01:00
admin
9ac66e9d7d feat: manual creator editing on packages and bulk set
- Click creator cell in STL Files table to edit
- Server action for updating/clearing package creator
- Bulk set creator action for multiple packages
2026-03-21 20:55:22 +01:00
admin
36a7e3d5f4 feat: add channel categories and improved creator detection
- Add category field to TelegramChannel (filterable tag like STL, PDF, D&D)
- Category column in channels table with edit via dropdown menu
- Improved creator extraction: filename patterns + channel title fallback
- extractCreatorFromChannelTitle strips [Completed], (Paid), emoji, etc.
- Fix ArchiveType in PackageListItem and PackageRow for new types
- Add Prisma migration for category column
2026-03-21 20:37:44 +01:00
admin
53a76a8136 feat: add support for 7z, PDF, STL, and other document types
- Add 7Z and DOCUMENT to ArchiveType enum
- Detect .7z, .pdf, .stl, .obj, .3mf, .step, .blend, .gcode, .svg,
  .dxf, .ai, .eps, .psd files as fetchable documents
- Handle DOCUMENT and 7Z formats in worker pipeline (skip extraction,
  record file as single entry)
- Add Prisma migration for new enum values
2026-03-21 20:25:00 +01:00
admin
ba3d3a6040 fix: use searchChatMessages instead of getChatHistory for channel scanning
getChatHistory fails silently in supergroups with hidden history for new
members, returning only system messages. searchChatMessages with document
and photo filters works regardless of history visibility settings.

Also adds getChats call after TDLib client creation to populate the chat
list, preventing 'Chat not found' errors.
2026-03-21 20:15:18 +01:00
admin
fe7a548fef fix: add getChat and sync delay after openChat for proper history loading 2026-03-21 19:27:43 +01:00
admin
4a44374bb7 fix: call openChat before getChatHistory to load remote messages
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-21 18:20:30 +01:00
admin
c7eb077e0d fix: resolve TypeScript null-check errors in bot tdlib client
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-21 16:06:03 +01:00
admin
031a4687fb feat: add invite code system and multi-image Drone pipeline
Some checks failed
continuous-integration/drone/push Build is failing
- Add InviteCode model with code, maxUses, expiry, usage tracking
- Registration now requires a valid invite code
- New users get USER role instead of ADMIN
- Admin-only /invites page to create, manage, and share invite codes
- Invite links auto-fill code via ?code= URL param
- Drone pipeline now builds app, worker, and bot images separately
- Add NEXT_PUBLIC_APP_URL build arg to fix URL redirects
2026-03-21 15:41:12 +01:00
admin
30fb96b3f9 fix: replace drone-ssh with alpine SSH and fix YAML indentation 2026-03-21 15:41:12 +01:00
xCyanGrizzly
9a077a3648 Update .drone.yml
All checks were successful
continuous-integration/drone/push Build is passing
2026-03-21 13:28:27 +01:00
xCyanGrizzly
2ceba66313 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 13:25:56 +01:00
xCyanGrizzly
036dadcb21 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 13:07:56 +01:00
xCyanGrizzly
541ae0c614 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 13:03:27 +01:00
xCyanGrizzly
b7a76fd932 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-21 12:51:05 +01:00
xCyanGrizzly
b75b0e1f91 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-21 12:40:24 +01:00
xCyanGrizzly
50e7e02b2d Update .drone.yml
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-21 12:29:49 +01:00
xCyanGrizzly
dea419b778 Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 12:26:34 +01:00
xCyanGrizzly
053eeed6be Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 12:24:17 +01:00
xCyanGrizzly
d5725bd52e Update .drone.yml
Some checks failed
continuous-integration/drone/push Build encountered an error
2026-03-21 12:21:24 +01:00
xCyanGrizzly
48726b9122 Update README.md
Some checks failed
continuous-integration/drone/push Build is failing
2026-03-20 00:31:05 +01:00
xCyanGrizzly
1b8df48768 Update README.md 2026-03-20 00:22:04 +01:00
xCyanGrizzly
726f55a943 Update .drone.yml 2026-03-19 23:39:00 +01:00
xCyanGrizzly
b08140b4f9 Create .drone.yml
Some checks failed
continuous-integration/drone/push Build was killed
2026-03-19 23:27:51 +01:00
xCyanGrizzly
761d5e0790 add TG skill 2026-03-17 12:59:05 +01:00
xCyanGrizzly
d7bbb7587e Update tg issues 2026-03-16 16:51:30 +01:00
2763de2711 Fix multiple issues 2026-03-07 21:33:40 +01:00
xCyanGrizzly
6926df9a2c Merge pull request #15 from xCyanGrizzly/copilot/fix-channel-tab-issues
Fix inactive source channels and add Fetch Channels button to Channels tab
2026-03-05 23:50:03 +01:00
copilot-swe-agent[bot]
651e9e6bdd Simplify redundant conditional in handleFetchChannels
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 22:05:50 +00:00
copilot-swe-agent[bot]
8d508d5a86 Fix channels not active after selection and add Fetch Channels button to Channels tab
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 22:04:28 +00:00
copilot-swe-agent[bot]
2bb3caf7d9 Initial plan 2026-03-05 21:57:43 +00:00
xCyanGrizzly
8d95752106 Merge pull request #14 from xCyanGrizzly/copilot/fix-telegram-worker-issue
Fix APP_PORT: align container listen port, port mapping, and healthcheck
2026-03-05 21:40:23 +01:00
copilot-swe-agent[bot]
22419106c1 Fix APP_PORT: make container listen port and healthcheck follow APP_PORT
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:39:26 +00:00
copilot-swe-agent[bot]
e45de85c69 Add Rescan Channel option to channels tab
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:34:53 +00:00
copilot-swe-agent[bot]
71a2e6a5e8 Fix Telegram worker: countdown timer, orphaned runs, fetch-listener reconnection, and logging
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:21:18 +00:00
copilot-swe-agent[bot]
1436b630e2 Initial plan 2026-03-05 20:05:41 +00:00
xCyanGrizzly
43af23d3be Merge pull request #13 from xCyanGrizzly/copilot/fix-docker-sync-issues
Fix worker getting stuck during channel message sync
2026-03-05 15:08:39 +01:00
copilot-swe-agent[bot]
49b82a352b Fix review issues: race condition in invokeWithTimeout and mutex queue entry
- Add settled flag to invokeWithTimeout to prevent double-settling
- Create mutex queue entry with wrapped resolve before pushing to queue

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:17:56 +00:00
copilot-swe-agent[bot]
2e242912af Remove worker/dist build artifacts from git, add to .gitignore
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:15:17 +00:00
copilot-swe-agent[bot]
9adbdb2a77 Fix worker getting stuck during sync: add timeouts, stuck detection, and safety limits
- Add invokeWithTimeout wrapper for TDLib API calls (2min timeout per call)
- Add stuck detection to getChannelMessages: break if from_message_id doesn't advance
- Add stuck detection to getTopicMessages: same protection for topic scanning
- Add stuck detection to getForumTopicList: break if pagination offsets don't advance
- Add max page limit (5000) to all scanning loops to prevent infinite pagination
- Add mutex wait timeout (30min) to prevent indefinite blocking when holder hangs
- Add cycle timeout (4h default, configurable via WORKER_CYCLE_TIMEOUT_MINUTES)
- Fix end-of-page detection to use actual limit value instead of hardcoded 100

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:14:53 +00:00
copilot-swe-agent[bot]
ad71346468 Initial plan 2026-03-05 13:02:41 +00:00
xCyanGrizzly
e19a80897d Merge pull request #12 from xCyanGrizzly/copilot/fix-worker-functionality-visibility
Fix worker activity tracking, add scan progress, default channels to disabled
2026-03-05 09:42:48 +01:00
copilot-swe-agent[bot]
22da4dfad2 Fix messagesScanned consistency: use totalScanned from scan results, remove double-counting
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:36:40 +00:00
copilot-swe-agent[bot]
22bcacf3bd Add live message scanning progress, channel/topic counters to worker activity
- Add progress callbacks to getChannelMessages and getTopicMessages that
  fire after each page of messages is fetched
- Worker now shows channel progress (e.g. "[2/5] Channel Name") when
  processing multiple source channels
- Worker now shows topic progress (e.g. "topic 3/12") when scanning forums
- Worker now shows live message scanning count during channel/topic scans
  (e.g. "Scanning Channel — 300 messages scanned")
- UI stats line now always shows messagesScanned count
- messagesScanned counter now increments during the scanning phase, not
  just during archive processing

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:33:26 +00:00
copilot-swe-agent[bot]
15da57b8c0 Fix worker stuck on "Working..." and default channels to disabled
1. Worker trigger: Add ingestion_trigger pg_notify listener so the worker
   picks up on-demand triggers from the UI and runs an immediate cycle with
   full activity tracking (currentActivity, currentStep, etc).

2. Remove orphaned IngestionRun creation from triggerIngestion server action.
   Previously the UI created RUNNING runs without activity fields, causing
   the UI to show "Working..." with no details. Now only the worker creates
   runs with proper activity tracking.

3. Default channels to disabled (isActive: false) in schema and all creation
   paths. Destination channels are explicitly set to active since they must
   receive uploads. Includes Prisma migration.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:27:37 +00:00
copilot-swe-agent[bot]
8f1a912ccb Initial plan 2026-03-05 08:11:20 +00:00
xCyanGrizzly
81b65912aa Merge pull request #11 from xCyanGrizzly/copilot/fix-nextjs-deployment-issues
Fix Docker deployment: file permissions, missing env vars, healthcheck timing
2026-03-04 23:24:05 +01:00
copilot-swe-agent[bot]
5eb2cf05b9 Fix Docker deployment: file permissions, missing env vars, healthcheck timing, error handling
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 22:20:49 +00:00
copilot-swe-agent[bot]
f73d06b3d9 Initial plan 2026-03-04 22:06:36 +00:00
xCyanGrizzly
cac3d518e1 Merge pull request #10 from xCyanGrizzly/copilot/debug-docker-compose-worker
Enable worker service by default in docker-compose
2026-03-04 22:49:52 +01:00
copilot-swe-agent[bot]
987167de0c Enable worker service by default in docker-compose
Remove profiles from worker service in both docker-compose.yml and
docker-compose.dev.yml so the worker starts automatically with
`docker compose up`. This fixes the issue where verification SMS and
the scheduler timer were not working because the worker was never
started. The bot remains as an optional profile.

Update README to reflect the change.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 21:13:00 +00:00
copilot-swe-agent[bot]
4f331d5411 Initial plan 2026-03-04 21:09:51 +00:00
xCyanGrizzly
8088a86feb Merge pull request #9 from xCyanGrizzly/copilot/fix-admin-access-issue
Make all users admins in self-hosted deployment
2026-03-04 21:58:04 +01:00
copilot-swe-agent[bot]
b53934ebf2 Make all users admins: update schema default, add migration, simplify registration and OAuth flows
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 20:23:54 +00:00
copilot-swe-agent[bot]
464c86b32a Initial plan 2026-03-04 20:16:22 +00:00
xCyanGrizzly
fc00fb6f2e Merge pull request #8 from xCyanGrizzly/copilot/fix-admin-account-login
Fix first user not getting ADMIN role when signing up via OAuth
2026-03-04 20:24:38 +01:00
copilot-swe-agent[bot]
0c0c9c7f23 Fix first user not getting ADMIN role when signing up via OAuth
The createUser event in auth.ts now promotes the first user to ADMIN
if no admin exists yet. The JWT callback also fetches the role from the
database on sign-in to pick up the freshly assigned ADMIN role.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 19:21:25 +00:00
copilot-swe-agent[bot]
82d5fc1812 Initial plan 2026-03-04 19:15:27 +00:00
xCyanGrizzly
9120f0fb5d Merge pull request #7 from xCyanGrizzly/copilot/fix-telegram-page-redirect
Fix telegram page redirect: auto-admin first user, hide admin-only nav
2026-03-04 20:06:12 +01:00
copilot-swe-agent[bot]
5d88f9beb3 Wrap first-user admin check in transaction to prevent race condition
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 18:55:41 +00:00
copilot-swe-agent[bot]
3704708970 Fix telegram page redirect: make first user admin and hide admin-only nav items from non-admins
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 18:55:02 +00:00
copilot-swe-agent[bot]
0c789eabd6 Initial plan 2026-03-04 18:24:56 +00:00
xCyanGrizzly
9a88914f11 Merge pull request #6 from xCyanGrizzly/copilot/fix-module-not-found-error
Fix: replace selective node_modules allowlist with full copy to prevent missing Prisma CLI deps
2026-03-04 17:51:05 +01:00
copilot-swe-agent[bot]
6cc8e1185a Fix: Copy full node_modules to production image to prevent missing module errors
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 15:31:01 +00:00
copilot-swe-agent[bot]
066fb5a046 Fix: Copy valibot to production Docker image for Prisma CLI
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 15:22:59 +00:00
copilot-swe-agent[bot]
bed99f8167 Initial plan 2026-03-04 15:20:17 +00:00
xCyanGrizzly
80a8833f2c Merge pull request #5 from xCyanGrizzly/copilot/fix-prisma-schema-error
Fix ENOENT for prisma_schema_build_bg.wasm in production Docker image
2026-03-04 16:17:55 +01:00
copilot-swe-agent[bot]
7303d5c6d3 Fix missing prisma_schema_build_bg.wasm by using symlink for .bin/prisma in Dockerfile
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 14:37:15 +00:00
copilot-swe-agent[bot]
c5ca9a7460 Initial plan 2026-03-04 14:31:43 +00:00
xCyanGrizzly
186aae38b5 Merge pull request #4 from xCyanGrizzly/copilot/fix-portainer-deployment-issues
fix: replace pull_policy: build with pull_policy: never to unbreak Portainer redeploy
2026-03-04 15:25:30 +01:00
copilot-swe-agent[bot]
df006636de fix: change pull_policy from build to never; remove dead fonts.ts
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 14:21:46 +00:00
copilot-swe-agent[bot]
373f1f2f08 Initial plan 2026-03-04 14:01:13 +00:00
xCyanGrizzly
81d322a91c Merge pull request #3 from xCyanGrizzly/copilot/fix-redeploy-error
fix: eliminate Google Fonts CDN dependency from Docker builds
2026-03-04 14:55:27 +01:00
102 changed files with 10369 additions and 546 deletions

5
.claude/settings.json Normal file
View File

@@ -0,0 +1,5 @@
{
"enabledPlugins": {
"superpowers@superpowers-marketplace": true
}
}

View File

@@ -83,7 +83,13 @@
"Bash(git -C /mnt/c/Users/A00963355/OneDrive - Amaris Zorggroep/Documents/VScodeProjects/DragonsStash log --oneline -10)",
"Bash(git -C \"C:/Users/A00963355/OneDrive - Amaris Zorggroep/Documents/VScodeProjects/DragonsStash\" status --short)",
"Bash(timeout:*)",
"mcp__Claude_Preview__preview_start"
"mcp__Claude_Preview__preview_start",
"Bash(cat:*)",
"Bash(grep:*)",
"Bash(wait:*)",
"WebSearch",
"Bash(SKILL_CREATOR_PATH=\"C:\\\\Users\\\\A00963355\\\\.claude\\\\plugins\\\\cache\\\\claude-plugins-official\\\\skill-creator\\\\d5c15b861cd2\\\\skills\\\\skill-creator\" && WORKSPACE=\"C:\\\\Users\\\\A00963355\\\\OneDrive - Amaris Zorggroep\\\\Documents\\\\VScodeProjects\\\\DragonsStash\\\\.claude\\\\skills\\\\tdlib-telegram-workspace\\\\iteration-1\" && python \"$SKILL_CREATOR_PATH/eval-viewer/generate_review.py\" \"$WORKSPACE\" --skill-name \"tdlib-telegram\" --benchmark \"$WORKSPACE/benchmark.json\" --static \"$WORKSPACE/review.html\" 2>&1)",
"Bash(start:*)"
]
}
}

View File

@@ -0,0 +1,46 @@
{
"skill_name": "tdlib-telegram",
"iteration": 1,
"configs": [
{
"name": "with_skill",
"pass_rate": {"mean": 1.0, "stddev": 0.0},
"tokens": {"mean": 53200, "stddev": 14800},
"time_seconds": {"mean": 123.5, "stddev": 16.7}
},
{
"name": "without_skill",
"pass_rate": {"mean": 0.857, "stddev": 0.134},
"tokens": {"mean": 56467, "stddev": 12100},
"time_seconds": {"mean": 156.4, "stddev": 39.7}
}
],
"delta": {
"pass_rate": "+14.3%",
"tokens": "-5.8%",
"time": "-21.0%"
},
"evals": [
{
"name": "broadcast-to-all-users",
"with_skill": {"pass_rate": 1.0, "passed": 5, "total": 5, "tokens": 35365, "time_seconds": 107.6},
"without_skill": {"pass_rate": 0.6, "passed": 3, "total": 5, "tokens": 69214, "time_seconds": 200.2}
},
{
"name": "flood-wait-during-scan",
"with_skill": {"pass_rate": 1.0, "passed": 4, "total": 4, "tokens": 63079, "time_seconds": 140.9},
"without_skill": {"pass_rate": 1.0, "passed": 4, "total": 4, "tokens": 45601, "time_seconds": 122.3}
},
{
"name": "download-and-reupload-file",
"with_skill": {"pass_rate": 1.0, "passed": 5, "total": 5, "tokens": 61157, "time_seconds": 122.1},
"without_skill": {"pass_rate": 1.0, "passed": 5, "total": 5, "tokens": 54587, "time_seconds": 146.7}
}
],
"analyst_notes": [
"The skill's biggest impact was on Eval 1 (broadcast): the baseline MISSED both withFloodWait retry wrapping and inter-message delay — the two most critical patterns for avoiding rate limits during bulk sends. This is exactly the kind of bug the skill is designed to prevent.",
"Eval 2 (FLOOD_WAIT debugging) was a near-tie. Both versions correctly diagnosed the problem and proposed adaptive backoff. The skill version was slightly more thorough: it added pagination-level retry with sleep(waitSec) instead of just re-throwing, meaning it can survive even after withFloodWait's retries are exhausted.",
"Eval 3 (download/reupload) was also close. Both correctly composed existing primitives. The skill version was more explicit about WHY certain patterns matter (referencing the skill's documentation), which helps future maintainers understand the code.",
"The skill version was faster on average (-21% time) and used fewer tokens (-5.8%), likely because the skill front-loaded the knowledge instead of requiring the agent to discover it by reading source files."
]
}

View File

@@ -0,0 +1,12 @@
{
"eval_id": 1,
"eval_name": "broadcast-to-all-users",
"prompt": "Add a new bot command /broadcast that sends a text message to ALL users who have a TelegramLink in the database. The admin triggers it from the web app. Add it to the bot's command handler and create an API endpoint that triggers it.",
"assertions": [
{"text": "uses_sequential_queue: Messages sent one at a time, no Promise.all or concurrent sends", "type": "required"},
{"text": "uses_flood_wait_retry: Each send call wrapped in withFloodWait or equivalent FLOOD_WAIT-aware retry", "type": "required"},
{"text": "has_per_message_delay: Includes a sleep/delay between individual sends", "type": "recommended"},
{"text": "handles_per_user_errors: One failed send does not abort the entire broadcast loop", "type": "required"},
{"text": "has_api_endpoint: Creates an API route or server action to trigger the broadcast from the web app", "type": "required"}
]
}

View File

@@ -0,0 +1,10 @@
{
"run_id": "eval-1-with_skill",
"expectations": [
{"text": "uses_sequential_queue", "passed": true, "evidence": "Uses a for...of loop iterating links one at a time. No Promise.all. Comment: '~20 msgs/sec — well under the 30 msgs/sec global bot limit'"},
{"text": "uses_flood_wait_retry", "passed": true, "evidence": "Each send wrapped in withFloodWait(): 'await withFloodWait(() => sendTextMessage(link.telegramUserId, data.message, \"textParseModeHTML\"))'. Created a new bot/src/util/flood-wait.ts with full extractFloodWaitSeconds + retry logic."},
{"text": "has_per_message_delay", "passed": true, "evidence": "await sleep(50) after each send — explicitly commented as '~20 msgs/sec'"},
{"text": "handles_per_user_errors", "passed": true, "evidence": "try/catch around each send with 'failed++' counter, continues to next user"},
{"text": "has_api_endpoint", "passed": true, "evidence": "Created src/app/api/telegram/bot/broadcast/route.ts with admin auth, 4096-char validation, pg_notify('bot_broadcast')"}
]
}

View File

@@ -0,0 +1,340 @@
# /broadcast Implementation Plan
## Approach
The broadcast feature has two parts:
1. **API endpoint** (`POST /api/telegram/bot/broadcast`) — called from the web app by an admin. It accepts a `message` string, looks up ALL `TelegramLink` records in the database, and triggers the bot to send the message to each linked user via `pg_notify`.
2. **Bot-side handler** — a new `bot_broadcast` pg_notify channel listener in `send-listener.ts` that receives the broadcast payload and sequentially sends the text message to every linked Telegram user.
The `/broadcast` bot command itself is not a user-facing Telegram command (regular users should not be able to trigger it). It is triggered exclusively through the admin API endpoint.
## Skill Patterns Applied
- **Sequential Send Queue** (from skill): Never fire concurrent sends to multiple users. The broadcast iterates users sequentially with `await sleep(50)` between sends (~20 msgs/sec, well under the 30 msgs/sec global bot limit).
- **FLOOD_WAIT handling** (from skill): Every `sendTextMessage` call is wrapped with `withFloodWait()` which extracts the wait duration from errors and retries with jitter.
- **Anti-pattern avoidance**: No `Promise.all(users.map(...))` — that would instantly hit the 30 msg/sec global limit.
- **Message text length limit**: The API endpoint validates that the broadcast message does not exceed 4,096 characters (Telegram's limit from the skill).
---
## File 1: `bot/src/util/flood-wait.ts` (NEW)
Extracted from the skill's recommended FLOOD_WAIT pattern so it can be reused by both existing send logic and the new broadcast logic.
```typescript
import { childLogger } from "./logger.js";
const log = childLogger("flood-wait");
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Extract the mandatory wait duration (in seconds) from a Telegram
* FLOOD_WAIT error. Returns null when the error is not rate-limit related.
*/
export function extractFloodWaitSeconds(err: unknown): number | null {
const message = err instanceof Error ? err.message : String(err);
// Pattern 1: FLOOD_WAIT_30
const flood = message.match(/FLOOD_WAIT_(\d+)/i);
if (flood) return parseInt(flood[1], 10);
// Pattern 2: "retry after 30"
const retry = message.match(/retry after (\d+)/i);
if (retry) return parseInt(retry[1], 10);
// Pattern 3: HTTP 429 without explicit seconds
if (String((err as any)?.code) === "429") return 30;
return null; // Not a rate limit error
}
/**
* Wrap any async Telegram operation with automatic FLOOD_WAIT retry.
* Adds random jitter (1-5 s) to prevent thundering-herd retries.
*/
export async function withFloodWait<T>(
fn: () => Promise<T>,
maxRetries = 5
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
const wait = extractFloodWaitSeconds(err);
if (wait === null || attempt >= maxRetries) throw err;
const jitter = 1000 + Math.random() * 4000;
log.warn(
{ wait, attempt, jitter: Math.round(jitter) },
"FLOOD_WAIT received — backing off"
);
await sleep(wait * 1000 + jitter);
}
}
throw new Error("Unreachable");
}
export { sleep };
```
---
## File 2: `bot/src/db/queries.ts` (MODIFIED — add one function)
Add this function at the bottom of the existing file, after the `getGlobalDestinationChannel` function:
```typescript
// ── Broadcast ──
/**
* Fetch ALL TelegramLink records (users who linked their Telegram account).
* Used by the broadcast feature to send a message to every linked user.
*/
export async function getAllTelegramLinks() {
return db.telegramLink.findMany({
select: {
telegramUserId: true,
telegramName: true,
},
});
}
```
---
## File 3: `bot/src/send-listener.ts` (MODIFIED — add broadcast channel)
Add the `bot_broadcast` channel to the existing listener. The changes are:
### 3a. Add import for the new query and flood-wait utility
At the top of the file, update the imports:
```typescript
import {
getPendingSendRequest,
updateSendRequest,
findMatchingSubscriptions,
getGlobalDestinationChannel,
getAllTelegramLinks, // ← NEW
} from "./db/queries.js";
import { copyMessageToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/client.js";
import { withFloodWait, sleep } from "./util/flood-wait.js"; // ← NEW
```
### 3b. Subscribe to the new pg_notify channel
Inside `connectListener()`, after the existing LISTEN statements, add:
```typescript
await pgClient.query("LISTEN bot_broadcast");
```
### 3c. Add the notification handler
Inside the `pgClient.on("notification", ...)` callback, add the new branch:
```typescript
pgClient.on("notification", (msg) => {
if (msg.channel === "bot_send" && msg.payload) {
handleBotSend(msg.payload);
} else if (msg.channel === "new_package" && msg.payload) {
handleNewPackage(msg.payload);
} else if (msg.channel === "bot_broadcast" && msg.payload) { // ← NEW
handleBroadcast(msg.payload);
}
});
```
Update the log message:
```typescript
log.info("Send listener started (bot_send, new_package, bot_broadcast)");
```
### 3d. Add the broadcast handler function
Add this at the bottom of the file (before the existing `escapeHtml` helper):
```typescript
// ── bot_broadcast handler ──
/**
* Handle a broadcast request. The payload is a JSON string:
* { message: string }
*
* Sends the message to every user who has a TelegramLink.
* Uses a sequential loop with a 50 ms delay between sends (~20 msgs/sec)
* to stay well under Telegram's 30 msgs/sec global bot limit.
* Each send is wrapped with withFloodWait to automatically retry on
* rate-limit errors.
*/
async function handleBroadcast(payload: string): Promise<void> {
try {
const data = JSON.parse(payload) as { message: string };
if (!data.message) {
log.warn("Broadcast payload missing message — ignoring");
return;
}
const links = await getAllTelegramLinks();
if (links.length === 0) {
log.info("Broadcast requested but no linked users found");
return;
}
log.info(
{ recipientCount: links.length },
"Starting broadcast to all linked users"
);
let sent = 0;
let failed = 0;
for (const link of links) {
try {
await withFloodWait(() =>
sendTextMessage(link.telegramUserId, data.message, "textParseModeHTML")
);
sent++;
} catch (err) {
failed++;
log.warn(
{ err, telegramUserId: link.telegramUserId.toString() },
"Broadcast send failed for user"
);
}
// ~20 msgs/sec — well under the 30 msgs/sec global bot limit
await sleep(50);
}
log.info({ sent, failed, total: links.length }, "Broadcast completed");
} catch (err) {
log.error({ err, payload }, "Failed to process broadcast");
}
}
```
---
## File 4: `src/app/api/telegram/bot/broadcast/route.ts` (NEW)
This is the Next.js API endpoint that the admin triggers from the web app.
```typescript
import { NextResponse } from "next/server";
import { auth } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
export const dynamic = "force-dynamic";
/**
* POST /api/telegram/bot/broadcast
* Send a text message to ALL users who have a linked Telegram account.
*
* Body: { message: string }
*
* Admin-only. The actual sending is done by the bot process — this endpoint
* simply validates input and fires a pg_notify('bot_broadcast', ...) signal.
*/
export async function POST(request: Request) {
const session = await auth();
if (!session?.user?.id) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// Only admins can broadcast
if (session.user.role !== "ADMIN") {
return NextResponse.json(
{ error: "Only admins can send broadcast messages" },
{ status: 403 }
);
}
let body: { message?: string };
try {
body = await request.json();
} catch {
return NextResponse.json({ error: "Invalid JSON body" }, { status: 400 });
}
if (!body.message || typeof body.message !== "string") {
return NextResponse.json(
{ error: "message is required and must be a string" },
{ status: 400 }
);
}
const trimmed = body.message.trim();
if (trimmed.length === 0) {
return NextResponse.json(
{ error: "message must not be empty" },
{ status: 400 }
);
}
// Telegram message text limit is 4096 characters
if (trimmed.length > 4096) {
return NextResponse.json(
{ error: "message exceeds Telegram's 4096-character limit" },
{ status: 400 }
);
}
// Count how many users will receive the broadcast
const recipientCount = await prisma.telegramLink.count();
if (recipientCount === 0) {
return NextResponse.json(
{ error: "No users have linked Telegram accounts" },
{ status: 400 }
);
}
// Notify the bot via pg_notify
const payload = JSON.stringify({ message: trimmed });
try {
await prisma.$queryRawUnsafe(
`SELECT pg_notify('bot_broadcast', $1)`,
payload
);
} catch (err) {
return NextResponse.json(
{ error: "Failed to queue broadcast — bot notification channel unavailable" },
{ status: 500 }
);
}
return NextResponse.json({
status: "QUEUED",
recipientCount,
message: `Broadcast queued for ${recipientCount} user(s)`,
});
}
```
---
## Summary of Changes
| File | Action | Description |
|------|--------|-------------|
| `bot/src/util/flood-wait.ts` | NEW | Reusable FLOOD_WAIT retry + sleep utilities (from skill) |
| `bot/src/db/queries.ts` | MODIFIED | Add `getAllTelegramLinks()` query |
| `bot/src/send-listener.ts` | MODIFIED | Subscribe to `bot_broadcast` channel; add `handleBroadcast()` with sequential send loop |
| `src/app/api/telegram/bot/broadcast/route.ts` | NEW | Admin-only API endpoint that validates input and fires `pg_notify` |
## Skill Patterns Used
1. **Sequential Send Queue**: The broadcast loop sends messages one at a time with `await sleep(50)` between each, keeping throughput at ~20 msgs/sec — safely under the 30 msgs/sec bot global limit documented in the skill.
2. **FLOOD_WAIT handling with `withFloodWait()`**: Every `sendTextMessage` call is wrapped so that if Telegram returns a `FLOOD_WAIT_X` error, the bot automatically waits the required duration plus random jitter (1-5s) before retrying. This prevents the entire broadcast from failing due to a temporary rate limit.
3. **Message text length validation**: The API endpoint rejects messages over 4,096 characters, which is the `message_text_length_max` limit documented in the skill.
4. **No concurrent sends (anti-pattern avoidance)**: The skill explicitly warns against `Promise.all(users.map(...))` for bulk sends. The implementation uses a sequential `for...of` loop instead.
5. **pg_notify pattern**: Follows the existing project convention (seen in `bot_send` and `new_package`) where the web app fires a `pg_notify` and the bot's `send-listener.ts` picks it up — keeping the two processes decoupled.

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 35365,
"duration_ms": 107616,
"total_duration_seconds": 107.6
}

View File

@@ -0,0 +1,10 @@
{
"run_id": "eval-1-without_skill",
"expectations": [
{"text": "uses_sequential_queue", "passed": true, "evidence": "Uses for...of loop over links, sends one at a time. No Promise.all."},
{"text": "uses_flood_wait_retry", "passed": false, "evidence": "No withFloodWait wrapper. Bare sendTextMessage calls in both send-listener handleBroadcast and commands handleBroadcastCommand. No flood-wait.ts utility created."},
{"text": "has_per_message_delay", "passed": false, "evidence": "No sleep/delay between sends in either the send-listener handler or the command handler."},
{"text": "handles_per_user_errors", "passed": true, "evidence": "try/catch per user with sent++/failed++ counters, continues to next user."},
{"text": "has_api_endpoint", "passed": true, "evidence": "Created src/app/api/telegram/bot/broadcast/route.ts with admin auth, 4096-char limit, pg_notify."}
]
}

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 69214,
"duration_ms": 200249,
"total_duration_seconds": 200.2
}

View File

@@ -0,0 +1,11 @@
{
"eval_id": 2,
"eval_name": "flood-wait-during-scan",
"prompt": "The worker keeps crashing with 'FLOOD_WAIT_35' errors when scanning a source channel that has about 10,000 messages. It happens during the getChannelMessages pagination loop. How do I fix this?",
"assertions": [
{"text": "identifies_retry_as_fix: Recommends wrapping pagination calls in FLOOD_WAIT-aware retry logic", "type": "required"},
{"text": "recommends_pagination_delay: Mentions adding or increasing sleep between pages", "type": "required"},
{"text": "respects_retry_after: Fix extracts and uses the actual wait duration from the error, not a hardcoded value", "type": "required"},
{"text": "does_not_reduce_frequency_as_primary_fix: Does not suggest scanning less often as the main solution", "type": "required"}
]
}

View File

@@ -0,0 +1,9 @@
{
"run_id": "eval-2-with_skill",
"expectations": [
{"text": "identifies_retry_as_fix", "passed": true, "evidence": "Identifies that withFloodWait already exists but the pagination loop needs its own adaptive backoff layer. Adds extractFloodWaitSeconds at pagination level."},
{"text": "recommends_pagination_delay", "passed": true, "evidence": "Keeps existing sleep(config.apiDelayMs) and makes it adaptive: doubles on FLOOD_WAIT, relaxes by 0.8x on success."},
{"text": "respects_retry_after", "passed": true, "evidence": "Extracts actual wait duration: 'await sleep(waitSec * 1000 + jitter)' using extractFloodWaitSeconds. Explicitly cites skill rule: 'Always respect the wait duration.'"},
{"text": "does_not_reduce_frequency_as_primary_fix", "passed": true, "evidence": "Primary fix is adaptive backoff at pagination level, not reducing scan frequency. Does not suggest lowering WORKER_INTERVAL_MINUTES."}
]
}

View File

@@ -0,0 +1,378 @@
# Diagnosis: FLOOD_WAIT_35 During Channel Scanning
## Root Cause
The `getChannelMessages` function in `worker/src/tdlib/download.ts` (line 146) paginates through channel history using `getChatHistory` with a fixed 1-second delay between pages (`config.apiDelayMs = 1000`). While each individual `getChatHistory` call is wrapped in `invokeWithTimeout` which delegates to `withFloodWait` for retry logic, the pagination loop itself has a fundamental problem for large channels:
**A 10,000-message channel at 100 messages/page = 100 consecutive API calls.** With only a 1-second delay between pages, this sustained burst of ~1 req/sec for 100+ seconds eventually triggers Telegram's dynamic rate limiter. Once `FLOOD_WAIT_35` fires, `withFloodWait` does correctly sleep for 35+ seconds and retry that single call -- but the loop immediately resumes its 1-second cadence, triggering another FLOOD_WAIT shortly after. After 5 retries (`config.maxRetries = 5`), the function throws and the worker crashes.
The same issue exists in `getTopicMessages` in `worker/src/tdlib/topics.ts` (line 170), which uses identical pagination logic.
## Why the Current Code Is Almost Right But Not Quite
The codebase already has the right building blocks:
1. `withFloodWait` in `worker/src/util/retry.ts` correctly extracts wait durations and adds jitter -- this matches the skill's recommended FLOOD_WAIT handler pattern exactly.
2. `invokeWithTimeout` in `worker/src/tdlib/download.ts` wraps every TDLib call through `withFloodWait`.
3. There is a 1-second inter-page delay (`config.apiDelayMs`).
**The gap:** After a FLOOD_WAIT recovery, the pagination loop does not back off its inter-page delay. It goes right back to 1-second spacing, which is what triggers repeated FLOOD_WAITs until max retries is exhausted.
## The Fix
Apply **adaptive backoff** to the pagination delay: when a FLOOD_WAIT is encountered during scanning, increase the inter-page delay for subsequent pages. This prevents the "recover then immediately re-trigger" cycle.
### Fix 1: Add adaptive delay to `getChannelMessages` (`worker/src/tdlib/download.ts`)
Replace lines 146-250 with:
```typescript
/**
* Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety).
* Collects both archive attachments AND photo messages (for preview matching).
* Returns messages in chronological order (oldest first).
*
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
* - Adaptive delay: backs off when FLOOD_WAIT is encountered
*/
export async function getChannelMessages(
client: Client,
chatId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
let currentDelay = config.apiDelayMs; // starts at 1000ms, adapts on FLOOD_WAIT
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
let result: { messages: TdMessage[] };
try {
result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
_: "getChatHistory",
chat_id: Number(chatId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
only_local: false,
});
} catch (err) {
// If invokeWithTimeout exhausted its retries on FLOOD_WAIT, check if
// we can recover at the pagination level by increasing the delay further.
const waitSec = extractFloodWaitSeconds(err);
if (waitSec !== null) {
// The retry wrapper already slept; bump the inter-page delay to
// prevent the next page from immediately re-triggering.
currentDelay = Math.min(currentDelay * 2, 30_000);
log.warn(
{ chatId: chatId.toString(), newDelay: currentDelay, totalScanned },
"FLOOD_WAIT persisted after retries — increasing inter-page delay and retrying"
);
// Sleep the full flood wait duration + jitter before continuing
const jitter = 1000 + Math.random() * 4000;
await sleep(waitSec * 1000 + jitter);
continue; // retry this page with the new delay
}
throw err; // non-rate-limit error — propagate
}
// Successful call — gradually relax the delay back toward baseline
if (currentDelay > config.apiDelayMs) {
currentDelay = Math.max(config.apiDelayMs, Math.floor(currentDelay * 0.8));
}
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
fileId: String(doc.document.id),
fileSize: BigInt(doc.document.size),
date: new Date(msg.date * 1000),
});
continue;
}
// Check for photo messages (potential previews)
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
date: new Date(msg.date * 1000),
caption,
fileId: String(smallest.photo.id),
fileSize: smallest.photo.size || smallest.photo.expected_size,
});
}
}
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < Math.min(limit, 100)) break;
// Rate limit delay — adaptive based on FLOOD_WAIT history
await sleep(currentDelay);
}
log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete"
);
// Reverse to chronological order (oldest first) so worker processes old→new
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}
```
You will also need to add the import for `extractFloodWaitSeconds` at the top of `download.ts`:
```typescript
import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
```
### Fix 2: Apply the same pattern to `getTopicMessages` (`worker/src/tdlib/topics.ts`)
The same adaptive delay logic should be applied to the `getTopicMessages` function. Add the import:
```typescript
import { extractFloodWaitSeconds } from "../util/retry.js";
```
Then apply the same changes to the pagination loop (the structure is identical):
```typescript
export async function getTopicMessages(
client: Client,
chatId: bigint,
topicId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
let currentDelay = config.apiDelayMs;
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
let result: {
messages?: {
id: number;
date: number;
content: {
_: string;
document?: {
file_name?: string;
document?: {
id: number;
size: number;
};
};
photo?: {
sizes?: {
type: string;
photo: { id: number; size: number; expected_size: number };
width: number;
height: number;
}[];
};
caption?: { text?: string };
};
}[];
};
try {
result = await invokeWithTimeout(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
} catch (err) {
const waitSec = extractFloodWaitSeconds(err);
if (waitSec !== null) {
currentDelay = Math.min(currentDelay * 2, 30_000);
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), newDelay: currentDelay, totalScanned },
"FLOOD_WAIT persisted after retries — increasing inter-page delay and retrying"
);
const jitter = 1000 + Math.random() * 4000;
await sleep(waitSec * 1000 + jitter);
continue;
}
throw err;
}
// Successful call — gradually relax the delay back toward baseline
if (currentDelay > config.apiDelayMs) {
currentDelay = Math.max(config.apiDelayMs, Math.floor(currentDelay * 0.8));
}
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
fileId: String(doc.document.id),
fileSize: BigInt(doc.document.size),
date: new Date(msg.date * 1000),
});
continue;
}
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
date: new Date(msg.date * 1000),
caption,
fileId: String(smallest.photo.id),
fileSize: smallest.photo.size || smallest.photo.expected_size,
});
}
}
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
if (boundary && currentFromId < boundary) break;
if (result.messages.length < Math.min(limit, 100)) break;
await sleep(currentDelay);
}
log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete"
);
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}
```
## Skill Patterns Applied
### 1. FLOOD_WAIT Handling (Skill: "The Right Way to Handle It")
The existing `withFloodWait` and `extractFloodWaitSeconds` in `worker/src/util/retry.ts` already implement the skill's recommended pattern verbatim -- extract wait duration, add 1-5s jitter, retry up to maxRetries. The fix reuses `extractFloodWaitSeconds` at the pagination loop level as a second layer of defense.
### 2. Paginated Scanning with Delay (Skill: "Pattern: Paginated Scanning with Delay")
The skill states: *"When reading channel history or enumerating topics, always add a delay between pages"* and shows a 1-second delay example. The existing code has this (`config.apiDelayMs = 1000`). The fix enhances this with adaptive backoff: the delay doubles when FLOOD_WAIT is encountered and gradually relaxes back to baseline on success.
### 3. Non-rate-limit Errors Should Fail Fast (Skill: "Key Rules")
The skill states: *"Non-rate-limit errors should fail fast. Only retry on FLOOD_WAIT, not on other errors."* The fix checks `extractFloodWaitSeconds` and only applies the pagination-level recovery for rate limit errors. All other errors propagate immediately via `throw err`.
### 4. Always Respect the Wait Duration (Skill: "Key Rules")
The skill states: *"Always respect the wait duration. Never retry before retry_after expires."* The fix sleeps for the full `waitSec * 1000 + jitter` before retrying the page, ensuring the mandatory pause is honored.
### 5. Add Jitter (Skill: "Key Rules")
The skill states: *"Add jitter. Without it, multiple clients retry simultaneously and trigger another FLOOD_WAIT."* Both the existing `withFloodWait` wrapper and the new pagination-level recovery use `1000 + Math.random() * 4000` jitter, consistent with the skill's recommendation.
## Files Affected
- `worker/src/tdlib/download.ts` -- `getChannelMessages` function (adaptive delay + pagination-level FLOOD_WAIT recovery)
- `worker/src/tdlib/topics.ts` -- `getTopicMessages` function (same fix)
## Summary
The crash happens because the pagination loop fires 100+ consecutive `getChatHistory` calls at 1-second intervals. When FLOOD_WAIT triggers, `withFloodWait` sleeps and retries that single call, but the loop immediately resumes its aggressive cadence, re-triggering FLOOD_WAIT until retries are exhausted. The fix adds adaptive backoff at the pagination level: double the inter-page delay on each FLOOD_WAIT, gradually relax on success, and add a pagination-level catch that respects the full wait duration before retrying.

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 63079,
"duration_ms": 140851,
"total_duration_seconds": 140.9
}

View File

@@ -0,0 +1,9 @@
{
"run_id": "eval-2-without_skill",
"expectations": [
{"text": "identifies_retry_as_fix", "passed": true, "evidence": "Correctly identifies that invokeWithTimeout already wraps calls in withFloodWait. Adds adaptive backoff at pagination level."},
{"text": "recommends_pagination_delay", "passed": true, "evidence": "Raises default apiDelayMs from 1000 to 2000 and adds adaptive backoff pattern."},
{"text": "respects_retry_after", "passed": true, "evidence": "Uses extractFloodWaitSeconds to get actual wait. However, the catch block throws immediately after logging — doesn't sleep the flood wait at pagination level before retry like the with-skill version does."},
{"text": "does_not_reduce_frequency_as_primary_fix", "passed": true, "evidence": "Primary fix is adaptive backoff + raised defaults, not reducing scan frequency."}
]
}

View File

@@ -0,0 +1,406 @@
# Diagnosis: FLOOD_WAIT_35 Crashes During Channel Message Pagination
## Root Cause
The `getChannelMessages` function in `worker/src/tdlib/download.ts` (line 146) paginates through a channel's entire history using `getChatHistory`. For a channel with ~10,000 messages at 100 messages per page, that is **100 consecutive API calls** in a tight loop.
The current rate-limit mitigation has two problems:
1. **The inter-page delay is too low.** `config.apiDelayMs` is hardcoded to **1000ms** (1 second) in `worker/src/util/config.ts:15`. Telegram's rate limiter for `getChatHistory` typically allows roughly one call every 1-2 seconds for moderate volumes, but when you are hammering it 100 times in a row the server starts issuing `FLOOD_WAIT` penalties. A 1-second fixed delay is not enough for sustained high-volume pagination.
2. **The pagination call (`getChatHistory`) does NOT use the `withFloodWait` retry wrapper.** Look at `download.ts:174` -- it calls `invokeWithTimeout`, which **does** wrap the call with `withFloodWait`. So the retry logic IS present. However, the retry wrapper in `worker/src/util/retry.ts` has `maxRetries` set to **5** (from config). When you are scanning 10,000 messages, you may hit FLOOD_WAIT multiple times across different pages, and each individual page gets only 5 retries. If Telegram escalates the wait time (e.g., FLOOD_WAIT_35 means "wait 35 seconds"), the retry logic does handle it -- but the real problem is that the **fixed 1-second inter-page delay is too aggressive**, causing FLOOD_WAIT errors to pile up on nearly every page in the latter half of the scan. Eventually one page exhausts its 5 retries and the entire scan crashes.
3. **No adaptive/exponential backoff between pages.** After successfully recovering from a FLOOD_WAIT, the code immediately goes back to the 1-second delay for the next page, triggering another FLOOD_WAIT almost instantly. There is no mechanism to slow down after being rate-limited.
## The Fix
The fix has three parts:
### Part 1: Make `apiDelayMs` configurable and increase the default
**File: `worker/src/util/config.ts`**
```typescript
export const config = {
databaseUrl: process.env.DATABASE_URL ?? "",
workerIntervalMinutes: parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10),
tempDir: process.env.WORKER_TEMP_DIR ?? "/tmp/zips",
tdlibStateDir: process.env.TDLIB_STATE_DIR ?? "/data/tdlib",
maxZipSizeMB: parseInt(process.env.WORKER_MAX_ZIP_SIZE_MB ?? "4096", 10),
logLevel: (process.env.LOG_LEVEL ?? "info") as "debug" | "info" | "warn" | "error",
telegramApiId: parseInt(process.env.TELEGRAM_API_ID ?? "0", 10),
telegramApiHash: process.env.TELEGRAM_API_HASH ?? "",
/** Maximum jitter added to scheduler interval (in minutes) */
jitterMinutes: 5,
/** Maximum time span for multipart archive parts (in hours). 0 = no limit. */
multipartTimeoutHours: parseInt(process.env.MULTIPART_TIMEOUT_HOURS ?? "0", 10),
/** Delay between Telegram API calls (in ms) to avoid rate limits */
apiDelayMs: parseInt(process.env.WORKER_API_DELAY_MS ?? "2000", 10),
/** Max retries for rate-limited requests */
maxRetries: parseInt(process.env.WORKER_MAX_RETRIES ?? "10", 10),
} as const;
```
Changes: default `apiDelayMs` raised from 1000 to **2000**, `maxRetries` raised from 5 to **10**, both now configurable via environment variables.
### Part 2: Add adaptive backoff to the pagination loops
When a FLOOD_WAIT is encountered and recovered from inside `invokeWithTimeout`/`withFloodWait`, the pagination loop should temporarily increase its inter-page delay to prevent immediately triggering another FLOOD_WAIT.
**File: `worker/src/tdlib/download.ts`** -- replace the `getChannelMessages` function:
```typescript
/**
* Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety).
* Collects both archive attachments AND photo messages (for preview matching).
* Returns messages in chronological order (oldest first).
*
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
* - Adaptive backoff: increases delay after FLOOD_WAIT recovery
*/
export async function getChannelMessages(
client: Client,
chatId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// Adaptive delay: starts at config value, increases after FLOOD_WAIT recovery
let currentDelayMs = config.apiDelayMs;
const MAX_DELAY_MS = 30_000; // Cap at 30 seconds between pages
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
let result: { messages: TdMessage[] };
try {
result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
_: "getChatHistory",
chat_id: Number(chatId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
only_local: false,
});
// Successful call without rate limiting — gradually reduce delay back
// toward the base value (but never below it)
if (currentDelayMs > config.apiDelayMs) {
currentDelayMs = Math.max(
config.apiDelayMs,
Math.floor(currentDelayMs * 0.8)
);
}
} catch (err) {
// If withFloodWait inside invokeWithTimeout exhausted retries on a
// FLOOD_WAIT error, increase the inter-page delay significantly
// before re-throwing so the caller (or a future retry of the whole
// scan) starts slower.
const floodSeconds = extractFloodWaitSeconds(err);
if (floodSeconds !== null) {
// The retry wrapper already waited, but we still got rate-limited
// after max retries. Double the inter-page delay for next time.
currentDelayMs = Math.min(MAX_DELAY_MS, currentDelayMs * 2);
log.warn(
{
chatId: chatId.toString(),
pageCount,
totalScanned,
newDelayMs: currentDelayMs,
},
"FLOOD_WAIT exhausted retries — increased inter-page delay"
);
}
throw err;
}
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
fileId: String(doc.document.id),
fileSize: BigInt(doc.document.size),
date: new Date(msg.date * 1000),
});
continue;
}
// Check for photo messages (potential previews)
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
date: new Date(msg.date * 1000),
caption,
fileId: String(smallest.photo.id),
fileSize: smallest.photo.size || smallest.photo.expected_size,
});
}
}
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < Math.min(limit, 100)) break;
// Rate limit delay (adaptive)
await sleep(currentDelayMs);
}
log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete"
);
// Reverse to chronological order (oldest first) so worker processes old→new
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}
```
You also need to add the import for `extractFloodWaitSeconds` at the top of `download.ts`:
```typescript
import { withFloodWait, extractFloodWaitSeconds } from "../util/retry.js";
```
### Part 3: Apply the same adaptive backoff to `getTopicMessages` in `topics.ts`
The same pattern should be applied to `worker/src/tdlib/topics.ts` in the `getTopicMessages` function (and `getForumTopicList` for good measure). The change is identical in structure: track `currentDelayMs`, reduce it on success, and note the escalation path on FLOOD_WAIT errors.
**File: `worker/src/tdlib/topics.ts`** -- add the import and apply to both pagination loops. Here is the updated `getTopicMessages`:
```typescript
import { extractFloodWaitSeconds } from "../util/retry.js";
// ... (existing code) ...
export async function getTopicMessages(
client: Client,
chatId: bigint,
topicId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// Adaptive delay
let currentDelayMs = config.apiDelayMs;
const MAX_DELAY_MS = 30_000;
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
let result: {
messages?: {
id: number;
date: number;
content: {
_: string;
document?: {
file_name?: string;
document?: { id: number; size: number };
};
photo?: {
sizes?: {
type: string;
photo: { id: number; size: number; expected_size: number };
width: number;
height: number;
}[];
};
caption?: { text?: string };
};
}[];
};
try {
result = await invokeWithTimeout(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
// Gradually reduce delay back toward base after success
if (currentDelayMs > config.apiDelayMs) {
currentDelayMs = Math.max(
config.apiDelayMs,
Math.floor(currentDelayMs * 0.8)
);
}
} catch (err) {
const floodSeconds = extractFloodWaitSeconds(err);
if (floodSeconds !== null) {
currentDelayMs = Math.min(MAX_DELAY_MS, currentDelayMs * 2);
log.warn(
{
chatId: chatId.toString(),
topicId: topicId.toString(),
pageCount,
totalScanned,
newDelayMs: currentDelayMs,
},
"FLOOD_WAIT exhausted retries — increased inter-page delay"
);
}
throw err;
}
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
fileId: String(doc.document.id),
fileSize: BigInt(doc.document.size),
date: new Date(msg.date * 1000),
});
continue;
}
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
date: new Date(msg.date * 1000),
caption,
fileId: String(smallest.photo.id),
fileSize: smallest.photo.size || smallest.photo.expected_size,
});
}
}
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
if (boundary && currentFromId < boundary) break;
if (result.messages.length < Math.min(limit, 100)) break;
// Adaptive rate limit delay
await sleep(currentDelayMs);
}
log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete"
);
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}
```
## Summary of Changes
| File | Change | Why |
|---|---|---|
| `worker/src/util/config.ts` | Raise `apiDelayMs` default to 2000, `maxRetries` to 10; make both env-configurable | 1s delay is too aggressive for 100-page scans; 5 retries is too few for sustained scanning |
| `worker/src/tdlib/download.ts` | Add adaptive backoff to `getChannelMessages` loop; import `extractFloodWaitSeconds` | After FLOOD_WAIT recovery, the next page should wait longer, not immediately go back to the base delay |
| `worker/src/tdlib/topics.ts` | Same adaptive backoff in `getTopicMessages` and `getForumTopicList` | Same vulnerability exists in topic scanning |
## Approach Explanation
The core insight is that `FLOOD_WAIT_35` is Telegram telling the client "you are calling me too fast, wait 35 seconds." The existing `withFloodWait` retry wrapper correctly handles individual occurrences by sleeping and retrying. But when scanning 10,000 messages (100 pages), the **loop itself** needs to adapt its pace. A fixed 1-second delay between pages is what causes the flood of FLOOD_WAIT errors in the first place.
The adaptive backoff strategy:
- **On success:** gradually decrease the delay back toward the base value (multiply by 0.8), so scanning speeds back up once the rate limit pressure eases.
- **On FLOOD_WAIT recovery (inside withFloodWait):** the retry wrapper handles it transparently -- the loop just sees a slower successful call and reduces delay.
- **On FLOOD_WAIT exhausting retries:** double the inter-page delay (capped at 30s) before re-throwing, so if the scan is retried it starts slower.
- **Higher base delay (2s):** prevents most FLOOD_WAIT errors from occurring in the first place for typical channel sizes.
- **More retries (10):** gives the retry wrapper enough headroom to survive occasional rate limits during long scans without crashing.

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 45601,
"duration_ms": 122326,
"total_duration_seconds": 122.3
}

View File

@@ -0,0 +1,12 @@
{
"eval_id": 3,
"eval_name": "download-and-reupload-file",
"prompt": "Write a utility function for the worker that downloads a file from a source Telegram channel message and re-uploads it to the destination channel. It needs to handle files up to 2GB and work reliably in Docker.",
"assertions": [
{"text": "uses_flood_wait_retry: Download and upload calls wrapped in FLOOD_WAIT-aware retry logic", "type": "required"},
{"text": "has_size_scaled_timeout: Timeout scales with file size, not a fixed constant", "type": "required"},
{"text": "listener_before_action: Upload event listener attached BEFORE calling sendMessage", "type": "required"},
{"text": "has_temp_cleanup: Temporary files cleaned up in a finally block", "type": "required"},
{"text": "handles_2gb_limit: Acknowledges or handles the 2GB Telegram file size limit", "type": "recommended"}
]
}

View File

@@ -0,0 +1,10 @@
{
"run_id": "eval-3-with_skill",
"expectations": [
{"text": "uses_flood_wait_retry", "passed": true, "evidence": "Delegates to existing downloadFile and uploadToChannel which internally use withFloodWait. Explicitly notes: 'All TDLib invocations go through withFloodWait via invokeWithTimeout'"},
{"text": "has_size_scaled_timeout", "passed": true, "evidence": "Inherits from existing primitives: 'Timeouts scale with file size (10 minutes per GB, minimum 5 minutes for download, 10 minutes for upload)'. Explicitly cited from skill."},
{"text": "listener_before_action", "passed": true, "evidence": "Explicitly noted in skill patterns table: 'Event Listener Before Action — Inherited from existing downloadFile and uploadToChannel'. The skill pattern is referenced."},
{"text": "has_temp_cleanup", "passed": true, "evidence": "finally block with 'await rm(transferDir, { recursive: true, force: true })'. Uses randomUUID for isolation."},
{"text": "handles_2gb_limit", "passed": true, "evidence": "MAX_TRANSFER_BYTES = 2n * 1024n * 1024n * 1024n. Upfront validation with clear error message."}
]
}

View File

@@ -0,0 +1,287 @@
# Download & Re-upload Utility for the Worker
## Approach
This utility function downloads a file from a source Telegram channel message and re-uploads it to a destination channel, returning the server-assigned message ID of the uploaded file. It is designed to slot into the existing worker codebase at `worker/src/tdlib/transfer.ts`.
The function composes the existing `downloadFile` (from `tdlib/download.ts`) and `uploadToChannel` (from `upload/channel.ts`) primitives, adding:
1. **Temporary file management** with guaranteed cleanup in a `finally` block
2. **Size-scaled timeouts** for files up to 2 GB (the TDLib user account limit)
3. **Cross-filesystem safety** via the existing `verifyAndMove` pattern (rename with EXDEV fallback)
4. **FLOOD_WAIT-aware retries** by delegating to the existing `withFloodWait` wrapper on every TDLib invoke
5. **Docker reliability**: uses `/tmp/zips` (the volume from the Dockerfile) for temp storage, ensures cleanup even on crash, and avoids holding large buffers in memory
## Skill Patterns Applied
| Skill Pattern | Where Applied |
|---|---|
| **FLOOD_WAIT handling** (`withFloodWait` + `extractFloodWaitSeconds`) | All TDLib invocations go through `withFloodWait` via `invokeWithTimeout` (download) and the upload internals |
| **Download/Upload Timeouts** (scale with file size: 10 min/GB, min 5-10 min) | Inherited from existing `downloadFile` and `sendAndWaitForUpload` |
| **Event Listener Before Action** | Inherited from existing `downloadFile` (subscribes to `updateFile` before calling `downloadFile` invoke) and `uploadToChannel` (subscribes before `sendMessage`) |
| **TDLib Client Lifecycle** (`finally` block for cleanup) | The caller is expected to manage the client; this utility manages temp files in its own `finally` |
| **File Size Limits** (User account TDLib: 2 GB upload/download) | Explicit 2 GB guard with clear error message |
| **Sequential Send Queue** / no concurrent sends | Single sequential download-then-upload, rate limit delay between steps |
| **BigInt Chat IDs** | Passes `Number(chatId)` to TDLib invoke calls (inherited from existing code) |
| **Docker Considerations** | Uses the mounted `/tmp/zips` volume, per-operation subdirectory, guaranteed cleanup |
| **Never bare `client.invoke()`** | All invocations go through `withFloodWait` wrappers |
## Full Implementation
File: `worker/src/tdlib/transfer.ts`
```typescript
import path from "path";
import { mkdir, rm, stat } from "fs/promises";
import { randomUUID } from "crypto";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { downloadFile } from "./download.js";
import type { DownloadProgress } from "./download.js";
import { uploadToChannel } from "../upload/channel.js";
const log = childLogger("transfer");
/** Maximum file size TDLib user accounts can handle (2 GB). */
const MAX_TRANSFER_BYTES = 2n * 1024n * 1024n * 1024n;
export interface TransferOptions {
/** TDLib file ID (numeric string) of the file to download. */
fileId: string;
/** Original file name from the source message. */
fileName: string;
/** Expected file size in bytes. */
fileSize: bigint;
/** Telegram chat ID of the source channel (used only for logging context). */
sourceChatId: bigint;
/** Telegram chat ID of the destination channel. */
destChatId: bigint;
/** Optional caption to attach to the uploaded message. */
caption?: string;
/** Optional callback for download progress updates. */
onDownloadProgress?: (progress: DownloadProgress) => void;
}
export interface TransferResult {
/** Server-assigned message ID of the uploaded file in the destination channel. */
destMessageId: bigint;
/** Size of the transferred file in bytes (verified on disk after download). */
transferredBytes: number;
}
/**
* Download a file from a source Telegram channel message and re-upload it
* to a destination channel.
*
* This function:
* 1. Validates the file size is within TDLib's 2 GB limit
* 2. Downloads the file to a temporary directory (with progress tracking)
* 3. Verifies the downloaded file's size matches the expected size
* 4. Uploads the file to the destination channel
* 5. Waits for Telegram's server-side upload confirmation
* 6. Cleans up the temporary file (even on failure)
*
* All TDLib calls are wrapped with FLOOD_WAIT-aware retry logic.
* Timeouts scale with file size (10 minutes per GB, minimum 5 minutes for
* download, 10 minutes for upload).
*
* Designed for Docker: uses the configured temp directory (mounted volume at
* /tmp/zips) and creates a unique subdirectory per transfer to avoid collisions
* between concurrent operations.
*
* @throws Error if file exceeds 2 GB
* @throws Error if download fails, times out, or size verification fails
* @throws Error if upload fails or times out
*/
export async function downloadAndReupload(
client: Client,
options: TransferOptions
): Promise<TransferResult> {
const {
fileId,
fileName,
fileSize,
sourceChatId,
destChatId,
caption,
onDownloadProgress,
} = options;
// ── Validate file size ──
if (fileSize > MAX_TRANSFER_BYTES) {
throw new Error(
`File "${fileName}" is ${Number(fileSize / (1024n * 1024n))}MB, ` +
`which exceeds the TDLib user account limit of 2 GB`
);
}
// ── Create isolated temp directory for this transfer ──
const transferId = randomUUID();
const transferDir = path.join(config.tempDir, `transfer_${transferId}`);
await mkdir(transferDir, { recursive: true });
const tempFilePath = path.join(transferDir, fileName);
log.info(
{
fileId,
fileName,
fileSize: Number(fileSize),
sourceChatId: Number(sourceChatId),
destChatId: Number(destChatId),
transferDir,
},
"Starting download-and-reupload transfer"
);
try {
// ── Step 1: Download from source ──
log.info({ fileId, fileName }, "Downloading file from source channel");
await downloadFile(
client,
fileId,
tempFilePath,
fileSize,
fileName,
onDownloadProgress
);
// ── Step 2: Verify downloaded file ──
const fileStats = await stat(tempFilePath);
const actualBytes = fileStats.size;
if (Number(fileSize) > 0 && actualBytes !== Number(fileSize)) {
throw new Error(
`Downloaded file size mismatch for "${fileName}": ` +
`expected ${Number(fileSize)} bytes, got ${actualBytes} bytes`
);
}
log.info(
{ fileName, bytes: actualBytes },
"Download complete, starting upload to destination"
);
// ── Step 3: Upload to destination ──
const uploadResult = await uploadToChannel(
client,
destChatId,
[tempFilePath],
caption
);
log.info(
{
fileName,
destMessageId: Number(uploadResult.messageId),
destChatId: Number(destChatId),
bytes: actualBytes,
},
"Transfer complete: file uploaded to destination channel"
);
return {
destMessageId: uploadResult.messageId,
transferredBytes: actualBytes,
};
} catch (err) {
log.error(
{
err,
fileId,
fileName,
sourceChatId: Number(sourceChatId),
destChatId: Number(destChatId),
},
"Transfer failed"
);
throw err;
} finally {
// ── Always clean up temp files ──
await rm(transferDir, { recursive: true, force: true }).catch((cleanupErr) => {
log.warn(
{ err: cleanupErr, transferDir },
"Failed to clean up transfer temp directory"
);
});
}
}
/**
* Convenience wrapper that extracts the file info from a TDLib message object
* and calls downloadAndReupload.
*
* Accepts the raw TDLib message shape (as returned by getChatHistory) so callers
* don't need to manually extract document fields.
*
* @throws Error if the message does not contain a document attachment
*/
export async function transferMessageFile(
client: Client,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
message: any,
sourceChatId: bigint,
destChatId: bigint,
caption?: string,
onDownloadProgress?: (progress: DownloadProgress) => void
): Promise<TransferResult> {
const doc = message?.content?.document;
if (!doc?.document) {
throw new Error(
`Message ${message?.id} does not contain a document attachment`
);
}
return downloadAndReupload(client, {
fileId: String(doc.document.id),
fileName: doc.file_name ?? `file_${message.id}`,
fileSize: BigInt(doc.document.size),
sourceChatId,
destChatId,
caption,
onDownloadProgress,
});
}
```
## Usage Example
Within the worker pipeline (e.g., in `worker.ts` or a new route handler):
```typescript
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { downloadAndReupload } from "./tdlib/transfer.js";
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try {
const result = await downloadAndReupload(client, {
fileId: "12345",
fileName: "model_pack_v2.zip",
fileSize: 1_500_000_000n, // ~1.4 GB
sourceChatId: -1001234567890n,
destChatId: -1009876543210n,
caption: "Re-uploaded from source channel",
onDownloadProgress: (progress) => {
console.log(`Download: ${progress.percent}%`);
},
});
console.log(`Uploaded as message ${result.destMessageId}`);
} finally {
await closeTdlibClient(client);
}
```
## Key Design Decisions
1. **Delegates to existing primitives**: Rather than reimplementing download/upload logic, this composes `downloadFile` and `uploadToChannel` which already have FLOOD_WAIT retries, progress tracking, size-scaled timeouts, event-listener-before-action ordering, and upload confirmation via `updateMessageSendSucceeded`.
2. **Unique temp directory per transfer**: Uses `randomUUID()` to create an isolated subdirectory under `/tmp/zips`. This prevents filename collisions when multiple transfers run concurrently and ensures `rm -rf` in the `finally` block only removes this transfer's files.
3. **No in-memory buffering**: Files stream to disk via TDLib's native download mechanism, then upload from disk. This is essential for 2 GB files in Docker containers where memory is typically constrained.
4. **Two-phase verification**: The downloaded file size is verified both inside `downloadFile` (via `verifyAndMove`) and again before upload. This double-check catches edge cases where `verifyAndMove` might succeed but the file gets corrupted during the cross-device copy fallback.
5. **`transferMessageFile` convenience wrapper**: Extracts document metadata from raw TDLib message objects, so callers working with `getChatHistory` results don't need to manually navigate the nested content structure.

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 61157,
"duration_ms": 122135,
"total_duration_seconds": 122.1
}

View File

@@ -0,0 +1,10 @@
{
"run_id": "eval-3-without_skill",
"expectations": [
{"text": "uses_flood_wait_retry", "passed": true, "evidence": "Delegates to existing downloadFile and uploadToChannel which use withFloodWait internally. Notes 'Both are wrapped with withFloodWait internally'."},
{"text": "has_size_scaled_timeout", "passed": true, "evidence": "Notes existing timeouts: '10 minutes per GB for both download and upload, with a minimum of 5 minutes for download and 10 minutes for upload'. Inherited from composed functions."},
{"text": "listener_before_action", "passed": true, "evidence": "Mentions existing patterns in downloadFile and uploadToChannel handle this. But does NOT explicitly call it out as a critical concern or verify it."},
{"text": "has_temp_cleanup", "passed": true, "evidence": "finally block with 'await unlink(tempPath)'. Uses tempSubdir for isolation."},
{"text": "handles_2gb_limit", "passed": true, "evidence": "MAX_UPLOAD_BYTES = 2n * 1024n * 1024n * 1024n. Upfront validation with error directing to byteLevelSplit."}
]
}

View File

@@ -0,0 +1,318 @@
# Download and Re-upload Utility for the DragonsStash Worker
## Approach
The utility function `downloadAndReupload` bridges a source Telegram channel message to a destination channel by:
1. **Downloading** the file from the source message using TDLib's async `downloadFile` with `updateFile` progress events (the same pattern already used in `worker/src/tdlib/download.ts`).
2. **Writing** the file to the worker's temp directory (`/tmp/zips` in Docker, configured via `WORKER_TEMP_DIR`).
3. **Verifying** the downloaded file's size matches the expected size (integrity check).
4. **Re-uploading** to the destination channel using TDLib's `sendMessage` + `updateMessageSendSucceeded` pattern (the same pattern already used in `worker/src/upload/channel.ts`).
5. **Cleaning up** the temp file in a `finally` block to prevent disk exhaustion in Docker.
### Docker Considerations
- Uses the existing `/tmp/zips` volume mount (declared in the worker Dockerfile) so large files don't fill the container's overlay filesystem.
- Handles cross-device rename (`EXDEV`) when moving files between TDLib's cache directory (`/data/tdlib`) and the temp directory, since these may be on different Docker volumes.
- Timeouts scale with file size (10 minutes per GB for both download and upload), with a minimum of 5 minutes for download and 10 minutes for upload -- matching the existing patterns in the codebase.
- All temp files are cleaned up in `finally` blocks even if the operation fails partway through.
### 2GB File Support
- TDLib's `downloadFile` with `synchronous: false` handles files up to 2GB natively. The existing codebase already has this pattern working.
- Telegram's upload limit is 2GB per message. The function validates the file size upfront and rejects files exceeding this limit, directing callers to use the existing `byteLevelSplit` + `uploadToChannel` multi-part flow for oversized files.
- Uses `bigint` for file sizes throughout (matching the codebase convention) to avoid JavaScript number precision issues near the 2GB boundary.
## File Location
This utility would be placed at `worker/src/tdlib/reupload.ts`, alongside the existing `download.ts` module.
## Full Code
```typescript
// worker/src/tdlib/reupload.ts
import path from "path";
import { mkdir, unlink, stat } from "fs/promises";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { downloadFile } from "./download.ts";
import type { DownloadProgress } from "./download.ts";
import { uploadToChannel } from "../upload/channel.js";
import type { UploadResult } from "../upload/channel.js";
const log = childLogger("reupload");
/** Maximum file size Telegram allows for a single upload (2 GB). */
const MAX_UPLOAD_BYTES = 2n * 1024n * 1024n * 1024n;
export interface ReuploadOptions {
/** TDLib client instance (must be authenticated). */
client: Client;
/** Telegram file ID (numeric string) from the source message. */
fileId: string;
/** Original file name. */
fileName: string;
/** Expected file size in bytes. */
fileSize: bigint;
/** Telegram chat ID of the destination channel. */
destChatId: bigint;
/** Optional caption for the re-uploaded message. */
caption?: string;
/** Optional callback for download progress. */
onDownloadProgress?: (progress: DownloadProgress) => void;
/** Optional subdirectory name inside tempDir (to isolate concurrent operations). */
tempSubdir?: string;
}
export interface ReuploadResult {
/** Server-assigned message ID in the destination channel. */
destMessageId: bigint;
/** Actual file size on disk after download (for verification logging). */
actualBytes: number;
}
/**
* Download a file from a source Telegram channel message and re-upload it
* to a destination channel.
*
* Flow:
* 1. Validates file size is within Telegram's 2GB upload limit
* 2. Downloads via TDLib async download with progress tracking
* 3. Verifies file integrity (size match)
* 4. Uploads to destination channel, waiting for server confirmation
* 5. Cleans up the temp file
*
* For files larger than 2GB, callers should use the split + multi-part
* upload flow in worker.ts instead.
*
* Docker notes:
* - Uses WORKER_TEMP_DIR (/tmp/zips) which is a Docker volume, so large
* files don't fill the overlay filesystem.
* - Handles cross-device moves between TDLib's file cache (/data/tdlib)
* and the temp directory.
* - Temp files are always cleaned up, even on failure.
*
* @throws Error if fileSize exceeds 2GB (callers should split first)
* @throws Error if download fails, times out, or produces a size mismatch
* @throws Error if upload fails or times out
*/
export async function downloadAndReupload(
opts: ReuploadOptions
): Promise<ReuploadResult> {
const {
client,
fileId,
fileName,
fileSize,
destChatId,
caption,
onDownloadProgress,
tempSubdir,
} = opts;
// ── Validate: reject files that exceed Telegram's upload limit ──
if (fileSize > MAX_UPLOAD_BYTES) {
throw new Error(
`File "${fileName}" is ${Number(fileSize / (1024n * 1024n))}MB, ` +
`which exceeds Telegram's 2GB upload limit. ` +
`Use byteLevelSplit + uploadToChannel for oversized files.`
);
}
// ── Prepare temp directory ──
const workDir = tempSubdir
? path.join(config.tempDir, tempSubdir)
: config.tempDir;
await mkdir(workDir, { recursive: true });
const tempPath = path.join(workDir, fileName);
log.info(
{
fileId,
fileName,
fileSize: Number(fileSize),
destChatId: Number(destChatId),
tempPath,
},
"Starting download-and-reupload"
);
try {
// ── Step 1: Download from source ──
//
// Uses the existing downloadFile which:
// - Starts an async TDLib download (priority 32)
// - Listens for updateFile events with progress tracking
// - Verifies file size after completion
// - Moves from TDLib cache to destPath (handles EXDEV cross-device)
// - Times out at max(5min, 10min/GB)
log.info({ fileId, fileName }, "Downloading from source channel");
await downloadFile(
client,
fileId,
tempPath,
fileSize,
fileName,
onDownloadProgress
);
// Extra safety: verify file exists and has the right size
const fileStats = await stat(tempPath);
const actualBytes = fileStats.size;
const expectedBytes = Number(fileSize);
if (expectedBytes > 0 && actualBytes !== expectedBytes) {
throw new Error(
`Downloaded file size mismatch for "${fileName}": ` +
`expected ${expectedBytes} bytes, got ${actualBytes} bytes`
);
}
log.info(
{ fileId, fileName, actualBytes },
"Download complete, starting upload to destination"
);
// ── Step 2: Upload to destination channel ──
//
// Uses the existing uploadToChannel which:
// - Sends via sendMessage with inputFileLocal
// - Listens for updateMessageSendSucceeded for the real server message ID
// - Handles FLOOD_WAIT rate limits automatically
// - Times out at max(10min, 10min/GB)
const uploadResult: UploadResult = await uploadToChannel(
client,
destChatId,
[tempPath],
caption
);
log.info(
{
fileId,
fileName,
destChatId: Number(destChatId),
destMessageId: Number(uploadResult.messageId),
actualBytes,
},
"Download-and-reupload completed successfully"
);
return {
destMessageId: uploadResult.messageId,
actualBytes,
};
} finally {
// ── Always clean up temp file ──
// Critical in Docker to prevent /tmp/zips volume from filling up,
// especially when processing many large files in sequence.
try {
await unlink(tempPath);
log.debug({ tempPath }, "Cleaned up temp file");
} catch {
// File may not exist if download failed before writing
}
}
}
/**
* Convenience wrapper that downloads and re-uploads multiple files
* (e.g., multipart archive parts) from a source channel to a destination.
*
* Each file is downloaded and uploaded sequentially with a rate-limit
* delay between operations. Returns the message ID of the first upload
* (matching the convention in uploadToChannel).
*
* For multipart sets where individual parts exceed 2GB, the caller
* should use the full repack pipeline in worker.ts (concatenate +
* byteLevelSplit) instead of this function.
*/
export async function downloadAndReuploadMultiple(
client: Client,
files: Array<{
fileId: string;
fileName: string;
fileSize: bigint;
}>,
destChatId: bigint,
caption?: string,
onDownloadProgress?: (fileIndex: number, progress: DownloadProgress) => void,
tempSubdir?: string
): Promise<ReuploadResult> {
if (files.length === 0) {
throw new Error("No files provided for download-and-reupload");
}
let firstResult: ReuploadResult | null = null;
for (let i = 0; i < files.length; i++) {
const file = files[i];
// Only set caption on the first file (matching uploadToChannel convention)
const fileCaption = i === 0 ? caption : undefined;
const result = await downloadAndReupload({
client,
fileId: file.fileId,
fileName: file.fileName,
fileSize: file.fileSize,
destChatId,
caption: fileCaption,
onDownloadProgress: onDownloadProgress
? (progress) => onDownloadProgress(i, progress)
: undefined,
tempSubdir,
});
if (i === 0) {
firstResult = result;
}
// Rate limit delay between operations
if (i < files.length - 1) {
await new Promise((resolve) =>
setTimeout(resolve, config.apiDelayMs)
);
}
}
return firstResult!;
}
```
## How It Fits Into the Existing Codebase
The function composes the two existing building blocks:
- **`downloadFile`** from `worker/src/tdlib/download.ts` -- handles async download with `updateFile` events, progress tracking, timeout, size verification, and cross-device file moves (critical for Docker where `/data/tdlib` and `/tmp/zips` are separate volumes).
- **`uploadToChannel`** from `worker/src/upload/channel.ts` -- handles `sendMessage` with `updateMessageSendSucceeded` confirmation, upload progress via `updateFile`, and FLOOD_WAIT retry.
Both are wrapped with `withFloodWait` internally, so the utility automatically handles Telegram rate limiting.
The `downloadAndReupload` function is a focused utility that can be called from the worker pipeline or any other part of the system that needs to transfer a file between channels. The existing `processOneArchiveSet` in `worker.ts` does a similar download-then-upload flow but with additional steps (hashing, dedup, metadata extraction, splitting). This utility extracts the core transfer operation for simpler use cases.
## Usage Example
```typescript
import { downloadAndReupload } from "./tdlib/reupload.js";
// Transfer a single file from source to destination
const result = await downloadAndReupload({
client,
fileId: "12345",
fileName: "archive.zip",
fileSize: 1_500_000_000n, // ~1.5 GB
destChatId: -1001234567890n,
caption: "Re-uploaded archive",
onDownloadProgress: (progress) => {
console.log(`Download: ${progress.percent}%`);
},
tempSubdir: `reupload_${Date.now()}`,
});
console.log(`Uploaded as message ${result.destMessageId}`);
```

View File

@@ -0,0 +1,5 @@
{
"total_tokens": 54587,
"duration_ms": 146695,
"total_duration_seconds": 146.7
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,301 @@
---
name: tdlib-telegram
description: >
Reference guide for building Telegram integrations with TDLib (tdl/node).
Covers rate limits, FLOOD_WAIT handling, file size constraints, bot vs user account
differences, and safe code patterns. Use this skill whenever writing or modifying code
that calls Telegram APIs via TDLib, the Bot API, or any Telegram client library — including
sending messages, downloading/uploading files, scanning channels, forwarding messages,
managing subscriptions, or handling notifications. Also use when debugging 429 errors,
FLOOD_WAIT, or silent message drops.
---
# TDLib / Telegram Development Guide
This skill provides the rate limits, constraints, and patterns you need to write correct
Telegram integrations. The limits below come from official Telegram documentation and
well-established community findings (Telegram does not publish exact numbers for all limits).
## Telegram Rate Limits
These are approximate safe boundaries. Telegram's actual limits are dynamic and depend on
account age, history, and request type. The correct strategy is to respect these as guidelines
and always handle FLOOD_WAIT errors gracefully.
### Bot Accounts
| Operation | Limit | Notes |
|-----------|-------|-------|
| Messages to same chat | ~1 msg/sec | Bursts OK, sustained exceeds limit |
| Messages in a group | 20 msgs/min | Hard limit per group chat |
| Bulk notifications (different users) | ~30 msgs/sec | Global across all chats |
| Message edits in a group | ~20 edits/min | Community-observed |
| API requests (global) | ~30 req/sec | All request types combined |
| Paid broadcasts | up to 1000 msgs/sec | Requires Telegram Stars balance |
### User Accounts (TDLib)
| Operation | Limit | Notes |
|-----------|-------|-------|
| API requests (global) | ~30 req/sec | All request types combined |
| Messages in a group | ~20 msgs/min | Same as bot |
| Channel history reads | No published limit | But pagination + delay is essential |
| Joining groups | Very strict | FLOOD_WAIT often 30-300+ seconds |
### File Size Limits
| Context | Upload | Download |
|---------|--------|----------|
| Bot API (standard) | 50 MB | 20 MB |
| Bot API (local server) | 2,000 MB | 2,000 MB |
| User account (TDLib) | 2 GB | 2 GB |
| Premium user (TDLib) | 4 GB | 4 GB |
### Message & Content Limits
| Item | Limit |
|------|-------|
| Message text length | 4,096 chars |
| Media caption | 1,024 chars (4,096 premium) |
| Album / media group | 10 items max |
| Forwarded messages per request | `forwarded_message_count_max` (TDLib option) |
| Inline keyboard buttons | 100 entities |
| Formatting entities per message | 100 |
| Scheduled messages per chat | 100 |
| Bot commands | 100 max |
### Forum & Group Limits
| Item | Limit |
|------|-------|
| Topics per group | 1,000,000 |
| Topic title | 128 chars |
| Group members | 200,000 |
| Admins per group | 50 |
| Bots per group | 20 |
| Pinned topics | 5 |
## FLOOD_WAIT — How It Works
When you exceed rate limits, Telegram returns a `FLOOD_WAIT_X` error (or HTTP 429 with
`retry_after`). This is a **mandatory pause** — the value `X` is the number of seconds you
must wait before ANY request will succeed. It blocks the entire client, not just the
operation that triggered it.
### The Right Way to Handle It
```typescript
// Extract the wait duration from the error
function extractFloodWaitSeconds(err: unknown): number | null {
const message = err instanceof Error ? err.message : String(err);
// Pattern 1: FLOOD_WAIT_30
const flood = message.match(/FLOOD_WAIT_(\d+)/i);
if (flood) return parseInt(flood[1], 10);
// Pattern 2: "retry after 30"
const retry = message.match(/retry after (\d+)/i);
if (retry) return parseInt(retry[1], 10);
// Pattern 3: HTTP 429 without explicit seconds
if (String((err as any)?.code) === "429") return 30;
return null; // Not a rate limit error
}
// Wrap any TDLib call with automatic retry
async function withFloodWait<T>(fn: () => Promise<T>, maxRetries = 5): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
const wait = extractFloodWaitSeconds(err);
if (wait === null || attempt >= maxRetries) throw err;
// Add 1-5s jitter to prevent thundering herd
const jitter = 1000 + Math.random() * 4000;
await sleep(wait * 1000 + jitter);
}
}
throw new Error("Unreachable");
}
```
### Key Rules
- **Always respect the wait duration.** Never retry before `retry_after` expires.
- **Add jitter.** Without it, multiple clients retry simultaneously and trigger another FLOOD_WAIT.
- **Non-rate-limit errors should fail fast.** Only retry on FLOOD_WAIT, not on other errors.
- **Don't artificially throttle below ~1 req/sec.** Telegram's own guidance (via grammY docs)
is to send requests as fast as you need and handle 429 errors. Fixed low-frequency throttling
wastes throughput without preventing floods.
## Code Patterns
### Pattern: Sequential Send Queue
When sending notifications to multiple users, use a sequential queue with a per-message delay.
Never fire concurrent sends — you will hit the 30 msg/sec global limit instantly.
```typescript
let sendQueue: Promise<void> = Promise.resolve();
function queueSend(chatId: bigint, text: string): void {
sendQueue = sendQueue
.then(() => withFloodWait(() => sendTextMessage(chatId, text)))
.then(() => sleep(50)) // ~20 msgs/sec, well under 30 limit
.catch((err) => log.error({ err, chatId }, "Send failed"));
}
```
### Pattern: Paginated Scanning with Delay
When reading channel history or enumerating topics, always add a delay between pages:
```typescript
while (hasMorePages) {
const result = await invokeWithTimeout(client, { _: "getChatHistory", ... });
processMessages(result.messages);
if (result.messages.length < limit) break;
await sleep(1000); // 1 second between pages — prevents FLOOD_WAIT on large channels
}
```
### Pattern: Event Listener Before Action
When waiting for TDLib async events (upload confirmation, download completion), always
attach the event listener BEFORE starting the operation. If you attach after, fast
operations can complete before the listener exists, causing the promise to hang forever.
```typescript
// CORRECT: listener first, then action
client.on("update", handleUpdate);
const tempMsg = await client.invoke({ _: "sendMessage", ... });
tempMsgId = tempMsg.id; // handler now knows which message to match
// WRONG: action first, then listener — race condition!
const tempMsg = await client.invoke({ _: "sendMessage", ... });
client.on("update", handleUpdate); // may miss updateMessageSendSucceeded
```
### Pattern: Download/Upload Timeouts
Scale timeouts with file size. TDLib downloads/uploads are asynchronous — without a timeout,
a stalled transfer hangs the entire pipeline.
```typescript
const timeoutMs = Math.max(
10 * 60_000, // minimum 10 minutes
(fileSizeMB / 1024) * 10 * 60_000 // 10 minutes per GB
);
```
### Pattern: TDLib Client Lifecycle
Always close TDLib clients in a `finally` block. Unclosed clients leak memory and file
descriptors, and can leave TDLib's internal database locked.
```typescript
const client = await createTdlibClient(account);
try {
// ... use client ...
} finally {
await closeTdlibClient(client);
}
```
## Anti-Patterns
### Never: Concurrent TDLib Sends Without Queue
```typescript
// BAD: fires all sends concurrently — will trigger FLOOD_WAIT immediately
await Promise.all(users.map((u) => sendTextMessage(u.chatId, msg)));
// GOOD: sequential with delay
for (const user of users) {
await withFloodWait(() => sendTextMessage(user.chatId, msg));
await sleep(50);
}
```
### Never: Bare client.invoke() Without Retry
Every `client.invoke()` call can return FLOOD_WAIT at any time. Bare calls will crash
on rate limits instead of retrying.
```typescript
// BAD: crashes on FLOOD_WAIT
await client.invoke({ _: "sendMessage", ... });
// GOOD: retries automatically
await withFloodWait(() => client.invoke({ _: "sendMessage", ... }));
```
### Never: Retry Without Respecting retry_after
```typescript
// BAD: fixed 1-second retry ignores Telegram's wait requirement
catch (err) { await sleep(1000); retry(); }
// GOOD: extract and respect the actual wait time
catch (err) {
const wait = extractFloodWaitSeconds(err);
if (wait !== null) await sleep(wait * 1000 + jitter);
else throw err;
}
```
### Never: Ignore FLOOD_WAIT in Bots
Bot accounts get the same FLOOD_WAIT as user accounts. The bot API's 429 response
blocks ALL operations for the specified duration — not just the chat that triggered it.
A single unhandled flood in a notification loop can make the entire bot unresponsive.
## Bot vs User Account Differences
| Capability | Bot | User (TDLib) |
|-----------|-----|-------------|
| Read channel history | No (unless admin) | Yes |
| Send to users who haven't started bot | No | N/A |
| Join groups via invite link | No (must be added) | Yes |
| Forward messages (send_copy) | Yes | Yes |
| File upload limit | 50 MB (standard API) | 2 GB |
| File download limit | 20 MB (standard API) | 2 GB |
| Auth method | Bot token | Phone + SMS code |
| Rate limit profile | Same FLOOD_WAIT | Same FLOOD_WAIT |
## TDLib-Specific Notes
### BigInt Chat IDs
TDLib uses numeric chat IDs. Supergroups and channels use negative IDs (e.g., `-1001234567890`).
When passing to `client.invoke()`, convert with `Number(chatId)` — TDLib's JSON interface
doesn't handle BigInt. Be aware that very large IDs may lose precision with `Number()`,
though current Telegram IDs are within safe integer range.
### TDLib Options (Runtime Queryable)
These are read-only values you can query at runtime via `getOption`:
- `message_text_length_max` — max message text length
- `message_caption_length_max` — max caption length
- `forwarded_message_count_max` — max forwards per request
### Session State
TDLib persists session state to disk. Each account needs its own state directory.
Running two clients on the same state directory simultaneously will corrupt the database.
Use separate directories per account, and separate volumes in Docker for worker vs bot.
## Docker Considerations
- **prebuilt-tdlib**: The `prebuilt-tdlib` npm package provides platform-specific TDLib
binaries. Container base image must match (e.g., `node:20-bookworm-slim` for Debian x64).
- **Volumes**: Mount persistent volumes for TDLib state directories — losing state forces
full re-authentication.
- **Graceful shutdown**: Wait for active operations to finish before closing DB connections.
TDLib operations in flight will fail if the database pool is closed underneath them.
- **Health checks**: TDLib services don't expose HTTP — use database connectivity as the
health signal instead.

View File

@@ -0,0 +1,23 @@
{
"skill_name": "tdlib-telegram",
"evals": [
{
"id": 1,
"prompt": "Add a new bot command /broadcast that sends a text message to ALL users who have a TelegramLink in the database. The admin triggers it from the web app. Add it to the bot's command handler and create an API endpoint that triggers it.",
"expected_output": "Code that uses a sequential send queue with withFloodWait wrapping each sendTextMessage call, a delay between sends (~50ms), and does NOT use Promise.all or concurrent sends. Should handle errors per-user without stopping the broadcast.",
"files": []
},
{
"id": 2,
"prompt": "The worker keeps crashing with 'FLOOD_WAIT_35' errors when scanning a source channel that has about 10,000 messages. It happens during the getChannelMessages pagination loop. How do I fix this?",
"expected_output": "Diagnosis that the apiDelayMs between pages may be too low or the retry logic isn't wrapping the pagination calls. Should recommend ensuring all getChatHistory/searchChatMessages calls go through withFloodWait/invokeWithTimeout, and that sleep(config.apiDelayMs) exists between pages. Should NOT suggest reducing scan frequency as the primary fix.",
"files": []
},
{
"id": 3,
"prompt": "Write a utility function for the worker that downloads a file from a source Telegram channel message and re-uploads it to the destination channel. It needs to handle files up to 2GB and work reliably in Docker.",
"expected_output": "Code that: (1) wraps download in withFloodWait with size-scaled timeout, (2) attaches upload event listener BEFORE calling sendMessage, (3) uses temp directory with cleanup in finally block, (4) handles the 2GB Telegram limit correctly, (5) uses try/finally for client cleanup if applicable.",
"files": []
}
]
}

66
.drone.yml Normal file
View File

@@ -0,0 +1,66 @@
---
kind: pipeline
type: docker
name: build-and-deploy
trigger:
branch: [main]
event: [push]
steps:
- name: build-app
image: plugins/docker
settings:
repo: git.samagsteribbe.nl/admin/dragonsstash
registry: git.samagsteribbe.nl
dockerfile: Dockerfile
tags:
- latest
- "${DRONE_COMMIT_SHA:0:8}"
build_args:
- NEXT_PUBLIC_APP_URL=https://dragonsstash.samagsteribbe.nl
username:
from_secret: gitea_username
password:
from_secret: gitea_password
- name: build-worker
image: plugins/docker
settings:
repo: git.samagsteribbe.nl/admin/dragonsstash-worker
registry: git.samagsteribbe.nl
dockerfile: worker/Dockerfile
tags:
- latest
- "${DRONE_COMMIT_SHA:0:8}"
username:
from_secret: gitea_username
password:
from_secret: gitea_password
- name: build-bot
image: plugins/docker
settings:
repo: git.samagsteribbe.nl/admin/dragonsstash-bot
registry: git.samagsteribbe.nl
dockerfile: bot/Dockerfile
tags:
- latest
- "${DRONE_COMMIT_SHA:0:8}"
username:
from_secret: gitea_username
password:
from_secret: gitea_password
- name: deploy
image: alpine
environment:
SSH_KEY:
from_secret: ssh_key
commands:
- apk add --no-cache openssh-client
- mkdir -p ~/.ssh
- printf "%s" "$SSH_KEY" > ~/.ssh/id_ed25519
- chmod 600 ~/.ssh/id_ed25519
- ssh-keyscan -t ed25519 192.168.68.68 > ~/.ssh/known_hosts 2>/dev/null
- ssh sam@192.168.68.68 "cd /opt/stacks/DragonsStash && docker compose pull && docker compose up -d"

View File

@@ -13,6 +13,8 @@ AUTH_GITHUB_ID=""
AUTH_GITHUB_SECRET=""
# App
# APP_PORT controls the port the container listens on AND how it is exposed on the host.
# If you change APP_PORT, also update NEXT_PUBLIC_APP_URL to match.
NEXT_PUBLIC_APP_URL="http://localhost:3000"
APP_PORT=3000

1
.gitignore vendored
View File

@@ -18,6 +18,7 @@ worker/node_modules
# production
/build
worker/dist
# misc
.DS_Store

102
CLAUDE.md Normal file
View File

@@ -0,0 +1,102 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Dragon's Stash is a self-hosted inventory management system for 3D printing filament, SLA resin, miniature paints, and supplies. It includes an integrated Telegram archive worker that scans channels for ZIP/RAR archives, indexes their contents, and a bot that lets users search and receive packages via Telegram.
## Tech Stack
- **App**: Next.js 16 (App Router), TypeScript 5.9 (strict), Tailwind CSS 4, shadcn/ui
- **Database**: PostgreSQL 16+ via Prisma v7.4 with `@prisma/adapter-pg`
- **Auth**: Auth.js v5 (NextAuth) with credentials + optional GitHub OAuth
- **Worker**: TypeScript + TDLib (via `tdl`) for Telegram channel scanning
- **Bot**: TypeScript + TDLib for Telegram bot interface
- **Forms**: React Hook Form + Zod v4
## Commands
### App (root package.json)
```bash
npm run dev # Next.js dev server with hot reload
npm run build # Production build (standalone output)
npm run start # Production server
npm run lint # ESLint (next/core-web-vitals + TypeScript)
```
### Database
```bash
npm run db:generate # Generate Prisma client
npm run db:migrate # Run migrations (dev mode)
npm run db:push # Push schema without migrations
npm run db:seed # Seed database with test data
npm run db:studio # Prisma Studio UI
npx prisma migrate dev --name <description> # Create new migration
```
### Worker & Bot (each in their own directory)
```bash
cd worker && npm run dev # Dev mode with tsx watch
cd worker && npm run build # TypeScript compile to dist/
cd bot && npm run dev # Dev mode with tsx watch
cd bot && npm run build # TypeScript compile to dist/
```
### Dev Environment Setup
```bash
docker compose -f docker-compose.dev.yml up -d # Start PostgreSQL + worker
npm run dev # Run app locally
```
## Architecture
### Three-Service Design
The project is split into three independent services sharing one PostgreSQL database:
1. **App** (root `src/`): Next.js web UI for inventory management and Telegram admin
2. **Worker** (`worker/`): Scans Telegram source channels, processes archives, uploads to destination channel
3. **Bot** (`bot/`): Telegram bot for user search, package delivery, keyword subscriptions
Services communicate asynchronously via `pg_notify` (e.g., on-demand channel fetches, bot send requests).
### App Source Layout (`src/`)
- `app/(auth)/` — Login/register pages (public)
- `app/(app)/` — Protected routes behind auth middleware (dashboard, filaments, resins, paints, supplies, vendors, locations, settings, stls, telegram, usage)
- `app/api/` — API routes (NextAuth, health check, bot endpoints)
- `data/` — Server-side Prisma query functions (`*.queries.ts`), one file per domain model
- `schemas/` — Zod validation schemas, one file per domain model
- `components/ui/` — shadcn/ui primitives
- `components/shared/` — Reusable business components (data-table, status-badge, color-swatch, stat-card, page-header)
- `components/layout/` — Sidebar and header
- `lib/` — Auth config, Prisma singleton, constants, utilities, Telegram query helpers
- `hooks/` — Custom React hooks (use-modal, use-debounce, use-current-user)
- `types/` — Shared TypeScript types
### Key Patterns
- **Server Components by default** — pages are async server components that fetch data directly. Only interactive components use `"use client"`.
- **Server Actions for mutations** — each page directory has an `actions.ts` file with create/update/delete actions.
- **Data queries centralized** — all Prisma reads go through `src/data/*.queries.ts`, not inline in components.
- **Modal-based CRUD** — add/edit forms use dialog modals, not separate pages.
- **TanStack Table** with server-side pagination for all inventory tables.
- **All Prisma PKs use `cuid()`** string IDs.
### Worker Pipeline
1. Authenticate Telegram account via TDLib (SMS code flow, managed via admin UI)
2. Scan source channels for messages since `lastProcessedMessageId`
3. Detect archives (ZIP/RAR), group multipart sets, extract file listings
4. Hash for dedup, match preview images, extract creator from filename
5. Split files >2GB, upload to destination channel, track progress
### ESLint Scope
ESLint covers `src/` only. The `worker/`, `bot/`, `scripts/`, and `prisma/seed.ts` directories are excluded from linting.
## Docker Deployment
- `docker-compose.yml` — Production: app + worker + bot + db
- `docker-compose.dev.yml` — Dev: db + worker only (app runs locally)
- `docker-entrypoint.sh` — Runs migrations, optional seeding, then starts app
- Bot service uses Docker Compose profiles (`bot` or `full`) — not started by default
## Testing
No test framework is configured. Testing is manual.

View File

@@ -17,6 +17,8 @@ COPY --from=deps /app/node_modules ./node_modules
COPY . .
ENV NEXT_TELEMETRY_DISABLED=1
ARG NEXT_PUBLIC_APP_URL=http://localhost:3000
ENV NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL}
RUN npm run build
# --- Production image ---
@@ -30,22 +32,23 @@ RUN addgroup --system --gid 1001 nodejs && \
adduser --system --uid 1001 nextjs
# Copy public assets
COPY --from=builder /app/public ./public
# Copy prisma schema + migrations for runtime migrate deploy
COPY --from=builder /app/prisma ./prisma
COPY --from=builder /app/prisma.config.ts ./prisma.config.ts
COPY --from=builder --chown=nextjs:nodejs /app/public ./public
# Copy standalone build output
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
# Copy node_modules for prisma CLI (needed for migrate deploy at startup)
COPY --from=builder /app/node_modules/.prisma ./node_modules/.prisma
COPY --from=builder /app/node_modules/@prisma ./node_modules/@prisma
COPY --from=builder /app/node_modules/prisma ./node_modules/prisma
COPY --from=builder /app/node_modules/.bin/prisma ./node_modules/.bin/prisma
COPY --from=builder /app/node_modules/dotenv ./node_modules/dotenv
# Copy prisma schema + migrations for runtime migrate deploy
COPY --from=builder --chown=nextjs:nodejs /app/prisma ./prisma
COPY --from=builder --chown=nextjs:nodejs /app/prisma.config.ts ./prisma.config.ts
# Copy node_modules for prisma CLI (needed for migrate deploy at startup).
# Copying the full directory ensures all transitive dependencies are present.
COPY --from=builder --chown=nextjs:nodejs /app/node_modules ./node_modules
# Recreate the .bin/prisma symlink so Node resolves __dirname to prisma/build/,
# where the WASM files live (COPY dereferences symlinks, breaking WASM resolution)
RUN mkdir -p ./node_modules/.bin && \
ln -sf ../prisma/build/index.js ./node_modules/.bin/prisma
# Copy entrypoint script
COPY --chown=nextjs:nodejs docker-entrypoint.sh ./
@@ -53,6 +56,7 @@ RUN chmod +x docker-entrypoint.sh
USER nextjs
# Default port — overridden at runtime by the PORT env var (set via docker-compose APP_PORT)
EXPOSE 3000
ENV PORT=3000
ENV HOSTNAME="0.0.0.0"

View File

@@ -125,18 +125,15 @@ docker compose up -d
The app will be available at [http://localhost:3000](http://localhost:3000).
### Adding Telegram Services
### Adding the Telegram Bot
The worker and bot run as optional profiles so `docker compose up` works with just the app + database:
The worker starts by default with `docker compose up`. The bot runs as an optional profile:
```bash
# App + DB + Telegram worker (needs TELEGRAM_API_ID + TELEGRAM_API_HASH in .env)
docker compose --profile telegram up -d
# App + DB + Worker + Bot (also needs BOT_TOKEN in .env)
docker compose --profile full up -d
# Or just the bot (alongside app + db)
# Or just the bot (alongside app + db + worker)
docker compose --profile bot up -d
```
@@ -297,5 +294,7 @@ curl http://localhost:3000/api/health
5. Open a Pull Request
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

View File

@@ -1,7 +1,7 @@
import { config } from "./util/config.js";
import { logger } from "./util/logger.js";
import { db, pool } from "./db/client.js";
import { createBotClient, closeBotClient, onBotUpdate } from "./tdlib/client.js";
import { createBotClient, closeBotClient, onBotUpdate, getUser } from "./tdlib/client.js";
import { startSendListener, stopSendListener } from "./send-listener.js";
import { handleMessage } from "./commands.js";
import { mkdir } from "fs/promises";
@@ -49,14 +49,27 @@ async function main(): Promise<void> {
const userId = senderId.user_id as number;
if (text && userId) {
// Get user info for display name (async but fire-and-forget for perf)
handleMessage({
(async () => {
let firstName = "User";
let lastName: string | undefined;
let username: string | undefined;
try {
const userInfo = await getUser(userId);
firstName = userInfo.firstName;
lastName = userInfo.lastName;
username = userInfo.username;
} catch {
// Fall back to defaults if getUser fails
}
await handleMessage({
chatId: BigInt(chatId),
userId: BigInt(userId),
text,
firstName: "User", // TDLib provides this via a separate getUser call
username: undefined,
}).catch((err) => {
firstName,
lastName,
username,
});
})().catch((err) => {
log.error({ err, chatId, userId }, "Failed to handle message");
});
}

View File

@@ -8,17 +8,31 @@ import {
getGlobalDestinationChannel,
} from "./db/queries.js";
import { copyMessageToUser, sendTextMessage, sendPhotoMessage } from "./tdlib/client.js";
import { sleep } from "./util/flood-wait.js";
const log = childLogger("send-listener");
let pgClient: pg.PoolClient | null = null;
let stopped = false;
/** Delay (ms) before attempting to reconnect after a connection loss. */
const RECONNECT_DELAY_MS = 5_000;
/**
* Start listening for pg_notify signals:
* - `bot_send` — payload = requestId → send a package to a user
* - `new_package` — payload = JSON { packageId, fileName, creator } → notify subscribers
*
* If the underlying connection is lost, the listener automatically reconnects
* so that pg_notify signals are never silently dropped.
*/
export async function startSendListener(): Promise<void> {
stopped = false;
await connectListener();
}
async function connectListener(): Promise<void> {
try {
pgClient = await pool.connect();
await pgClient.query("LISTEN bot_send");
await pgClient.query("LISTEN new_package");
@@ -31,10 +45,46 @@ export async function startSendListener(): Promise<void> {
}
});
// Reconnect automatically when the connection ends unexpectedly
pgClient.on("end", () => {
if (!stopped) {
log.warn("Send listener connection lost — reconnecting");
pgClient = null;
scheduleReconnect();
}
});
pgClient.on("error", (err) => {
log.error({ err }, "Send listener connection error");
if (!stopped && pgClient) {
try {
pgClient.release(true);
} catch (releaseErr) {
log.debug({ err: releaseErr }, "Failed to release pg client after error");
}
pgClient = null;
scheduleReconnect();
}
});
log.info("Send listener started (bot_send, new_package)");
} catch (err) {
log.error({ err }, "Failed to start send listener — retrying");
scheduleReconnect();
}
}
function scheduleReconnect(): void {
if (stopped) return;
setTimeout(() => {
if (!stopped) {
connectListener();
}
}, RECONNECT_DELAY_MS);
}
export function stopSendListener(): void {
stopped = true;
if (pgClient) {
pgClient.release();
pgClient = null;
@@ -133,7 +183,7 @@ async function handleNewPackage(payload: string): Promise<void> {
userSubs.set(key, patterns);
}
const creator = data.creator ? ` by ${data.creator}` : "";
const creator = data.creator ? ` by ${escapeHtml(data.creator)}` : "";
for (const [telegramUserId, patterns] of userSubs) {
const msg = [
`🔔 <b>New package matching your subscriptions:</b>`,
@@ -151,6 +201,9 @@ async function handleNewPackage(payload: string): Promise<void> {
"Failed to notify subscriber"
);
});
// Rate limit delay between notifications (~20 msgs/sec, under 30 msgs/sec bot limit)
await sleep(50);
}
} catch (err) {
log.error({ err, payload }, "Failed to process new_package notification");

View File

@@ -2,6 +2,7 @@ import tdl from "tdl";
import { getTdjson } from "prebuilt-tdlib";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/flood-wait.js";
const log = childLogger("tdlib-bot");
@@ -33,7 +34,7 @@ export async function createBotClient(): Promise<tdl.Client> {
await client.login(() => ({
type: "bot",
token: config.botToken,
getToken: () => Promise.resolve(config.botToken),
}));
log.info("Bot client authenticated successfully");
@@ -54,7 +55,10 @@ export async function closeBotClient(): Promise<void> {
/**
* Forward a message from a channel to a user's DM.
* Uses copyMessage to make it appear as sent by the bot.
* Uses forwardMessages with send_copy to make it appear as sent by the bot.
*
* The fromChatId is the TDLib chat ID stored in the DB — already in the correct
* format (negative for supergroups/channels, e.g. -1001234567890).
*/
export async function copyMessageToUser(
fromChatId: bigint,
@@ -62,19 +66,20 @@ export async function copyMessageToUser(
toUserId: bigint
): Promise<void> {
if (!client) throw new Error("Bot client not initialized");
const c = client;
// TDLib uses negative chat IDs for channels/supergroups
// The telegramId from the DB is the raw Telegram ID; for channels it needs -100 prefix
const fromChatIdNum = Number(-100n * 1n) + Number(fromChatId);
await client.invoke({
await withFloodWait(
() =>
c.invoke({
_: "forwardMessages",
chat_id: Number(toUserId),
from_chat_id: Number(fromChatId) > 0 ? -Number(fromChatId) : Number(fromChatId),
from_chat_id: Number(fromChatId),
message_ids: [Number(messageId)],
send_copy: true,
remove_caption: false,
});
}),
"copyMessageToUser"
);
}
/**
@@ -86,22 +91,31 @@ export async function sendTextMessage(
parseMode: "textParseModeMarkdown" | "textParseModeHTML" = "textParseModeMarkdown"
): Promise<void> {
if (!client) throw new Error("Bot client not initialized");
const c = client;
// Parse the text first
const parsed = await client.invoke({
const parsed = await withFloodWait(
() =>
c.invoke({
_: "parseTextEntities",
text,
parse_mode: { _: parseMode, version: parseMode === "textParseModeMarkdown" ? 2 : 0 },
});
}),
"parseTextEntities"
);
await client.invoke({
await withFloodWait(
() =>
c.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageText",
text: parsed,
},
});
}),
"sendTextMessage"
);
}
/**
@@ -113,6 +127,7 @@ export async function sendPhotoMessage(
caption: string
): Promise<void> {
if (!client) throw new Error("Bot client not initialized");
const c = client;
// Write the photo to a temp file
const { writeFile, unlink } = await import("fs/promises");
@@ -122,13 +137,19 @@ export async function sendPhotoMessage(
try {
await writeFile(tempPath, photoData);
const parsedCaption = await client.invoke({
const parsedCaption = await withFloodWait(
() =>
c.invoke({
_: "parseTextEntities",
text: caption,
parse_mode: { _: "textParseModeMarkdown", version: 2 },
});
}),
"parsePhotoCaption"
);
await client.invoke({
await withFloodWait(
() =>
c.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
@@ -138,12 +159,41 @@ export async function sendPhotoMessage(
width: 0,
height: 0,
},
});
}),
"sendPhotoMessage"
);
} finally {
await unlink(tempPath).catch(() => {});
}
}
/**
* Get basic info about a Telegram user (name, username).
*/
export async function getUser(
userId: number
): Promise<{ firstName: string; lastName?: string; username?: string }> {
if (!client) throw new Error("Bot client not initialized");
const c = client;
const user = (await withFloodWait(
() =>
c.invoke({
_: "getUser",
user_id: userId,
}),
"getUser"
)) as {
first_name?: string;
last_name?: string;
usernames?: { editable_username?: string };
};
return {
firstName: user.first_name ?? "User",
lastName: user.last_name || undefined,
username: user.usernames?.editable_username || undefined,
};
}
/**
* Get updates from TDLib. The bot listens for new messages this way.
*/

View File

@@ -0,0 +1,60 @@
import { childLogger } from "./logger.js";
const log = childLogger("flood-wait");
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Extract the mandatory wait duration (in seconds) from a Telegram
* FLOOD_WAIT error. Returns null when the error is not rate-limit related.
*/
export function extractFloodWaitSeconds(err: unknown): number | null {
const message = err instanceof Error ? err.message : String(err);
// Pattern 1: FLOOD_WAIT_30
const flood = message.match(/FLOOD_WAIT_(\d+)/i);
if (flood) return parseInt(flood[1], 10);
// Pattern 2: "retry after 30"
const retry = message.match(/retry after (\d+)/i);
if (retry) return parseInt(retry[1], 10);
// Pattern 3: HTTP 429 without explicit seconds
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (String((err as any)?.code) === "429") return 30;
return null;
}
/**
* Wrap any async Telegram operation with automatic FLOOD_WAIT retry.
* Adds random jitter (1-5s) to prevent thundering-herd retries.
*
* Non-rate-limit errors are re-thrown immediately (fail-fast).
*/
export async function withFloodWait<T>(
fn: () => Promise<T>,
context?: string,
maxRetries = 5
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
const wait = extractFloodWaitSeconds(err);
if (wait === null || attempt >= maxRetries) throw err;
const jitter = 1000 + Math.random() * 4000;
log.warn(
{ context, wait, attempt: attempt + 1, maxRetries, jitter: Math.round(jitter) },
"FLOOD_WAIT received — backing off"
);
await sleep(wait * 1000 + jitter);
}
}
throw new Error("Unreachable");
}
export { sleep };

View File

@@ -16,7 +16,6 @@ services:
retries: 5
worker:
profiles: ["telegram", "full"]
build:
context: .
dockerfile: worker/Dockerfile

View File

@@ -3,25 +3,31 @@ services:
build:
context: .
dockerfile: Dockerfile
pull_policy: build
pull_policy: never
ports:
- "${APP_PORT:-3000}:3000"
- "${APP_PORT:-3000}:${APP_PORT:-3000}"
environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
- AUTH_TRUST_HOST=true
- AUTH_GITHUB_ID=${AUTH_GITHUB_ID:-}
- AUTH_GITHUB_SECRET=${AUTH_GITHUB_SECRET:-}
- NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
- TELEGRAM_API_KEY=${TELEGRAM_API_KEY:-}
- BOT_TOKEN=${BOT_TOKEN:-}
- BOT_USERNAME=${BOT_USERNAME:-}
- LOG_LEVEL=${LOG_LEVEL:-info}
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
- PORT=${APP_PORT:-3000}
depends_on:
db:
condition: service_healthy
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health"]
test: ["CMD-SHELL", "wget -q --spider http://localhost:$$PORT/api/health || exit 1"]
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
start_period: 60s
restart: unless-stopped
deploy:
resources:
@@ -31,11 +37,10 @@ services:
- frontend
worker:
profiles: ["telegram", "full"]
build:
context: .
dockerfile: worker/Dockerfile
pull_policy: build
pull_policy: never
environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
- TELEGRAM_API_ID=${TELEGRAM_API_ID:-}
@@ -67,7 +72,7 @@ services:
build:
context: .
dockerfile: bot/Dockerfile
pull_policy: build
pull_policy: never
environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
- BOT_TOKEN=${BOT_TOKEN:-}

View File

@@ -10,7 +10,10 @@ if [ "$AUTH_SECRET" = "change-me-to-a-random-secret-in-production" ] || [ -z "$A
fi
echo "Running database migrations..."
./node_modules/.bin/prisma migrate deploy
if ! ./node_modules/.bin/prisma migrate deploy; then
echo "ERROR: Database migration failed. Check DATABASE_URL and database connectivity."
exit 1
fi
if [ "$SEED_DATABASE" = "true" ]; then
echo "Seeding database..."

221
install.cmd Normal file
View File

@@ -0,0 +1,221 @@
@echo off
setlocal enabledelayedexpansion
REM Claude Code Windows CMD Bootstrap Script
REM Installs Claude Code for environments where PowerShell is not available
REM Parse command line argument
set "TARGET=%~1"
if "!TARGET!"=="" set "TARGET=latest"
REM Validate target parameter
if /i "!TARGET!"=="stable" goto :target_valid
if /i "!TARGET!"=="latest" goto :target_valid
echo !TARGET! | findstr /r "^[0-9][0-9]*\.[0-9][0-9]*\.[0-9][0-9]*" >nul
if !ERRORLEVEL! equ 0 goto :target_valid
echo Usage: %0 [stable^|latest^|VERSION] >&2
echo Example: %0 1.0.58 >&2
exit /b 1
:target_valid
REM Check for 64-bit Windows
if /i "%PROCESSOR_ARCHITECTURE%"=="AMD64" goto :arch_valid
if /i "%PROCESSOR_ARCHITECTURE%"=="ARM64" goto :arch_valid
if /i "%PROCESSOR_ARCHITEW6432%"=="AMD64" goto :arch_valid
if /i "%PROCESSOR_ARCHITEW6432%"=="ARM64" goto :arch_valid
echo Claude Code does not support 32-bit Windows. Please use a 64-bit version of Windows. >&2
exit /b 1
:arch_valid
REM Set constants
set "GCS_BUCKET=https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases"
set "DOWNLOAD_DIR=%USERPROFILE%\.claude\downloads"
REM Use native ARM64 binary on ARM64 Windows, x64 otherwise
if /i "%PROCESSOR_ARCHITECTURE%"=="ARM64" (
set "PLATFORM=win32-arm64"
) else (
set "PLATFORM=win32-x64"
)
REM Create download directory
if not exist "!DOWNLOAD_DIR!" mkdir "!DOWNLOAD_DIR!"
REM Check for curl availability
curl --version >nul 2>&1
if !ERRORLEVEL! neq 0 (
echo curl is required but not available. Please install curl or use PowerShell installer. >&2
exit /b 1
)
REM Always download latest version (which has the most up-to-date installer)
call :download_file "!GCS_BUCKET!/latest" "!DOWNLOAD_DIR!\latest"
if !ERRORLEVEL! neq 0 (
echo Failed to get latest version >&2
exit /b 1
)
REM Read version from file
set /p VERSION=<"!DOWNLOAD_DIR!\latest"
del "!DOWNLOAD_DIR!\latest"
REM Download manifest
call :download_file "!GCS_BUCKET!/!VERSION!/manifest.json" "!DOWNLOAD_DIR!\manifest.json"
if !ERRORLEVEL! neq 0 (
echo Failed to get manifest >&2
exit /b 1
)
REM Extract checksum from manifest
call :parse_manifest "!DOWNLOAD_DIR!\manifest.json" "!PLATFORM!"
if !ERRORLEVEL! neq 0 (
echo Platform !PLATFORM! not found in manifest >&2
del "!DOWNLOAD_DIR!\manifest.json" 2>nul
exit /b 1
)
del "!DOWNLOAD_DIR!\manifest.json"
REM Download binary
set "BINARY_PATH=!DOWNLOAD_DIR!\claude-!VERSION!-!PLATFORM!.exe"
call :download_file "!GCS_BUCKET!/!VERSION!/!PLATFORM!/claude.exe" "!BINARY_PATH!"
if !ERRORLEVEL! neq 0 (
echo Failed to download binary >&2
if exist "!BINARY_PATH!" del "!BINARY_PATH!"
exit /b 1
)
REM Verify checksum
call :verify_checksum "!BINARY_PATH!" "!EXPECTED_CHECKSUM!"
if !ERRORLEVEL! neq 0 (
echo Checksum verification failed >&2
del "!BINARY_PATH!"
exit /b 1
)
REM Run claude install to set up launcher and shell integration
echo Setting up Claude Code...
"!BINARY_PATH!" install "!TARGET!"
set "INSTALL_RESULT=!ERRORLEVEL!"
REM Clean up downloaded file
REM Wait a moment for any file handles to be released
timeout /t 1 /nobreak >nul 2>&1
del /f "!BINARY_PATH!" >nul 2>&1
if exist "!BINARY_PATH!" (
echo Warning: Could not remove temporary file: !BINARY_PATH!
)
if !INSTALL_RESULT! neq 0 (
echo Installation failed >&2
exit /b 1
)
echo.
echo Installation complete^^!
echo.
exit /b 0
REM ============================================================================
REM SUBROUTINES
REM ============================================================================
:download_file
REM Downloads a file using curl
REM Args: %1=URL, %2=OutputPath
set "URL=%~1"
set "OUTPUT=%~2"
curl -fsSL "!URL!" -o "!OUTPUT!"
exit /b !ERRORLEVEL!
:parse_manifest
REM Parse JSON manifest to extract checksum for platform
REM Args: %1=ManifestPath, %2=Platform
set "MANIFEST_PATH=%~1"
set "PLATFORM_NAME=%~2"
set "EXPECTED_CHECKSUM="
REM Use findstr to find platform section, then look for checksum
set "FOUND_PLATFORM="
set "IN_PLATFORM_SECTION="
REM Read the manifest line by line
for /f "usebackq tokens=*" %%i in ("!MANIFEST_PATH!") do (
set "LINE=%%i"
REM Check if this line contains our platform
echo !LINE! | findstr /c:"\"%PLATFORM_NAME%\":" >nul
if !ERRORLEVEL! equ 0 (
set "IN_PLATFORM_SECTION=1"
)
REM If we're in the platform section, look for checksum
if defined IN_PLATFORM_SECTION (
echo !LINE! | findstr /c:"\"checksum\":" >nul
if !ERRORLEVEL! equ 0 (
REM Extract checksum value
for /f "tokens=2 delims=:" %%j in ("!LINE!") do (
set "CHECKSUM_PART=%%j"
REM Remove quotes, whitespace, and comma
set "CHECKSUM_PART=!CHECKSUM_PART: =!"
set "CHECKSUM_PART=!CHECKSUM_PART:"=!"
set "CHECKSUM_PART=!CHECKSUM_PART:,=!"
REM Check if it looks like a SHA256 (64 hex chars)
if not "!CHECKSUM_PART!"=="" (
call :check_length "!CHECKSUM_PART!" 64
if !ERRORLEVEL! equ 0 (
set "EXPECTED_CHECKSUM=!CHECKSUM_PART!"
exit /b 0
)
)
)
)
REM Check if we've left the platform section (closing brace)
echo !LINE! | findstr /c:"}" >nul
if !ERRORLEVEL! equ 0 set "IN_PLATFORM_SECTION="
)
)
if "!EXPECTED_CHECKSUM!"=="" exit /b 1
exit /b 0
:check_length
REM Check if string length equals expected length
REM Args: %1=String, %2=ExpectedLength
set "STR=%~1"
set "EXPECTED_LEN=%~2"
set "LEN=0"
:count_loop
if "!STR:~%LEN%,1!"=="" goto :count_done
set /a LEN+=1
goto :count_loop
:count_done
if %LEN%==%EXPECTED_LEN% exit /b 0
exit /b 1
:verify_checksum
REM Verify file checksum using certutil
REM Args: %1=FilePath, %2=ExpectedChecksum
set "FILE_PATH=%~1"
set "EXPECTED=%~2"
for /f "skip=1 tokens=*" %%i in ('certutil -hashfile "!FILE_PATH!" SHA256') do (
set "ACTUAL=%%i"
set "ACTUAL=!ACTUAL: =!"
if "!ACTUAL!"=="CertUtil:Thecommandcompletedsuccessfully." goto :verify_done
if "!ACTUAL!" neq "" (
if /i "!ACTUAL!"=="!EXPECTED!" (
exit /b 0
) else (
exit /b 1
)
)
)
:verify_done
exit /b 1

2
package-lock.json generated
View File

@@ -49,7 +49,7 @@
"ts-node": "^10.9.2",
"tsx": "^4.21.0",
"tw-animate-css": "^1.4.0",
"typescript": "^5"
"typescript": "5.9.3"
}
},
"node_modules/@alloc/quick-lru": {

View File

@@ -58,6 +58,6 @@
"ts-node": "^10.9.2",
"tsx": "^4.21.0",
"tw-animate-css": "^1.4.0",
"typescript": "^5"
"typescript": "5.9.3"
}
}

View File

@@ -0,0 +1,5 @@
-- Promote all existing users to ADMIN (self-hosted: every user is an admin)
UPDATE "User" SET "role" = 'ADMIN' WHERE "role" = 'USER';
-- Change the default role for new users to ADMIN
ALTER TABLE "User" ALTER COLUMN "role" SET DEFAULT 'ADMIN';

View File

@@ -0,0 +1,3 @@
-- Change the default for new channels to disabled (isActive = false).
-- Existing channels are not affected — admins can manually enable/disable them.
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;

View File

@@ -0,0 +1,21 @@
-- CreateTable
CREATE TABLE "invite_codes" (
"id" TEXT NOT NULL,
"code" VARCHAR(32) NOT NULL,
"maxUses" INTEGER NOT NULL DEFAULT 1,
"uses" INTEGER NOT NULL DEFAULT 0,
"expiresAt" TIMESTAMP(3),
"createdBy" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "invite_codes_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "invite_codes_code_key" ON "invite_codes"("code");
-- CreateIndex
CREATE INDEX "invite_codes_code_idx" ON "invite_codes"("code");
-- AddForeignKey
ALTER TABLE "invite_codes" ADD CONSTRAINT "invite_codes_createdBy_fkey" FOREIGN KEY ("createdBy") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -0,0 +1,3 @@
-- AlterEnum
ALTER TYPE "ArchiveType" ADD VALUE 'SEVEN_Z';
ALTER TYPE "ArchiveType" ADD VALUE 'DOCUMENT';

View File

@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "telegram_channels" ADD COLUMN "category" VARCHAR(64);
-- CreateIndex
CREATE INDEX "telegram_channels_category_idx" ON "telegram_channels"("category");

View File

@@ -0,0 +1,32 @@
-- CreateEnum
CREATE TYPE "ExtractStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
-- AlterTable
ALTER TABLE "User" ADD COLUMN "usedInviteId" TEXT;
-- CreateTable
CREATE TABLE "archive_extract_requests" (
"id" TEXT NOT NULL,
"packageId" TEXT NOT NULL,
"filePath" VARCHAR(1024) NOT NULL,
"status" "ExtractStatus" NOT NULL DEFAULT 'PENDING',
"imageData" BYTEA,
"contentType" VARCHAR(64),
"error" TEXT,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,
CONSTRAINT "archive_extract_requests_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "archive_extract_requests_packageId_filePath_idx" ON "archive_extract_requests"("packageId", "filePath");
-- CreateIndex
CREATE INDEX "archive_extract_requests_status_idx" ON "archive_extract_requests"("status");
-- AddForeignKey
ALTER TABLE "User" ADD CONSTRAINT "User_usedInviteId_fkey" FOREIGN KEY ("usedInviteId") REFERENCES "invite_codes"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "archive_extract_requests" ADD CONSTRAINT "archive_extract_requests_packageId_fkey" FOREIGN KEY ("packageId") REFERENCES "packages"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -22,7 +22,7 @@ model User {
emailVerified DateTime?
image String?
hashedPassword String?
role Role @default(USER)
role Role @default(ADMIN)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@ -38,6 +38,9 @@ model User {
tags Tag[]
settings UserSettings?
telegramLink TelegramLink?
inviteCodes InviteCode[] @relation("InviteCreator")
usedInvite InviteCode? @relation("InviteUser", fields: [usedInviteId], references: [id], onDelete: SetNull)
usedInviteId String?
}
model Account {
@@ -376,6 +379,8 @@ enum ChannelRole {
enum ArchiveType {
ZIP
RAR
SEVEN_Z
DOCUMENT
}
enum IngestionStatus {
@@ -417,7 +422,8 @@ model TelegramChannel {
title String
type ChannelType
isForum Boolean @default(false)
isActive Boolean @default(true)
isActive Boolean @default(false)
category String? @db.VarChar(64)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@ -425,6 +431,7 @@ model TelegramChannel {
packages Package[]
@@index([type, isActive])
@@index([category])
@@map("telegram_channels")
}
@@ -471,6 +478,7 @@ model Package {
ingestionRun IngestionRun? @relation(fields: [ingestionRunId], references: [id])
ingestionRunId String?
sendRequests BotSendRequest[]
extractRequests ArchiveExtractRequest[]
@@index([sourceChannelId])
@@index([destChannelId])
@@ -554,6 +562,22 @@ model GlobalSetting {
@@map("global_settings")
}
model InviteCode {
id String @id @default(cuid())
code String @unique @db.VarChar(32)
maxUses Int @default(1)
uses Int @default(0)
expiresAt DateTime?
createdBy String
createdAt DateTime @default(now())
creator User @relation("InviteCreator", fields: [createdBy], references: [id], onDelete: Cascade)
usedBy User[] @relation("InviteUser")
@@index([code])
@@map("invite_codes")
}
model ChannelFetchRequest {
id String @id @default(cuid())
accountId String
@@ -626,3 +650,35 @@ model BotSubscription {
@@index([telegramUserId])
@@map("bot_subscriptions")
}
// ───────────────────────────────────────
// Archive image extraction (worker-mediated)
// ───────────────────────────────────────
enum ExtractStatus {
PENDING
IN_PROGRESS
COMPLETED
FAILED
}
/// A request for the worker to extract an image from an archive.
/// The web app creates this, sends a pg_notify, and the worker
/// downloads the archive, extracts the file, and writes the result.
model ArchiveExtractRequest {
id String @id @default(cuid())
packageId String
filePath String @db.VarChar(1024) // path within archive to extract
status ExtractStatus @default(PENDING)
imageData Bytes? // extracted image bytes (JPEG/PNG/WebP)
contentType String? @db.VarChar(64) // MIME type of extracted image
error String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
package Package @relation(fields: [packageId], references: [id], onDelete: Cascade)
@@index([packageId, filePath])
@@index([status])
@@map("archive_extract_requests")
}

View File

@@ -0,0 +1,415 @@
"use client";
import { useState, useTransition } from "react";
import { Copy, Link2, Plus, Trash2 } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { Switch } from "@/components/ui/switch";
import {
Table,
TableBody,
TableCell,
TableHead,
TableHeader,
TableRow,
} from "@/components/ui/table";
import { Badge } from "@/components/ui/badge";
import {
AlertDialog,
AlertDialogAction,
AlertDialogCancel,
AlertDialogContent,
AlertDialogDescription,
AlertDialogFooter,
AlertDialogHeader,
AlertDialogTitle,
AlertDialogTrigger,
} from "@/components/ui/alert-dialog";
import {
Tooltip,
TooltipContent,
TooltipTrigger,
} from "@/components/ui/tooltip";
import { createInviteCode, createBulkInviteCodes, deleteInviteCode } from "../actions";
type InviteUser = {
id: string;
name: string | null;
email: string | null;
createdAt: string;
};
type InviteCode = {
id: string;
code: string;
maxUses: number;
uses: number;
expiresAt: string | null;
createdAt: string;
creator: { name: string | null };
usedBy: InviteUser[];
};
export function InviteManager({
inviteCodes,
appUrl,
}: {
inviteCodes: InviteCode[];
appUrl: string;
}) {
const [maxUses, setMaxUses] = useState(1);
const [expiresInDays, setExpiresInDays] = useState(7);
const [noExpiry, setNoExpiry] = useState(false);
const [bulkCount, setBulkCount] = useState(5);
const [isPending, startTransition] = useTransition();
const [copiedId, setCopiedId] = useState<string | null>(null);
const [copiedType, setCopiedType] = useState<"code" | "link" | null>(null);
function handleCreate() {
startTransition(async () => {
await createInviteCode({
maxUses,
expiresInDays: noExpiry ? null : expiresInDays,
});
});
}
function handleBulkCreate() {
startTransition(async () => {
await createBulkInviteCodes({
count: bulkCount,
maxUses,
expiresInDays: noExpiry ? null : expiresInDays,
});
});
}
function handleDelete(id: string) {
startTransition(async () => {
await deleteInviteCode(id);
});
}
function copyToClipboard(text: string, id: string, type: "code" | "link") {
navigator.clipboard.writeText(text);
setCopiedId(id);
setCopiedType(type);
setTimeout(() => {
setCopiedId(null);
setCopiedType(null);
}, 2000);
}
function getStatus(invite: InviteCode): "active" | "used" | "expired" {
if (invite.uses >= invite.maxUses) return "used";
if (invite.expiresAt && new Date(invite.expiresAt) < new Date()) return "expired";
return "active";
}
function formatRelativeDate(dateStr: string) {
const date = new Date(dateStr);
const now = new Date();
const diffMs = date.getTime() - now.getTime();
const diffDays = Math.ceil(diffMs / (1000 * 60 * 60 * 24));
if (diffDays < 0) return "Expired";
if (diffDays === 0) return "Today";
if (diffDays === 1) return "Tomorrow";
return `${diffDays} days`;
}
const activeCount = inviteCodes.filter((i) => getStatus(i) === "active").length;
const usedCount = inviteCodes.filter((i) => getStatus(i) === "used").length;
return (
<div className="max-w-5xl space-y-6">
{/* Create Card */}
<Card>
<CardHeader>
<CardTitle>Generate Invite Codes</CardTitle>
<CardDescription>
Create single or bulk invite codes to share with new users
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex flex-wrap items-end gap-4">
<div className="space-y-2">
<Label htmlFor="maxUses">Max Uses</Label>
<Input
id="maxUses"
type="number"
min={1}
max={100}
value={maxUses}
onChange={(e) => setMaxUses(Number(e.target.value))}
className="w-24"
/>
</div>
<div className="space-y-2">
<Label htmlFor="expiresInDays">Expires in (days)</Label>
<Input
id="expiresInDays"
type="number"
min={1}
max={365}
value={expiresInDays}
onChange={(e) => setExpiresInDays(Number(e.target.value))}
disabled={noExpiry}
className="w-24"
/>
</div>
<div className="flex items-center gap-2 pb-1">
<Switch
id="noExpiry"
checked={noExpiry}
onCheckedChange={setNoExpiry}
/>
<Label htmlFor="noExpiry" className="text-sm">
No expiry
</Label>
</div>
</div>
<div className="flex flex-wrap items-end gap-3 border-t pt-4">
<Button onClick={handleCreate} disabled={isPending}>
<Plus className="mr-2 h-4 w-4" />
{isPending ? "Creating..." : "Create One"}
</Button>
<div className="flex items-end gap-2">
<div className="space-y-2">
<Label htmlFor="bulkCount">Count</Label>
<Input
id="bulkCount"
type="number"
min={2}
max={25}
value={bulkCount}
onChange={(e) => setBulkCount(Number(e.target.value))}
className="w-20"
/>
</div>
<Button
variant="secondary"
onClick={handleBulkCreate}
disabled={isPending}
>
<Plus className="mr-2 h-4 w-4" />
{isPending ? "Creating..." : `Create ${bulkCount}`}
</Button>
</div>
</div>
</CardContent>
</Card>
{/* Codes Table */}
<Card>
<CardHeader>
<CardTitle>Invite Codes</CardTitle>
<CardDescription>
{inviteCodes.length} total &middot; {activeCount} active &middot; {usedCount} fully used
</CardDescription>
</CardHeader>
<CardContent>
{inviteCodes.length === 0 ? (
<p className="text-sm text-muted-foreground">
No invite codes yet. Create one above.
</p>
) : (
<Table>
<TableHeader>
<TableRow>
<TableHead>Code</TableHead>
<TableHead>Status</TableHead>
<TableHead>Uses</TableHead>
<TableHead>Redeemed By</TableHead>
<TableHead>Expires</TableHead>
<TableHead>Created</TableHead>
<TableHead className="text-right">Actions</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{inviteCodes.map((invite) => {
const status = getStatus(invite);
const isCopiedCode =
copiedId === invite.id && copiedType === "code";
const isCopiedLink =
copiedId === invite.id && copiedType === "link";
return (
<TableRow key={invite.id}>
<TableCell className="font-mono text-sm">
{invite.code}
</TableCell>
<TableCell>
<Badge
variant={
status === "active"
? "default"
: status === "used"
? "secondary"
: "destructive"
}
>
{status}
</Badge>
</TableCell>
<TableCell>
{invite.uses} / {invite.maxUses}
</TableCell>
<TableCell>
{invite.usedBy.length === 0 ? (
<span className="text-muted-foreground">--</span>
) : (
<div className="space-y-0.5">
{invite.usedBy.map((user) => (
<Tooltip key={user.id}>
<TooltipTrigger asChild>
<div className="text-sm cursor-default">
{user.name ?? user.email ?? "Unknown"}
</div>
</TooltipTrigger>
<TooltipContent>
<div className="text-xs">
{user.email && <div>{user.email}</div>}
<div>
Joined{" "}
{new Date(user.createdAt).toLocaleDateString()}
</div>
</div>
</TooltipContent>
</Tooltip>
))}
</div>
)}
</TableCell>
<TableCell>
{invite.expiresAt ? (
<Tooltip>
<TooltipTrigger asChild>
<span className="cursor-default">
{formatRelativeDate(invite.expiresAt)}
</span>
</TooltipTrigger>
<TooltipContent>
{new Date(invite.expiresAt).toLocaleString()}
</TooltipContent>
</Tooltip>
) : (
<span className="text-muted-foreground">Never</span>
)}
</TableCell>
<TableCell>
<Tooltip>
<TooltipTrigger asChild>
<span className="cursor-default">
{new Date(invite.createdAt).toLocaleDateString()}
</span>
</TooltipTrigger>
<TooltipContent>
by {invite.creator.name ?? "Unknown"}
</TooltipContent>
</Tooltip>
</TableCell>
<TableCell className="text-right">
<div className="flex justify-end gap-1">
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="outline"
size="sm"
onClick={() =>
copyToClipboard(
invite.code,
invite.id,
"code"
)
}
>
<Copy className="h-3 w-3" />
{isCopiedCode && (
<span className="ml-1">Copied!</span>
)}
</Button>
</TooltipTrigger>
<TooltipContent>Copy code</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="outline"
size="sm"
onClick={() =>
copyToClipboard(
`${appUrl}/register?code=${invite.code}`,
invite.id,
"link"
)
}
disabled={status !== "active"}
>
<Link2 className="h-3 w-3" />
{isCopiedLink && (
<span className="ml-1">Copied!</span>
)}
</Button>
</TooltipTrigger>
<TooltipContent>Copy registration link</TooltipContent>
</Tooltip>
<AlertDialog>
<Tooltip>
<TooltipTrigger asChild>
<AlertDialogTrigger asChild>
<Button
variant="destructive"
size="sm"
disabled={isPending}
>
<Trash2 className="h-3 w-3" />
</Button>
</AlertDialogTrigger>
</TooltipTrigger>
<TooltipContent>Delete code</TooltipContent>
</Tooltip>
<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>
Delete invite code?
</AlertDialogTitle>
<AlertDialogDescription>
This will permanently delete the invite code{" "}
<span className="font-mono font-semibold">
{invite.code}
</span>
.{" "}
{status === "active" &&
"Anyone with this code will no longer be able to register."}
</AlertDialogDescription>
</AlertDialogHeader>
<AlertDialogFooter>
<AlertDialogCancel>Cancel</AlertDialogCancel>
<AlertDialogAction
onClick={() => handleDelete(invite.id)}
>
Delete
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>
</AlertDialog>
</div>
</TableCell>
</TableRow>
);
})}
</TableBody>
</Table>
)}
</CardContent>
</Card>
</div>
);
}

View File

@@ -0,0 +1,96 @@
"use server";
import crypto from "crypto";
import { auth } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
import type { ActionResult } from "@/types/api.types";
import { revalidatePath } from "next/cache";
export async function createInviteCode(input: {
maxUses: number;
expiresInDays: number | null;
}): Promise<ActionResult<{ code: string }>> {
const session = await auth();
if (!session?.user?.id || session.user.role !== "ADMIN") {
return { success: false, error: "Unauthorized" };
}
const code = crypto.randomBytes(6).toString("hex");
const expiresAt = input.expiresInDays
? new Date(Date.now() + input.expiresInDays * 24 * 60 * 60 * 1000)
: null;
await prisma.inviteCode.create({
data: {
code,
maxUses: input.maxUses,
expiresAt,
createdBy: session.user.id,
},
});
revalidatePath("/invites");
return { success: true, data: { code } };
}
export async function createBulkInviteCodes(input: {
count: number;
maxUses: number;
expiresInDays: number | null;
}): Promise<ActionResult<{ codes: string[] }>> {
const session = await auth();
if (!session?.user?.id || session.user.role !== "ADMIN") {
return { success: false, error: "Unauthorized" };
}
if (input.count < 1 || input.count > 25) {
return { success: false, error: "Can generate between 1 and 25 codes at a time" };
}
const expiresAt = input.expiresInDays
? new Date(Date.now() + input.expiresInDays * 24 * 60 * 60 * 1000)
: null;
const codes: string[] = [];
await prisma.$transaction(async (tx) => {
for (let i = 0; i < input.count; i++) {
const code = crypto.randomBytes(6).toString("hex");
codes.push(code);
await tx.inviteCode.create({
data: {
code,
maxUses: input.maxUses,
expiresAt,
createdBy: session.user.id,
},
});
}
});
revalidatePath("/invites");
return { success: true, data: { codes } };
}
export async function deleteInviteCode(id: string): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id || session.user.role !== "ADMIN") {
return { success: false, error: "Unauthorized" };
}
await prisma.inviteCode.delete({ where: { id } });
revalidatePath("/invites");
return { success: true, data: undefined };
}
export async function getInviteCodes() {
const codes = await prisma.inviteCode.findMany({
orderBy: { createdAt: "desc" },
include: {
creator: { select: { name: true } },
usedBy: { select: { id: true, name: true, email: true, createdAt: true } },
},
});
return codes;
}

View File

@@ -0,0 +1,26 @@
import { auth } from "@/lib/auth";
import { redirect } from "next/navigation";
import { PageHeader } from "@/components/shared/page-header";
import { getInviteCodes } from "./actions";
import { InviteManager } from "./_components/invite-manager";
export default async function InvitesPage() {
const session = await auth();
if (!session?.user?.id) redirect("/login");
if (session.user.role !== "ADMIN") redirect("/dashboard");
const inviteCodes = await getInviteCodes();
return (
<div className="space-y-6">
<PageHeader
title="Invite Codes"
description="Manage invite codes for new user registration"
/>
<InviteManager
inviteCodes={JSON.parse(JSON.stringify(inviteCodes))}
appUrl={process.env.NEXT_PUBLIC_APP_URL ?? ""}
/>
</div>
);
}

View File

@@ -0,0 +1,399 @@
"use client";
import { useEffect, useState, useCallback, useRef, useTransition } from "react";
import {
Image as ImageIcon,
Loader2,
Check,
AlertCircle,
ImageOff,
} from "lucide-react";
import {
Dialog,
DialogContent,
DialogHeader,
DialogTitle,
DialogDescription,
} from "@/components/ui/dialog";
import { ScrollArea } from "@/components/ui/scroll-area";
import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { setPreviewFromExtract } from "../actions";
interface ArchiveImage {
id: string;
path: string;
fileName: string;
extension: string | null;
size: string;
}
interface ThumbnailState {
status: "idle" | "loading" | "loaded" | "failed";
requestId?: string;
imageUrl?: string;
error?: string;
}
interface ArchivePreviewPickerProps {
packageId: string;
packageName: string;
open: boolean;
onOpenChange: (open: boolean) => void;
onPreviewSet?: () => void;
}
function formatBytes(bytesStr: string): string {
const bytes = Number(bytesStr);
if (bytes === 0) return "0 B";
const k = 1024;
const sizes = ["B", "KB", "MB", "GB"];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return `${parseFloat((bytes / Math.pow(k, i)).toFixed(1))} ${sizes[i]}`;
}
export function ArchivePreviewPicker({
packageId,
packageName,
open,
onOpenChange,
onPreviewSet,
}: ArchivePreviewPickerProps) {
const [images, setImages] = useState<ArchiveImage[]>([]);
const [loading, setLoading] = useState(false);
const [thumbnails, setThumbnails] = useState<Map<string, ThumbnailState>>(new Map());
const [selectedPath, setSelectedPath] = useState<string | null>(null);
const [isPending, startTransition] = useTransition();
const pollTimers = useRef<Map<string, ReturnType<typeof setInterval>>>(new Map());
// Track which paths have already been requested to avoid re-requesting
const requestedPaths = useRef<Set<string>>(new Set());
// Cleanup poll timers on unmount
useEffect(() => {
return () => {
for (const timer of pollTimers.current.values()) {
clearInterval(timer);
}
};
}, []);
// Fetch image list when opened
useEffect(() => {
if (!open) return;
setImages([]);
setThumbnails(new Map());
setSelectedPath(null);
requestedPaths.current.clear();
// Clear any leftover poll timers
for (const timer of pollTimers.current.values()) {
clearInterval(timer);
}
pollTimers.current.clear();
const fetchImages = async () => {
setLoading(true);
try {
const res = await fetch(`/api/zips/${packageId}/images`);
if (!res.ok) throw new Error("Failed to fetch images");
const data = await res.json();
setImages(data.images);
} catch {
toast.error("Failed to load archive images");
} finally {
setLoading(false);
}
};
fetchImages();
}, [open, packageId]);
// Poll callback for a specific request
const startPolling = useCallback(
(filePath: string, requestId: string) => {
// Clear any existing poll for this path
const existing = pollTimers.current.get(filePath);
if (existing) clearInterval(existing);
const pollId = setInterval(async () => {
try {
const pollRes = await fetch(
`/api/zips/${packageId}/extract/${requestId}`
);
if (!pollRes.ok) return;
const pollData = await pollRes.json();
if (pollData.status === "COMPLETED") {
clearInterval(pollId);
pollTimers.current.delete(filePath);
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, {
status: "loaded",
requestId,
imageUrl: `/api/zips/${packageId}/extract/${requestId}?image=true`,
});
return next;
});
} else if (pollData.status === "FAILED") {
clearInterval(pollId);
pollTimers.current.delete(filePath);
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, {
status: "failed",
error: pollData.error || "Extraction failed",
});
return next;
});
}
} catch {
// Silently retry on network error
}
}, 2000);
pollTimers.current.set(filePath, pollId);
},
[packageId]
);
// Request extraction for a specific image
const requestThumbnail = useCallback(
async (filePath: string) => {
// Don't re-request if already in progress
if (requestedPaths.current.has(filePath)) return;
requestedPaths.current.add(filePath);
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, { status: "loading" });
return next;
});
try {
const res = await fetch(`/api/zips/${packageId}/extract`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ filePath }),
});
if (!res.ok) {
const err = await res.json();
throw new Error(err.error || "Extract failed");
}
const data = await res.json();
if (data.status === "COMPLETED") {
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, {
status: "loaded",
requestId: data.requestId,
imageUrl: `/api/zips/${packageId}/extract/${data.requestId}?image=true`,
});
return next;
});
return;
}
// Pending or in-progress: start polling
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, { status: "loading", requestId: data.requestId });
return next;
});
startPolling(filePath, data.requestId);
} catch (err) {
requestedPaths.current.delete(filePath);
setThumbnails((prev) => {
const next = new Map(prev);
next.set(filePath, {
status: "failed",
error: err instanceof Error ? err.message : "Failed to extract",
});
return next;
});
}
},
[packageId, startPolling]
);
// Auto-request thumbnails for the first batch of images
useEffect(() => {
if (!open || images.length === 0) return;
// Request the first 12 images automatically
const toRequest = images.slice(0, 12);
for (const img of toRequest) {
requestThumbnail(img.path);
}
// Only trigger when images list changes, not on every requestThumbnail change
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [images, open]);
// Handle selection confirmation
const handleConfirm = () => {
if (!selectedPath) return;
const thumbState = thumbnails.get(selectedPath);
if (!thumbState?.requestId) return;
startTransition(async () => {
const result = await setPreviewFromExtract(packageId, thumbState.requestId!);
if (result.success) {
toast.success("Preview updated from archive image");
onOpenChange(false);
onPreviewSet?.();
} else {
toast.error(result.error);
}
});
};
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="sm:max-w-2xl max-h-[80vh] flex flex-col gap-0 p-0">
<DialogHeader className="px-6 pt-6 pb-4 border-b border-border space-y-1">
<DialogTitle>Select Preview Image</DialogTitle>
<DialogDescription className="text-sm">
Choose an image from the archive to use as the preview for{" "}
<span className="font-medium text-foreground">{packageName}</span>
</DialogDescription>
</DialogHeader>
<ScrollArea className="flex-1 min-h-0">
<div className="p-4">
{loading ? (
<div className="flex flex-col items-center justify-center gap-2 py-12">
<Loader2 className="h-5 w-5 animate-spin text-muted-foreground" />
<span className="text-sm text-muted-foreground">
Loading image list...
</span>
</div>
) : images.length === 0 ? (
<div className="flex flex-col items-center justify-center gap-2 py-12">
<ImageOff className="h-6 w-6 text-muted-foreground/50" />
<span className="text-sm text-muted-foreground">
No images found in this archive
</span>
</div>
) : (
<div className="grid grid-cols-3 sm:grid-cols-4 gap-3">
{images.map((img) => {
const thumbState = thumbnails.get(img.path);
const isSelected = selectedPath === img.path;
const isLoaded = thumbState?.status === "loaded";
const isLoading = thumbState?.status === "loading";
const isFailed = thumbState?.status === "failed";
return (
<button
key={img.id}
type="button"
className={cn(
"relative aspect-square rounded-lg overflow-hidden border-2 transition-all",
"hover:border-primary/50 cursor-pointer group",
isSelected
? "border-primary ring-2 ring-primary/30"
: "border-border",
isFailed && "opacity-60"
)}
onClick={() => {
if (isLoaded) {
setSelectedPath(img.path);
} else if (isFailed) {
// Allow retry on failed
requestedPaths.current.delete(img.path);
requestThumbnail(img.path);
} else if (!thumbState || thumbState.status === "idle") {
requestThumbnail(img.path);
}
}}
title={img.path}
>
{isLoaded && thumbState.imageUrl ? (
<img
src={thumbState.imageUrl}
alt={img.fileName}
className="h-full w-full object-cover"
loading="lazy"
/>
) : isLoading ? (
<div className="h-full w-full flex items-center justify-center bg-muted">
<Loader2 className="h-5 w-5 animate-spin text-muted-foreground" />
</div>
) : isFailed ? (
<div className="h-full w-full flex flex-col items-center justify-center bg-muted gap-1">
<AlertCircle className="h-4 w-4 text-destructive" />
<span className="text-[10px] text-destructive px-1 text-center">
Click to retry
</span>
</div>
) : (
<div className="h-full w-full flex items-center justify-center bg-muted">
<ImageIcon className="h-5 w-5 text-muted-foreground" />
</div>
)}
{/* Selection checkmark */}
{isSelected && (
<div className="absolute top-1.5 right-1.5 h-5 w-5 rounded-full bg-primary flex items-center justify-center">
<Check className="h-3 w-3 text-primary-foreground" />
</div>
)}
{/* File info overlay */}
<div className="absolute bottom-0 left-0 right-0 bg-black/60 px-1.5 py-1 opacity-0 group-hover:opacity-100 transition-opacity">
<p className="text-[10px] text-white truncate">
{img.fileName}
</p>
<p className="text-[9px] text-white/70">
{formatBytes(img.size)}
</p>
</div>
</button>
);
})}
</div>
)}
</div>
</ScrollArea>
{/* Footer */}
{images.length > 0 && (
<div className="px-6 py-4 border-t border-border flex items-center justify-between">
<span className="text-sm text-muted-foreground">
{images.length} image{images.length !== 1 ? "s" : ""} found
</span>
<div className="flex gap-2">
<Button
variant="outline"
size="sm"
onClick={() => onOpenChange(false)}
>
Cancel
</Button>
<Button
size="sm"
disabled={!selectedPath || isPending}
onClick={handleConfirm}
>
{isPending ? (
<>
<Loader2 className="h-3.5 w-3.5 animate-spin mr-1" />
Setting...
</>
) : (
"Use as Preview"
)}
</Button>
</div>
</div>
)}
</DialogContent>
</Dialog>
);
}

View File

@@ -1,7 +1,7 @@
"use client";
import { type ColumnDef } from "@tanstack/react-table";
import { FileArchive, Eye, ImageIcon } from "lucide-react";
import { FileArchive, Eye, Pencil } from "lucide-react";
import { DataTableColumnHeader } from "@/components/shared/data-table-column-header";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
@@ -12,7 +12,7 @@ export interface PackageRow {
fileName: string;
fileSize: string;
contentHash: string;
archiveType: "ZIP" | "RAR";
archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT";
fileCount: number;
isMultipart: boolean;
hasPreview: boolean;
@@ -26,6 +26,7 @@ export interface PackageRow {
interface PackageColumnsProps {
onViewFiles: (pkg: PackageRow) => void;
onSetCreator: (pkg: PackageRow) => void;
}
function formatBytes(bytesStr: string): string {
@@ -57,6 +58,7 @@ function PreviewCell({ pkg }: { pkg: PackageRow }) {
export function getPackageColumns({
onViewFiles,
onSetCreator,
}: PackageColumnsProps): ColumnDef<PackageRow, unknown>[] {
return [
{
@@ -113,9 +115,13 @@ export function getPackageColumns({
accessorKey: "creator",
header: ({ column }) => <DataTableColumnHeader column={column} title="Creator" />,
cell: ({ row }) => (
<span className="text-sm text-muted-foreground truncate max-w-[160px] block">
{row.original.creator ?? "\u2014"}
</span>
<button
className="text-sm text-muted-foreground truncate max-w-[160px] block hover:text-foreground hover:underline cursor-pointer text-left"
onClick={() => onSetCreator(row.original)}
title="Click to edit creator"
>
{row.original.creator || "\u2014"}
</button>
),
},
{

View File

@@ -1,6 +1,7 @@
"use client";
import { useEffect, useState, useCallback, useMemo } from "react";
import { useEffect, useState, useCallback, useMemo, useRef } from "react";
import { toast } from "sonner";
import {
FileText,
Folder,
@@ -9,6 +10,9 @@ import {
Search,
ChevronDown,
ChevronRight,
Upload,
ImagePlus,
Images,
} from "lucide-react";
import {
Dialog,
@@ -24,6 +28,8 @@ import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
import type { PackageRow } from "./package-columns";
import { SendToTelegramButton } from "./send-to-telegram-button";
import { uploadPackagePreview } from "../actions";
import { ArchivePreviewPicker } from "./archive-preview-picker";
interface FileItem {
id: string;
@@ -224,6 +230,46 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
const [loadingMore, setLoadingMore] = useState(false);
const [search, setSearch] = useState("");
const [page, setPage] = useState(1);
const [uploading, setUploading] = useState(false);
const [localPreviewUrl, setLocalPreviewUrl] = useState<string | null>(null);
const [showPreviewPicker, setShowPreviewPicker] = useState(false);
const fileInputRef = useRef<HTMLInputElement>(null);
const handlePreviewUpload = useCallback(
async (e: React.ChangeEvent<HTMLInputElement>) => {
const file = e.target.files?.[0];
if (!file || !pkg) return;
// Reset file input so the same file can be re-selected
e.target.value = "";
setUploading(true);
try {
const formData = new FormData();
formData.append("file", file);
const result = await uploadPackagePreview(pkg.id, formData);
if (result.success) {
toast.success("Preview image uploaded");
// Show uploaded image immediately via local object URL
setLocalPreviewUrl(URL.createObjectURL(file));
} else {
toast.error(result.error);
}
} catch {
toast.error("Failed to upload preview image");
} finally {
setUploading(false);
}
},
[pkg]
);
// Clean up local preview URL when drawer closes or package changes
useEffect(() => {
return () => {
if (localPreviewUrl) URL.revokeObjectURL(localPreviewUrl);
};
}, [localPreviewUrl]);
const fetchFiles = useCallback(
async (pageNum: number, append: boolean) => {
@@ -258,6 +304,7 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
setTotal(0);
setSearch("");
setPage(1);
setLocalPreviewUrl(null);
fetchFiles(1, false);
}
}, [open, pkg, fetchFiles]);
@@ -293,12 +340,49 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
<DialogHeader className="px-6 pt-6 pb-4 border-b border-border space-y-3">
{/* Preview image + title row */}
<div className="flex gap-4">
{pkg?.hasPreview && (
<img
src={`/api/zips/${pkg.id}/preview`}
alt=""
className="h-20 w-20 rounded-lg object-cover bg-muted shrink-0"
{/* Preview image area with upload capability */}
<input
ref={fileInputRef}
type="file"
accept="image/jpeg,image/png,image/webp"
className="hidden"
onChange={handlePreviewUpload}
/>
{(pkg?.hasPreview || localPreviewUrl) ? (
<button
type="button"
className="relative group h-20 w-20 shrink-0 rounded-lg overflow-hidden bg-muted"
onClick={() => fileInputRef.current?.click()}
disabled={uploading}
title="Click to replace preview image"
>
<img
src={localPreviewUrl ?? `/api/zips/${pkg!.id}/preview`}
alt=""
className="h-full w-full object-cover"
/>
<div className="absolute inset-0 bg-black/50 opacity-0 group-hover:opacity-100 transition-opacity flex items-center justify-center">
{uploading ? (
<Loader2 className="h-5 w-5 text-white animate-spin" />
) : (
<Upload className="h-5 w-5 text-white" />
)}
</div>
</button>
) : (
<button
type="button"
className="flex h-20 w-20 shrink-0 items-center justify-center rounded-lg border border-dashed border-muted-foreground/30 bg-muted/50 hover:bg-muted hover:border-muted-foreground/50 transition-colors cursor-pointer"
onClick={() => fileInputRef.current?.click()}
disabled={uploading}
title="Upload preview image"
>
{uploading ? (
<Loader2 className="h-5 w-5 text-muted-foreground animate-spin" />
) : (
<ImagePlus className="h-5 w-5 text-muted-foreground" />
)}
</button>
)}
<div className="min-w-0 flex-1">
<DialogTitle className="truncate pr-8">
@@ -308,11 +392,22 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
{total.toLocaleString()} file{total !== 1 ? "s" : ""} in archive
</DialogDescription>
{pkg && (
<div className="mt-2">
<div className="mt-2 flex items-center gap-2">
<SendToTelegramButton
packageId={pkg.id}
packageName={pkg.fileName}
/>
{pkg.archiveType !== "DOCUMENT" && !pkg.isMultipart && (
<Button
variant="outline"
size="sm"
className="h-8 gap-1.5 text-xs"
onClick={() => setShowPreviewPicker(true)}
>
<Images className="h-3.5 w-3.5" />
Pick Preview
</Button>
)}
</div>
)}
</div>
@@ -416,6 +511,20 @@ export function PackageFilesDrawer({ pkg, open, onOpenChange }: PackageFilesDraw
</div>
</ScrollArea>
</DialogContent>
{/* Archive preview picker modal */}
{pkg && pkg.archiveType !== "DOCUMENT" && !pkg.isMultipart && (
<ArchivePreviewPicker
packageId={pkg.id}
packageName={pkg.fileName}
open={showPreviewPicker}
onOpenChange={setShowPreviewPicker}
onPreviewSet={() => {
// Refresh the preview by setting a cache-busting URL
setLocalPreviewUrl(`/api/zips/${pkg.id}/preview?t=${Date.now()}`);
}}
/>
)}
</Dialog>
);
}

View File

@@ -1,7 +1,8 @@
"use client";
import { useState, useCallback } from "react";
import { useState, useCallback, useTransition } from "react";
import { useRouter, usePathname, useSearchParams } from "next/navigation";
import { toast } from "sonner";
import { Search, FileBox } from "lucide-react";
import { useDataTable } from "@/hooks/use-data-table";
import { getPackageColumns, type PackageRow } from "./package-columns";
@@ -13,6 +14,7 @@ import { DataTableViewOptions } from "@/components/shared/data-table-view-option
import { PageHeader } from "@/components/shared/page-header";
import { Input } from "@/components/ui/input";
import type { IngestionAccountStatus } from "@/lib/telegram/types";
import { updatePackageCreator } from "../actions";
interface StlTableProps {
data: PackageRow[];
@@ -33,6 +35,7 @@ export function StlTable({
const [searchValue, setSearchValue] = useState(searchParams.get("search") ?? "");
const [viewPkg, setViewPkg] = useState<PackageRow | null>(null);
const [, startTransition] = useTransition();
const updateSearch = useCallback(
(value: string) => {
@@ -51,6 +54,19 @@ export function StlTable({
const columns = getPackageColumns({
onViewFiles: (pkg) => setViewPkg(pkg),
onSetCreator: (pkg) => {
const value = prompt("Enter creator name:", pkg.creator ?? "");
if (value === null) return;
startTransition(async () => {
const result = await updatePackageCreator(pkg.id, value || null);
if (result.success) {
toast.success(value ? `Creator set to "${value}"` : "Creator removed");
router.refresh();
} else {
toast.error(result.error);
}
});
},
});
const { table } = useDataTable({ data, columns, pageCount });

View File

@@ -0,0 +1,137 @@
"use server";
import { auth } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
import type { ActionResult } from "@/types/api.types";
import { revalidatePath } from "next/cache";
const ALLOWED_IMAGE_TYPES = [
"image/jpeg",
"image/png",
"image/webp",
] as const;
const MAX_IMAGE_SIZE = 2 * 1024 * 1024; // 2 MB
export async function updatePackageCreator(
packageId: string,
creator: string | null
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
await prisma.package.update({
where: { id: packageId },
data: { creator: creator?.trim() || null },
});
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to update creator" };
}
}
export async function uploadPackagePreview(
packageId: string,
formData: FormData
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
const file = formData.get("file");
if (!(file instanceof File)) {
return { success: false, error: "No file provided" };
}
if (!ALLOWED_IMAGE_TYPES.includes(file.type as (typeof ALLOWED_IMAGE_TYPES)[number])) {
return { success: false, error: "Only JPG, PNG, and WebP images are accepted" };
}
if (file.size > MAX_IMAGE_SIZE) {
return { success: false, error: "Image must be smaller than 2 MB" };
}
try {
const arrayBuffer = await file.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
await prisma.package.update({
where: { id: packageId },
data: {
previewData: buffer,
// Set previewMsgId to 0 as sentinel so hasPreview checks work
previewMsgId: 0n,
},
});
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to upload preview image" };
}
}
export async function bulkSetCreator(
packageIds: string[],
creator: string
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
await prisma.package.updateMany({
where: { id: { in: packageIds } },
data: { creator: creator.trim() },
});
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to update creators" };
}
}
/**
* Set a package's preview from an extracted archive image.
* Reads the image data from a completed ArchiveExtractRequest.
*/
export async function setPreviewFromExtract(
packageId: string,
extractRequestId: string
): Promise<ActionResult> {
const session = await auth();
if (!session?.user?.id) return { success: false, error: "Unauthorized" };
try {
const extractReq = await prisma.archiveExtractRequest.findUnique({
where: { id: extractRequestId },
select: { status: true, imageData: true, packageId: true },
});
if (!extractReq) {
return { success: false, error: "Extract request not found" };
}
if (extractReq.packageId !== packageId) {
return { success: false, error: "Extract request does not belong to this package" };
}
if (extractReq.status !== "COMPLETED" || !extractReq.imageData) {
return { success: false, error: "Image extraction not yet completed" };
}
await prisma.package.update({
where: { id: packageId },
data: {
previewData: extractReq.imageData,
// Set previewMsgId to 0 as sentinel so hasPreview checks work
// (original Telegram-matched previews have the actual message ID)
previewMsgId: 0n,
},
});
revalidatePath("/stls");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to set preview from archive image" };
}
}

View File

@@ -7,6 +7,8 @@ import {
Power,
ArrowDownToLine,
ArrowUpFromLine,
RefreshCcw,
Tag,
} from "lucide-react";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
@@ -23,12 +25,16 @@ interface ChannelColumnsProps {
onToggleActive: (id: string) => void;
onDelete: (id: string) => void;
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
onRescan: (id: string) => void;
onSetCategory: (id: string, category: string | null) => void;
}
export function getChannelColumns({
onToggleActive,
onDelete,
onSetType,
onRescan,
onSetCategory,
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
return [
{
@@ -60,6 +66,18 @@ export function getChannelColumns({
</Badge>
),
},
{
accessorKey: "category",
header: "Category",
cell: ({ row }) => {
const category = row.original.category;
return category ? (
<Badge variant="outline">{category}</Badge>
) : (
<span className="text-xs text-muted-foreground"></span>
);
},
},
{
accessorKey: "isActive",
header: "Status",
@@ -121,6 +139,23 @@ export function getChannelColumns({
Set as Source
</DropdownMenuItem>
)}
{row.original.type === "SOURCE" && (
<DropdownMenuItem
onClick={() => onRescan(row.original.id)}
>
<RefreshCcw className="mr-2 h-3.5 w-3.5" />
Rescan Channel
</DropdownMenuItem>
)}
<DropdownMenuItem
onClick={() => {
const cat = prompt("Enter category (e.g. STL, PDF, D&D, Cosplay):", row.original.category ?? "");
if (cat !== null) onSetCategory(row.original.id, cat || null);
}}
>
<Tag className="mr-2 h-3.5 w-3.5" />
Set Category
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => onToggleActive(row.original.id)}
>

View File

@@ -2,26 +2,39 @@
import { useState, useTransition } from "react";
import { toast } from "sonner";
import { Download, Plus } from "lucide-react";
import { getChannelColumns } from "./channel-columns";
import { DestinationCard } from "./destination-card";
import { ChannelPickerDialog } from "./channel-picker-dialog";
import { JoinChannelDialog } from "./join-channel-dialog";
import {
deleteChannel,
toggleChannelActive,
setChannelType,
setChannelCategory,
rescanChannel,
} from "../actions";
import { DataTable } from "@/components/shared/data-table";
import { DeleteDialog } from "@/components/shared/delete-dialog";
import type { ChannelRow, GlobalDestination } from "@/lib/telegram/admin-queries";
import { Button } from "@/components/ui/button";
import type { AccountRow, ChannelRow, GlobalDestination } from "@/lib/telegram/admin-queries";
import { useDataTable } from "@/hooks/use-data-table";
interface ChannelsTabProps {
channels: ChannelRow[];
globalDestination: GlobalDestination;
accounts: AccountRow[];
}
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsTabProps) {
const [isPending, startTransition] = useTransition();
const [deleteId, setDeleteId] = useState<string | null>(null);
const [rescanId, setRescanId] = useState<string | null>(null);
const [fetchChannelsAccountId, setFetchChannelsAccountId] = useState<string | null>(null);
const [joinDialogOpen, setJoinDialogOpen] = useState(false);
// Find the first authenticated account for "Fetch Channels"
const authenticatedAccounts = accounts.filter((a) => a.authState === "AUTHENTICATED" && a.isActive);
const columns = getChannelColumns({
onToggleActive: (id) => {
@@ -39,6 +52,14 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
else toast.error(result.error);
});
},
onRescan: (id) => setRescanId(id),
onSetCategory: (id, category) => {
startTransition(async () => {
const result = await setChannelCategory(id, category);
if (result.success) toast.success(category ? `Category set to "${category}"` : "Category removed");
else toast.error(result.error);
});
},
});
const { table } = useDataTable({
@@ -60,19 +81,59 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
});
};
const handleRescan = () => {
if (!rescanId) return;
startTransition(async () => {
const result = await rescanChannel(rescanId);
if (result.success) {
toast.success("Channel scan progress reset — it will be fully rescanned on the next sync");
setRescanId(null);
} else {
toast.error(result.error);
}
});
};
const handleFetchChannels = () => {
if (authenticatedAccounts.length > 0) {
setFetchChannelsAccountId(authenticatedAccounts[0].id);
} else {
toast.error("No authenticated accounts available. Add and authenticate an account first.");
}
};
return (
<div className="space-y-4">
<DestinationCard destination={globalDestination} />
<DestinationCard destination={globalDestination} channels={channels} />
<div className="flex items-center gap-2">
<Button
variant="outline"
onClick={handleFetchChannels}
disabled={authenticatedAccounts.length === 0}
>
<Download className="mr-2 h-4 w-4" />
Fetch Channels
</Button>
<Button
variant="outline"
onClick={() => setJoinDialogOpen(true)}
disabled={authenticatedAccounts.length === 0}
>
<Plus className="mr-2 h-4 w-4" />
Add Channel
</Button>
</div>
{channels.length > 0 && (
<p className="text-xs text-muted-foreground">
Source channels are added per-account via the &quot;Fetch Channels&quot; button on the Accounts tab.
Channels discovered via &quot;Fetch Channels&quot; are automatically activated as sources.
</p>
)}
<DataTable
table={table}
emptyMessage="No channels yet. Use &quot;Fetch Channels&quot; on an account to discover and add source channels."
emptyMessage="No channels yet. Click &quot;Fetch Channels&quot; above to discover and add source channels."
/>
<DeleteDialog
@@ -83,6 +144,29 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
onConfirm={handleDelete}
isLoading={isPending}
/>
<DeleteDialog
open={!!rescanId}
onOpenChange={(open) => !open && setRescanId(null)}
title="Rescan Channel"
description="This will reset all scan progress for this channel. On the next sync the worker will re-process every message from the beginning. Packages that are already in the library will be skipped (deduplication by hash), but any missing files will be re-downloaded and re-uploaded. This may take a long time for large channels."
confirmLabel="Rescan"
onConfirm={handleRescan}
isLoading={isPending}
/>
<ChannelPickerDialog
accountId={fetchChannelsAccountId}
open={!!fetchChannelsAccountId}
onOpenChange={(open) => {
if (!open) setFetchChannelsAccountId(null);
}}
/>
<JoinChannelDialog
open={joinDialogOpen}
onOpenChange={setJoinDialogOpen}
/>
</div>
);
}

View File

@@ -1,9 +1,21 @@
"use client";
import { useState, useEffect, useTransition } from "react";
import { Database, AlertTriangle, Link2, Plus, Loader2 } from "lucide-react";
import {
Database,
AlertTriangle,
Link2,
Plus,
Loader2,
ArrowRight,
RefreshCw,
} from "lucide-react";
import { toast } from "sonner";
import { createDestinationViaWorker } from "../actions";
import {
createDestinationViaWorker,
setGlobalDestination,
rebuildPackageDatabase,
} from "../actions";
import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
@@ -17,10 +29,19 @@ import {
DialogTitle,
DialogFooter,
} from "@/components/ui/dialog";
import type { GlobalDestination } from "@/lib/telegram/admin-queries";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
import type { GlobalDestination, ChannelRow } from "@/lib/telegram/admin-queries";
interface DestinationCardProps {
destination: GlobalDestination;
channels?: ChannelRow[];
}
type CreateState =
@@ -29,11 +50,34 @@ type CreateState =
| { phase: "done"; title: string; telegramId: string }
| { phase: "error"; message: string };
export function DestinationCard({ destination }: DestinationCardProps) {
type RebuildState =
| { phase: "idle" }
| { phase: "running"; requestId: string }
| { phase: "done"; created: number; skipped: number; scanned: number }
| { phase: "error"; message: string };
interface RebuildProgress {
status: string;
messagesScanned: number;
documentsFound: number;
packagesCreated: number;
packagesSkipped: number;
error?: string;
}
export function DestinationCard({ destination, channels = [] }: DestinationCardProps) {
const [isPending, startTransition] = useTransition();
const [createOpen, setCreateOpen] = useState(false);
const [title, setTitle] = useState("dragonsstash db");
const [createState, setCreateState] = useState<CreateState>({ phase: "idle" });
const [selectedChannelId, setSelectedChannelId] = useState<string>("");
const [rebuildState, setRebuildState] = useState<RebuildState>({ phase: "idle" });
const [rebuildProgress, setRebuildProgress] = useState<RebuildProgress | null>(null);
// Channels that can be assigned as destination (SOURCE channels only, exclude current destination)
const assignableChannels = channels.filter(
(c) => c.type === "SOURCE" && c.id !== destination?.id
);
// Poll for worker result when creating
useEffect(() => {
@@ -90,6 +134,86 @@ export function DestinationCard({ destination }: DestinationCardProps) {
return () => { mounted = false; };
}, [createState]);
// Poll for rebuild progress
useEffect(() => {
if (rebuildState.phase !== "running") return;
let mounted = true;
const requestId = rebuildState.requestId;
const poll = async () => {
for (let i = 0; i < 300; i++) {
await new Promise((r) => setTimeout(r, 2000));
if (!mounted) return;
try {
const res = await fetch(
`/api/telegram/worker-request?requestId=${requestId}`
);
if (!res.ok) continue;
const data = await res.json();
// Update live progress from resultJson
if (data.result && typeof data.result === "object") {
if (mounted) setRebuildProgress(data.result as RebuildProgress);
}
if (data.status === "COMPLETED" && data.result) {
const result = data.result as RebuildProgress;
if (mounted) {
setRebuildState({
phase: "done",
created: result.packagesCreated,
skipped: result.packagesSkipped,
scanned: result.messagesScanned,
});
setRebuildProgress(null);
toast.success(
`Rebuild complete: ${result.packagesCreated} packages restored, ${result.packagesSkipped} skipped`
);
}
return;
} else if (data.status === "FAILED") {
if (mounted) {
setRebuildState({
phase: "error",
message: data.error || "Rebuild failed",
});
setRebuildProgress(null);
}
return;
}
} catch {
// Network blip — keep polling
}
}
if (mounted) {
setRebuildState({ phase: "error", message: "Timed out waiting for rebuild" });
setRebuildProgress(null);
}
};
poll();
return () => {
mounted = false;
};
}, [rebuildState]);
const handleRebuild = () => {
startTransition(async () => {
const result = await rebuildPackageDatabase();
if (result.success) {
setRebuildState({ phase: "running", requestId: result.data.requestId });
setRebuildProgress(null);
toast.info("Rebuild started — scanning destination channel...");
} else {
toast.error(result.error ?? "Failed to start rebuild");
}
});
};
const handleCreate = () => {
if (!title.trim()) return;
@@ -103,6 +227,21 @@ export function DestinationCard({ destination }: DestinationCardProps) {
});
};
const handleAssignExisting = () => {
if (!selectedChannelId) return;
startTransition(async () => {
const result = await setGlobalDestination(selectedChannelId);
if (result.success) {
toast.success("Channel set as destination!");
setCreateOpen(false);
setSelectedChannelId("");
} else {
toast.error(result.error ?? "Failed to set destination");
}
});
};
const handleOpenChange = (open: boolean) => {
setCreateOpen(open);
if (!open) {
@@ -110,6 +249,7 @@ export function DestinationCard({ destination }: DestinationCardProps) {
if (createState.phase !== "creating") {
setCreateState({ phase: "idle" });
}
setSelectedChannelId("");
}
};
@@ -132,19 +272,23 @@ export function DestinationCard({ destination }: DestinationCardProps) {
</div>
<Button size="sm" onClick={() => setCreateOpen(true)}>
<Plus className="mr-2 h-3.5 w-3.5" />
Create Destination
Set Destination
</Button>
</CardContent>
</Card>
<CreateDestinationDialog
<DestinationDialog
open={createOpen}
onOpenChange={handleOpenChange}
title={title}
setTitle={setTitle}
onSubmit={handleCreate}
onSubmitCreate={handleCreate}
createState={createState}
isPending={isPending}
assignableChannels={assignableChannels}
selectedChannelId={selectedChannelId}
setSelectedChannelId={setSelectedChannelId}
onSubmitAssign={handleAssignExisting}
/>
</>
);
@@ -153,7 +297,8 @@ export function DestinationCard({ destination }: DestinationCardProps) {
return (
<>
<Card>
<CardContent className="flex items-center justify-between gap-4 py-4">
<CardContent className="py-4 space-y-3">
<div className="flex items-center justify-between gap-4">
<div className="flex items-center gap-3">
<Database className="h-5 w-5 text-purple-500 shrink-0" />
<div>
@@ -177,6 +322,21 @@ export function DestinationCard({ destination }: DestinationCardProps) {
</div>
</div>
</div>
<div className="flex items-center gap-2">
<Button
variant="outline"
size="sm"
onClick={handleRebuild}
disabled={isPending || rebuildState.phase === "running"}
title="Scan destination channel and rebuild the package database"
>
{rebuildState.phase === "running" ? (
<Loader2 className="h-3.5 w-3.5 animate-spin mr-1.5" />
) : (
<RefreshCw className="h-3.5 w-3.5 mr-1.5" />
)}
Rebuild DB
</Button>
<Button
variant="outline"
size="sm"
@@ -184,49 +344,124 @@ export function DestinationCard({ destination }: DestinationCardProps) {
>
Change
</Button>
</div>
</div>
{/* Rebuild progress */}
{rebuildState.phase === "running" && rebuildProgress && (
<div className="border-t pt-3">
<div className="flex items-center gap-2">
<Loader2 className="h-3.5 w-3.5 animate-spin text-primary shrink-0" />
<span className="text-xs text-muted-foreground">
Rebuilding package database...
</span>
</div>
<div className="flex items-center gap-4 pl-6 mt-1 text-xs text-muted-foreground">
<span>
<span className="text-foreground tabular-nums">
{rebuildProgress.messagesScanned}
</span>{" "}
messages scanned
</span>
<span>
<span className="text-foreground tabular-nums">
{rebuildProgress.documentsFound}
</span>{" "}
archives found
</span>
<span>
<span className="text-foreground tabular-nums">
{rebuildProgress.packagesCreated}
</span>{" "}
restored
</span>
<span>
<span className="text-foreground tabular-nums">
{rebuildProgress.packagesSkipped}
</span>{" "}
skipped
</span>
</div>
</div>
)}
{rebuildState.phase === "done" && (
<div className="border-t pt-3">
<div className="flex items-center gap-2 text-xs text-emerald-500">
<Database className="h-3.5 w-3.5 shrink-0" />
<span>
Rebuild complete: {rebuildState.created} packages restored,{" "}
{rebuildState.skipped} skipped ({rebuildState.scanned} messages
scanned)
</span>
</div>
</div>
)}
{rebuildState.phase === "error" && (
<div className="border-t pt-3">
<div className="flex items-center gap-2 text-xs text-red-500">
<AlertTriangle className="h-3.5 w-3.5 shrink-0" />
<span>Rebuild failed: {rebuildState.message}</span>
</div>
</div>
)}
</CardContent>
</Card>
<CreateDestinationDialog
<DestinationDialog
open={createOpen}
onOpenChange={handleOpenChange}
title={title}
setTitle={setTitle}
onSubmit={handleCreate}
onSubmitCreate={handleCreate}
createState={createState}
isPending={isPending}
assignableChannels={assignableChannels}
selectedChannelId={selectedChannelId}
setSelectedChannelId={setSelectedChannelId}
onSubmitAssign={handleAssignExisting}
/>
</>
);
}
function CreateDestinationDialog({
function DestinationDialog({
open,
onOpenChange,
title,
setTitle,
onSubmit,
onSubmitCreate,
createState,
isPending,
assignableChannels,
selectedChannelId,
setSelectedChannelId,
onSubmitAssign,
}: {
open: boolean;
onOpenChange: (open: boolean) => void;
title: string;
setTitle: (v: string) => void;
onSubmit: () => void;
onSubmitCreate: () => void;
createState: CreateState;
isPending: boolean;
assignableChannels: ChannelRow[];
selectedChannelId: string;
setSelectedChannelId: (v: string) => void;
onSubmitAssign: () => void;
}) {
const isCreating = createState.phase === "creating";
const hasAssignable = assignableChannels.length > 0;
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="sm:max-w-md">
<DialogHeader>
<DialogTitle>Create Destination Channel</DialogTitle>
<DialogTitle>Set Destination Channel</DialogTitle>
<DialogDescription>
A private Telegram group will be created automatically using one of
your authenticated accounts. All accounts will write archives here.
Choose an existing channel or create a new private group. All
accounts will write archives to this destination.
</DialogDescription>
</DialogHeader>
@@ -241,7 +476,71 @@ function CreateDestinationDialog({
</p>
</div>
) : (
<div className="space-y-4">
<Tabs defaultValue={hasAssignable ? "existing" : "create"} className="w-full">
<TabsList className="grid w-full grid-cols-2">
<TabsTrigger value="existing" disabled={!hasAssignable}>
<ArrowRight className="mr-1.5 h-3.5 w-3.5" />
Use Existing
</TabsTrigger>
<TabsTrigger value="create">
<Plus className="mr-1.5 h-3.5 w-3.5" />
Create New
</TabsTrigger>
</TabsList>
<TabsContent value="existing" className="space-y-4 pt-2">
{createState.phase === "error" && (
<div className="rounded-md border border-destructive/50 bg-destructive/10 p-3">
<p className="text-sm text-destructive">{createState.message}</p>
</div>
)}
<div className="space-y-2">
<Label>Select Channel</Label>
<Select
value={selectedChannelId}
onValueChange={setSelectedChannelId}
>
<SelectTrigger>
<SelectValue placeholder="Pick a channel..." />
</SelectTrigger>
<SelectContent>
{assignableChannels.map((ch) => (
<SelectItem key={ch.id} value={ch.id}>
{ch.title}{" "}
<span className="text-muted-foreground text-xs">
({ch.telegramId})
</span>
</SelectItem>
))}
</SelectContent>
</Select>
<p className="text-xs text-muted-foreground">
The selected channel will become the destination. All accounts
will be linked as writers automatically.
</p>
</div>
<DialogFooter>
<Button
variant="outline"
onClick={() => onOpenChange(false)}
>
Cancel
</Button>
<Button
onClick={onSubmitAssign}
disabled={isPending || !selectedChannelId}
>
{isPending && (
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
)}
Set as Destination
</Button>
</DialogFooter>
</TabsContent>
<TabsContent value="create" className="space-y-4 pt-2">
{createState.phase === "error" && (
<div className="rounded-md border border-destructive/50 bg-destructive/10 p-3">
<p className="text-sm text-destructive">{createState.message}</p>
@@ -257,30 +556,31 @@ function CreateDestinationDialog({
onChange={(e) => setTitle(e.target.value)}
/>
<p className="text-xs text-muted-foreground">
This will be the name of the Telegram group. You can rename it later in Telegram.
A new private Telegram group will be created using one of your
authenticated accounts. You can rename it later in Telegram.
</p>
</div>
</div>
)}
<DialogFooter>
<Button
variant="outline"
onClick={() => onOpenChange(false)}
disabled={isCreating}
>
Cancel
</Button>
<Button
onClick={onSubmit}
disabled={isPending || isCreating || !title.trim()}
onClick={onSubmitCreate}
disabled={isPending || !title.trim()}
>
{(isPending || isCreating) && (
{isPending && (
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
)}
Create Group
</Button>
</DialogFooter>
</TabsContent>
</Tabs>
)}
</DialogContent>
</Dialog>
);

View File

@@ -0,0 +1,179 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { Loader2, Link as LinkIcon } from "lucide-react";
import { toast } from "sonner";
import { joinChannelByLink } from "../actions";
import {
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
DialogFooter,
} from "@/components/ui/dialog";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
interface JoinChannelDialogProps {
open: boolean;
onOpenChange: (open: boolean) => void;
}
type JoinState =
| { phase: "idle" }
| { phase: "submitting"; requestId?: string }
| { phase: "success"; title: string }
| { phase: "error"; message: string };
export function JoinChannelDialog({
open,
onOpenChange,
}: JoinChannelDialogProps) {
const [input, setInput] = useState("");
const [joinState, setJoinState] = useState<JoinState>({ phase: "idle" });
// Reset on close
useEffect(() => {
if (!open) {
setInput("");
setJoinState({ phase: "idle" });
}
}, [open]);
const pollForResult = useCallback(async (requestId: string) => {
for (let i = 0; i < 30; i++) {
await new Promise((r) => setTimeout(r, 2000));
try {
const res = await fetch(
`/api/telegram/worker-request?requestId=${requestId}`
);
if (!res.ok) continue;
const data = await res.json();
if (data.status === "COMPLETED") {
const result = data.result;
setJoinState({
phase: "success",
title: result?.title ?? "Unknown channel",
});
toast.success(`Channel "${result?.title}" added as source`);
// Auto-close after short delay
setTimeout(() => onOpenChange(false), 1500);
return;
} else if (data.status === "FAILED") {
setJoinState({
phase: "error",
message: data.error || "Failed to join channel",
});
return;
}
} catch {
// Network error, keep polling
}
}
setJoinState({
phase: "error",
message: "Request timed out. The worker may be busy -- try again later.",
});
}, [onOpenChange]);
const handleSubmit = async () => {
if (!input.trim()) return;
setJoinState({ phase: "submitting" });
try {
const result = await joinChannelByLink(input);
if (!result.success) {
setJoinState({ phase: "error", message: result.error ?? "Unknown error" });
return;
}
const requestId = result.data!.requestId;
setJoinState({ phase: "submitting", requestId });
await pollForResult(requestId);
} catch (err) {
const message = err instanceof Error ? err.message : "Network error";
setJoinState({ phase: "error", message });
}
};
const isSubmitting = joinState.phase === "submitting";
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="sm:max-w-md">
<DialogHeader>
<DialogTitle>Add Channel</DialogTitle>
<DialogDescription>
Join a Telegram channel or group by link, username, or invite link.
The channel will be added as an active source.
</DialogDescription>
</DialogHeader>
<div className="space-y-4 py-2">
<div className="space-y-2">
<Label htmlFor="channel-input">Channel link or username</Label>
<Input
id="channel-input"
placeholder="@channel, t.me/channel, or t.me/+invite"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => {
if (e.key === "Enter" && !isSubmitting && input.trim()) {
handleSubmit();
}
}}
disabled={isSubmitting}
/>
<p className="text-xs text-muted-foreground">
Supported formats: @username, https://t.me/username, https://t.me/+invitecode
</p>
</div>
{joinState.phase === "submitting" && (
<div className="flex items-center gap-2 text-sm text-muted-foreground">
<Loader2 className="h-4 w-4 animate-spin" />
{joinState.requestId
? "Joining channel via worker..."
: "Sending request..."}
</div>
)}
{joinState.phase === "error" && (
<p className="text-sm text-destructive">{joinState.message}</p>
)}
{joinState.phase === "success" && (
<p className="text-sm text-emerald-600">
Successfully added &quot;{joinState.title}&quot;
</p>
)}
</div>
<DialogFooter>
<Button variant="outline" onClick={() => onOpenChange(false)}>
{joinState.phase === "success" ? "Close" : "Cancel"}
</Button>
{joinState.phase !== "success" && (
<Button
onClick={handleSubmit}
disabled={isSubmitting || !input.trim()}
>
{isSubmitting ? (
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
) : (
<LinkIcon className="mr-2 h-4 w-4" />
)}
Add Channel
</Button>
)}
</DialogFooter>
</DialogContent>
</Dialog>
);
}

View File

@@ -16,6 +16,7 @@ interface TelegramAdminProps {
ingestionStatus: IngestionAccountStatus[];
globalDestination: GlobalDestination;
sendHistory: SendHistoryRow[];
workerIntervalMinutes: number;
}
export function TelegramAdmin({
@@ -24,6 +25,7 @@ export function TelegramAdmin({
ingestionStatus,
globalDestination,
sendHistory,
workerIntervalMinutes,
}: TelegramAdminProps) {
return (
<div className="space-y-4">
@@ -32,7 +34,7 @@ export function TelegramAdmin({
description="Manage Telegram accounts, channels, and ingestion"
/>
<WorkerStatusPanel initialStatus={ingestionStatus} />
<WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
<Tabs defaultValue="accounts" className="space-y-4">
<TabsList>
@@ -51,7 +53,7 @@ export function TelegramAdmin({
<AccountsTab accounts={accounts} />
</TabsContent>
<TabsContent value="channels">
<ChannelsTab channels={channels} globalDestination={globalDestination} />
<ChannelsTab channels={channels} globalDestination={globalDestination} accounts={accounts} />
</TabsContent>
<TabsContent value="sends">
<BotSendsTab history={sendHistory} />

View File

@@ -1,6 +1,6 @@
"use client";
import { useEffect, useState, useCallback } from "react";
import { useEffect, useState, useCallback, useTransition } from "react";
import {
Loader2,
CheckCircle2,
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { triggerIngestion } from "../actions";
import type { IngestionAccountStatus } from "@/lib/telegram/types";
interface WorkerStatusPanelProps {
initialStatus: IngestionAccountStatus[];
initialIntervalMinutes?: number;
}
const AUTH_STATE_CONFIG: Record<
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
};
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
const [accounts, setAccounts] = useState(initialStatus);
const [error, setError] = useState(false);
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
const [isPending, startTransition] = useTransition();
// Find active run
const activeRun = accounts.find((a) => a.currentRun);
const isRunning = !!activeRun;
const handleSyncNow = useCallback(() => {
startTransition(async () => {
const result = await triggerIngestion();
if (result.success) {
toast.success("Sync triggered — worker will start shortly");
} else {
toast.error(result.error ?? "Failed to trigger sync");
}
});
}, []);
// Poll for status
useEffect(() => {
let timer: ReturnType<typeof setTimeout>;
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
const data = await res.json();
if (mounted) {
setAccounts(data.accounts ?? []);
if (data.workerIntervalMinutes) {
setWorkerIntervalMinutes(data.workerIntervalMinutes);
}
setError(false);
}
} catch {
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return;
}
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter)
// Estimate next run based on last run finish time + configured interval + up to 5 min jitter
const lastFinished = accounts
.filter((a) => a.lastRun?.finishedAt)
.map((a) => new Date(a.lastRun!.finishedAt!).getTime())
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return;
}
const intervalMs = 5 * 60 * 1000; // 5 min base
const intervalMs = workerIntervalMinutes * 60 * 1000;
const estimatedNext = lastFinished + intervalMs;
const tick = () => {
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
tick();
const interval = setInterval(tick, 1_000);
return () => clearInterval(interval);
}, [isRunning, accounts]);
}, [isRunning, accounts, workerIntervalMinutes]);
if (accounts.length === 0 && !error) {
return (
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
) : isRunning && activeRun?.currentRun ? (
<RunningStatus run={activeRun.currentRun} />
) : (
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} />
<IdleStatus
accounts={accounts}
nextRunCountdown={nextRunCountdown}
onSyncNow={handleSyncNow}
isSyncing={isPending}
/>
)}
</CardContent>
</Card>
@@ -233,6 +257,11 @@ function RunningStatus({
</span>
</span>
)}
{run.messagesScanned > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
</span>
)}
{run.zipsIngested > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
@@ -251,9 +280,13 @@ function RunningStatus({
function IdleStatus({
accounts,
nextRunCountdown,
onSyncNow,
isSyncing,
}: {
accounts: IngestionAccountStatus[];
nextRunCountdown: string | null;
onSyncNow: () => void;
isSyncing: boolean;
}) {
const lastRun = accounts
.filter((a) => a.lastRun)
@@ -316,14 +349,32 @@ function IdleStatus({
)}
</div>
<div className="flex items-center gap-2 shrink-0">
{nextRunCountdown && hasAuthenticated && (
<div className="flex items-center gap-1.5 shrink-0">
<div className="flex items-center gap-1.5">
<RefreshCw className="h-3 w-3 text-muted-foreground" />
<span className="text-xs text-muted-foreground tabular-nums">
Next: {nextRunCountdown}
</span>
</div>
)}
{hasAuthenticated && (
<Button
variant="outline"
size="sm"
className="h-7 text-xs px-2"
onClick={onSyncNow}
disabled={isSyncing}
>
{isSyncing ? (
<Loader2 className="h-3 w-3 animate-spin mr-1" />
) : (
<RefreshCw className="h-3 w-3 mr-1" />
)}
Sync Now
</Button>
)}
</div>
</div>
);
}

View File

@@ -173,6 +173,7 @@ export async function createChannel(
telegramId: BigInt(parsed.data.telegramId),
title: parsed.data.title,
type: parsed.data.type,
isActive: false,
},
});
revalidatePath(REVALIDATE_PATH);
@@ -258,6 +259,25 @@ export async function deleteChannel(id: string): Promise<ActionResult> {
}
}
export async function setChannelCategory(
id: string,
category: string | null
): Promise<ActionResult> {
const admin = await requireAdmin();
if (!admin.success) return admin;
try {
await prisma.telegramChannel.update({
where: { id },
data: { category: category?.trim() || null },
});
revalidatePath("/telegram");
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to update category" };
}
}
export async function setChannelType(
id: string,
type: "SOURCE" | "DESTINATION"
@@ -269,6 +289,13 @@ export async function setChannelType(
if (!existing) return { success: false, error: "Channel not found" };
try {
if (type === "DESTINATION") {
// Setting as destination: use the full global destination logic
// so it updates the global settings key, creates WRITER links, etc.
return await setGlobalDestination(id);
}
// Setting as SOURCE — just change the type
await prisma.telegramChannel.update({
where: { id },
data: { type },
@@ -280,19 +307,49 @@ export async function setChannelType(
}
}
export async function triggerChannelSync(): Promise<ActionResult> {
/**
* Reset all scan progress for a channel so the worker will re-process it
* from the very beginning on the next ingestion cycle.
*
* This clears:
* - `lastProcessedMessageId` on every AccountChannelMap linked to this channel
* - All TopicProgress records for those maps (for forum channels)
*/
export async function rescanChannel(channelId: string): Promise<ActionResult> {
const admin = await requireAdmin();
if (!admin.success) return admin;
const channel = await prisma.telegramChannel.findUnique({
where: { id: channelId },
});
if (!channel) return { success: false, error: "Channel not found" };
try {
// Signal the worker to do a channel sync via pg_notify
await prisma.$queryRawUnsafe(
`SELECT pg_notify('channel_sync', 'requested')`
);
// Find all account-channel maps for this channel
const maps = await prisma.accountChannelMap.findMany({
where: { channelId },
select: { id: true },
});
const mapIds = maps.map((m) => m.id);
// Delete all topic progress records for these maps (forum channels)
if (mapIds.length > 0) {
await prisma.topicProgress.deleteMany({
where: { accountChannelMapId: { in: mapIds } },
});
}
// Reset the scan cursor so the worker re-processes from the start
await prisma.accountChannelMap.updateMany({
where: { channelId },
data: { lastProcessedMessageId: null },
});
revalidatePath(REVALIDATE_PATH);
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to trigger channel sync" };
return { success: false, error: "Failed to reset channel scan progress" };
}
}
@@ -371,23 +428,12 @@ export async function triggerIngestion(
return { success: false, error: "No eligible accounts found" };
}
// Create ingestion runs — the worker picks these up
for (const account of accounts) {
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// pg_notify for immediate worker pickup
// Signal the worker to run an immediate ingestion cycle via pg_notify.
// The worker will create its own IngestionRun records with proper activity tracking.
try {
await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",")
accounts.map((a: { id: string }) => a.id).join(",")
);
} catch {
// Best-effort
@@ -417,7 +463,7 @@ export async function saveChannelSelections(
try {
let linked = 0;
for (const ch of channels) {
// Upsert the channel record
// Upsert the channel record and activate it (user explicitly selected it)
const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(ch.telegramId) },
create: {
@@ -425,10 +471,12 @@ export async function saveChannelSelections(
title: ch.title,
type: "SOURCE",
isForum: ch.isForum,
isActive: true,
},
update: {
title: ch.title,
isForum: ch.isForum,
isActive: true,
},
});
@@ -453,6 +501,56 @@ export async function saveChannelSelections(
}
}
// ── Join channel by link/username ──
/**
* Request the worker to join a channel by t.me link, invite link, or @username.
* Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify.
* Returns the requestId so the UI can poll for completion.
*/
export async function joinChannelByLink(
input: string
): Promise<ActionResult<{ requestId: string }>> {
const admin = await requireAdmin();
if (!admin.success) return admin;
const trimmed = input.trim();
if (!trimmed) return { success: false, error: "Input is required" };
try {
// Need at least one authenticated account for TDLib
const account = await prisma.telegramAccount.findFirst({
where: { isActive: true, authState: "AUTHENTICATED" },
select: { id: true },
});
if (!account) {
return { success: false, error: "At least one authenticated account is needed" };
}
// Create a fetch request to track progress
const fetchRequest = await prisma.channelFetchRequest.create({
data: {
accountId: account.id,
status: "PENDING",
},
});
// Signal worker via pg_notify
await prisma.$queryRawUnsafe(
`SELECT pg_notify('join_channel', $1)`,
JSON.stringify({
requestId: fetchRequest.id,
input: trimmed,
accountId: account.id,
})
);
return { success: true, data: { requestId: fetchRequest.id } };
} catch {
return { success: false, error: "Failed to request channel join" };
}
}
// ── Global destination channel ──
export async function setGlobalDestination(
@@ -467,10 +565,10 @@ export async function setGlobalDestination(
if (!channel) return { success: false, error: "Channel not found" };
try {
// Set the channel type to DESTINATION
// Set the channel type to DESTINATION and ensure it's active
await prisma.telegramChannel.update({
where: { id: channelId },
data: { type: "DESTINATION" },
data: { type: "DESTINATION", isActive: true },
});
// Save as global destination
@@ -521,17 +619,19 @@ export async function createDestinationChannel(
if (!admin.success) return admin;
try {
// Create the channel as DESTINATION
// Create the channel as DESTINATION (active by default — needed for uploads)
const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(telegramId) },
create: {
telegramId: BigInt(telegramId),
title,
type: "DESTINATION",
isActive: true,
},
update: {
title,
type: "DESTINATION",
isActive: true,
},
});
@@ -581,6 +681,63 @@ export async function createDestinationChannel(
}
}
/**
* Request the worker to rebuild the package database by scanning the
* destination channel for uploaded archives and recreating Package records.
* Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify.
* Returns the requestId so the UI can poll for progress.
*/
export async function rebuildPackageDatabase(): Promise<
ActionResult<{ requestId: string }>
> {
const admin = await requireAdmin();
if (!admin.success) return admin;
try {
// Need at least one authenticated account for TDLib
const hasAccount = await prisma.telegramAccount.findFirst({
where: { isActive: true, authState: "AUTHENTICATED" },
select: { id: true },
});
if (!hasAccount) {
return {
success: false,
error:
"At least one authenticated account is needed to scan the destination channel",
};
}
// Need a destination channel
const destSetting = await prisma.globalSetting.findUnique({
where: { key: "destination_channel_id" },
});
if (!destSetting) {
return {
success: false,
error: "No destination channel configured",
};
}
// Create a fetch request to track progress
const fetchRequest = await prisma.channelFetchRequest.create({
data: {
accountId: hasAccount.id,
status: "PENDING",
},
});
// Signal worker via pg_notify
await prisma.$queryRawUnsafe(
`SELECT pg_notify('rebuild_packages', $1)`,
fetchRequest.id
);
return { success: true, data: { requestId: fetchRequest.id } };
} catch {
return { success: false, error: "Failed to request package database rebuild" };
}
}
/**
* Request the worker to create a new Telegram supergroup as the destination.
* Uses ChannelFetchRequest as a generic DB-mediated request with pg_notify.

View File

@@ -25,7 +25,7 @@ export default async function TelegramPage() {
}),
]);
const serializedHistory = sendHistory.map((r) => ({
const serializedHistory = sendHistory.map((r: typeof sendHistory[number]) => ({
id: r.id,
packageName: r.package.fileName,
recipientName: r.telegramLink.telegramName,
@@ -42,6 +42,7 @@ export default async function TelegramPage() {
ingestionStatus={ingestionStatus}
globalDestination={globalDestination}
sendHistory={serializedHistory}
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
/>
);
}

View File

@@ -11,6 +11,23 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
return { success: false, error: "Validation failed" };
}
// Validate invite code
const invite = await prisma.inviteCode.findUnique({
where: { code: parsed.data.inviteCode },
});
if (!invite) {
return { success: false, error: "Invalid invite code. Please check the code and try again." };
}
if (invite.uses >= invite.maxUses) {
return { success: false, error: "This invite code has reached its maximum number of uses" };
}
if (invite.expiresAt && invite.expiresAt < new Date()) {
return { success: false, error: "This invite code has expired. Please request a new one." };
}
const existing = await prisma.user.findUnique({
where: { email: parsed.data.email },
});
@@ -21,12 +38,15 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
const hashedPassword = await bcrypt.hash(parsed.data.password, 10);
const user = await prisma.user.create({
// Create user and increment invite usage in a transaction
const user = await prisma.$transaction(async (tx) => {
const newUser = await tx.user.create({
data: {
name: parsed.data.name,
email: parsed.data.email,
hashedPassword,
role: "USER",
usedInviteId: invite.id,
settings: {
create: {
lowStockThreshold: 10,
@@ -38,5 +58,13 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
},
});
await tx.inviteCode.update({
where: { id: invite.id },
data: { uses: { increment: 1 } },
});
return newUser;
});
return { success: true, data: { id: user.id } };
}

View File

@@ -1,7 +1,7 @@
"use client";
import { useState, useTransition } from "react";
import { useRouter } from "next/navigation";
import { useRouter, useSearchParams } from "next/navigation";
import Link from "next/link";
import { useForm } from "react-hook-form";
import { zodResolver } from "@hookform/resolvers/zod";
@@ -24,12 +24,19 @@ import { APP_NAME } from "@/lib/constants";
export default function RegisterPage() {
const router = useRouter();
const searchParams = useSearchParams();
const [error, setError] = useState<string | null>(null);
const [isPending, startTransition] = useTransition();
const form = useForm<RegisterInput>({
resolver: zodResolver(registerSchema),
defaultValues: { name: "", email: "", password: "", confirmPassword: "" },
defaultValues: {
name: "",
email: "",
password: "",
confirmPassword: "",
inviteCode: searchParams.get("code") ?? "",
},
});
function onSubmit(values: RegisterInput) {
@@ -75,7 +82,7 @@ export default function RegisterPage() {
<Card>
<CardHeader>
<CardTitle>Create Account</CardTitle>
<CardDescription>Fill in your details below</CardDescription>
<CardDescription>You need an invite code to register</CardDescription>
</CardHeader>
<CardContent>
<Form {...form}>
@@ -86,6 +93,24 @@ export default function RegisterPage() {
</div>
)}
<FormField
control={form.control}
name="inviteCode"
render={({ field }) => (
<FormItem>
<FormLabel>Invite Code</FormLabel>
<FormControl>
<Input
placeholder="Enter your invite code"
autoComplete="off"
{...field}
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
<FormField
control={form.control}
name="name"

View File

@@ -9,5 +9,9 @@ export async function GET(request: Request) {
if ("error" in authResult) return authResult.error;
const accounts = await getIngestionStatus();
return NextResponse.json({ accounts });
const workerIntervalMinutes = parseInt(
process.env.WORKER_INTERVAL_MINUTES ?? "60",
10
);
return NextResponse.json({ accounts, workerIntervalMinutes });
}

View File

@@ -45,33 +45,20 @@ export async function POST(request: Request) {
);
}
// Create ingestion runs marked as RUNNING — the worker will pick these up
// when it next polls, or we use pg_notify for immediate pickup
for (const account of accounts) {
// Only create if no run is already RUNNING for this account
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// Send pg_notify for immediate worker pickup
// Send pg_notify for immediate worker pickup.
// The worker creates its own IngestionRun records with proper activity tracking.
try {
await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",")
accounts.map((a: { id: string }) => a.id).join(",")
);
} catch {
// pg_notify is best-effort — worker will pick up on next cycle anyway
// pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
}
return NextResponse.json({
triggered: true,
accountIds: accounts.map((a) => a.id),
message: `Ingestion queued for ${accounts.length} account(s)`,
accountIds: accounts.map((a: { id: string }) => a.id),
message: `Ingestion triggered for ${accounts.length} account(s)`,
});
}

View File

@@ -0,0 +1,73 @@
import { NextResponse } from "next/server";
import { authenticateApiRequest } from "@/lib/telegram/api-auth";
import { prisma } from "@/lib/prisma";
export const dynamic = "force-dynamic";
/**
* GET /api/zips/:id/extract/:requestId
* Get the status and/or image data for an extraction request.
* Query param: ?image=true returns the raw image bytes if completed.
* Otherwise returns status JSON.
*/
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string; requestId: string }> }
) {
const authResult = await authenticateApiRequest(request);
if ("error" in authResult) return authResult.error;
const { requestId } = await params;
const url = new URL(request.url);
const wantImage = url.searchParams.get("image") === "true";
if (wantImage) {
// Return the raw image bytes
const req = await prisma.archiveExtractRequest.findUnique({
where: { id: requestId },
select: { status: true, imageData: true, contentType: true, error: true },
});
if (!req) {
return new NextResponse(null, { status: 404 });
}
if (req.status !== "COMPLETED" || !req.imageData) {
return NextResponse.json(
{ status: req.status, error: req.error },
{ status: req.status === "FAILED" ? 400 : 202 }
);
}
const buffer =
req.imageData instanceof Buffer
? req.imageData
: Buffer.from(req.imageData);
return new NextResponse(buffer, {
status: 200,
headers: {
"Content-Type": req.contentType || "image/jpeg",
"Content-Length": String(buffer.length),
"Cache-Control": "public, max-age=3600, immutable",
},
});
}
// Return status JSON (without image data to avoid large payloads)
const req = await prisma.archiveExtractRequest.findUnique({
where: { id: requestId },
select: { id: true, status: true, error: true, contentType: true },
});
if (!req) {
return NextResponse.json({ error: "Request not found" }, { status: 404 });
}
return NextResponse.json({
requestId: req.id,
status: req.status,
error: req.error,
contentType: req.contentType,
});
}

View File

@@ -0,0 +1,118 @@
import { NextResponse } from "next/server";
import { authenticateApiRequest } from "@/lib/telegram/api-auth";
import { prisma } from "@/lib/prisma";
export const dynamic = "force-dynamic";
/**
* POST /api/zips/:id/extract
* Request extraction of an image from a package archive.
* Body: { filePath: string }
* Returns: { requestId: string, status: string }
*
* If a completed extraction already exists for this package+filePath,
* returns it immediately.
*/
export async function POST(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const authResult = await authenticateApiRequest(request);
if ("error" in authResult) return authResult.error;
const { id } = await params;
const body = await request.json();
const filePath = body?.filePath;
if (!filePath || typeof filePath !== "string") {
return NextResponse.json(
{ error: "filePath is required" },
{ status: 400 }
);
}
// Verify package exists
const pkg = await prisma.package.findUnique({
where: { id },
select: { id: true, destChannelId: true, destMessageId: true, archiveType: true, isMultipart: true, partCount: true },
});
if (!pkg) {
return NextResponse.json({ error: "Package not found" }, { status: 404 });
}
if (!pkg.destChannelId || !pkg.destMessageId) {
return NextResponse.json(
{ error: "Package has not been uploaded to destination channel" },
{ status: 400 }
);
}
if (pkg.archiveType === "DOCUMENT") {
return NextResponse.json(
{ error: "Cannot extract images from standalone documents" },
{ status: 400 }
);
}
if (pkg.isMultipart && pkg.partCount > 1) {
return NextResponse.json(
{ error: "Image extraction is not supported for multipart archives" },
{ status: 400 }
);
}
// Check for an existing completed extraction
const existing = await prisma.archiveExtractRequest.findFirst({
where: {
packageId: id,
filePath,
status: "COMPLETED",
imageData: { not: null },
},
select: { id: true, status: true },
});
if (existing) {
return NextResponse.json({
requestId: existing.id,
status: "COMPLETED",
});
}
// Check for an in-progress request
const pending = await prisma.archiveExtractRequest.findFirst({
where: {
packageId: id,
filePath,
status: { in: ["PENDING", "IN_PROGRESS"] },
},
select: { id: true, status: true },
});
if (pending) {
return NextResponse.json({
requestId: pending.id,
status: pending.status,
});
}
// Create a new extraction request
const extractRequest = await prisma.archiveExtractRequest.create({
data: {
packageId: id,
filePath,
},
});
// Notify the worker via pg_notify
await prisma.$queryRawUnsafe(
`SELECT pg_notify('archive_extract', $1)`,
extractRequest.id
);
return NextResponse.json({
requestId: extractRequest.id,
status: "PENDING",
});
}

View File

@@ -0,0 +1,56 @@
import { NextResponse } from "next/server";
import { authenticateApiRequest } from "@/lib/telegram/api-auth";
import { prisma } from "@/lib/prisma";
export const dynamic = "force-dynamic";
const IMAGE_EXTENSIONS = ["jpg", "jpeg", "png", "webp", "gif", "bmp"];
/**
* GET /api/zips/:id/images
* Lists image files inside a package's archive (from PackageFile records).
* Returns a list of image file paths that can be used as preview candidates.
*/
export async function GET(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const authResult = await authenticateApiRequest(request);
if ("error" in authResult) return authResult.error;
const { id } = await params;
const pkg = await prisma.package.findUnique({
where: { id },
select: { id: true, archiveType: true },
});
if (!pkg) {
return NextResponse.json({ error: "Package not found" }, { status: 404 });
}
const images = await prisma.packageFile.findMany({
where: {
packageId: id,
extension: { in: IMAGE_EXTENSIONS },
},
orderBy: { path: "asc" },
select: {
id: true,
path: true,
fileName: true,
extension: true,
uncompressedSize: true,
},
});
const mapped = images.map((img) => ({
id: img.id,
path: img.path,
fileName: img.fileName,
extension: img.extension,
size: img.uncompressedSize.toString(),
}));
return NextResponse.json({ images: mapped });
}

View File

@@ -2,6 +2,7 @@
import Link from "next/link";
import { usePathname } from "next/navigation";
import { useSession } from "next-auth/react";
import {
LayoutDashboard,
Cylinder,
@@ -14,30 +15,21 @@ import {
Building2,
MapPin,
Settings,
UserPlus,
Flame,
} from "lucide-react";
import { cn } from "@/lib/utils";
import { APP_NAME } from "@/lib/constants";
import { APP_NAME, NAV_ITEMS } from "@/lib/constants";
import { SheetHeader, SheetTitle } from "@/components/ui/sheet";
const icons = { LayoutDashboard, Cylinder, Droplets, Paintbrush, Gem, FileBox, Send, ClipboardList, Building2, MapPin, Settings };
const navItems = [
{ label: "Dashboard", href: "/dashboard", icon: "LayoutDashboard" as const },
{ label: "Filaments", href: "/filaments", icon: "Cylinder" as const },
{ label: "Resins", href: "/resins", icon: "Droplets" as const },
{ label: "Paints", href: "/paints", icon: "Paintbrush" as const },
{ label: "Supplies", href: "/supplies", icon: "Gem" as const },
{ label: "STL Files", href: "/stls", icon: "FileBox" as const },
{ label: "Telegram", href: "/telegram", icon: "Send" as const },
{ label: "Usage", href: "/usage", icon: "ClipboardList" as const },
{ label: "Vendors", href: "/vendors", icon: "Building2" as const },
{ label: "Locations", href: "/locations", icon: "MapPin" as const },
{ label: "Settings", href: "/settings", icon: "Settings" as const },
];
const icons = { LayoutDashboard, Cylinder, Droplets, Paintbrush, Gem, FileBox, Send, ClipboardList, Building2, MapPin, Settings, UserPlus };
export function MobileSidebar() {
const pathname = usePathname();
const { data: session } = useSession();
const isAdmin = session?.user?.role === "ADMIN";
const visibleItems = NAV_ITEMS.filter((item) => !item.adminOnly || isAdmin);
return (
<div className="flex h-full flex-col">
@@ -48,7 +40,7 @@ export function MobileSidebar() {
</SheetTitle>
</SheetHeader>
<nav className="flex-1 space-y-1 p-2">
{navItems.map((item) => {
{visibleItems.map((item) => {
const Icon = icons[item.icon];
const isActive = pathname.startsWith(item.href);

View File

@@ -3,6 +3,7 @@
import { useState } from "react";
import Link from "next/link";
import { usePathname } from "next/navigation";
import { useSession } from "next-auth/react";
import {
LayoutDashboard,
Cylinder,
@@ -15,12 +16,13 @@ import {
Building2,
MapPin,
Settings,
UserPlus,
Flame,
PanelLeftClose,
PanelLeft,
} from "lucide-react";
import { cn } from "@/lib/utils";
import { APP_NAME } from "@/lib/constants";
import { APP_NAME, NAV_ITEMS } from "@/lib/constants";
import { Button } from "@/components/ui/button";
import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip";
@@ -36,25 +38,16 @@ const icons = {
Building2,
MapPin,
Settings,
UserPlus,
} as const;
const navItems = [
{ label: "Dashboard", href: "/dashboard", icon: "LayoutDashboard" as const },
{ label: "Filaments", href: "/filaments", icon: "Cylinder" as const },
{ label: "Resins", href: "/resins", icon: "Droplets" as const },
{ label: "Paints", href: "/paints", icon: "Paintbrush" as const },
{ label: "Supplies", href: "/supplies", icon: "Gem" as const },
{ label: "STL Files", href: "/stls", icon: "FileBox" as const },
{ label: "Telegram", href: "/telegram", icon: "Send" as const },
{ label: "Usage", href: "/usage", icon: "ClipboardList" as const },
{ label: "Vendors", href: "/vendors", icon: "Building2" as const },
{ label: "Locations", href: "/locations", icon: "MapPin" as const },
{ label: "Settings", href: "/settings", icon: "Settings" as const },
];
export function Sidebar() {
const pathname = usePathname();
const [collapsed, setCollapsed] = useState(false);
const { data: session } = useSession();
const isAdmin = session?.user?.role === "ADMIN";
const visibleItems = NAV_ITEMS.filter((item) => !item.adminOnly || isAdmin);
return (
<aside
@@ -73,7 +66,7 @@ export function Sidebar() {
{/* Navigation */}
<nav className="flex-1 space-y-1 p-2">
{navItems.map((item) => {
{visibleItems.map((item) => {
const Icon = icons[item.icon];
const isActive = pathname.startsWith(item.href);

View File

@@ -18,6 +18,8 @@ interface DeleteDialogProps {
description?: string;
onConfirm: () => void;
isLoading?: boolean;
confirmLabel?: string;
confirmLoadingLabel?: string;
}
export function DeleteDialog({
@@ -27,6 +29,8 @@ export function DeleteDialog({
description = "This action cannot be undone.",
onConfirm,
isLoading,
confirmLabel = "Delete",
confirmLoadingLabel,
}: DeleteDialogProps) {
return (
<AlertDialog open={open} onOpenChange={onOpenChange}>
@@ -42,7 +46,7 @@ export function DeleteDialog({
disabled={isLoading}
className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
>
{isLoading ? "Deleting..." : "Delete"}
{isLoading ? (confirmLoadingLabel ?? `${confirmLabel}...`) : confirmLabel}
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>

View File

@@ -18,7 +18,12 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
async jwt({ token, user }) {
if (user) {
token.id = user.id!;
token.role = user.role ?? "USER";
// Fetch the role from the database to ensure token reflects current role
const dbUser = await prisma.user.findUnique({
where: { id: user.id! },
select: { role: true },
});
token.role = dbUser?.role ?? user.role ?? "ADMIN";
}
return token;
},
@@ -33,6 +38,12 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
events: {
async createUser({ user }) {
if (user.id) {
// Self-hosted: all users are admins
await prisma.user.update({
where: { id: user.id },
data: { role: "ADMIN" },
});
await prisma.userSettings.upsert({
where: { userId: user.id },
update: {},

View File

@@ -1,17 +1,18 @@
export const APP_NAME = "Dragon's Stash";
export const NAV_ITEMS = [
{ label: "Dashboard", href: "/dashboard", icon: "LayoutDashboard" },
{ label: "Filaments", href: "/filaments", icon: "Cylinder" },
{ label: "Resins", href: "/resins", icon: "Droplets" },
{ label: "Paints", href: "/paints", icon: "Paintbrush" },
{ label: "Supplies", href: "/supplies", icon: "Gem" },
{ label: "STL Files", href: "/stls", icon: "FileBox" },
{ label: "Telegram", href: "/telegram", icon: "Send" },
{ label: "Usage", href: "/usage", icon: "ClipboardList" },
{ label: "Vendors", href: "/vendors", icon: "Building2" },
{ label: "Locations", href: "/locations", icon: "MapPin" },
{ label: "Settings", href: "/settings", icon: "Settings" },
{ label: "Dashboard", href: "/dashboard", icon: "LayoutDashboard", adminOnly: false },
{ label: "Filaments", href: "/filaments", icon: "Cylinder", adminOnly: false },
{ label: "Resins", href: "/resins", icon: "Droplets", adminOnly: false },
{ label: "Paints", href: "/paints", icon: "Paintbrush", adminOnly: false },
{ label: "Supplies", href: "/supplies", icon: "Gem", adminOnly: false },
{ label: "STL Files", href: "/stls", icon: "FileBox", adminOnly: false },
{ label: "Telegram", href: "/telegram", icon: "Send", adminOnly: true },
{ label: "Invites", href: "/invites", icon: "UserPlus", adminOnly: true },
{ label: "Usage", href: "/usage", icon: "ClipboardList", adminOnly: false },
{ label: "Vendors", href: "/vendors", icon: "Building2", adminOnly: false },
{ label: "Locations", href: "/locations", icon: "MapPin", adminOnly: false },
{ label: "Settings", href: "/settings", icon: "Settings", adminOnly: false },
] as const;
export const MATERIALS = [

View File

@@ -42,6 +42,7 @@ export async function listChannels() {
title: c.title,
type: c.type,
isActive: c.isActive,
category: c.category,
createdAt: c.createdAt.toISOString(),
accountCount: c._count.accountMaps,
packageCount: c._count.packages,

View File

@@ -3,7 +3,7 @@ export interface PackageListItem {
fileName: string;
fileSize: string; // BigInt serialized as string
contentHash: string;
archiveType: "ZIP" | "RAR";
archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT";
fileCount: number;
isMultipart: boolean;
hasPreview: boolean;

View File

@@ -11,6 +11,7 @@ export const registerSchema = z
email: z.email("Invalid email address"),
password: z.string().min(6, "Password must be at least 6 characters"),
confirmPassword: z.string(),
inviteCode: z.string().min(1, "Invite code is required"),
})
.refine((data) => data.password === data.confirmPassword, {
message: "Passwords do not match",

View File

@@ -3,7 +3,7 @@ FROM node:20-bookworm-slim AS deps
RUN sed -i 's/^Components: main$/Components: main non-free/' /etc/apt/sources.list.d/debian.sources && \
apt-get update && apt-get install -y \
libssl-dev zlib1g-dev unrar \
libssl-dev zlib1g-dev unzip unrar p7zip-full \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
@@ -26,7 +26,7 @@ FROM node:20-bookworm-slim AS runner
RUN sed -i 's/^Components: main$/Components: main non-free/' /etc/apt/sources.list.d/debian.sources && \
apt-get update && apt-get install -y \
libssl3 zlib1g unrar \
libssl3 zlib1g unzip unrar p7zip-full \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app

View File

@@ -1,21 +1,71 @@
/**
* Extract a creator name from common archive file naming patterns.
*
* Priority in the worker: topic name > filename extraction.
* This is the fallback when no forum topic name is available.
* Priority in the worker: topic name > filename extraction > channel title > null.
*
* Patterns handled (split on ` - `):
* Patterns handled:
* "Mammoth Factory - 2026-01.zip" → "Mammoth Factory"
* "Artist Name - Pack Title.part01.rar" → "Artist Name"
* "ArtistName_PackTitle.zip" → null (ambiguous)
* "some_random_file.zip" → null
*/
export function extractCreatorFromFileName(fileName: string): string | null {
// Strip archive extensions (.zip, .rar, .part01.rar, .z01, etc.)
const bare = fileName.replace(/(\.(part\d+\.rar|z\d{2}|zip|rar))+$/i, "");
// Strip archive/document extensions
const bare = fileName.replace(
/(\.(part\d+\.rar|z\d{2}|zip|rar|7z|pdf|stl|obj|3mf|step|stp|blend|gcode|svg|dxf|ai|eps|psd))+$/i,
""
);
const idx = bare.indexOf(" - ");
if (idx <= 0) return null;
// Pattern 1: "Creator - Title" (most common)
const dashIdx = bare.indexOf(" - ");
if (dashIdx > 0) {
const creator = bare.slice(0, dashIdx).trim();
if (creator.length > 1) return creator;
}
const creator = bare.slice(0, idx).trim();
return creator.length > 0 ? creator : null;
// Pattern 2: "Creator_Title" with underscores where first segment looks like a name
// Only match if the first segment has a space or capital letter pattern suggesting a name
const underscoreIdx = bare.indexOf("_");
if (underscoreIdx > 2) {
const candidate = bare.slice(0, underscoreIdx).trim();
// Accept if it contains a space (multi-word) or starts with upper + has lower (proper name)
if (candidate.includes(" ") || /^[A-Z][a-z]/.test(candidate)) {
return candidate;
}
}
return null;
}
/**
* Extract a creator name from a Telegram channel title.
* Strips common suffixes like "[Completed]", "(Paid)", dates, etc.
*/
export function extractCreatorFromChannelTitle(title: string): string | null {
let clean = title
// Remove bracketed suffixes: [Completed], [Open], [Closed], etc.
.replace(/\s*\[.*?\]\s*/g, " ")
// Remove parenthesized suffixes: (Paid), (partial upload...), etc.
.replace(/\s*\(.*?\)\s*/g, " ")
// Remove common emoji
.replace(/[\u{1F300}-\u{1FAFF}\u{2600}-\u{27BF}]/gu, "")
.trim();
// If there's a " - " separator, take the first part as creator
const dashIdx = clean.indexOf(" - ");
if (dashIdx > 0) {
clean = clean.slice(0, dashIdx).trim();
}
// Too generic or too short
if (clean.length < 2) return null;
// Skip overly generic channel names
const generic = [
"3d printing", "stl", "free stl", "stl zone", "stl forest", "stl all",
"marvel stl", "dc stl", "star wars stl", "pokemon stl",
];
if (generic.includes(clean.toLowerCase())) return null;
return clean;
}

View File

@@ -1,4 +1,4 @@
export type ArchiveFormat = "ZIP" | "RAR";
export type ArchiveFormat = "ZIP" | "RAR" | "7Z" | "DOCUMENT";
export interface MultipartInfo {
baseName: string;
@@ -48,6 +48,9 @@ const patterns: {
},
];
/** Extensions we recognize as fetchable documents (archives + standalone files) */
const DOCUMENT_EXTENSIONS = /\.(pdf|stl|obj|3mf|step|stp|blend|gcode|svg|dxf|ai|eps|psd)$/i;
/**
* Detect if a filename is an archive and extract multipart info.
*/
@@ -85,11 +88,32 @@ export function detectArchive(fileName: string): MultipartInfo | null {
};
}
// Single .7z file
if (/\.7z$/i.test(fileName)) {
return {
baseName: fileName.replace(/\.7z$/i, ""),
partNumber: -1,
format: "7Z",
pattern: "SINGLE",
};
}
// Standalone documents (PDFs, STLs, 3D files, etc.)
if (DOCUMENT_EXTENSIONS.test(fileName)) {
const ext = fileName.match(DOCUMENT_EXTENSIONS)![0];
return {
baseName: fileName.replace(DOCUMENT_EXTENSIONS, ""),
partNumber: -1,
format: "DOCUMENT",
pattern: "SINGLE",
};
}
return null;
}
/**
* Check if a filename looks like any archive attachment we should process.
* Check if a filename looks like any attachment we should process.
*/
export function isArchiveAttachment(fileName: string): boolean {
return detectArchive(fileName) !== null;

View File

@@ -0,0 +1,33 @@
import path from "path";
const IMAGE_EXTENSIONS = new Set(["jpg", "jpeg", "png", "webp", "gif", "bmp"]);
/**
* Check if a file path within an archive is an image.
*/
export function isImageFile(filePath: string): boolean {
const ext = path.extname(filePath).toLowerCase().slice(1);
return IMAGE_EXTENSIONS.has(ext);
}
/**
* Get the MIME type for an image file extension.
*/
export function getImageMimeType(filePath: string): string {
const ext = path.extname(filePath).toLowerCase().slice(1);
switch (ext) {
case "jpg":
case "jpeg":
return "image/jpeg";
case "png":
return "image/png";
case "webp":
return "image/webp";
case "gif":
return "image/gif";
case "bmp":
return "image/bmp";
default:
return "application/octet-stream";
}
}

View File

@@ -0,0 +1,88 @@
import { execFile } from "child_process";
import { promisify } from "util";
import path from "path";
import { childLogger } from "../util/logger.js";
import type { FileEntry } from "./zip-reader.js";
const execFileAsync = promisify(execFile);
const log = childLogger("7z-reader");
/**
* Parse output of `7z l <file>` to extract file metadata.
*
* Example output:
* Date Time Attr Size Compressed Name
* ------------------- ----- ------------ ------------ ------------------------
* 2024-01-15 10:30:00 ....A 12345 10234 folder/file.stl
* ------------------- ----- ------------ ------------ ------------------------
*/
export async function read7zContents(
filePath: string
): Promise<FileEntry[]> {
try {
const { stdout } = await execFileAsync("7z", ["l", filePath], {
timeout: 30000,
maxBuffer: 10 * 1024 * 1024,
});
return parse7zOutput(stdout);
} catch (err) {
log.warn({ err, file: filePath }, "Failed to read 7z contents");
return [];
}
}
function parse7zOutput(output: string): FileEntry[] {
const entries: FileEntry[] = [];
const lines = output.split("\n");
let inFileList = false;
let separatorCount = 0;
for (const line of lines) {
const trimmed = line.trim();
// Detect separator lines (------- pattern)
if (/^-{5,}/.test(trimmed)) {
separatorCount++;
if (separatorCount === 1) {
inFileList = true;
} else if (separatorCount >= 2) {
inFileList = false;
}
continue;
}
if (!inFileList) continue;
// Parse: Date Time Attr Size [Compressed] Name
// In solid archives, Compressed is only shown for the first file.
// 2024-06-14 16:23:14 ....A 225863 595992954 IDP02S02_Steak/01.jpg
// 2024-06-14 16:26:30 ....A 188040 IDP02S02_Steak/02.jpg
const match = trimmed.match(
/^\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\s+(\S+)\s+(\d+)\s+(\d+\s+)?(.+)$/
);
if (match) {
const [, attr, uncompressedStr, compressedRaw, filePath] = match;
// Skip directory entries (D attribute or trailing slash)
if (attr.startsWith("D") || filePath.endsWith("/") || filePath.endsWith("\\")) continue;
// Skip entries with 0 size
if (uncompressedStr === "0") continue;
const compressedStr = compressedRaw?.trim() || uncompressedStr;
const ext = path.extname(filePath).toLowerCase();
entries.push({
path: filePath,
fileName: path.basename(filePath),
extension: ext ? ext.slice(1) : null,
compressedSize: BigInt(compressedStr),
uncompressedSize: BigInt(uncompressedStr),
crc32: null,
});
}
}
return entries;
}

View File

@@ -1,8 +1,16 @@
import type pg from "pg";
import { pool } from "./client.js";
import { childLogger } from "../util/logger.js";
const log = childLogger("locks");
/**
* Holds the pooled connection for each active advisory lock.
* Session-level advisory locks are tied to the specific PostgreSQL connection,
* so we MUST keep the same connection checked out for the entire lock duration.
*/
const heldConnections = new Map<string, pg.PoolClient>();
/**
* Derive a stable 32-bit integer lock ID from an account ID string.
* PostgreSQL advisory locks use bigint, but we use 32-bit for safety.
@@ -20,6 +28,9 @@ function hashToLockId(accountId: string): number {
/**
* Try to acquire a PostgreSQL advisory lock for an account.
* Returns true if acquired, false if already held by another session.
*
* IMPORTANT: The pooled connection is kept checked out for the duration
* of the lock. You MUST call releaseLock() when done to return it to the pool.
*/
export async function tryAcquireLock(accountId: string): Promise<boolean> {
const lockId = hashToLockId(accountId);
@@ -31,26 +42,40 @@ export async function tryAcquireLock(accountId: string): Promise<boolean> {
);
const acquired = result.rows[0]?.pg_try_advisory_lock ?? false;
if (acquired) {
// Keep the connection checked out — lock is tied to this connection
heldConnections.set(accountId, client);
log.debug({ accountId, lockId }, "Advisory lock acquired");
return true;
} else {
log.debug({ accountId, lockId }, "Advisory lock already held");
}
return acquired;
} finally {
// Lock not acquired — release the connection back to the pool
client.release();
log.debug({ accountId, lockId }, "Advisory lock already held");
return false;
}
} catch (err) {
client.release();
throw err;
}
}
/**
* Release the advisory lock for an account.
* Uses the SAME connection that acquired the lock, then returns it to the pool.
*/
export async function releaseLock(accountId: string): Promise<void> {
const lockId = hashToLockId(accountId);
const client = await pool.connect();
const client = heldConnections.get(accountId);
if (!client) {
log.warn({ accountId, lockId }, "No held connection for lock release — lock may have already been released");
return;
}
try {
await client.query("SELECT pg_advisory_unlock($1)", [lockId]);
log.debug({ accountId, lockId }, "Advisory lock released");
} finally {
heldConnections.delete(accountId);
client.release();
}
}

View File

@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
title: string;
type: "SOURCE" | "DESTINATION";
isForum: boolean;
isActive?: boolean;
}
/**
* Upsert a channel by telegramId. Returns the channel record.
* If it already exists, update title and forum status.
* New channels default to disabled (isActive: false) so the admin must
* explicitly enable them before the worker processes them.
* Pass isActive: true for DESTINATION channels that must be active immediately.
*/
export async function upsertChannel(input: UpsertChannelInput) {
return db.telegramChannel.upsert({
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
title: input.title,
type: input.type,
isForum: input.isForum,
isActive: input.isActive ?? false,
},
update: {
title: input.title,
@@ -433,3 +438,35 @@ export async function getExistingChannelsByTelegramId(): Promise<Map<string, str
export async function getAccountById(accountId: string) {
return db.telegramAccount.findUnique({ where: { id: accountId } });
}
/**
* Find packages that have a destMessageId set (appear uploaded) but may
* reference messages that no longer exist in Telegram. These need
* verification on startup.
*
* Groups by destChannelId so the caller can batch-verify per channel.
*/
export async function getPackagesWithDestMessage() {
return db.package.findMany({
where: { destMessageId: { not: null }, destChannelId: { not: null } },
select: {
id: true,
fileName: true,
contentHash: true,
destChannelId: true,
destMessageId: true,
sourceChannel: { select: { telegramId: true } },
},
});
}
/**
* Reset a package's destination fields so it will be re-processed
* on the next ingestion run (treated as not-yet-uploaded).
*/
export async function resetPackageDestination(packageId: string) {
return db.package.update({
where: { id: packageId },
data: { destChannelId: null, destMessageId: null },
});
}

View File

@@ -0,0 +1,217 @@
import path from "path";
import { mkdir, rm } from "fs/promises";
import { db } from "./db/client.js";
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { downloadFile } from "./tdlib/download.js";
import { getActiveAccounts } from "./db/queries.js";
import { extractPreviewImage } from "./preview/extract.js";
import { getImageMimeType } from "./archive/extract-image.js";
const log = childLogger("extract-listener");
/**
* Process a single archive extract request.
* Downloads the archive from Telegram (dest channel), extracts the
* requested image file, and writes the result to the DB.
*/
export async function processExtractRequest(requestId: string): Promise<void> {
const request = await db.archiveExtractRequest.findUnique({
where: { id: requestId },
include: {
package: {
select: {
id: true,
fileName: true,
fileSize: true,
archiveType: true,
destChannelId: true,
destMessageId: true,
isMultipart: true,
partCount: true,
},
},
},
});
if (!request || request.status !== "PENDING") {
log.debug({ requestId }, "Extract request not found or not pending");
return;
}
const pkg = request.package;
if (!pkg.destChannelId || !pkg.destMessageId) {
await db.archiveExtractRequest.update({
where: { id: requestId },
data: { status: "FAILED", error: "Package has no destination upload" },
});
return;
}
// Multipart archives require downloading and reassembling all parts,
// which is too complex for on-demand extraction. Reject early.
if (pkg.isMultipart && pkg.partCount > 1) {
await db.archiveExtractRequest.update({
where: { id: requestId },
data: { status: "FAILED", error: "Image extraction is not supported for multipart archives" },
});
return;
}
// Check for a cached result first: if another request for the same
// package+filePath already completed, reuse its data.
const cached = await db.archiveExtractRequest.findFirst({
where: {
packageId: pkg.id,
filePath: request.filePath,
status: "COMPLETED",
imageData: { not: null },
id: { not: requestId },
},
select: { imageData: true, contentType: true },
});
if (cached?.imageData) {
log.info({ requestId, filePath: request.filePath }, "Reusing cached extraction result");
await db.archiveExtractRequest.update({
where: { id: requestId },
data: {
status: "COMPLETED",
imageData: cached.imageData,
contentType: cached.contentType,
},
});
return;
}
await db.archiveExtractRequest.update({
where: { id: requestId },
data: { status: "IN_PROGRESS" },
});
log.info(
{ requestId, packageId: pkg.id, filePath: request.filePath, archiveType: pkg.archiveType },
"Processing extract request"
);
const tempDir = path.join(config.tempDir, `extract_${requestId}`);
try {
await mkdir(tempDir, { recursive: true });
// Wrap the entire TDLib session in the mutex so no other TDLib
// operation can run concurrently (TDLib is single-session).
await withTdlibMutex("extract", async () => {
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated Telegram accounts available");
}
const account = accounts[0];
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try {
// Load chat list so TDLib can find the dest channel
try {
await client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 1000,
});
} catch {
// May already be loaded
}
// Get the dest channel telegram ID
const destChannel = await db.telegramChannel.findUnique({
where: { id: pkg.destChannelId! },
select: { telegramId: true },
});
if (!destChannel) {
throw new Error("Destination channel not found in DB");
}
const chatId = Number(destChannel.telegramId);
const messageId = Number(pkg.destMessageId);
// Get the file_id from the destination message
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const message = await client.invoke({
_: "getMessage",
chat_id: chatId,
message_id: messageId,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
const doc = message?.content?.document;
if (!doc?.document?.id) {
throw new Error("Could not find document in destination message");
}
const fileId = String(doc.document.id);
const fileName = doc.file_name || pkg.fileName;
const archivePath = path.join(tempDir, fileName);
log.info(
{ requestId, fileName, fileId, chatId, messageId },
"Downloading archive for extraction"
);
await downloadFile(
client,
fileId,
archivePath,
pkg.fileSize,
fileName
);
// Extract the requested image using the existing CLI-based extractor.
// This pipes the file to stdout (no temp files needed for the extracted image).
const imageData = await extractPreviewImage(
archivePath,
pkg.archiveType as "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT",
request.filePath
);
if (!imageData) {
throw new Error(`Could not extract "${request.filePath}" from archive`);
}
// Cap at 5MB for safety
if (imageData.length > 5 * 1024 * 1024) {
throw new Error(`Extracted image is too large (${(imageData.length / 1024 / 1024).toFixed(1)}MB)`);
}
const contentType = getImageMimeType(request.filePath);
await db.archiveExtractRequest.update({
where: { id: requestId },
data: {
status: "COMPLETED",
imageData: new Uint8Array(imageData),
contentType,
},
});
log.info(
{ requestId, filePath: request.filePath, bytes: imageData.length },
"Image extracted successfully"
);
} finally {
await closeTdlibClient(client).catch(() => {});
}
});
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.error({ err, requestId }, "Extract request failed");
await db.archiveExtractRequest.update({
where: { id: requestId },
data: { status: "FAILED", error: errMsg },
}).catch(() => {});
} finally {
await rm(tempDir, { recursive: true, force: true }).catch(() => {});
}
}

View File

@@ -3,8 +3,11 @@ import { pool } from "./db/client.js";
import { childLogger } from "./util/logger.js";
import { withTdlibMutex } from "./util/mutex.js";
import { processFetchRequest } from "./worker.js";
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
import { processExtractRequest } from "./extract-listener.js";
import { rebuildPackageDatabase } from "./rebuild.js";
import { generateInviteLink, createSupergroup, searchPublicChat } from "./tdlib/chats.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { triggerImmediateCycle } from "./scheduler.js";
import {
getGlobalDestinationChannel,
getGlobalSetting,
@@ -12,11 +15,16 @@ import {
getActiveAccounts,
upsertChannel,
ensureAccountChannelLink,
updateFetchRequestStatus,
} from "./db/queries.js";
const log = childLogger("fetch-listener");
let pgClient: pg.PoolClient | null = null;
let stopped = false;
/** Delay (ms) before attempting to reconnect after a connection loss. */
const RECONNECT_DELAY_MS = 5_000;
/**
* Start listening for pg_notify signals from the web app.
@@ -25,12 +33,28 @@ let pgClient: pg.PoolClient | null = null;
* - `channel_fetch` — payload = requestId → fetch channels for an account
* - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle
* - `join_channel` — payload = JSON { requestId, input, accountId } → join/lookup channel by link/username
* - `rebuild_packages` — payload = requestId → rebuild package DB from destination channel
*
* If the underlying connection is lost, the listener automatically reconnects
* so that pg_notify signals are never silently dropped.
*/
export async function startFetchListener(): Promise<void> {
stopped = false;
await connectListener();
}
async function connectListener(): Promise<void> {
try {
pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite");
await pgClient.query("LISTEN create_destination");
await pgClient.query("LISTEN ingestion_trigger");
await pgClient.query("LISTEN join_channel");
await pgClient.query("LISTEN archive_extract");
await pgClient.query("LISTEN rebuild_packages");
pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) {
@@ -39,13 +63,57 @@ export async function startFetchListener(): Promise<void> {
handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload);
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger();
} else if (msg.channel === "join_channel" && msg.payload) {
handleJoinChannel(msg.payload);
} else if (msg.channel === "archive_extract" && msg.payload) {
handleArchiveExtract(msg.payload);
} else if (msg.channel === "rebuild_packages" && msg.payload) {
handleRebuildPackages(msg.payload);
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
// Reconnect automatically when the connection ends unexpectedly
pgClient.on("end", () => {
if (!stopped) {
log.warn("Fetch listener connection lost — reconnecting");
pgClient = null;
scheduleReconnect();
}
});
pgClient.on("error", (err) => {
log.error({ err }, "Fetch listener connection error");
if (!stopped && pgClient) {
try {
pgClient.release(true);
} catch (releaseErr) {
log.debug({ err: releaseErr }, "Failed to release pg client after error");
}
pgClient = null;
scheduleReconnect();
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger, join_channel, archive_extract, rebuild_packages)");
} catch (err) {
log.error({ err }, "Failed to start fetch listener — retrying");
scheduleReconnect();
}
}
function scheduleReconnect(): void {
if (stopped) return;
setTimeout(() => {
if (!stopped) {
connectListener();
}
}, RECONNECT_DELAY_MS);
}
export function stopFetchListener(): void {
stopped = true;
if (pgClient) {
pgClient.release();
pgClient = null;
@@ -138,12 +206,13 @@ function handleCreateDestination(payload: string): void {
const result = await createSupergroup(client, parsed.title);
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
// Upsert it as a DESTINATION channel in the DB
// Upsert it as a DESTINATION channel in the DB (active by default)
const channel = await upsertChannel({
telegramId: result.chatId,
title: result.title,
type: "DESTINATION",
isForum: false,
isActive: true,
});
// Set as global destination
@@ -204,3 +273,241 @@ function handleCreateDestination(payload: string): void {
}
});
}
// ── Join channel handler ──
/**
* Parse a Telegram link/username into its type and identifier.
*
* Supported formats:
* - @username or username → public chat search
* - https://t.me/username → public chat search
* - https://t.me/+INVITE_HASH → join by invite link
* - https://t.me/joinchat/INVITE_HASH → join by invite link (legacy)
*/
function parseTelegramInput(input: string): { type: "username"; username: string } | { type: "invite"; link: string } | null {
const trimmed = input.trim();
// Invite link patterns
const invitePatterns = [
/^https?:\/\/t\.me\/\+([a-zA-Z0-9_-]+)$/,
/^https?:\/\/t\.me\/joinchat\/([a-zA-Z0-9_-]+)$/,
/^https?:\/\/telegram\.me\/\+([a-zA-Z0-9_-]+)$/,
/^https?:\/\/telegram\.me\/joinchat\/([a-zA-Z0-9_-]+)$/,
];
for (const pattern of invitePatterns) {
if (pattern.test(trimmed)) {
return { type: "invite", link: trimmed };
}
}
// Public link: https://t.me/username
const publicLinkMatch = trimmed.match(/^https?:\/\/(?:t\.me|telegram\.me)\/([a-zA-Z][a-zA-Z0-9_]{3,31})$/);
if (publicLinkMatch) {
return { type: "username", username: publicLinkMatch[1] };
}
// @username or bare username
const usernameMatch = trimmed.match(/^@?([a-zA-Z][a-zA-Z0-9_]{3,31})$/);
if (usernameMatch) {
return { type: "username", username: usernameMatch[1] };
}
return null;
}
function handleJoinChannel(payload: string): void {
fetchQueue = fetchQueue.then(async () => {
let requestId: string | undefined;
try {
const parsed = JSON.parse(payload) as { requestId: string; input: string; accountId: string };
requestId = parsed.requestId;
await withTdlibMutex("join-channel", async () => {
await updateFetchRequestStatus(requestId!, "IN_PROGRESS");
const accounts = await getActiveAccounts();
const account = accounts.find((a) => a.id === parsed.accountId) ?? accounts[0];
if (!account) {
throw new Error("No authenticated accounts available");
}
const client = await createTdlibClient({ id: account.id, phone: account.phone });
try {
const linkInfo = parseTelegramInput(parsed.input);
if (!linkInfo) {
throw new Error(
"Invalid input. Use a t.me link (e.g. https://t.me/channel_name), " +
"an invite link (e.g. https://t.me/+abc123), or a @username."
);
}
let chatInfo: { chatId: bigint; title: string; type: string; isForum: boolean };
if (linkInfo.type === "username") {
// Public chat: search by username
const result = await searchPublicChat(client, linkInfo.username);
if (!result) {
throw new Error(`Public channel "@${linkInfo.username}" not found. Check the username and try again.`);
}
if (result.type !== "channel" && result.type !== "supergroup") {
throw new Error(`"@${linkInfo.username}" is a ${result.type}, not a channel or group. Only channels and supergroups are supported.`);
}
chatInfo = { chatId: result.chatId, title: result.title, type: result.type, isForum: result.isForum };
} else {
// Private/invite link: join first, then get chat info
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let joinResult: any;
try {
joinResult = await client.invoke({
_: "joinChatByInviteLink",
invite_link: linkInfo.link,
});
} catch (joinErr: unknown) {
const msg = joinErr instanceof Error ? joinErr.message : String(joinErr);
// "INVITE_REQUEST_SENT" means the chat requires admin approval
if (msg.includes("INVITE_REQUEST_SENT")) {
throw new Error("Join request sent. An admin of that channel must approve it before it can be added.");
}
// Already a member is fine
if (!msg.includes("USER_ALREADY_PARTICIPANT") && !msg.includes("INVITE_HASH_EXPIRED")) {
throw new Error(`Failed to join via invite link: ${msg}`);
}
// If already a participant, we need to get chat info from the link
// Try checkChatInviteLink to get the chat id
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const checkResult = (await client.invoke({
_: "checkChatInviteLink",
invite_link: linkInfo.link,
})) as any;
if (checkResult.chat_id) {
joinResult = { id: checkResult.chat_id };
} else {
throw joinErr;
}
} catch {
throw joinErr;
}
}
// Get full chat info
const chatId = joinResult?.id ?? joinResult?.chat_id;
if (!chatId) {
throw new Error("Joined channel but could not determine chat ID.");
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chat = (await client.invoke({ _: "getChat", chat_id: chatId })) as any;
let type: string = "other";
let isForum = false;
if (chat.type?._ === "chatTypeSupergroup") {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sg = (await client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as any;
type = sg.is_channel ? "channel" : "supergroup";
isForum = sg.is_forum ?? false;
} catch {
type = "supergroup";
}
} else if (chat.type?._ === "chatTypeBasicGroup") {
type = "group";
}
if (type !== "channel" && type !== "supergroup") {
throw new Error(`The joined chat is a ${type}, not a channel or group. Only channels and supergroups are supported.`);
}
chatInfo = { chatId: BigInt(chatId), title: chat.title ?? "Unknown", type, isForum };
}
// Upsert channel in DB (active as source by default since user explicitly added it)
const channel = await upsertChannel({
telegramId: chatInfo.chatId,
title: chatInfo.title,
type: "SOURCE",
isForum: chatInfo.isForum,
isActive: true,
});
// Link the account as READER
await ensureAccountChannelLink(account.id, channel.id, "READER");
log.info(
{ channelId: channel.id, telegramId: chatInfo.chatId.toString(), title: chatInfo.title },
"Channel joined and added"
);
await updateFetchRequestStatus(requestId!, "COMPLETED", {
resultJson: JSON.stringify({
channelId: channel.id,
telegramId: chatInfo.chatId.toString(),
title: chatInfo.title,
type: chatInfo.type,
isForum: chatInfo.isForum,
}),
});
} finally {
await closeTdlibClient(client);
}
});
} catch (err) {
log.error({ err, payload }, "Failed to join channel");
if (requestId) {
try {
await updateFetchRequestStatus(requestId, "FAILED", {
error: err instanceof Error ? err.message : String(err),
});
} catch {
// Best-effort
}
}
}
});
}
// ── Archive extract handler ──
function handleArchiveExtract(requestId: string): void {
fetchQueue = fetchQueue.then(async () => {
try {
log.info({ requestId }, "Archive extract request received");
await processExtractRequest(requestId);
} catch (err) {
log.error({ err, requestId }, "Failed to process archive extract request");
}
});
}
// ── Ingestion trigger handler ──
function handleIngestionTrigger(): void {
fetchQueue = fetchQueue.then(async () => {
try {
log.info("Ingestion trigger received from UI");
await triggerImmediateCycle();
} catch (err) {
log.error({ err }, "Failed to trigger immediate ingestion cycle");
}
});
}
// ── Package database rebuild handler ──
function handleRebuildPackages(requestId: string): void {
fetchQueue = fetchQueue.then(async () => {
try {
await withTdlibMutex("rebuild-packages", () =>
rebuildPackageDatabase(requestId)
);
} catch (err) {
log.error({ err, requestId }, "Failed to rebuild package database");
}
});
}

View File

@@ -3,6 +3,7 @@ import { config } from "./util/config.js";
import { logger } from "./util/logger.js";
import { markStaleRunsAsFailed } from "./db/queries.js";
import { cleanupTempDir } from "./worker.js";
import { recoverIncompleteUploads } from "./recovery.js";
import { startScheduler, stopScheduler } from "./scheduler.js";
import { startFetchListener, stopFetchListener } from "./fetch-listener.js";
import { db, pool } from "./db/client.js";
@@ -26,6 +27,10 @@ async function main(): Promise<void> {
await cleanupTempDir();
await markStaleRunsAsFailed();
// Verify destination messages exist for all "uploaded" packages.
// Resets any packages whose dest message is missing so they get re-processed.
await recoverIncompleteUploads();
// Start the fetch listener (pg_notify for on-demand channel fetching)
await startFetchListener();
@@ -36,11 +41,13 @@ async function main(): Promise<void> {
// Graceful shutdown
function shutdown(signal: string): void {
log.info({ signal }, "Shutdown signal received");
stopScheduler();
// Stop accepting new work
stopFetchListener();
// Close DB connections
Promise.all([db.$disconnect(), pool.end()])
// Wait for any active cycle to finish before closing DB
stopScheduler()
.then(() => Promise.all([db.$disconnect(), pool.end()]))
.then(() => {
log.info("Shutdown complete");
process.exit(0);

View File

@@ -0,0 +1,111 @@
import { execFile } from "child_process";
import { promisify } from "util";
import { childLogger } from "../util/logger.js";
import type { FileEntry } from "../archive/zip-reader.js";
const execFileAsync = promisify(execFile);
const log = childLogger("preview-extract");
/** Max bytes we'll accept for an extracted preview image (2MB). */
const MAX_PREVIEW_BYTES = 2 * 1024 * 1024;
/** Image extensions we consider valid previews, in priority order. */
const IMAGE_EXTENSIONS = new Set(["jpg", "jpeg", "png"]);
/**
* Pick the best preview image from the file entries list.
*
* Prefers files that look like dedicated preview images (01.jpg, insta.jpg,
* preview.jpg) over arbitrary images buried in subdirectories.
* Skips images that are suspiciously large (>2MB uncompressed).
*/
export function pickPreviewFile(entries: FileEntry[]): FileEntry | null {
const candidates = entries.filter((e) => {
if (!e.extension || !IMAGE_EXTENSIONS.has(e.extension.toLowerCase())) return false;
// Skip very large images — they're probably textures, not previews
if (e.uncompressedSize > BigInt(MAX_PREVIEW_BYTES)) return false;
return true;
});
if (candidates.length === 0) return null;
// Score candidates: lower depth + preview-like names win
const scored = candidates.map((entry) => {
const depth = entry.path.split("/").length - 1;
const nameLower = entry.fileName.toLowerCase();
let nameScore = 10; // default
// Known preview-like names get priority
if (/^(preview|thumb|cover|insta)\b/i.test(nameLower)) {
nameScore = 0;
} else if (/^0*[1-2]\.(jpe?g|png)$/i.test(nameLower)) {
// 01.jpg, 1.jpg, 02.jpg — common preview filenames
nameScore = 1;
} else if (/^0*[3-9]\.(jpe?g|png)$/i.test(nameLower)) {
nameScore = 2;
}
return { entry, score: nameScore + depth };
});
scored.sort((a, b) => a.score - b.score);
return scored[0].entry;
}
/**
* Extract a single file from an archive and return its contents as a Buffer.
*
* Uses the appropriate CLI tool based on archive type:
* - ZIP: unzip -p
* - RAR: unrar p -inul
* - 7Z: 7z e -so
*/
export async function extractPreviewImage(
archivePath: string,
archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT",
filePath: string
): Promise<Buffer | null> {
if (archiveType === "DOCUMENT") return null;
try {
let stdout: Buffer;
if (archiveType === "ZIP") {
const result = await execFileAsync("unzip", ["-p", archivePath, filePath], {
timeout: 15000,
maxBuffer: MAX_PREVIEW_BYTES,
encoding: "buffer",
});
stdout = result.stdout as unknown as Buffer;
} else if (archiveType === "RAR") {
const result = await execFileAsync("unrar", ["p", "-inul", archivePath, filePath], {
timeout: 15000,
maxBuffer: MAX_PREVIEW_BYTES,
encoding: "buffer",
});
stdout = result.stdout as unknown as Buffer;
} else {
// SEVEN_Z
const result = await execFileAsync("7z", ["e", "-so", archivePath, filePath], {
timeout: 15000,
maxBuffer: MAX_PREVIEW_BYTES,
encoding: "buffer",
});
stdout = result.stdout as unknown as Buffer;
}
if (stdout.length === 0) {
log.warn({ archivePath, filePath }, "Extracted preview image is empty");
return null;
}
log.debug(
{ archivePath, filePath, bytes: stdout.length },
"Extracted preview image from archive"
);
return stdout;
} catch (err) {
log.warn({ err, archivePath, filePath }, "Failed to extract preview image from archive");
return null;
}
}

411
worker/src/rebuild.ts Normal file
View File

@@ -0,0 +1,411 @@
import type { Client } from "tdl";
import { config } from "./util/config.js";
import { childLogger } from "./util/logger.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { invokeWithTimeout, MAX_SCAN_PAGES } from "./tdlib/download.js";
import { isArchiveAttachment } from "./archive/detect.js";
import { extractCreatorFromFileName } from "./archive/creator.js";
import { groupArchiveSets } from "./archive/multipart.js";
import type { TelegramMessage } from "./archive/multipart.js";
import {
getActiveAccounts,
getGlobalDestinationChannel,
} from "./db/queries.js";
import { db } from "./db/client.js";
const log = childLogger("rebuild");
export interface RebuildProgress {
status: "PENDING" | "IN_PROGRESS" | "COMPLETED" | "FAILED";
messagesScanned: number;
documentsFound: number;
packagesCreated: number;
packagesSkipped: number;
error?: string;
}
/**
* Scan the destination channel for uploaded archive files and rebuild
* the package database from what's actually there.
*
* Uses searchChatMessages (not getChatHistory) because the destination
* channel may be a hidden-history supergroup.
*
* For each document found:
* 1. Check if a Package record with that destMessageId already exists -> skip
* 2. Try to match by fileName to an existing package without destMessageId -> update it
* 3. Otherwise create a minimal Package record (no file listing, no content hash)
*
* This is a "best-effort" rebuild. It restores the mapping between destination
* messages and package records so that the bot can deliver files. It does NOT
* re-download archives or rebuild file listings (those require the source channel).
*/
export async function rebuildPackageDatabase(
requestId: string
): Promise<void> {
log.info({ requestId }, "Starting package database rebuild");
try {
await db.channelFetchRequest.update({
where: { id: requestId },
data: { status: "IN_PROGRESS" },
});
// Get an authenticated account for TDLib
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
throw new Error("No authenticated accounts available");
}
const destChannel = await getGlobalDestinationChannel();
if (!destChannel) {
throw new Error("No destination channel configured");
}
const account = accounts[0];
const client = await createTdlibClient({
id: account.id,
phone: account.phone,
});
try {
const progress: RebuildProgress = {
status: "IN_PROGRESS",
messagesScanned: 0,
documentsFound: 0,
packagesCreated: 0,
packagesSkipped: 0,
};
// Write initial progress
await updateRebuildProgress(requestId, progress);
// Scan the destination channel for all document messages
const archiveMessages = await scanDestinationChannel(
client,
destChannel.telegramId,
async (scanned) => {
progress.messagesScanned = scanned;
await updateRebuildProgress(requestId, progress);
}
);
progress.documentsFound = archiveMessages.length;
await updateRebuildProgress(requestId, progress);
log.info(
{
messagesScanned: progress.messagesScanned,
documentsFound: archiveMessages.length,
},
"Destination channel scan complete"
);
// Group into archive sets (handles multipart)
const archiveSets = groupArchiveSets(archiveMessages);
log.info(
{ archiveSets: archiveSets.length, totalMessages: archiveMessages.length },
"Grouped into archive sets"
);
// Get ALL source channels so we can try to match
const sourceChannels = await db.telegramChannel.findMany({
where: { type: "SOURCE" },
select: { id: true, title: true },
});
// Use the first source channel as a fallback for unmatched packages
const fallbackSourceId = sourceChannels[0]?.id ?? null;
// Process each archive set
for (const archiveSet of archiveSets) {
const firstPart = archiveSet.parts[0];
const fileName = firstPart.fileName;
const destMessageId = firstPart.id;
const totalSize = archiveSet.parts.reduce(
(sum, p) => sum + p.fileSize,
0n
);
// 1. Check if a package with this destMessageId already exists
const existingByDest = await db.package.findFirst({
where: {
destChannelId: destChannel.id,
destMessageId,
},
select: { id: true },
});
if (existingByDest) {
progress.packagesSkipped++;
await updateRebuildProgress(requestId, progress);
continue;
}
// 2. Try to match by fileName to an existing package without destMessageId
const existingByName = await db.package.findFirst({
where: {
fileName,
destMessageId: null,
},
select: { id: true },
});
if (existingByName) {
// Update existing record with destination info
await db.package.update({
where: { id: existingByName.id },
data: {
destChannelId: destChannel.id,
destMessageId,
isMultipart: archiveSet.parts.length > 1,
partCount: archiveSet.parts.length,
},
});
progress.packagesCreated++;
log.debug({ fileName, destMessageId: Number(destMessageId) }, "Updated existing package with dest info");
await updateRebuildProgress(requestId, progress);
continue;
}
// 3. Create a new minimal Package record
// We don't have the source message or content hash, so generate a placeholder hash
const placeholderHash = `rebuild:${destChannel.id}:${destMessageId}`;
const creator = extractCreatorFromFileName(fileName) ?? null;
const archiveType = archiveSet.type;
// We need a sourceChannelId (required FK). Use fallback if available.
if (!fallbackSourceId) {
log.warn(
{ fileName },
"No source channels exist — cannot create package record without a source channel"
);
progress.packagesSkipped++;
await updateRebuildProgress(requestId, progress);
continue;
}
try {
await db.package.create({
data: {
contentHash: placeholderHash,
fileName,
fileSize: totalSize,
archiveType,
sourceChannelId: fallbackSourceId,
sourceMessageId: 0n, // Unknown — rebuilt from destination
destChannelId: destChannel.id,
destMessageId,
isMultipart: archiveSet.parts.length > 1,
partCount: archiveSet.parts.length,
fileCount: 0,
creator,
},
});
progress.packagesCreated++;
log.debug(
{ fileName, destMessageId: Number(destMessageId), creator },
"Created new package from destination"
);
} catch (err) {
// Unique constraint on contentHash — might be a race or duplicate
if (err instanceof Error && err.message.includes("Unique constraint")) {
log.debug({ fileName, placeholderHash }, "Package already exists (hash conflict), skipping");
progress.packagesSkipped++;
} else {
throw err;
}
}
await updateRebuildProgress(requestId, progress);
}
// Done
progress.status = "COMPLETED";
await updateRebuildProgress(requestId, progress);
await db.channelFetchRequest.update({
where: { id: requestId },
data: {
status: "COMPLETED",
resultJson: JSON.stringify(progress),
},
});
log.info(
{
messagesScanned: progress.messagesScanned,
documentsFound: progress.documentsFound,
packagesCreated: progress.packagesCreated,
packagesSkipped: progress.packagesSkipped,
},
"Package database rebuild complete"
);
} finally {
await closeTdlibClient(client);
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
log.error({ err, requestId }, "Package database rebuild failed");
await db.channelFetchRequest.update({
where: { id: requestId },
data: {
status: "FAILED",
error: message,
resultJson: JSON.stringify({
status: "FAILED",
error: message,
}),
},
});
}
}
/**
* Scan the destination channel for document messages using searchChatMessages.
* Returns archive messages in chronological order (oldest first).
*/
async function scanDestinationChannel(
client: Client,
chatId: bigint,
onProgress?: (messagesScanned: number) => Promise<void>
): Promise<TelegramMessage[]> {
const archives: TelegramMessage[] = [];
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
let lastProgressUpdate = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for destination scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
const result = await invokeWithTimeout<{
messages?: {
id: number;
date: number;
content: {
_: string;
document?: {
file_name?: string;
document?: {
id: number;
size: number;
};
};
};
}[];
}>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
from_message_id: currentFromId,
offset: 0,
limit: 100,
filter: { _: "searchMessagesFilterDocument" },
sender_id: null,
message_thread_id: 0,
saved_messages_topic_id: 0,
});
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
fileId: String(doc.document.id),
fileSize: BigInt(doc.document.size),
date: new Date(msg.date * 1000),
});
}
}
// Throttle progress updates to every 2 seconds
const now = Date.now();
if (onProgress && now - lastProgressUpdate >= 2000) {
lastProgressUpdate = now;
await onProgress(totalScanned);
}
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck, breaking"
);
break;
}
if (result.messages.length < 100) break;
await sleep(config.apiDelayMs);
}
// Final progress update
if (onProgress) {
await onProgress(totalScanned);
}
log.info(
{
chatId: chatId.toString(),
archives: archives.length,
totalScanned,
pages: pageCount,
},
"Destination channel scan complete"
);
// Reverse to chronological order (oldest first)
return archives.reverse();
}
/**
* Update the rebuild progress in the fetch request's resultJson field.
* Throttled to avoid excessive DB writes.
*/
let lastUpdateTime = 0;
async function updateRebuildProgress(
requestId: string,
progress: RebuildProgress
): Promise<void> {
const now = Date.now();
// Throttle to every 2 seconds, but always write for status changes
if (
progress.status !== "IN_PROGRESS" ||
now - lastUpdateTime >= 2000
) {
lastUpdateTime = now;
try {
await db.channelFetchRequest.update({
where: { id: requestId },
data: {
resultJson: JSON.stringify(progress),
},
});
} catch {
// Best-effort
}
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

187
worker/src/recovery.ts Normal file
View File

@@ -0,0 +1,187 @@
import { childLogger } from "./util/logger.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { withFloodWait } from "./util/retry.js";
import {
getActiveAccounts,
getPackagesWithDestMessage,
resetPackageDestination,
getGlobalDestinationChannel,
} from "./db/queries.js";
import type { Client } from "tdl";
const log = childLogger("recovery");
/**
* Verify that destination messages still exist in Telegram for all
* packages that claim to be uploaded. If a message is missing (deleted
* or never actually committed), reset the package so the next ingestion
* run will re-download and re-upload it.
*
* This handles the case where the worker crashed mid-upload: TDLib may
* have returned a temporary message ID that was stored as destMessageId
* but the upload never completed server-side, or the message was later
* deleted from the destination channel.
*
* Called once on worker startup, before the scheduler begins.
*/
export async function recoverIncompleteUploads(): Promise<void> {
const packages = await getPackagesWithDestMessage();
if (packages.length === 0) {
log.debug("No packages with destination messages to verify");
return;
}
// We need a TDLib client to verify messages. Use the first active account.
const accounts = await getActiveAccounts();
if (accounts.length === 0) {
log.info("No active accounts available for upload verification, skipping recovery");
return;
}
const destChannel = await getGlobalDestinationChannel();
if (!destChannel) {
log.info("No destination channel configured, skipping recovery");
return;
}
// Group packages by destChannelId for efficient verification
const byChannel = new Map<string, typeof packages>();
for (const pkg of packages) {
const channelId = pkg.destChannelId!;
if (!byChannel.has(channelId)) {
byChannel.set(channelId, []);
}
byChannel.get(channelId)!.push(pkg);
}
log.info(
{ totalPackages: packages.length, channels: byChannel.size },
"Verifying destination messages exist in Telegram"
);
const account = accounts[0];
let client: Client | undefined;
try {
client = await createTdlibClient({ id: account.id, phone: account.phone });
// Load the chat list so TDLib can resolve chat IDs
try {
await client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 1000,
});
} catch {
// May already be loaded
}
let resetCount = 0;
let verifiedCount = 0;
for (const [, channelPackages] of byChannel) {
for (const pkg of channelPackages) {
const exists = await verifyMessageExists(
client,
destChannel.telegramId,
pkg.destMessageId!
);
if (exists) {
verifiedCount++;
} else {
log.warn(
{
packageId: pkg.id,
fileName: pkg.fileName,
destMessageId: Number(pkg.destMessageId),
},
"Destination message missing in Telegram, resetting package for re-upload"
);
await resetPackageDestination(pkg.id);
resetCount++;
}
}
}
if (resetCount > 0) {
log.info(
{ resetCount, verifiedCount, totalChecked: packages.length },
"Upload recovery complete — packages reset for re-processing"
);
} else {
log.info(
{ verifiedCount, totalChecked: packages.length },
"Upload recovery complete — all destination messages verified"
);
}
} catch (err) {
log.error({ err }, "Upload recovery failed (non-fatal, will retry next startup)");
} finally {
if (client) {
await closeTdlibClient(client);
}
}
}
/**
* Check whether a message exists in a Telegram chat.
* Returns false if the message was deleted or never existed.
*/
async function verifyMessageExists(
client: Client,
chatTelegramId: bigint,
messageId: bigint
): Promise<boolean> {
try {
const result = await withFloodWait(
() =>
client.invoke({
_: "getMessage",
chat_id: Number(chatTelegramId),
message_id: Number(messageId),
}),
"getMessage:verify"
);
// TDLib returns the message object if it exists.
// A deleted message may return with content type "messageChatDeleteMessage"
// or the call may throw. Check that we got a real message with content.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const msg = result as any;
if (!msg || !msg.content) {
return false;
}
// Check that the message has document content (our uploads are documents)
// A message that exists but has no document content was likely cleared/replaced
if (msg.content._ !== "messageDocument") {
log.debug(
{
messageId: Number(messageId),
contentType: msg.content._,
},
"Destination message exists but is not a document"
);
return false;
}
return true;
} catch (err) {
// TDLib throws "Message not found" (error code 404) for deleted messages
const message = err instanceof Error ? err.message : String(err);
const code = (err as { code?: number })?.code;
if (code === 404 || message.includes("not found") || message.includes("Not Found")) {
return false;
}
// For other errors (network issues, etc.), assume the message exists
// to avoid incorrectly resetting packages due to transient failures
log.warn(
{ err, messageId: Number(messageId) },
"Could not verify message (assuming it exists)"
);
return true;
}
}

View File

@@ -9,6 +9,14 @@ const log = childLogger("scheduler");
let running = false;
let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0;
let activeCyclePromise: Promise<void> | null = null;
/**
* Maximum time for a single ingestion cycle (ms).
* After this, new accounts won't be started (in-progress work finishes).
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
*/
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
/**
* Run one ingestion cycle:
@@ -17,6 +25,10 @@ let cycleCount = 0;
*
* All TDLib operations are wrapped in the mutex to ensure only one client
* runs at a time (also shared with the fetch listener for on-demand requests).
*
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress
* account processing is allowed to finish its current archive set.
*/
async function runCycle(): Promise<void> {
if (running) {
@@ -26,7 +38,8 @@ async function runCycle(): Promise<void> {
running = true;
cycleCount++;
log.info({ cycle: cycleCount }, "Starting ingestion cycle");
const cycleStart = Date.now();
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
try {
// ── Phase 1: Authenticate pending accounts ──
@@ -37,6 +50,10 @@ async function runCycle(): Promise<void> {
"Found pending accounts, starting authentication"
);
for (const account of pendingAccounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn("Cycle timeout reached during authentication phase, stopping");
break;
}
await withTdlibMutex(`auth:${account.phone}`, () =>
authenticateAccount(account)
);
@@ -54,12 +71,22 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn(
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
"Cycle timeout reached, skipping remaining accounts"
);
break;
}
await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account)
);
}
log.info("Ingestion cycle complete");
log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
"Ingestion cycle complete"
);
} catch (err) {
log.error({ err }, "Ingestion cycle failed");
} finally {
@@ -81,7 +108,9 @@ function scheduleNext(): void {
);
timer = setTimeout(async () => {
await runCycle();
activeCyclePromise = runCycle();
await activeCyclePromise;
activeCyclePromise = null;
scheduleNext();
}, delay);
}
@@ -99,19 +128,44 @@ export async function startScheduler(): Promise<void> {
);
// Run immediately on start
await runCycle();
activeCyclePromise = runCycle();
await activeCyclePromise;
activeCyclePromise = null;
// Then schedule recurring cycles
scheduleNext();
}
/**
* Stop the scheduler gracefully.
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
* If a cycle is already running, this is a no-op.
*/
export function stopScheduler(): void {
export async function triggerImmediateCycle(): Promise<void> {
if (running) {
log.info("Cycle already running, ignoring trigger");
return;
}
log.info("Immediate cycle triggered via UI");
await runCycle();
}
/**
* Stop the scheduler gracefully.
* Returns a promise that resolves when any active cycle finishes,
* so callers can wait before closing DB connections.
*/
export function stopScheduler(): Promise<void> {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (activeCyclePromise) {
log.info("Scheduler stopping — waiting for active cycle to finish");
return activeCyclePromise.finally(() => {
activeCyclePromise = null;
log.info("Scheduler stopped");
});
}
log.info("Scheduler stopped");
return Promise.resolve();
}

View File

@@ -1,6 +1,7 @@
import type { Client } from "tdl";
import { childLogger } from "../util/logger.js";
import { config } from "../util/config.js";
import { withFloodWait } from "../util/retry.js";
const log = childLogger("chats");
@@ -29,11 +30,14 @@ export async function getAccountChats(
while (hasMore) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
const result = (await withFloodWait(
() => client.invoke({
_: "getChats",
chat_list: { _: "chatListMain" },
limit: 100,
})) as { chat_ids: number[] };
}),
"getChats"
)) as { chat_ids: number[] };
if (!result.chat_ids || result.chat_ids.length === 0) {
break;
@@ -42,10 +46,13 @@ export async function getAccountChats(
for (const chatId of result.chat_ids) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chat = (await client.invoke({
const chat = (await withFloodWait(
() => client.invoke({
_: "getChat",
chat_id: chatId,
})) as any;
}),
"getChat"
)) as any;
const chatType = chat.type?._;
let type: TelegramChatInfo["type"] = "other";
@@ -55,10 +62,13 @@ export async function getAccountChats(
// Get supergroup details to check if it's a channel or group
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sg = (await client.invoke({
const sg = (await withFloodWait(
() => client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as any;
}),
"getSupergroup"
)) as any;
type = sg.is_channel ? "channel" : "supergroup";
isForum = sg.is_forum ?? false;
@@ -109,12 +119,15 @@ export async function generateInviteLink(
chatId: bigint
): Promise<string> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
const result = (await withFloodWait(
() => client.invoke({
_: "createChatInviteLink",
chat_id: Number(chatId),
name: "DragonsStash Auto-Join",
creates_join_request: false,
})) as any;
}),
"createChatInviteLink"
)) as any;
const link = result.invite_link as string;
log.info({ chatId: chatId.toString(), link }, "Generated invite link");
@@ -130,13 +143,16 @@ export async function createSupergroup(
title: string
): Promise<{ chatId: bigint; title: string }> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
const result = (await withFloodWait(
() => client.invoke({
_: "createNewSupergroupChat",
title,
is_forum: false,
is_channel: false,
description: "DragonsStash archive destination — all accounts write here",
})) as any;
}),
"createNewSupergroupChat"
)) as any;
const chatId = BigInt(result.id);
log.info({ chatId: chatId.toString(), title }, "Created new supergroup");
@@ -150,13 +166,73 @@ export async function joinChatByInviteLink(
client: Client,
inviteLink: string
): Promise<void> {
await client.invoke({
await withFloodWait(
() => client.invoke({
_: "joinChatByInviteLink",
invite_link: inviteLink,
});
}),
"joinChatByInviteLink"
);
log.info({ inviteLink }, "Joined chat by invite link");
}
/**
* Search for a public chat by username.
* Returns the chat info if found, or null if not found.
*/
export async function searchPublicChat(
client: Client,
username: string
): Promise<TelegramChatInfo | null> {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const chat = (await withFloodWait(
() => client.invoke({
_: "searchPublicChat",
username,
}),
"searchPublicChat"
)) as any;
const chatType = chat.type?._;
let type: TelegramChatInfo["type"] = "other";
let isForum = false;
if (chatType === "chatTypeSupergroup") {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const sg = (await withFloodWait(
() => client.invoke({
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
}),
"getSupergroup"
)) as any;
type = sg.is_channel ? "channel" : "supergroup";
isForum = sg.is_forum ?? false;
} catch {
type = "supergroup";
}
} else if (chatType === "chatTypeBasicGroup") {
type = "group";
} else if (chatType === "chatTypePrivate" || chatType === "chatTypeSecret") {
type = "private";
}
log.info({ username, chatId: chat.id, type }, "Found public chat");
return {
chatId: BigInt(chat.id),
title: chat.title ?? username,
type,
isForum,
};
} catch (err) {
log.warn({ username, err }, "Public chat not found");
return null;
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -2,12 +2,19 @@ import type { Client } from "tdl";
import { readFile, rename, copyFile, unlink, stat } from "fs/promises";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
const log = childLogger("download");
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
export const MAX_SCAN_PAGES = 5000;
/** Timeout for a single TDLib API call (ms) */
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
interface TdPhotoSize {
type: string;
photo: {
@@ -66,6 +73,59 @@ interface TdFile {
export interface ChannelScanResult {
archives: TelegramMessage[];
photos: TelegramPhoto[];
totalScanned: number;
}
export type ScanProgressCallback = (messagesScanned: number) => void;
/**
* Invoke a TDLib method with a timeout to prevent indefinite hangs,
* and automatic retry on FLOOD_WAIT rate-limit errors.
*
* If TDLib does not respond within the timeout, the promise rejects.
* If Telegram returns a rate limit error, sleeps for the required
* duration and retries (up to maxRetries times).
*/
export async function invokeWithTimeout<T>(
client: Client,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: Record<string, any>,
timeoutMs = INVOKE_TIMEOUT_MS
): Promise<T> {
return withFloodWait(
() =>
new Promise<T>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(
new Error(
`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`
)
);
}
}, timeoutMs);
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
}),
`TDLib:${request._}`
);
}
/**
@@ -77,36 +137,74 @@ export interface ChannelScanResult {
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/
export async function getChannelMessages(
client: Client,
chatId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
// Open the chat so TDLib can access it
try {
await invokeWithTimeout(client, { _: "openChat", chat_id: Number(chatId) });
} catch {
// Ignore — may already be open
}
let totalScanned = 0;
let pageCount = 0;
// Use searchChatMessages with document filter — this works even when
// getChatHistory is restricted (e.g. hidden history for new members).
// We search for documents first, then photos separately.
for (const filter of [
{ _: "searchMessagesFilterDocument" as const, kind: "document" },
{ _: "searchMessagesFilterPhoto" as const, kind: "photo" },
]) {
let fromMessageId = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const result = (await client.invoke({
_: "getChatHistory",
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const result = await invokeWithTimeout<{ messages: TdMessage[]; total_count?: number }>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
from_message_id: currentFromId,
query: "",
from_message_id: fromMessageId,
offset: 0,
limit: Math.min(limit, 100),
only_local: false,
})) as { messages: TdMessage[] };
filter,
message_thread_id: 0,
});
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
if (doc?.file_name && doc.document && isArchiveAttachment(doc.file_name)) {
// Skip if we've already processed past this message
if (boundary && msg.id <= boundary) continue;
archives.push({
id: BigInt(msg.id),
fileName: doc.file_name,
@@ -121,6 +219,7 @@ export async function getChannelMessages(
const photo = msg.content?.photo;
const caption = msg.content?.caption?.text ?? "";
if (photo?.sizes && photo.sizes.length > 0) {
if (boundary && msg.id <= boundary) continue;
const smallest = photo.sizes[0];
photos.push({
id: BigInt(msg.id),
@@ -132,19 +231,24 @@ export async function getChannelMessages(
}
}
currentFromId = result.messages[result.messages.length - 1].id;
onProgress?.(totalScanned);
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
// Advance pagination
fromMessageId = result.messages[result.messages.length - 1].id;
if (result.messages.length < Math.min(limit, 100)) break;
if (result.messages.length < 100) break;
// Rate limit delay
await sleep(config.apiDelayMs);
}
}
// Close the chat after scanning
await invokeWithTimeout(client, {
_: "closeChat",
chat_id: Number(chatId),
}).catch(() => {});
log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete"
);
@@ -152,6 +256,7 @@ export async function getChannelMessages(
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}
@@ -334,15 +439,20 @@ export async function downloadFile(
client.on("update", handleUpdate);
// Start async download (non-blocking — progress via updateFile events)
client
.invoke({
// Wrapped in withFloodWait: if the initial invoke is rate-limited,
// it will sleep and retry before the download event loop begins.
withFloodWait(
() =>
client.invoke({
_: "downloadFile",
file_id: numericId,
priority: 32,
offset: 0,
limit: 0,
synchronous: false,
})
}),
`downloadFile:${fileName}`
)
.then((result: unknown) => {
// If the file was already cached locally, invoke returns immediately
const file = result as TdFile | undefined;

View File

@@ -4,7 +4,8 @@ import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js";
import type { ChannelScanResult } from "./download.js";
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
const log = childLogger("topics");
@@ -21,16 +22,16 @@ export async function isChatForum(
chatId: bigint
): Promise<boolean> {
try {
const chat = (await client.invoke({
_: "getChat",
chat_id: Number(chatId),
})) as {
const chat = await invokeWithTimeout<{
type?: {
_: string;
supergroup_id?: number;
is_forum?: boolean;
};
};
}>(client, {
_: "getChat",
chat_id: Number(chatId),
});
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
return true;
@@ -38,10 +39,10 @@ export async function isChatForum(
// Also check via getSupergroup for older TDLib versions
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
const sg = (await client.invoke({
const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
_: "getSupergroup",
supergroup_id: chat.type.supergroup_id,
})) as { is_forum?: boolean };
});
return sg.is_forum === true;
}
@@ -54,6 +55,7 @@ export async function isChatForum(
/**
* Get all forum topics in a supergroup.
* Includes stuck detection and timeout protection on API calls.
*/
export async function getForumTopicList(
client: Client,
@@ -63,18 +65,24 @@ export async function getForumTopicList(
let offsetDate = 0;
let offsetMessageId = 0;
let offsetMessageThreadId = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
const result = (await client.invoke({
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
})) as {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, topicCount: topics.length },
"Hit max page limit for topic enumeration, stopping"
);
break;
}
pageCount++;
const prevOffsetDate = offsetDate;
const prevOffsetMessageId = offsetMessageId;
const prevOffsetMessageThreadId = offsetMessageThreadId;
const result = await invokeWithTimeout<{
topics?: {
info?: {
message_thread_id?: number;
@@ -85,18 +93,24 @@ export async function getForumTopicList(
next_offset_date?: number;
next_offset_message_id?: number;
next_offset_message_thread_id?: number;
};
}>(client, {
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
});
if (!result.topics || result.topics.length === 0) break;
for (const t of result.topics) {
if (!t.info?.message_thread_id) continue;
// Skip the "General" topic — it's not creator-specific
if (t.info.is_general) continue;
topics.push({
topicId: BigInt(t.info.message_thread_id),
name: t.info.name ?? "Unnamed",
name: t.info.is_general ? "General" : (t.info.name ?? "Unnamed"),
});
}
@@ -113,6 +127,19 @@ export async function getForumTopicList(
offsetMessageId = result.next_offset_message_id ?? 0;
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
// Stuck detection: if offsets didn't advance, break
if (
offsetDate === prevOffsetDate &&
offsetMessageId === prevOffsetMessageId &&
offsetMessageThreadId === prevOffsetMessageThreadId
) {
log.warn(
{ chatId: chatId.toString(), topicCount: topics.length },
"Topic pagination stuck (offsets not advancing), breaking"
);
break;
}
await sleep(config.apiDelayMs);
}
@@ -134,35 +161,43 @@ export async function getForumTopicList(
* When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/
export async function getTopicMessages(
client: Client,
chatId: bigint,
topicId: bigint,
lastProcessedMessageId?: bigint | null,
limit = 100
limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
})) as {
const result = await invokeWithTimeout<{
messages?: {
id: number;
date: number;
@@ -186,10 +221,23 @@ export async function getTopicMessages(
caption?: { text?: string };
};
}[];
};
}>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) {
// Check for archive documents
const doc = msg.content?.document;
@@ -219,18 +267,30 @@ export async function getTopicMessages(
}
}
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break;
if (result.messages.length < Math.min(limit, 100)) break;
await sleep(config.apiDelayMs);
}
log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete"
);
@@ -238,6 +298,7 @@ export async function getTopicMessages(
return {
archives: archives.reverse(),
photos: photos.reverse(),
totalScanned,
};
}

View File

@@ -3,6 +3,7 @@ import { stat } from "fs/promises";
import type { Client } from "tdl";
import { config } from "../util/config.js";
import { childLogger } from "../util/logger.js";
import { withFloodWait } from "../util/retry.js";
const log = childLogger("upload");
@@ -75,6 +76,10 @@ export async function uploadToChannel(
/**
* Send a single file message and wait for Telegram to confirm the upload.
* Returns the final server-assigned message ID.
*
* IMPORTANT: The update listener is attached BEFORE sending the message to
* avoid a race where fast uploads (cached files) complete before the listener
* is registered, which would cause the promise to hang forever.
*/
async function sendAndWaitForUpload(
client: Client,
@@ -84,36 +89,10 @@ async function sendAndWaitForUpload(
fileName: string,
fileSizeMB: number
): Promise<bigint> {
// Send the message — this returns a temporary message immediately
const tempMsg = (await client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
})) as { id: number };
const tempMsgId = tempMsg.id;
log.debug(
{ fileName, tempMsgId },
"Message queued, waiting for upload confirmation"
);
// Wait for the actual upload to complete
return new Promise<bigint>((resolve, reject) => {
let settled = false;
let lastLoggedPercent = 0;
let tempMsgId: number | null = null;
// Timeout: 10 minutes per GB, minimum 10 minutes
const timeoutMs = Math.max(
@@ -156,7 +135,7 @@ async function sendAndWaitForUpload(
if (update?._ === "updateMessageSendSucceeded") {
const msg = update.message;
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (tempMsgId !== null && oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
@@ -173,7 +152,7 @@ async function sendAndWaitForUpload(
// Upload failed
if (update?._ === "updateMessageSendFailed") {
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (tempMsgId !== null && oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
@@ -189,7 +168,47 @@ async function sendAndWaitForUpload(
client.off("update", handleUpdate);
};
// Attach listener BEFORE sending to avoid missing fast completions
client.on("update", handleUpdate);
// Send the message — this returns a temporary message immediately.
// Wrapped in withFloodWait to handle Telegram rate limits on upload.
withFloodWait(
() =>
client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
}),
"sendMessage:upload"
)
.then((result) => {
const tempMsg = result as { id: number };
tempMsgId = tempMsg.id;
log.debug(
{ fileName, tempMsgId },
"Message queued, waiting for upload confirmation"
);
})
.catch((err) => {
if (!settled) {
settled = true;
cleanup();
reject(err);
}
});
});
}

View File

@@ -4,12 +4,21 @@ const log = childLogger("mutex");
let locked = false;
let holder = "";
const queue: Array<{ resolve: () => void; label: string }> = [];
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
/**
* Ensures only one TDLib client runs at a time across the entire worker process.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
* before creating any TDLib client.
*
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
*/
export async function withTdlibMutex<T>(
label: string,
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
): Promise<T> {
if (locked) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
await new Promise<void>((resolve) => queue.push({ resolve, label }));
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = queue.indexOf(entry);
if (idx !== -1) {
queue.splice(idx, 1);
reject(new Error(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
`(waiting: ${label}, holder: ${holder})`
));
}
}, MUTEX_WAIT_TIMEOUT_MS);
const entry = {
resolve: () => {
clearTimeout(timer);
resolve();
},
reject,
label,
};
queue.push(entry);
});
}
locked = true;

Some files were not shown because too many files have changed in this diff Show More