28 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
651e9e6bdd Simplify redundant conditional in handleFetchChannels
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 22:05:50 +00:00
copilot-swe-agent[bot]
8d508d5a86 Fix channels not active after selection and add Fetch Channels button to Channels tab
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 22:04:28 +00:00
copilot-swe-agent[bot]
2bb3caf7d9 Initial plan 2026-03-05 21:57:43 +00:00
xCyanGrizzly
8d95752106 Merge pull request #14 from xCyanGrizzly/copilot/fix-telegram-worker-issue
Fix APP_PORT: align container listen port, port mapping, and healthcheck
2026-03-05 21:40:23 +01:00
copilot-swe-agent[bot]
22419106c1 Fix APP_PORT: make container listen port and healthcheck follow APP_PORT
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:39:26 +00:00
copilot-swe-agent[bot]
e45de85c69 Add Rescan Channel option to channels tab
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:34:53 +00:00
copilot-swe-agent[bot]
71a2e6a5e8 Fix Telegram worker: countdown timer, orphaned runs, fetch-listener reconnection, and logging
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 20:21:18 +00:00
copilot-swe-agent[bot]
1436b630e2 Initial plan 2026-03-05 20:05:41 +00:00
xCyanGrizzly
43af23d3be Merge pull request #13 from xCyanGrizzly/copilot/fix-docker-sync-issues
Fix worker getting stuck during channel message sync
2026-03-05 15:08:39 +01:00
copilot-swe-agent[bot]
49b82a352b Fix review issues: race condition in invokeWithTimeout and mutex queue entry
- Add settled flag to invokeWithTimeout to prevent double-settling
- Create mutex queue entry with wrapped resolve before pushing to queue

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:17:56 +00:00
copilot-swe-agent[bot]
2e242912af Remove worker/dist build artifacts from git, add to .gitignore
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:15:17 +00:00
copilot-swe-agent[bot]
9adbdb2a77 Fix worker getting stuck during sync: add timeouts, stuck detection, and safety limits
- Add invokeWithTimeout wrapper for TDLib API calls (2min timeout per call)
- Add stuck detection to getChannelMessages: break if from_message_id doesn't advance
- Add stuck detection to getTopicMessages: same protection for topic scanning
- Add stuck detection to getForumTopicList: break if pagination offsets don't advance
- Add max page limit (5000) to all scanning loops to prevent infinite pagination
- Add mutex wait timeout (30min) to prevent indefinite blocking when holder hangs
- Add cycle timeout (4h default, configurable via WORKER_CYCLE_TIMEOUT_MINUTES)
- Fix end-of-page detection to use actual limit value instead of hardcoded 100

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 13:14:53 +00:00
copilot-swe-agent[bot]
ad71346468 Initial plan 2026-03-05 13:02:41 +00:00
xCyanGrizzly
e19a80897d Merge pull request #12 from xCyanGrizzly/copilot/fix-worker-functionality-visibility
Fix worker activity tracking, add scan progress, default channels to disabled
2026-03-05 09:42:48 +01:00
copilot-swe-agent[bot]
22da4dfad2 Fix messagesScanned consistency: use totalScanned from scan results, remove double-counting
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:36:40 +00:00
copilot-swe-agent[bot]
22bcacf3bd Add live message scanning progress, channel/topic counters to worker activity
- Add progress callbacks to getChannelMessages and getTopicMessages that
  fire after each page of messages is fetched
- Worker now shows channel progress (e.g. "[2/5] Channel Name") when
  processing multiple source channels
- Worker now shows topic progress (e.g. "topic 3/12") when scanning forums
- Worker now shows live message scanning count during channel/topic scans
  (e.g. "Scanning Channel — 300 messages scanned")
- UI stats line now always shows messagesScanned count
- messagesScanned counter now increments during the scanning phase, not
  just during archive processing

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:33:26 +00:00
copilot-swe-agent[bot]
15da57b8c0 Fix worker stuck on "Working..." and default channels to disabled
1. Worker trigger: Add ingestion_trigger pg_notify listener so the worker
   picks up on-demand triggers from the UI and runs an immediate cycle with
   full activity tracking (currentActivity, currentStep, etc).

2. Remove orphaned IngestionRun creation from triggerIngestion server action.
   Previously the UI created RUNNING runs without activity fields, causing
   the UI to show "Working..." with no details. Now only the worker creates
   runs with proper activity tracking.

3. Default channels to disabled (isActive: false) in schema and all creation
   paths. Destination channels are explicitly set to active since they must
   receive uploads. Includes Prisma migration.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-05 08:27:37 +00:00
copilot-swe-agent[bot]
8f1a912ccb Initial plan 2026-03-05 08:11:20 +00:00
xCyanGrizzly
81b65912aa Merge pull request #11 from xCyanGrizzly/copilot/fix-nextjs-deployment-issues
Fix Docker deployment: file permissions, missing env vars, healthcheck timing
2026-03-04 23:24:05 +01:00
copilot-swe-agent[bot]
5eb2cf05b9 Fix Docker deployment: file permissions, missing env vars, healthcheck timing, error handling
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 22:20:49 +00:00
copilot-swe-agent[bot]
f73d06b3d9 Initial plan 2026-03-04 22:06:36 +00:00
xCyanGrizzly
cac3d518e1 Merge pull request #10 from xCyanGrizzly/copilot/debug-docker-compose-worker
Enable worker service by default in docker-compose
2026-03-04 22:49:52 +01:00
copilot-swe-agent[bot]
987167de0c Enable worker service by default in docker-compose
Remove profiles from worker service in both docker-compose.yml and
docker-compose.dev.yml so the worker starts automatically with
`docker compose up`. This fixes the issue where verification SMS and
the scheduler timer were not working because the worker was never
started. The bot remains as an optional profile.

Update README to reflect the change.

Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 21:13:00 +00:00
copilot-swe-agent[bot]
4f331d5411 Initial plan 2026-03-04 21:09:51 +00:00
xCyanGrizzly
8088a86feb Merge pull request #9 from xCyanGrizzly/copilot/fix-admin-access-issue
Make all users admins in self-hosted deployment
2026-03-04 21:58:04 +01:00
copilot-swe-agent[bot]
b53934ebf2 Make all users admins: update schema default, add migration, simplify registration and OAuth flows
Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
2026-03-04 20:23:54 +00:00
copilot-swe-agent[bot]
464c86b32a Initial plan 2026-03-04 20:16:22 +00:00
xCyanGrizzly
fc00fb6f2e Merge pull request #8 from xCyanGrizzly/copilot/fix-admin-account-login
Fix first user not getting ADMIN role when signing up via OAuth
2026-03-04 20:24:38 +01:00
30 changed files with 680 additions and 185 deletions

View File

@@ -13,6 +13,8 @@ AUTH_GITHUB_ID=""
AUTH_GITHUB_SECRET="" AUTH_GITHUB_SECRET=""
# App # App
# APP_PORT controls the port the container listens on AND how it is exposed on the host.
# If you change APP_PORT, also update NEXT_PUBLIC_APP_URL to match.
NEXT_PUBLIC_APP_URL="http://localhost:3000" NEXT_PUBLIC_APP_URL="http://localhost:3000"
APP_PORT=3000 APP_PORT=3000

1
.gitignore vendored
View File

@@ -18,6 +18,7 @@ worker/node_modules
# production # production
/build /build
worker/dist
# misc # misc
.DS_Store .DS_Store

View File

@@ -30,19 +30,19 @@ RUN addgroup --system --gid 1001 nodejs && \
adduser --system --uid 1001 nextjs adduser --system --uid 1001 nextjs
# Copy public assets # Copy public assets
COPY --from=builder /app/public ./public COPY --from=builder --chown=nextjs:nodejs /app/public ./public
# Copy prisma schema + migrations for runtime migrate deploy
COPY --from=builder /app/prisma ./prisma
COPY --from=builder /app/prisma.config.ts ./prisma.config.ts
# Copy standalone build output # Copy standalone build output
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./ COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
# Copy prisma schema + migrations for runtime migrate deploy
COPY --from=builder --chown=nextjs:nodejs /app/prisma ./prisma
COPY --from=builder --chown=nextjs:nodejs /app/prisma.config.ts ./prisma.config.ts
# Copy node_modules for prisma CLI (needed for migrate deploy at startup). # Copy node_modules for prisma CLI (needed for migrate deploy at startup).
# Copying the full directory ensures all transitive dependencies are present. # Copying the full directory ensures all transitive dependencies are present.
COPY --from=builder /app/node_modules ./node_modules COPY --from=builder --chown=nextjs:nodejs /app/node_modules ./node_modules
# Recreate the .bin/prisma symlink so Node resolves __dirname to prisma/build/, # Recreate the .bin/prisma symlink so Node resolves __dirname to prisma/build/,
# where the WASM files live (COPY dereferences symlinks, breaking WASM resolution) # where the WASM files live (COPY dereferences symlinks, breaking WASM resolution)
RUN mkdir -p ./node_modules/.bin && \ RUN mkdir -p ./node_modules/.bin && \
@@ -54,6 +54,7 @@ RUN chmod +x docker-entrypoint.sh
USER nextjs USER nextjs
# Default port — overridden at runtime by the PORT env var (set via docker-compose APP_PORT)
EXPOSE 3000 EXPOSE 3000
ENV PORT=3000 ENV PORT=3000
ENV HOSTNAME="0.0.0.0" ENV HOSTNAME="0.0.0.0"

View File

@@ -125,18 +125,15 @@ docker compose up -d
The app will be available at [http://localhost:3000](http://localhost:3000). The app will be available at [http://localhost:3000](http://localhost:3000).
### Adding Telegram Services ### Adding the Telegram Bot
The worker and bot run as optional profiles so `docker compose up` works with just the app + database: The worker starts by default with `docker compose up`. The bot runs as an optional profile:
```bash ```bash
# App + DB + Telegram worker (needs TELEGRAM_API_ID + TELEGRAM_API_HASH in .env)
docker compose --profile telegram up -d
# App + DB + Worker + Bot (also needs BOT_TOKEN in .env) # App + DB + Worker + Bot (also needs BOT_TOKEN in .env)
docker compose --profile full up -d docker compose --profile full up -d
# Or just the bot (alongside app + db) # Or just the bot (alongside app + db + worker)
docker compose --profile bot up -d docker compose --profile bot up -d
``` ```

View File

@@ -16,7 +16,6 @@ services:
retries: 5 retries: 5
worker: worker:
profiles: ["telegram", "full"]
build: build:
context: . context: .
dockerfile: worker/Dockerfile dockerfile: worker/Dockerfile

View File

@@ -5,23 +5,29 @@ services:
dockerfile: Dockerfile dockerfile: Dockerfile
pull_policy: never pull_policy: never
ports: ports:
- "${APP_PORT:-3000}:3000" - "${APP_PORT:-3000}:${APP_PORT:-3000}"
environment: environment:
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash} - DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env} - AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
- AUTH_TRUST_HOST=true - AUTH_TRUST_HOST=true
- AUTH_GITHUB_ID=${AUTH_GITHUB_ID:-}
- AUTH_GITHUB_SECRET=${AUTH_GITHUB_SECRET:-}
- NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL:-http://localhost:3000} - NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
- TELEGRAM_API_KEY=${TELEGRAM_API_KEY:-} - TELEGRAM_API_KEY=${TELEGRAM_API_KEY:-}
- BOT_TOKEN=${BOT_TOKEN:-} - BOT_TOKEN=${BOT_TOKEN:-}
- BOT_USERNAME=${BOT_USERNAME:-}
- LOG_LEVEL=${LOG_LEVEL:-info}
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
- PORT=${APP_PORT:-3000}
depends_on: depends_on:
db: db:
condition: service_healthy condition: service_healthy
healthcheck: healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health"] test: ["CMD-SHELL", "wget -q --spider http://localhost:$$PORT/api/health || exit 1"]
interval: 30s interval: 30s
timeout: 5s timeout: 5s
retries: 3 retries: 3
start_period: 30s start_period: 60s
restart: unless-stopped restart: unless-stopped
deploy: deploy:
resources: resources:
@@ -31,7 +37,6 @@ services:
- frontend - frontend
worker: worker:
profiles: ["telegram", "full"]
build: build:
context: . context: .
dockerfile: worker/Dockerfile dockerfile: worker/Dockerfile

View File

@@ -10,7 +10,10 @@ if [ "$AUTH_SECRET" = "change-me-to-a-random-secret-in-production" ] || [ -z "$A
fi fi
echo "Running database migrations..." echo "Running database migrations..."
./node_modules/.bin/prisma migrate deploy if ! ./node_modules/.bin/prisma migrate deploy; then
echo "ERROR: Database migration failed. Check DATABASE_URL and database connectivity."
exit 1
fi
if [ "$SEED_DATABASE" = "true" ]; then if [ "$SEED_DATABASE" = "true" ]; then
echo "Seeding database..." echo "Seeding database..."

2
package-lock.json generated
View File

@@ -49,7 +49,7 @@
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsx": "^4.21.0", "tsx": "^4.21.0",
"tw-animate-css": "^1.4.0", "tw-animate-css": "^1.4.0",
"typescript": "^5" "typescript": "5.9.3"
} }
}, },
"node_modules/@alloc/quick-lru": { "node_modules/@alloc/quick-lru": {

View File

@@ -58,6 +58,6 @@
"ts-node": "^10.9.2", "ts-node": "^10.9.2",
"tsx": "^4.21.0", "tsx": "^4.21.0",
"tw-animate-css": "^1.4.0", "tw-animate-css": "^1.4.0",
"typescript": "^5" "typescript": "5.9.3"
} }
} }

View File

@@ -0,0 +1,5 @@
-- Promote all existing users to ADMIN (self-hosted: every user is an admin)
UPDATE "User" SET "role" = 'ADMIN' WHERE "role" = 'USER';
-- Change the default role for new users to ADMIN
ALTER TABLE "User" ALTER COLUMN "role" SET DEFAULT 'ADMIN';

View File

@@ -0,0 +1,3 @@
-- Change the default for new channels to disabled (isActive = false).
-- Existing channels are not affected — admins can manually enable/disable them.
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;

View File

@@ -22,7 +22,7 @@ model User {
emailVerified DateTime? emailVerified DateTime?
image String? image String?
hashedPassword String? hashedPassword String?
role Role @default(USER) role Role @default(ADMIN)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt
@@ -417,7 +417,7 @@ model TelegramChannel {
title String title String
type ChannelType type ChannelType
isForum Boolean @default(false) isForum Boolean @default(false)
isActive Boolean @default(true) isActive Boolean @default(false)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt

View File

@@ -7,6 +7,7 @@ import {
Power, Power,
ArrowDownToLine, ArrowDownToLine,
ArrowUpFromLine, ArrowUpFromLine,
RefreshCcw,
} from "lucide-react"; } from "lucide-react";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
@@ -23,12 +24,14 @@ interface ChannelColumnsProps {
onToggleActive: (id: string) => void; onToggleActive: (id: string) => void;
onDelete: (id: string) => void; onDelete: (id: string) => void;
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void; onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
onRescan: (id: string) => void;
} }
export function getChannelColumns({ export function getChannelColumns({
onToggleActive, onToggleActive,
onDelete, onDelete,
onSetType, onSetType,
onRescan,
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] { }: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
return [ return [
{ {
@@ -121,6 +124,14 @@ export function getChannelColumns({
Set as Source Set as Source
</DropdownMenuItem> </DropdownMenuItem>
)} )}
{row.original.type === "SOURCE" && (
<DropdownMenuItem
onClick={() => onRescan(row.original.id)}
>
<RefreshCcw className="mr-2 h-3.5 w-3.5" />
Rescan Channel
</DropdownMenuItem>
)}
<DropdownMenuItem <DropdownMenuItem
onClick={() => onToggleActive(row.original.id)} onClick={() => onToggleActive(row.original.id)}
> >

View File

@@ -2,26 +2,36 @@
import { useState, useTransition } from "react"; import { useState, useTransition } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import { Download } from "lucide-react";
import { getChannelColumns } from "./channel-columns"; import { getChannelColumns } from "./channel-columns";
import { DestinationCard } from "./destination-card"; import { DestinationCard } from "./destination-card";
import { ChannelPickerDialog } from "./channel-picker-dialog";
import { import {
deleteChannel, deleteChannel,
toggleChannelActive, toggleChannelActive,
setChannelType, setChannelType,
rescanChannel,
} from "../actions"; } from "../actions";
import { DataTable } from "@/components/shared/data-table"; import { DataTable } from "@/components/shared/data-table";
import { DeleteDialog } from "@/components/shared/delete-dialog"; import { DeleteDialog } from "@/components/shared/delete-dialog";
import type { ChannelRow, GlobalDestination } from "@/lib/telegram/admin-queries"; import { Button } from "@/components/ui/button";
import type { AccountRow, ChannelRow, GlobalDestination } from "@/lib/telegram/admin-queries";
import { useDataTable } from "@/hooks/use-data-table"; import { useDataTable } from "@/hooks/use-data-table";
interface ChannelsTabProps { interface ChannelsTabProps {
channels: ChannelRow[]; channels: ChannelRow[];
globalDestination: GlobalDestination; globalDestination: GlobalDestination;
accounts: AccountRow[];
} }
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) { export function ChannelsTab({ channels, globalDestination, accounts }: ChannelsTabProps) {
const [isPending, startTransition] = useTransition(); const [isPending, startTransition] = useTransition();
const [deleteId, setDeleteId] = useState<string | null>(null); const [deleteId, setDeleteId] = useState<string | null>(null);
const [rescanId, setRescanId] = useState<string | null>(null);
const [fetchChannelsAccountId, setFetchChannelsAccountId] = useState<string | null>(null);
// Find the first authenticated account for "Fetch Channels"
const authenticatedAccounts = accounts.filter((a) => a.authState === "AUTHENTICATED" && a.isActive);
const columns = getChannelColumns({ const columns = getChannelColumns({
onToggleActive: (id) => { onToggleActive: (id) => {
@@ -39,6 +49,7 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
else toast.error(result.error); else toast.error(result.error);
}); });
}, },
onRescan: (id) => setRescanId(id),
}); });
const { table } = useDataTable({ const { table } = useDataTable({
@@ -60,19 +71,51 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
}); });
}; };
const handleRescan = () => {
if (!rescanId) return;
startTransition(async () => {
const result = await rescanChannel(rescanId);
if (result.success) {
toast.success("Channel scan progress reset — it will be fully rescanned on the next sync");
setRescanId(null);
} else {
toast.error(result.error);
}
});
};
const handleFetchChannels = () => {
if (authenticatedAccounts.length > 0) {
setFetchChannelsAccountId(authenticatedAccounts[0].id);
} else {
toast.error("No authenticated accounts available. Add and authenticate an account first.");
}
};
return ( return (
<div className="space-y-4"> <div className="space-y-4">
<DestinationCard destination={globalDestination} /> <DestinationCard destination={globalDestination} />
<div className="flex items-center gap-2">
<Button
variant="outline"
onClick={handleFetchChannels}
disabled={authenticatedAccounts.length === 0}
>
<Download className="mr-2 h-4 w-4" />
Fetch Channels
</Button>
</div>
{channels.length > 0 && ( {channels.length > 0 && (
<p className="text-xs text-muted-foreground"> <p className="text-xs text-muted-foreground">
Source channels are added per-account via the &quot;Fetch Channels&quot; button on the Accounts tab. Channels discovered via &quot;Fetch Channels&quot; are automatically activated as sources.
</p> </p>
)} )}
<DataTable <DataTable
table={table} table={table}
emptyMessage="No channels yet. Use &quot;Fetch Channels&quot; on an account to discover and add source channels." emptyMessage="No channels yet. Click &quot;Fetch Channels&quot; above to discover and add source channels."
/> />
<DeleteDialog <DeleteDialog
@@ -83,6 +126,24 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
onConfirm={handleDelete} onConfirm={handleDelete}
isLoading={isPending} isLoading={isPending}
/> />
<DeleteDialog
open={!!rescanId}
onOpenChange={(open) => !open && setRescanId(null)}
title="Rescan Channel"
description="This will reset all scan progress for this channel. On the next sync the worker will re-process every message from the beginning. Packages that are already in the library will be skipped (deduplication by hash), but any missing files will be re-downloaded and re-uploaded. This may take a long time for large channels."
confirmLabel="Rescan"
onConfirm={handleRescan}
isLoading={isPending}
/>
<ChannelPickerDialog
accountId={fetchChannelsAccountId}
open={!!fetchChannelsAccountId}
onOpenChange={(open) => {
if (!open) setFetchChannelsAccountId(null);
}}
/>
</div> </div>
); );
} }

View File

@@ -16,6 +16,7 @@ interface TelegramAdminProps {
ingestionStatus: IngestionAccountStatus[]; ingestionStatus: IngestionAccountStatus[];
globalDestination: GlobalDestination; globalDestination: GlobalDestination;
sendHistory: SendHistoryRow[]; sendHistory: SendHistoryRow[];
workerIntervalMinutes: number;
} }
export function TelegramAdmin({ export function TelegramAdmin({
@@ -24,6 +25,7 @@ export function TelegramAdmin({
ingestionStatus, ingestionStatus,
globalDestination, globalDestination,
sendHistory, sendHistory,
workerIntervalMinutes,
}: TelegramAdminProps) { }: TelegramAdminProps) {
return ( return (
<div className="space-y-4"> <div className="space-y-4">
@@ -32,7 +34,7 @@ export function TelegramAdmin({
description="Manage Telegram accounts, channels, and ingestion" description="Manage Telegram accounts, channels, and ingestion"
/> />
<WorkerStatusPanel initialStatus={ingestionStatus} /> <WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
<Tabs defaultValue="accounts" className="space-y-4"> <Tabs defaultValue="accounts" className="space-y-4">
<TabsList> <TabsList>
@@ -51,7 +53,7 @@ export function TelegramAdmin({
<AccountsTab accounts={accounts} /> <AccountsTab accounts={accounts} />
</TabsContent> </TabsContent>
<TabsContent value="channels"> <TabsContent value="channels">
<ChannelsTab channels={channels} globalDestination={globalDestination} /> <ChannelsTab channels={channels} globalDestination={globalDestination} accounts={accounts} />
</TabsContent> </TabsContent>
<TabsContent value="sends"> <TabsContent value="sends">
<BotSendsTab history={sendHistory} /> <BotSendsTab history={sendHistory} />

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useEffect, useState, useCallback } from "react"; import { useEffect, useState, useCallback, useTransition } from "react";
import { import {
Loader2, Loader2,
CheckCircle2, CheckCircle2,
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge"; import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils"; import { cn } from "@/lib/utils";
import { toast } from "sonner";
import { triggerIngestion } from "../actions";
import type { IngestionAccountStatus } from "@/lib/telegram/types"; import type { IngestionAccountStatus } from "@/lib/telegram/types";
interface WorkerStatusPanelProps { interface WorkerStatusPanelProps {
initialStatus: IngestionAccountStatus[]; initialStatus: IngestionAccountStatus[];
initialIntervalMinutes?: number;
} }
const AUTH_STATE_CONFIG: Record< const AUTH_STATE_CONFIG: Record<
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" }, EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
}; };
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) { export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
const [accounts, setAccounts] = useState(initialStatus); const [accounts, setAccounts] = useState(initialStatus);
const [error, setError] = useState(false); const [error, setError] = useState(false);
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null); const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
const [isPending, startTransition] = useTransition();
// Find active run // Find active run
const activeRun = accounts.find((a) => a.currentRun); const activeRun = accounts.find((a) => a.currentRun);
const isRunning = !!activeRun; const isRunning = !!activeRun;
const handleSyncNow = useCallback(() => {
startTransition(async () => {
const result = await triggerIngestion();
if (result.success) {
toast.success("Sync triggered — worker will start shortly");
} else {
toast.error(result.error ?? "Failed to trigger sync");
}
});
}, []);
// Poll for status // Poll for status
useEffect(() => { useEffect(() => {
let timer: ReturnType<typeof setTimeout>; let timer: ReturnType<typeof setTimeout>;
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
const data = await res.json(); const data = await res.json();
if (mounted) { if (mounted) {
setAccounts(data.accounts ?? []); setAccounts(data.accounts ?? []);
if (data.workerIntervalMinutes) {
setWorkerIntervalMinutes(data.workerIntervalMinutes);
}
setError(false); setError(false);
} }
} catch { } catch {
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter) // Estimate next run based on last run finish time + configured interval + up to 5 min jitter
const lastFinished = accounts const lastFinished = accounts
.filter((a) => a.lastRun?.finishedAt) .filter((a) => a.lastRun?.finishedAt)
.map((a) => new Date(a.lastRun!.finishedAt!).getTime()) .map((a) => new Date(a.lastRun!.finishedAt!).getTime())
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
return; return;
} }
const intervalMs = 5 * 60 * 1000; // 5 min base const intervalMs = workerIntervalMinutes * 60 * 1000;
const estimatedNext = lastFinished + intervalMs; const estimatedNext = lastFinished + intervalMs;
const tick = () => { const tick = () => {
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
tick(); tick();
const interval = setInterval(tick, 1_000); const interval = setInterval(tick, 1_000);
return () => clearInterval(interval); return () => clearInterval(interval);
}, [isRunning, accounts]); }, [isRunning, accounts, workerIntervalMinutes]);
if (accounts.length === 0 && !error) { if (accounts.length === 0 && !error) {
return ( return (
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
) : isRunning && activeRun?.currentRun ? ( ) : isRunning && activeRun?.currentRun ? (
<RunningStatus run={activeRun.currentRun} /> <RunningStatus run={activeRun.currentRun} />
) : ( ) : (
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} /> <IdleStatus
accounts={accounts}
nextRunCountdown={nextRunCountdown}
onSyncNow={handleSyncNow}
isSyncing={isPending}
/>
)} )}
</CardContent> </CardContent>
</Card> </Card>
@@ -233,6 +257,11 @@ function RunningStatus({
</span> </span>
</span> </span>
)} )}
{run.messagesScanned > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
</span>
)}
{run.zipsIngested > 0 && ( {run.zipsIngested > 0 && (
<span> <span>
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested <span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
@@ -251,9 +280,13 @@ function RunningStatus({
function IdleStatus({ function IdleStatus({
accounts, accounts,
nextRunCountdown, nextRunCountdown,
onSyncNow,
isSyncing,
}: { }: {
accounts: IngestionAccountStatus[]; accounts: IngestionAccountStatus[];
nextRunCountdown: string | null; nextRunCountdown: string | null;
onSyncNow: () => void;
isSyncing: boolean;
}) { }) {
const lastRun = accounts const lastRun = accounts
.filter((a) => a.lastRun) .filter((a) => a.lastRun)
@@ -316,14 +349,32 @@ function IdleStatus({
)} )}
</div> </div>
<div className="flex items-center gap-2 shrink-0">
{nextRunCountdown && hasAuthenticated && ( {nextRunCountdown && hasAuthenticated && (
<div className="flex items-center gap-1.5 shrink-0"> <div className="flex items-center gap-1.5">
<RefreshCw className="h-3 w-3 text-muted-foreground" /> <RefreshCw className="h-3 w-3 text-muted-foreground" />
<span className="text-xs text-muted-foreground tabular-nums"> <span className="text-xs text-muted-foreground tabular-nums">
Next: {nextRunCountdown} Next: {nextRunCountdown}
</span> </span>
</div> </div>
)} )}
{hasAuthenticated && (
<Button
variant="outline"
size="sm"
className="h-7 text-xs px-2"
onClick={onSyncNow}
disabled={isSyncing}
>
{isSyncing ? (
<Loader2 className="h-3 w-3 animate-spin mr-1" />
) : (
<RefreshCw className="h-3 w-3 mr-1" />
)}
Sync Now
</Button>
)}
</div>
</div> </div>
); );
} }

View File

@@ -173,6 +173,7 @@ export async function createChannel(
telegramId: BigInt(parsed.data.telegramId), telegramId: BigInt(parsed.data.telegramId),
title: parsed.data.title, title: parsed.data.title,
type: parsed.data.type, type: parsed.data.type,
isActive: false,
}, },
}); });
revalidatePath(REVALIDATE_PATH); revalidatePath(REVALIDATE_PATH);
@@ -296,6 +297,52 @@ export async function triggerChannelSync(): Promise<ActionResult> {
} }
} }
/**
* Reset all scan progress for a channel so the worker will re-process it
* from the very beginning on the next ingestion cycle.
*
* This clears:
* - `lastProcessedMessageId` on every AccountChannelMap linked to this channel
* - All TopicProgress records for those maps (for forum channels)
*/
export async function rescanChannel(channelId: string): Promise<ActionResult> {
const admin = await requireAdmin();
if (!admin.success) return admin;
const channel = await prisma.telegramChannel.findUnique({
where: { id: channelId },
});
if (!channel) return { success: false, error: "Channel not found" };
try {
// Find all account-channel maps for this channel
const maps = await prisma.accountChannelMap.findMany({
where: { channelId },
select: { id: true },
});
const mapIds = maps.map((m) => m.id);
// Delete all topic progress records for these maps (forum channels)
if (mapIds.length > 0) {
await prisma.topicProgress.deleteMany({
where: { accountChannelMapId: { in: mapIds } },
});
}
// Reset the scan cursor so the worker re-processes from the start
await prisma.accountChannelMap.updateMany({
where: { channelId },
data: { lastProcessedMessageId: null },
});
revalidatePath(REVALIDATE_PATH);
return { success: true, data: undefined };
} catch {
return { success: false, error: "Failed to reset channel scan progress" };
}
}
// ── Account-Channel link actions ── // ── Account-Channel link actions ──
export async function linkChannel( export async function linkChannel(
@@ -371,23 +418,12 @@ export async function triggerIngestion(
return { success: false, error: "No eligible accounts found" }; return { success: false, error: "No eligible accounts found" };
} }
// Create ingestion runs — the worker picks these up // Signal the worker to run an immediate ingestion cycle via pg_notify.
for (const account of accounts) { // The worker will create its own IngestionRun records with proper activity tracking.
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// pg_notify for immediate worker pickup
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",") accounts.map((a: { id: string }) => a.id).join(",")
); );
} catch { } catch {
// Best-effort // Best-effort
@@ -417,7 +453,7 @@ export async function saveChannelSelections(
try { try {
let linked = 0; let linked = 0;
for (const ch of channels) { for (const ch of channels) {
// Upsert the channel record // Upsert the channel record and activate it (user explicitly selected it)
const channel = await prisma.telegramChannel.upsert({ const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(ch.telegramId) }, where: { telegramId: BigInt(ch.telegramId) },
create: { create: {
@@ -425,10 +461,12 @@ export async function saveChannelSelections(
title: ch.title, title: ch.title,
type: "SOURCE", type: "SOURCE",
isForum: ch.isForum, isForum: ch.isForum,
isActive: true,
}, },
update: { update: {
title: ch.title, title: ch.title,
isForum: ch.isForum, isForum: ch.isForum,
isActive: true,
}, },
}); });
@@ -467,10 +505,10 @@ export async function setGlobalDestination(
if (!channel) return { success: false, error: "Channel not found" }; if (!channel) return { success: false, error: "Channel not found" };
try { try {
// Set the channel type to DESTINATION // Set the channel type to DESTINATION and ensure it's active
await prisma.telegramChannel.update({ await prisma.telegramChannel.update({
where: { id: channelId }, where: { id: channelId },
data: { type: "DESTINATION" }, data: { type: "DESTINATION", isActive: true },
}); });
// Save as global destination // Save as global destination
@@ -521,17 +559,19 @@ export async function createDestinationChannel(
if (!admin.success) return admin; if (!admin.success) return admin;
try { try {
// Create the channel as DESTINATION // Create the channel as DESTINATION (active by default — needed for uploads)
const channel = await prisma.telegramChannel.upsert({ const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(telegramId) }, where: { telegramId: BigInt(telegramId) },
create: { create: {
telegramId: BigInt(telegramId), telegramId: BigInt(telegramId),
title, title,
type: "DESTINATION", type: "DESTINATION",
isActive: true,
}, },
update: { update: {
title, title,
type: "DESTINATION", type: "DESTINATION",
isActive: true,
}, },
}); });

View File

@@ -25,7 +25,7 @@ export default async function TelegramPage() {
}), }),
]); ]);
const serializedHistory = sendHistory.map((r) => ({ const serializedHistory = sendHistory.map((r: typeof sendHistory[number]) => ({
id: r.id, id: r.id,
packageName: r.package.fileName, packageName: r.package.fileName,
recipientName: r.telegramLink.telegramName, recipientName: r.telegramLink.telegramName,
@@ -42,6 +42,7 @@ export default async function TelegramPage() {
ingestionStatus={ingestionStatus} ingestionStatus={ingestionStatus}
globalDestination={globalDestination} globalDestination={globalDestination}
sendHistory={serializedHistory} sendHistory={serializedHistory}
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
/> />
); );
} }

View File

@@ -21,17 +21,13 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
const hashedPassword = await bcrypt.hash(parsed.data.password, 10); const hashedPassword = await bcrypt.hash(parsed.data.password, 10);
// First user to register becomes ADMIN (self-hosted owner) // Self-hosted: all users are admins
const user = await prisma.$transaction(async (tx) => { const user = await prisma.user.create({
const userCount = await tx.user.count();
const role = userCount === 0 ? "ADMIN" : "USER";
return tx.user.create({
data: { data: {
name: parsed.data.name, name: parsed.data.name,
email: parsed.data.email, email: parsed.data.email,
hashedPassword, hashedPassword,
role, role: "ADMIN",
settings: { settings: {
create: { create: {
lowStockThreshold: 10, lowStockThreshold: 10,
@@ -42,7 +38,6 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
}, },
}, },
}); });
});
return { success: true, data: { id: user.id } }; return { success: true, data: { id: user.id } };
} }

View File

@@ -9,5 +9,9 @@ export async function GET(request: Request) {
if ("error" in authResult) return authResult.error; if ("error" in authResult) return authResult.error;
const accounts = await getIngestionStatus(); const accounts = await getIngestionStatus();
return NextResponse.json({ accounts }); const workerIntervalMinutes = parseInt(
process.env.WORKER_INTERVAL_MINUTES ?? "60",
10
);
return NextResponse.json({ accounts, workerIntervalMinutes });
} }

View File

@@ -45,33 +45,20 @@ export async function POST(request: Request) {
); );
} }
// Create ingestion runs marked as RUNNING — the worker will pick these up // Send pg_notify for immediate worker pickup.
// when it next polls, or we use pg_notify for immediate pickup // The worker creates its own IngestionRun records with proper activity tracking.
for (const account of accounts) {
// Only create if no run is already RUNNING for this account
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// Send pg_notify for immediate worker pickup
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
accounts.map((a) => a.id).join(",") accounts.map((a: { id: string }) => a.id).join(",")
); );
} catch { } catch {
// pg_notify is best-effort — worker will pick up on next cycle anyway // pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
} }
return NextResponse.json({ return NextResponse.json({
triggered: true, triggered: true,
accountIds: accounts.map((a) => a.id), accountIds: accounts.map((a: { id: string }) => a.id),
message: `Ingestion queued for ${accounts.length} account(s)`, message: `Ingestion triggered for ${accounts.length} account(s)`,
}); });
} }

View File

@@ -18,6 +18,8 @@ interface DeleteDialogProps {
description?: string; description?: string;
onConfirm: () => void; onConfirm: () => void;
isLoading?: boolean; isLoading?: boolean;
confirmLabel?: string;
confirmLoadingLabel?: string;
} }
export function DeleteDialog({ export function DeleteDialog({
@@ -27,6 +29,8 @@ export function DeleteDialog({
description = "This action cannot be undone.", description = "This action cannot be undone.",
onConfirm, onConfirm,
isLoading, isLoading,
confirmLabel = "Delete",
confirmLoadingLabel,
}: DeleteDialogProps) { }: DeleteDialogProps) {
return ( return (
<AlertDialog open={open} onOpenChange={onOpenChange}> <AlertDialog open={open} onOpenChange={onOpenChange}>
@@ -42,7 +46,7 @@ export function DeleteDialog({
disabled={isLoading} disabled={isLoading}
className="bg-destructive text-destructive-foreground hover:bg-destructive/90" className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
> >
{isLoading ? "Deleting..." : "Delete"} {isLoading ? (confirmLoadingLabel ?? `${confirmLabel}...`) : confirmLabel}
</AlertDialogAction> </AlertDialogAction>
</AlertDialogFooter> </AlertDialogFooter>
</AlertDialogContent> </AlertDialogContent>

View File

@@ -18,12 +18,12 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
async jwt({ token, user }) { async jwt({ token, user }) {
if (user) { if (user) {
token.id = user.id!; token.id = user.id!;
// Fetch the role from the database to pick up first-user ADMIN promotion // Fetch the role from the database to ensure token reflects current role
const dbUser = await prisma.user.findUnique({ const dbUser = await prisma.user.findUnique({
where: { id: user.id! }, where: { id: user.id! },
select: { role: true }, select: { role: true },
}); });
token.role = dbUser?.role ?? user.role ?? "USER"; token.role = dbUser?.role ?? user.role ?? "ADMIN";
} }
return token; return token;
}, },
@@ -38,17 +38,11 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
events: { events: {
async createUser({ user }) { async createUser({ user }) {
if (user.id) { if (user.id) {
// First user to register becomes ADMIN (self-hosted owner) // Self-hosted: all users are admins
const adminExists = await prisma.user.findFirst({
where: { role: "ADMIN" },
select: { id: true },
});
if (!adminExists) {
await prisma.user.update({ await prisma.user.update({
where: { id: user.id }, where: { id: user.id },
data: { role: "ADMIN" }, data: { role: "ADMIN" },
}); });
}
await prisma.userSettings.upsert({ await prisma.userSettings.upsert({
where: { userId: user.id }, where: { userId: user.id },

View File

@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
title: string; title: string;
type: "SOURCE" | "DESTINATION"; type: "SOURCE" | "DESTINATION";
isForum: boolean; isForum: boolean;
isActive?: boolean;
} }
/** /**
* Upsert a channel by telegramId. Returns the channel record. * Upsert a channel by telegramId. Returns the channel record.
* If it already exists, update title and forum status. * If it already exists, update title and forum status.
* New channels default to disabled (isActive: false) so the admin must
* explicitly enable them before the worker processes them.
* Pass isActive: true for DESTINATION channels that must be active immediately.
*/ */
export async function upsertChannel(input: UpsertChannelInput) { export async function upsertChannel(input: UpsertChannelInput) {
return db.telegramChannel.upsert({ return db.telegramChannel.upsert({
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
title: input.title, title: input.title,
type: input.type, type: input.type,
isForum: input.isForum, isForum: input.isForum,
isActive: input.isActive ?? false,
}, },
update: { update: {
title: input.title, title: input.title,

View File

@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
import { processFetchRequest } from "./worker.js"; import { processFetchRequest } from "./worker.js";
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { triggerImmediateCycle } from "./scheduler.js";
import { import {
getGlobalDestinationChannel, getGlobalDestinationChannel,
getGlobalSetting, getGlobalSetting,
@@ -17,6 +18,10 @@ import {
const log = childLogger("fetch-listener"); const log = childLogger("fetch-listener");
let pgClient: pg.PoolClient | null = null; let pgClient: pg.PoolClient | null = null;
let stopped = false;
/** Delay (ms) before attempting to reconnect after a connection loss. */
const RECONNECT_DELAY_MS = 5_000;
/** /**
* Start listening for pg_notify signals from the web app. * Start listening for pg_notify signals from the web app.
@@ -25,12 +30,23 @@ let pgClient: pg.PoolClient | null = null;
* - `channel_fetch` — payload = requestId → fetch channels for an account * - `channel_fetch` — payload = requestId → fetch channels for an account
* - `generate_invite` — payload = channelId → generate invite link for destination * - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle
*
* If the underlying connection is lost, the listener automatically reconnects
* so that pg_notify signals are never silently dropped.
*/ */
export async function startFetchListener(): Promise<void> { export async function startFetchListener(): Promise<void> {
stopped = false;
await connectListener();
}
async function connectListener(): Promise<void> {
try {
pgClient = await pool.connect(); pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN generate_invite");
await pgClient.query("LISTEN create_destination"); await pgClient.query("LISTEN create_destination");
await pgClient.query("LISTEN ingestion_trigger");
pgClient.on("notification", (msg) => { pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) { if (msg.channel === "channel_fetch" && msg.payload) {
@@ -39,13 +55,51 @@ export async function startFetchListener(): Promise<void> {
handleGenerateInvite(msg.payload); handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) { } else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload); handleCreateDestination(msg.payload);
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger();
} }
}); });
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)"); // Reconnect automatically when the connection ends unexpectedly
pgClient.on("end", () => {
if (!stopped) {
log.warn("Fetch listener connection lost — reconnecting");
pgClient = null;
scheduleReconnect();
}
});
pgClient.on("error", (err) => {
log.error({ err }, "Fetch listener connection error");
if (!stopped && pgClient) {
try {
pgClient.release(true);
} catch (releaseErr) {
log.debug({ err: releaseErr }, "Failed to release pg client after error");
}
pgClient = null;
scheduleReconnect();
}
});
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
} catch (err) {
log.error({ err }, "Failed to start fetch listener — retrying");
scheduleReconnect();
}
}
function scheduleReconnect(): void {
if (stopped) return;
setTimeout(() => {
if (!stopped) {
connectListener();
}
}, RECONNECT_DELAY_MS);
} }
export function stopFetchListener(): void { export function stopFetchListener(): void {
stopped = true;
if (pgClient) { if (pgClient) {
pgClient.release(); pgClient.release();
pgClient = null; pgClient = null;
@@ -138,12 +192,13 @@ function handleCreateDestination(payload: string): void {
const result = await createSupergroup(client, parsed.title); const result = await createSupergroup(client, parsed.title);
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created"); log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
// Upsert it as a DESTINATION channel in the DB // Upsert it as a DESTINATION channel in the DB (active by default)
const channel = await upsertChannel({ const channel = await upsertChannel({
telegramId: result.chatId, telegramId: result.chatId,
title: result.title, title: result.title,
type: "DESTINATION", type: "DESTINATION",
isForum: false, isForum: false,
isActive: true,
}); });
// Set as global destination // Set as global destination
@@ -204,3 +259,16 @@ function handleCreateDestination(payload: string): void {
} }
}); });
} }
// ── Ingestion trigger handler ──
function handleIngestionTrigger(): void {
fetchQueue = fetchQueue.then(async () => {
try {
log.info("Ingestion trigger received from UI");
await triggerImmediateCycle();
} catch (err) {
log.error({ err }, "Failed to trigger immediate ingestion cycle");
}
});
}

View File

@@ -10,6 +10,13 @@ let running = false;
let timer: ReturnType<typeof setTimeout> | null = null; let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0; let cycleCount = 0;
/**
* Maximum time for a single ingestion cycle (ms).
* After this, new accounts won't be started (in-progress work finishes).
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
*/
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
/** /**
* Run one ingestion cycle: * Run one ingestion cycle:
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels) * 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
@@ -17,6 +24,10 @@ let cycleCount = 0;
* *
* All TDLib operations are wrapped in the mutex to ensure only one client * All TDLib operations are wrapped in the mutex to ensure only one client
* runs at a time (also shared with the fetch listener for on-demand requests). * runs at a time (also shared with the fetch listener for on-demand requests).
*
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
* Once the timeout elapses, no new accounts will be started but any in-progress
* account processing is allowed to finish its current archive set.
*/ */
async function runCycle(): Promise<void> { async function runCycle(): Promise<void> {
if (running) { if (running) {
@@ -26,7 +37,8 @@ async function runCycle(): Promise<void> {
running = true; running = true;
cycleCount++; cycleCount++;
log.info({ cycle: cycleCount }, "Starting ingestion cycle"); const cycleStart = Date.now();
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
try { try {
// ── Phase 1: Authenticate pending accounts ── // ── Phase 1: Authenticate pending accounts ──
@@ -37,6 +49,10 @@ async function runCycle(): Promise<void> {
"Found pending accounts, starting authentication" "Found pending accounts, starting authentication"
); );
for (const account of pendingAccounts) { for (const account of pendingAccounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn("Cycle timeout reached during authentication phase, stopping");
break;
}
await withTdlibMutex(`auth:${account.phone}`, () => await withTdlibMutex(`auth:${account.phone}`, () =>
authenticateAccount(account) authenticateAccount(account)
); );
@@ -54,12 +70,22 @@ async function runCycle(): Promise<void> {
log.info({ accountCount: accounts.length }, "Processing accounts"); log.info({ accountCount: accounts.length }, "Processing accounts");
for (const account of accounts) { for (const account of accounts) {
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
log.warn(
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
"Cycle timeout reached, skipping remaining accounts"
);
break;
}
await withTdlibMutex(`ingest:${account.phone}`, () => await withTdlibMutex(`ingest:${account.phone}`, () =>
runWorkerForAccount(account) runWorkerForAccount(account)
); );
} }
log.info("Ingestion cycle complete"); log.info(
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
"Ingestion cycle complete"
);
} catch (err) { } catch (err) {
log.error({ err }, "Ingestion cycle failed"); log.error({ err }, "Ingestion cycle failed");
} finally { } finally {
@@ -105,6 +131,19 @@ export async function startScheduler(): Promise<void> {
scheduleNext(); scheduleNext();
} }
/**
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
* If a cycle is already running, this is a no-op.
*/
export async function triggerImmediateCycle(): Promise<void> {
if (running) {
log.info("Cycle already running, ignoring trigger");
return;
}
log.info("Immediate cycle triggered via UI");
await runCycle();
}
/** /**
* Stop the scheduler gracefully. * Stop the scheduler gracefully.
*/ */

View File

@@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js";
const log = childLogger("download"); const log = childLogger("download");
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
export const MAX_SCAN_PAGES = 5000;
/** Timeout for a single TDLib API call (ms) */
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
interface TdPhotoSize { interface TdPhotoSize {
type: string; type: string;
photo: { photo: {
@@ -66,6 +72,47 @@ interface TdFile {
export interface ChannelScanResult { export interface ChannelScanResult {
archives: TelegramMessage[]; archives: TelegramMessage[];
photos: TelegramPhoto[]; photos: TelegramPhoto[];
totalScanned: number;
}
export type ScanProgressCallback = (messagesScanned: number) => void;
/**
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
* If TDLib does not respond within the timeout, the promise rejects.
*/
export async function invokeWithTimeout<T>(
client: Client,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
request: Record<string, any>,
timeoutMs = INVOKE_TIMEOUT_MS
): Promise<T> {
return new Promise<T>((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
}
}, timeoutMs);
(client.invoke(request) as Promise<T>)
.then((result) => {
if (!settled) {
settled = true;
clearTimeout(timer);
resolve(result);
}
})
.catch((err) => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(err);
}
});
});
} }
/** /**
@@ -77,32 +124,53 @@ export interface ChannelScanResult {
* When `lastProcessedMessageId` is null (first run), scans everything. * When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets, * The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net. * and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/ */
export async function getChannelMessages( export async function getChannelMessages(
client: Client, client: Client,
chatId: bigint, chatId: bigint,
lastProcessedMessageId?: bigint | null, lastProcessedMessageId?: bigint | null,
limit = 100 limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> { ): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
const result = (await client.invoke({ if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), pageCount, totalScanned },
"Hit max page limit for channel scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
_: "getChatHistory", _: "getChatHistory",
chat_id: Number(chatId), chat_id: Number(chatId),
from_message_id: currentFromId, from_message_id: currentFromId,
offset: 0, offset: 0,
limit: Math.min(limit, 100), limit: Math.min(limit, 100),
only_local: false, only_local: false,
})) as { messages: TdMessage[] }; });
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -132,19 +200,31 @@ export async function getChannelMessages(
} }
} }
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), currentFromId, totalScanned },
"Pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break; if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break; if (result.messages.length < Math.min(limit, 100)) break;
// Rate limit delay // Rate limit delay
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
log.info( log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length }, { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Channel scan complete" "Channel scan complete"
); );
@@ -152,6 +232,7 @@ export async function getChannelMessages(
return { return {
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned,
}; };
} }

View File

@@ -4,7 +4,8 @@ import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js"; import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js"; import type { TelegramPhoto } from "../preview/match.js";
import type { ChannelScanResult } from "./download.js"; import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
const log = childLogger("topics"); const log = childLogger("topics");
@@ -21,16 +22,16 @@ export async function isChatForum(
chatId: bigint chatId: bigint
): Promise<boolean> { ): Promise<boolean> {
try { try {
const chat = (await client.invoke({ const chat = await invokeWithTimeout<{
_: "getChat",
chat_id: Number(chatId),
})) as {
type?: { type?: {
_: string; _: string;
supergroup_id?: number; supergroup_id?: number;
is_forum?: boolean; is_forum?: boolean;
}; };
}; }>(client, {
_: "getChat",
chat_id: Number(chatId),
});
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) { if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
return true; return true;
@@ -38,10 +39,10 @@ export async function isChatForum(
// Also check via getSupergroup for older TDLib versions // Also check via getSupergroup for older TDLib versions
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) { if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
const sg = (await client.invoke({ const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
_: "getSupergroup", _: "getSupergroup",
supergroup_id: chat.type.supergroup_id, supergroup_id: chat.type.supergroup_id,
})) as { is_forum?: boolean }; });
return sg.is_forum === true; return sg.is_forum === true;
} }
@@ -54,6 +55,7 @@ export async function isChatForum(
/** /**
* Get all forum topics in a supergroup. * Get all forum topics in a supergroup.
* Includes stuck detection and timeout protection on API calls.
*/ */
export async function getForumTopicList( export async function getForumTopicList(
client: Client, client: Client,
@@ -63,18 +65,24 @@ export async function getForumTopicList(
let offsetDate = 0; let offsetDate = 0;
let offsetMessageId = 0; let offsetMessageId = 0;
let offsetMessageThreadId = 0; let offsetMessageThreadId = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
const result = (await client.invoke({ if (pageCount >= MAX_SCAN_PAGES) {
_: "getForumTopics", log.warn(
chat_id: Number(chatId), { chatId: chatId.toString(), pageCount, topicCount: topics.length },
query: "", "Hit max page limit for topic enumeration, stopping"
offset_date: offsetDate, );
offset_message_id: offsetMessageId, break;
offset_message_thread_id: offsetMessageThreadId, }
limit: 100, pageCount++;
})) as {
const prevOffsetDate = offsetDate;
const prevOffsetMessageId = offsetMessageId;
const prevOffsetMessageThreadId = offsetMessageThreadId;
const result = await invokeWithTimeout<{
topics?: { topics?: {
info?: { info?: {
message_thread_id?: number; message_thread_id?: number;
@@ -85,7 +93,15 @@ export async function getForumTopicList(
next_offset_date?: number; next_offset_date?: number;
next_offset_message_id?: number; next_offset_message_id?: number;
next_offset_message_thread_id?: number; next_offset_message_thread_id?: number;
}; }>(client, {
_: "getForumTopics",
chat_id: Number(chatId),
query: "",
offset_date: offsetDate,
offset_message_id: offsetMessageId,
offset_message_thread_id: offsetMessageThreadId,
limit: 100,
});
if (!result.topics || result.topics.length === 0) break; if (!result.topics || result.topics.length === 0) break;
@@ -113,6 +129,19 @@ export async function getForumTopicList(
offsetMessageId = result.next_offset_message_id ?? 0; offsetMessageId = result.next_offset_message_id ?? 0;
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0; offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
// Stuck detection: if offsets didn't advance, break
if (
offsetDate === prevOffsetDate &&
offsetMessageId === prevOffsetMessageId &&
offsetMessageThreadId === prevOffsetMessageThreadId
) {
log.warn(
{ chatId: chatId.toString(), topicCount: topics.length },
"Topic pagination stuck (offsets not advancing), breaking"
);
break;
}
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
@@ -134,35 +163,43 @@ export async function getForumTopicList(
* When `lastProcessedMessageId` is null (first run), scans everything. * When `lastProcessedMessageId` is null (first run), scans everything.
* The worker applies a post-grouping filter to skip fully-processed sets, * The worker applies a post-grouping filter to skip fully-processed sets,
* and keeps `packageExistsBySourceMessage` as a safety net. * and keeps `packageExistsBySourceMessage` as a safety net.
*
* Safety features:
* - Max page limit to prevent infinite loops
* - Stuck detection: breaks if from_message_id stops advancing
* - Timeout on each TDLib API call
*/ */
export async function getTopicMessages( export async function getTopicMessages(
client: Client, client: Client,
chatId: bigint, chatId: bigint,
topicId: bigint, topicId: bigint,
lastProcessedMessageId?: bigint | null, lastProcessedMessageId?: bigint | null,
limit = 100 limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> { ): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0;
let pageCount = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
if (pageCount >= MAX_SCAN_PAGES) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
"Hit max page limit for topic scan, stopping"
);
break;
}
pageCount++;
const previousFromId = currentFromId;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await client.invoke({ const result = await invokeWithTimeout<{
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
})) as {
messages?: { messages?: {
id: number; id: number;
date: number; date: number;
@@ -186,10 +223,23 @@ export async function getTopicMessages(
caption?: { text?: string }; caption?: { text?: string };
}; };
}[]; }[];
}; }>(client, {
_: "searchChatMessages",
chat_id: Number(chatId),
query: "",
message_thread_id: Number(topicId),
from_message_id: currentFromId,
offset: 0,
limit: Math.min(limit, 100),
filter: null,
sender_id: null,
saved_messages_topic_id: 0,
});
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -219,18 +269,30 @@ export async function getTopicMessages(
} }
} }
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
if (currentFromId === previousFromId) {
log.warn(
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
"Topic pagination stuck (from_message_id not advancing), breaking"
);
break;
}
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
if (boundary && currentFromId < boundary) break; if (boundary && currentFromId < boundary) break;
if (result.messages.length < 100) break; if (result.messages.length < Math.min(limit, 100)) break;
await sleep(config.apiDelayMs); await sleep(config.apiDelayMs);
} }
log.info( log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length }, { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
"Topic scan complete" "Topic scan complete"
); );
@@ -238,6 +300,7 @@ export async function getTopicMessages(
return { return {
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned,
}; };
} }

View File

@@ -4,12 +4,21 @@ const log = childLogger("mutex");
let locked = false; let locked = false;
let holder = ""; let holder = "";
const queue: Array<{ resolve: () => void; label: string }> = []; const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
/**
* Maximum time to wait for the TDLib mutex (ms).
* If the mutex is not available within this time, the operation is rejected.
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
*/
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
/** /**
* Ensures only one TDLib client runs at a time across the entire worker process. * Ensures only one TDLib client runs at a time across the entire worker process.
* Both the scheduler (auth, ingestion) and the fetch listener acquire this * Both the scheduler (auth, ingestion) and the fetch listener acquire this
* before creating any TDLib client. * before creating any TDLib client.
*
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
*/ */
export async function withTdlibMutex<T>( export async function withTdlibMutex<T>(
label: string, label: string,
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
): Promise<T> { ): Promise<T> {
if (locked) { if (locked) {
log.info({ waiting: label, holder }, "Waiting for TDLib mutex"); log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
await new Promise<void>((resolve) => queue.push({ resolve, label })); await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
const idx = queue.indexOf(entry);
if (idx !== -1) {
queue.splice(idx, 1);
reject(new Error(
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
`(waiting: ${label}, holder: ${holder})`
));
}
}, MUTEX_WAIT_TIMEOUT_MS);
const entry = {
resolve: () => {
clearTimeout(timer);
resolve();
},
reject,
label,
};
queue.push(entry);
});
} }
locked = true; locked = true;

View File

@@ -349,8 +349,18 @@ export async function runWorkerForAccount(
throw new Error("No global destination channel configured — set one in the admin UI"); throw new Error("No global destination channel configured — set one in the admin UI");
} }
for (const mapping of channelMappings) { const totalChannels = channelMappings.length;
if (totalChannels === 0) {
accountLog.info("No active source channels linked to this account — nothing to ingest");
}
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx];
const channel = mapping.channel; const channel = mapping.channel;
const channelLabel = totalChannels > 1
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
: channel.title;
try { try {
// ── Check if channel is a forum ── // ── Check if channel is a forum ──
@@ -380,15 +390,16 @@ export async function runWorkerForAccount(
if (forum) { if (forum) {
// ── Forum channel: scan per-topic ── // ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Enumerating topics in "${channel.title}"`, currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: channel.title, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
const topics = await getForumTopicList(client, channel.telegramId); const topics = await getForumTopicList(client, channel.telegramId);
@@ -399,34 +410,53 @@ export async function runWorkerForAccount(
"Scanning forum channel by topic" "Scanning forum channel by topic"
); );
for (const topic of topics) { for (let tIdx = 0; tIdx < topics.length; tIdx++) {
const topic = topics[tIdx];
try { try {
const progress = topicProgressList.find( const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId (tp) => tp.topicId === topic.topicId
); );
const topicLabel = `${channel.title} ${topic.name}`;
const topicProgress = topics.length > 1
? ` (topic ${tIdx + 1}/${topics.length})`
: "";
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`, currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: `${channel.title} ${topic.name}`, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
const scanResult = await getTopicMessages( const scanResult = await getTopicMessages(
client, client,
channel.telegramId, channel.telegramId,
topic.topicId, topic.topicId,
progress?.lastProcessedMessageId progress?.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${topicLabel}"${topicProgress}${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
); );
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug( accountLog.info(
{ channelId: channel.id, topic: topic.name }, { channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
"No new archives in topic" "No new archives in topic"
); );
continue; continue;
@@ -463,15 +493,16 @@ export async function runWorkerForAccount(
} else { } else {
// ── Non-forum channel: flat scan (existing behavior) ── // ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channel.title}" for new archives`, currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: channel.title, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
accountLog.info( accountLog.info(
@@ -482,11 +513,23 @@ export async function runWorkerForAccount(
const scanResult = await getChannelMessages( const scanResult = await getChannelMessages(
client, client,
channel.telegramId, channel.telegramId,
mapping.lastProcessedMessageId mapping.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
); );
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives"); accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
continue; continue;
} }
@@ -593,6 +636,7 @@ async function processArchiveSets(
currentChannel: channelTitle, currentChannel: channelTitle,
totalFiles: archiveSets.length, totalFiles: archiveSets.length,
zipsFound: counters.zipsFound, zipsFound: counters.zipsFound,
messagesScanned: counters.messagesScanned,
}); });
// Track the highest message ID that was successfully processed // Track the highest message ID that was successfully processed
@@ -646,7 +690,6 @@ async function processOneArchiveSet(
throttled, counters, topicCreator, sourceTopicId, accountLog, throttled, counters, topicCreator, sourceTopicId, accountLog,
} = ctx; } = ctx;
counters.messagesScanned += archiveSet.parts.length;
const archiveName = archiveSet.parts[0].fileName; const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ── // ── Early skip: check if this archive set was already ingested ──