mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-10 22:01:16 +00:00
Compare commits
18 Commits
copilot/de
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22419106c1 | ||
|
|
e45de85c69 | ||
|
|
71a2e6a5e8 | ||
|
|
1436b630e2 | ||
|
|
43af23d3be | ||
|
|
49b82a352b | ||
|
|
2e242912af | ||
|
|
9adbdb2a77 | ||
|
|
ad71346468 | ||
|
|
e19a80897d | ||
|
|
22da4dfad2 | ||
|
|
22bcacf3bd | ||
|
|
15da57b8c0 | ||
|
|
8f1a912ccb | ||
|
|
81b65912aa | ||
|
|
5eb2cf05b9 | ||
|
|
f73d06b3d9 | ||
|
|
cac3d518e1 |
@@ -13,6 +13,8 @@ AUTH_GITHUB_ID=""
|
||||
AUTH_GITHUB_SECRET=""
|
||||
|
||||
# App
|
||||
# APP_PORT controls the port the container listens on AND how it is exposed on the host.
|
||||
# If you change APP_PORT, also update NEXT_PUBLIC_APP_URL to match.
|
||||
NEXT_PUBLIC_APP_URL="http://localhost:3000"
|
||||
APP_PORT=3000
|
||||
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -18,6 +18,7 @@ worker/node_modules
|
||||
|
||||
# production
|
||||
/build
|
||||
worker/dist
|
||||
|
||||
# misc
|
||||
.DS_Store
|
||||
|
||||
13
Dockerfile
13
Dockerfile
@@ -30,19 +30,19 @@ RUN addgroup --system --gid 1001 nodejs && \
|
||||
adduser --system --uid 1001 nextjs
|
||||
|
||||
# Copy public assets
|
||||
COPY --from=builder /app/public ./public
|
||||
|
||||
# Copy prisma schema + migrations for runtime migrate deploy
|
||||
COPY --from=builder /app/prisma ./prisma
|
||||
COPY --from=builder /app/prisma.config.ts ./prisma.config.ts
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/public ./public
|
||||
|
||||
# Copy standalone build output
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
|
||||
|
||||
# Copy prisma schema + migrations for runtime migrate deploy
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/prisma ./prisma
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/prisma.config.ts ./prisma.config.ts
|
||||
|
||||
# Copy node_modules for prisma CLI (needed for migrate deploy at startup).
|
||||
# Copying the full directory ensures all transitive dependencies are present.
|
||||
COPY --from=builder /app/node_modules ./node_modules
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/node_modules ./node_modules
|
||||
# Recreate the .bin/prisma symlink so Node resolves __dirname to prisma/build/,
|
||||
# where the WASM files live (COPY dereferences symlinks, breaking WASM resolution)
|
||||
RUN mkdir -p ./node_modules/.bin && \
|
||||
@@ -54,6 +54,7 @@ RUN chmod +x docker-entrypoint.sh
|
||||
|
||||
USER nextjs
|
||||
|
||||
# Default port — overridden at runtime by the PORT env var (set via docker-compose APP_PORT)
|
||||
EXPOSE 3000
|
||||
ENV PORT=3000
|
||||
ENV HOSTNAME="0.0.0.0"
|
||||
|
||||
@@ -5,23 +5,29 @@ services:
|
||||
dockerfile: Dockerfile
|
||||
pull_policy: never
|
||||
ports:
|
||||
- "${APP_PORT:-3000}:3000"
|
||||
- "${APP_PORT:-3000}:${APP_PORT:-3000}"
|
||||
environment:
|
||||
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
|
||||
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
|
||||
- AUTH_TRUST_HOST=true
|
||||
- AUTH_GITHUB_ID=${AUTH_GITHUB_ID:-}
|
||||
- AUTH_GITHUB_SECRET=${AUTH_GITHUB_SECRET:-}
|
||||
- NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
|
||||
- TELEGRAM_API_KEY=${TELEGRAM_API_KEY:-}
|
||||
- BOT_TOKEN=${BOT_TOKEN:-}
|
||||
- BOT_USERNAME=${BOT_USERNAME:-}
|
||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
|
||||
- PORT=${APP_PORT:-3000}
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
healthcheck:
|
||||
test: ["CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health"]
|
||||
test: ["CMD-SHELL", "wget -q --spider http://localhost:$$PORT/api/health || exit 1"]
|
||||
interval: 30s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
start_period: 60s
|
||||
restart: unless-stopped
|
||||
deploy:
|
||||
resources:
|
||||
|
||||
@@ -10,7 +10,10 @@ if [ "$AUTH_SECRET" = "change-me-to-a-random-secret-in-production" ] || [ -z "$A
|
||||
fi
|
||||
|
||||
echo "Running database migrations..."
|
||||
./node_modules/.bin/prisma migrate deploy
|
||||
if ! ./node_modules/.bin/prisma migrate deploy; then
|
||||
echo "ERROR: Database migration failed. Check DATABASE_URL and database connectivity."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$SEED_DATABASE" = "true" ]; then
|
||||
echo "Seeding database..."
|
||||
|
||||
2
package-lock.json
generated
2
package-lock.json
generated
@@ -49,7 +49,7 @@
|
||||
"ts-node": "^10.9.2",
|
||||
"tsx": "^4.21.0",
|
||||
"tw-animate-css": "^1.4.0",
|
||||
"typescript": "^5"
|
||||
"typescript": "5.9.3"
|
||||
}
|
||||
},
|
||||
"node_modules/@alloc/quick-lru": {
|
||||
|
||||
@@ -58,6 +58,6 @@
|
||||
"ts-node": "^10.9.2",
|
||||
"tsx": "^4.21.0",
|
||||
"tw-animate-css": "^1.4.0",
|
||||
"typescript": "^5"
|
||||
"typescript": "5.9.3"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
-- Change the default for new channels to disabled (isActive = false).
|
||||
-- Existing channels are not affected — admins can manually enable/disable them.
|
||||
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;
|
||||
@@ -417,7 +417,7 @@ model TelegramChannel {
|
||||
title String
|
||||
type ChannelType
|
||||
isForum Boolean @default(false)
|
||||
isActive Boolean @default(true)
|
||||
isActive Boolean @default(false)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
Power,
|
||||
ArrowDownToLine,
|
||||
ArrowUpFromLine,
|
||||
RefreshCcw,
|
||||
} from "lucide-react";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import { Button } from "@/components/ui/button";
|
||||
@@ -23,12 +24,14 @@ interface ChannelColumnsProps {
|
||||
onToggleActive: (id: string) => void;
|
||||
onDelete: (id: string) => void;
|
||||
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
|
||||
onRescan: (id: string) => void;
|
||||
}
|
||||
|
||||
export function getChannelColumns({
|
||||
onToggleActive,
|
||||
onDelete,
|
||||
onSetType,
|
||||
onRescan,
|
||||
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
|
||||
return [
|
||||
{
|
||||
@@ -121,6 +124,14 @@ export function getChannelColumns({
|
||||
Set as Source
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
{row.original.type === "SOURCE" && (
|
||||
<DropdownMenuItem
|
||||
onClick={() => onRescan(row.original.id)}
|
||||
>
|
||||
<RefreshCcw className="mr-2 h-3.5 w-3.5" />
|
||||
Rescan Channel
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
<DropdownMenuItem
|
||||
onClick={() => onToggleActive(row.original.id)}
|
||||
>
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
deleteChannel,
|
||||
toggleChannelActive,
|
||||
setChannelType,
|
||||
rescanChannel,
|
||||
} from "../actions";
|
||||
import { DataTable } from "@/components/shared/data-table";
|
||||
import { DeleteDialog } from "@/components/shared/delete-dialog";
|
||||
@@ -22,6 +23,7 @@ interface ChannelsTabProps {
|
||||
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
||||
const [isPending, startTransition] = useTransition();
|
||||
const [deleteId, setDeleteId] = useState<string | null>(null);
|
||||
const [rescanId, setRescanId] = useState<string | null>(null);
|
||||
|
||||
const columns = getChannelColumns({
|
||||
onToggleActive: (id) => {
|
||||
@@ -39,6 +41,7 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
||||
else toast.error(result.error);
|
||||
});
|
||||
},
|
||||
onRescan: (id) => setRescanId(id),
|
||||
});
|
||||
|
||||
const { table } = useDataTable({
|
||||
@@ -60,6 +63,19 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
||||
});
|
||||
};
|
||||
|
||||
const handleRescan = () => {
|
||||
if (!rescanId) return;
|
||||
startTransition(async () => {
|
||||
const result = await rescanChannel(rescanId);
|
||||
if (result.success) {
|
||||
toast.success("Channel scan progress reset — it will be fully rescanned on the next sync");
|
||||
setRescanId(null);
|
||||
} else {
|
||||
toast.error(result.error);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="space-y-4">
|
||||
<DestinationCard destination={globalDestination} />
|
||||
@@ -83,6 +99,16 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
||||
onConfirm={handleDelete}
|
||||
isLoading={isPending}
|
||||
/>
|
||||
|
||||
<DeleteDialog
|
||||
open={!!rescanId}
|
||||
onOpenChange={(open) => !open && setRescanId(null)}
|
||||
title="Rescan Channel"
|
||||
description="This will reset all scan progress for this channel. On the next sync the worker will re-process every message from the beginning. Packages that are already in the library will be skipped (deduplication by hash), but any missing files will be re-downloaded and re-uploaded. This may take a long time for large channels."
|
||||
confirmLabel="Rescan"
|
||||
onConfirm={handleRescan}
|
||||
isLoading={isPending}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ interface TelegramAdminProps {
|
||||
ingestionStatus: IngestionAccountStatus[];
|
||||
globalDestination: GlobalDestination;
|
||||
sendHistory: SendHistoryRow[];
|
||||
workerIntervalMinutes: number;
|
||||
}
|
||||
|
||||
export function TelegramAdmin({
|
||||
@@ -24,6 +25,7 @@ export function TelegramAdmin({
|
||||
ingestionStatus,
|
||||
globalDestination,
|
||||
sendHistory,
|
||||
workerIntervalMinutes,
|
||||
}: TelegramAdminProps) {
|
||||
return (
|
||||
<div className="space-y-4">
|
||||
@@ -32,7 +34,7 @@ export function TelegramAdmin({
|
||||
description="Manage Telegram accounts, channels, and ingestion"
|
||||
/>
|
||||
|
||||
<WorkerStatusPanel initialStatus={ingestionStatus} />
|
||||
<WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
|
||||
|
||||
<Tabs defaultValue="accounts" className="space-y-4">
|
||||
<TabsList>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"use client";
|
||||
|
||||
import { useEffect, useState, useCallback } from "react";
|
||||
import { useEffect, useState, useCallback, useTransition } from "react";
|
||||
import {
|
||||
Loader2,
|
||||
CheckCircle2,
|
||||
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { cn } from "@/lib/utils";
|
||||
import { toast } from "sonner";
|
||||
import { triggerIngestion } from "../actions";
|
||||
import type { IngestionAccountStatus } from "@/lib/telegram/types";
|
||||
|
||||
interface WorkerStatusPanelProps {
|
||||
initialStatus: IngestionAccountStatus[];
|
||||
initialIntervalMinutes?: number;
|
||||
}
|
||||
|
||||
const AUTH_STATE_CONFIG: Record<
|
||||
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
|
||||
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
|
||||
};
|
||||
|
||||
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
|
||||
const [accounts, setAccounts] = useState(initialStatus);
|
||||
const [error, setError] = useState(false);
|
||||
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
|
||||
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
|
||||
const [isPending, startTransition] = useTransition();
|
||||
|
||||
// Find active run
|
||||
const activeRun = accounts.find((a) => a.currentRun);
|
||||
const isRunning = !!activeRun;
|
||||
|
||||
const handleSyncNow = useCallback(() => {
|
||||
startTransition(async () => {
|
||||
const result = await triggerIngestion();
|
||||
if (result.success) {
|
||||
toast.success("Sync triggered — worker will start shortly");
|
||||
} else {
|
||||
toast.error(result.error ?? "Failed to trigger sync");
|
||||
}
|
||||
});
|
||||
}, []);
|
||||
|
||||
// Poll for status
|
||||
useEffect(() => {
|
||||
let timer: ReturnType<typeof setTimeout>;
|
||||
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
const data = await res.json();
|
||||
if (mounted) {
|
||||
setAccounts(data.accounts ?? []);
|
||||
if (data.workerIntervalMinutes) {
|
||||
setWorkerIntervalMinutes(data.workerIntervalMinutes);
|
||||
}
|
||||
setError(false);
|
||||
}
|
||||
} catch {
|
||||
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter)
|
||||
// Estimate next run based on last run finish time + configured interval + up to 5 min jitter
|
||||
const lastFinished = accounts
|
||||
.filter((a) => a.lastRun?.finishedAt)
|
||||
.map((a) => new Date(a.lastRun!.finishedAt!).getTime())
|
||||
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
return;
|
||||
}
|
||||
|
||||
const intervalMs = 5 * 60 * 1000; // 5 min base
|
||||
const intervalMs = workerIntervalMinutes * 60 * 1000;
|
||||
const estimatedNext = lastFinished + intervalMs;
|
||||
|
||||
const tick = () => {
|
||||
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
tick();
|
||||
const interval = setInterval(tick, 1_000);
|
||||
return () => clearInterval(interval);
|
||||
}, [isRunning, accounts]);
|
||||
}, [isRunning, accounts, workerIntervalMinutes]);
|
||||
|
||||
if (accounts.length === 0 && !error) {
|
||||
return (
|
||||
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
||||
) : isRunning && activeRun?.currentRun ? (
|
||||
<RunningStatus run={activeRun.currentRun} />
|
||||
) : (
|
||||
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} />
|
||||
<IdleStatus
|
||||
accounts={accounts}
|
||||
nextRunCountdown={nextRunCountdown}
|
||||
onSyncNow={handleSyncNow}
|
||||
isSyncing={isPending}
|
||||
/>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
@@ -233,6 +257,11 @@ function RunningStatus({
|
||||
</span>
|
||||
</span>
|
||||
)}
|
||||
{run.messagesScanned > 0 && (
|
||||
<span>
|
||||
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
|
||||
</span>
|
||||
)}
|
||||
{run.zipsIngested > 0 && (
|
||||
<span>
|
||||
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
||||
@@ -251,9 +280,13 @@ function RunningStatus({
|
||||
function IdleStatus({
|
||||
accounts,
|
||||
nextRunCountdown,
|
||||
onSyncNow,
|
||||
isSyncing,
|
||||
}: {
|
||||
accounts: IngestionAccountStatus[];
|
||||
nextRunCountdown: string | null;
|
||||
onSyncNow: () => void;
|
||||
isSyncing: boolean;
|
||||
}) {
|
||||
const lastRun = accounts
|
||||
.filter((a) => a.lastRun)
|
||||
@@ -316,14 +349,32 @@ function IdleStatus({
|
||||
)}
|
||||
</div>
|
||||
|
||||
{nextRunCountdown && hasAuthenticated && (
|
||||
<div className="flex items-center gap-1.5 shrink-0">
|
||||
<RefreshCw className="h-3 w-3 text-muted-foreground" />
|
||||
<span className="text-xs text-muted-foreground tabular-nums">
|
||||
Next: {nextRunCountdown}
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
<div className="flex items-center gap-2 shrink-0">
|
||||
{nextRunCountdown && hasAuthenticated && (
|
||||
<div className="flex items-center gap-1.5">
|
||||
<RefreshCw className="h-3 w-3 text-muted-foreground" />
|
||||
<span className="text-xs text-muted-foreground tabular-nums">
|
||||
Next: {nextRunCountdown}
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
{hasAuthenticated && (
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
className="h-7 text-xs px-2"
|
||||
onClick={onSyncNow}
|
||||
disabled={isSyncing}
|
||||
>
|
||||
{isSyncing ? (
|
||||
<Loader2 className="h-3 w-3 animate-spin mr-1" />
|
||||
) : (
|
||||
<RefreshCw className="h-3 w-3 mr-1" />
|
||||
)}
|
||||
Sync Now
|
||||
</Button>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ export async function createChannel(
|
||||
telegramId: BigInt(parsed.data.telegramId),
|
||||
title: parsed.data.title,
|
||||
type: parsed.data.type,
|
||||
isActive: false,
|
||||
},
|
||||
});
|
||||
revalidatePath(REVALIDATE_PATH);
|
||||
@@ -296,6 +297,52 @@ export async function triggerChannelSync(): Promise<ActionResult> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset all scan progress for a channel so the worker will re-process it
|
||||
* from the very beginning on the next ingestion cycle.
|
||||
*
|
||||
* This clears:
|
||||
* - `lastProcessedMessageId` on every AccountChannelMap linked to this channel
|
||||
* - All TopicProgress records for those maps (for forum channels)
|
||||
*/
|
||||
export async function rescanChannel(channelId: string): Promise<ActionResult> {
|
||||
const admin = await requireAdmin();
|
||||
if (!admin.success) return admin;
|
||||
|
||||
const channel = await prisma.telegramChannel.findUnique({
|
||||
where: { id: channelId },
|
||||
});
|
||||
if (!channel) return { success: false, error: "Channel not found" };
|
||||
|
||||
try {
|
||||
// Find all account-channel maps for this channel
|
||||
const maps = await prisma.accountChannelMap.findMany({
|
||||
where: { channelId },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
const mapIds = maps.map((m) => m.id);
|
||||
|
||||
// Delete all topic progress records for these maps (forum channels)
|
||||
if (mapIds.length > 0) {
|
||||
await prisma.topicProgress.deleteMany({
|
||||
where: { accountChannelMapId: { in: mapIds } },
|
||||
});
|
||||
}
|
||||
|
||||
// Reset the scan cursor so the worker re-processes from the start
|
||||
await prisma.accountChannelMap.updateMany({
|
||||
where: { channelId },
|
||||
data: { lastProcessedMessageId: null },
|
||||
});
|
||||
|
||||
revalidatePath(REVALIDATE_PATH);
|
||||
return { success: true, data: undefined };
|
||||
} catch {
|
||||
return { success: false, error: "Failed to reset channel scan progress" };
|
||||
}
|
||||
}
|
||||
|
||||
// ── Account-Channel link actions ──
|
||||
|
||||
export async function linkChannel(
|
||||
@@ -371,23 +418,12 @@ export async function triggerIngestion(
|
||||
return { success: false, error: "No eligible accounts found" };
|
||||
}
|
||||
|
||||
// Create ingestion runs — the worker picks these up
|
||||
for (const account of accounts) {
|
||||
const existing = await prisma.ingestionRun.findFirst({
|
||||
where: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
if (!existing) {
|
||||
await prisma.ingestionRun.create({
|
||||
data: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// pg_notify for immediate worker pickup
|
||||
// Signal the worker to run an immediate ingestion cycle via pg_notify.
|
||||
// The worker will create its own IngestionRun records with proper activity tracking.
|
||||
try {
|
||||
await prisma.$queryRawUnsafe(
|
||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||
accounts.map((a) => a.id).join(",")
|
||||
accounts.map((a: { id: string }) => a.id).join(",")
|
||||
);
|
||||
} catch {
|
||||
// Best-effort
|
||||
@@ -417,7 +453,7 @@ export async function saveChannelSelections(
|
||||
try {
|
||||
let linked = 0;
|
||||
for (const ch of channels) {
|
||||
// Upsert the channel record
|
||||
// Upsert the channel record (new channels default to disabled)
|
||||
const channel = await prisma.telegramChannel.upsert({
|
||||
where: { telegramId: BigInt(ch.telegramId) },
|
||||
create: {
|
||||
@@ -425,6 +461,7 @@ export async function saveChannelSelections(
|
||||
title: ch.title,
|
||||
type: "SOURCE",
|
||||
isForum: ch.isForum,
|
||||
isActive: false,
|
||||
},
|
||||
update: {
|
||||
title: ch.title,
|
||||
@@ -467,10 +504,10 @@ export async function setGlobalDestination(
|
||||
if (!channel) return { success: false, error: "Channel not found" };
|
||||
|
||||
try {
|
||||
// Set the channel type to DESTINATION
|
||||
// Set the channel type to DESTINATION and ensure it's active
|
||||
await prisma.telegramChannel.update({
|
||||
where: { id: channelId },
|
||||
data: { type: "DESTINATION" },
|
||||
data: { type: "DESTINATION", isActive: true },
|
||||
});
|
||||
|
||||
// Save as global destination
|
||||
@@ -521,17 +558,19 @@ export async function createDestinationChannel(
|
||||
if (!admin.success) return admin;
|
||||
|
||||
try {
|
||||
// Create the channel as DESTINATION
|
||||
// Create the channel as DESTINATION (active by default — needed for uploads)
|
||||
const channel = await prisma.telegramChannel.upsert({
|
||||
where: { telegramId: BigInt(telegramId) },
|
||||
create: {
|
||||
telegramId: BigInt(telegramId),
|
||||
title,
|
||||
type: "DESTINATION",
|
||||
isActive: true,
|
||||
},
|
||||
update: {
|
||||
title,
|
||||
type: "DESTINATION",
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ export default async function TelegramPage() {
|
||||
}),
|
||||
]);
|
||||
|
||||
const serializedHistory = sendHistory.map((r) => ({
|
||||
const serializedHistory = sendHistory.map((r: typeof sendHistory[number]) => ({
|
||||
id: r.id,
|
||||
packageName: r.package.fileName,
|
||||
recipientName: r.telegramLink.telegramName,
|
||||
@@ -42,6 +42,7 @@ export default async function TelegramPage() {
|
||||
ingestionStatus={ingestionStatus}
|
||||
globalDestination={globalDestination}
|
||||
sendHistory={serializedHistory}
|
||||
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -9,5 +9,9 @@ export async function GET(request: Request) {
|
||||
if ("error" in authResult) return authResult.error;
|
||||
|
||||
const accounts = await getIngestionStatus();
|
||||
return NextResponse.json({ accounts });
|
||||
const workerIntervalMinutes = parseInt(
|
||||
process.env.WORKER_INTERVAL_MINUTES ?? "60",
|
||||
10
|
||||
);
|
||||
return NextResponse.json({ accounts, workerIntervalMinutes });
|
||||
}
|
||||
|
||||
@@ -45,33 +45,20 @@ export async function POST(request: Request) {
|
||||
);
|
||||
}
|
||||
|
||||
// Create ingestion runs marked as RUNNING — the worker will pick these up
|
||||
// when it next polls, or we use pg_notify for immediate pickup
|
||||
for (const account of accounts) {
|
||||
// Only create if no run is already RUNNING for this account
|
||||
const existing = await prisma.ingestionRun.findFirst({
|
||||
where: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
if (!existing) {
|
||||
await prisma.ingestionRun.create({
|
||||
data: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Send pg_notify for immediate worker pickup
|
||||
// Send pg_notify for immediate worker pickup.
|
||||
// The worker creates its own IngestionRun records with proper activity tracking.
|
||||
try {
|
||||
await prisma.$queryRawUnsafe(
|
||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||
accounts.map((a) => a.id).join(",")
|
||||
accounts.map((a: { id: string }) => a.id).join(",")
|
||||
);
|
||||
} catch {
|
||||
// pg_notify is best-effort — worker will pick up on next cycle anyway
|
||||
// pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
triggered: true,
|
||||
accountIds: accounts.map((a) => a.id),
|
||||
message: `Ingestion queued for ${accounts.length} account(s)`,
|
||||
accountIds: accounts.map((a: { id: string }) => a.id),
|
||||
message: `Ingestion triggered for ${accounts.length} account(s)`,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ interface DeleteDialogProps {
|
||||
description?: string;
|
||||
onConfirm: () => void;
|
||||
isLoading?: boolean;
|
||||
confirmLabel?: string;
|
||||
confirmLoadingLabel?: string;
|
||||
}
|
||||
|
||||
export function DeleteDialog({
|
||||
@@ -27,6 +29,8 @@ export function DeleteDialog({
|
||||
description = "This action cannot be undone.",
|
||||
onConfirm,
|
||||
isLoading,
|
||||
confirmLabel = "Delete",
|
||||
confirmLoadingLabel,
|
||||
}: DeleteDialogProps) {
|
||||
return (
|
||||
<AlertDialog open={open} onOpenChange={onOpenChange}>
|
||||
@@ -42,7 +46,7 @@ export function DeleteDialog({
|
||||
disabled={isLoading}
|
||||
className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
|
||||
>
|
||||
{isLoading ? "Deleting..." : "Delete"}
|
||||
{isLoading ? (confirmLoadingLabel ?? `${confirmLabel}...`) : confirmLabel}
|
||||
</AlertDialogAction>
|
||||
</AlertDialogFooter>
|
||||
</AlertDialogContent>
|
||||
|
||||
@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
|
||||
title: string;
|
||||
type: "SOURCE" | "DESTINATION";
|
||||
isForum: boolean;
|
||||
isActive?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upsert a channel by telegramId. Returns the channel record.
|
||||
* If it already exists, update title and forum status.
|
||||
* New channels default to disabled (isActive: false) so the admin must
|
||||
* explicitly enable them before the worker processes them.
|
||||
* Pass isActive: true for DESTINATION channels that must be active immediately.
|
||||
*/
|
||||
export async function upsertChannel(input: UpsertChannelInput) {
|
||||
return db.telegramChannel.upsert({
|
||||
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
|
||||
title: input.title,
|
||||
type: input.type,
|
||||
isForum: input.isForum,
|
||||
isActive: input.isActive ?? false,
|
||||
},
|
||||
update: {
|
||||
title: input.title,
|
||||
|
||||
@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { processFetchRequest } from "./worker.js";
|
||||
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
import { triggerImmediateCycle } from "./scheduler.js";
|
||||
import {
|
||||
getGlobalDestinationChannel,
|
||||
getGlobalSetting,
|
||||
@@ -17,6 +18,10 @@ import {
|
||||
const log = childLogger("fetch-listener");
|
||||
|
||||
let pgClient: pg.PoolClient | null = null;
|
||||
let stopped = false;
|
||||
|
||||
/** Delay (ms) before attempting to reconnect after a connection loss. */
|
||||
const RECONNECT_DELAY_MS = 5_000;
|
||||
|
||||
/**
|
||||
* Start listening for pg_notify signals from the web app.
|
||||
@@ -25,27 +30,76 @@ let pgClient: pg.PoolClient | null = null;
|
||||
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
||||
* - `generate_invite` — payload = channelId → generate invite link for destination
|
||||
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
||||
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
||||
*
|
||||
* If the underlying connection is lost, the listener automatically reconnects
|
||||
* so that pg_notify signals are never silently dropped.
|
||||
*/
|
||||
export async function startFetchListener(): Promise<void> {
|
||||
pgClient = await pool.connect();
|
||||
await pgClient.query("LISTEN channel_fetch");
|
||||
await pgClient.query("LISTEN generate_invite");
|
||||
await pgClient.query("LISTEN create_destination");
|
||||
stopped = false;
|
||||
await connectListener();
|
||||
}
|
||||
|
||||
pgClient.on("notification", (msg) => {
|
||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||
handleChannelFetch(msg.payload);
|
||||
} else if (msg.channel === "generate_invite" && msg.payload) {
|
||||
handleGenerateInvite(msg.payload);
|
||||
} else if (msg.channel === "create_destination" && msg.payload) {
|
||||
handleCreateDestination(msg.payload);
|
||||
async function connectListener(): Promise<void> {
|
||||
try {
|
||||
pgClient = await pool.connect();
|
||||
await pgClient.query("LISTEN channel_fetch");
|
||||
await pgClient.query("LISTEN generate_invite");
|
||||
await pgClient.query("LISTEN create_destination");
|
||||
await pgClient.query("LISTEN ingestion_trigger");
|
||||
|
||||
pgClient.on("notification", (msg) => {
|
||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||
handleChannelFetch(msg.payload);
|
||||
} else if (msg.channel === "generate_invite" && msg.payload) {
|
||||
handleGenerateInvite(msg.payload);
|
||||
} else if (msg.channel === "create_destination" && msg.payload) {
|
||||
handleCreateDestination(msg.payload);
|
||||
} else if (msg.channel === "ingestion_trigger") {
|
||||
handleIngestionTrigger();
|
||||
}
|
||||
});
|
||||
|
||||
// Reconnect automatically when the connection ends unexpectedly
|
||||
pgClient.on("end", () => {
|
||||
if (!stopped) {
|
||||
log.warn("Fetch listener connection lost — reconnecting");
|
||||
pgClient = null;
|
||||
scheduleReconnect();
|
||||
}
|
||||
});
|
||||
|
||||
pgClient.on("error", (err) => {
|
||||
log.error({ err }, "Fetch listener connection error");
|
||||
if (!stopped && pgClient) {
|
||||
try {
|
||||
pgClient.release(true);
|
||||
} catch (releaseErr) {
|
||||
log.debug({ err: releaseErr }, "Failed to release pg client after error");
|
||||
}
|
||||
pgClient = null;
|
||||
scheduleReconnect();
|
||||
}
|
||||
});
|
||||
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
||||
} catch (err) {
|
||||
log.error({ err }, "Failed to start fetch listener — retrying");
|
||||
scheduleReconnect();
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleReconnect(): void {
|
||||
if (stopped) return;
|
||||
setTimeout(() => {
|
||||
if (!stopped) {
|
||||
connectListener();
|
||||
}
|
||||
});
|
||||
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
|
||||
}, RECONNECT_DELAY_MS);
|
||||
}
|
||||
|
||||
export function stopFetchListener(): void {
|
||||
stopped = true;
|
||||
if (pgClient) {
|
||||
pgClient.release();
|
||||
pgClient = null;
|
||||
@@ -138,12 +192,13 @@ function handleCreateDestination(payload: string): void {
|
||||
const result = await createSupergroup(client, parsed.title);
|
||||
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
||||
|
||||
// Upsert it as a DESTINATION channel in the DB
|
||||
// Upsert it as a DESTINATION channel in the DB (active by default)
|
||||
const channel = await upsertChannel({
|
||||
telegramId: result.chatId,
|
||||
title: result.title,
|
||||
type: "DESTINATION",
|
||||
isForum: false,
|
||||
isActive: true,
|
||||
});
|
||||
|
||||
// Set as global destination
|
||||
@@ -204,3 +259,16 @@ function handleCreateDestination(payload: string): void {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Ingestion trigger handler ──
|
||||
|
||||
function handleIngestionTrigger(): void {
|
||||
fetchQueue = fetchQueue.then(async () => {
|
||||
try {
|
||||
log.info("Ingestion trigger received from UI");
|
||||
await triggerImmediateCycle();
|
||||
} catch (err) {
|
||||
log.error({ err }, "Failed to trigger immediate ingestion cycle");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -10,6 +10,13 @@ let running = false;
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
let cycleCount = 0;
|
||||
|
||||
/**
|
||||
* Maximum time for a single ingestion cycle (ms).
|
||||
* After this, new accounts won't be started (in-progress work finishes).
|
||||
* Default: 4 hours. Configurable via WORKER_CYCLE_TIMEOUT_MINUTES.
|
||||
*/
|
||||
const CYCLE_TIMEOUT_MS = (parseInt(process.env.WORKER_CYCLE_TIMEOUT_MINUTES ?? "240", 10)) * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Run one ingestion cycle:
|
||||
* 1. Authenticate any PENDING accounts (triggers SMS code flow + auto-fetch channels)
|
||||
@@ -17,6 +24,10 @@ let cycleCount = 0;
|
||||
*
|
||||
* All TDLib operations are wrapped in the mutex to ensure only one client
|
||||
* runs at a time (also shared with the fetch listener for on-demand requests).
|
||||
*
|
||||
* The cycle has a configurable timeout (WORKER_CYCLE_TIMEOUT_MINUTES, default 4h).
|
||||
* Once the timeout elapses, no new accounts will be started but any in-progress
|
||||
* account processing is allowed to finish its current archive set.
|
||||
*/
|
||||
async function runCycle(): Promise<void> {
|
||||
if (running) {
|
||||
@@ -26,7 +37,8 @@ async function runCycle(): Promise<void> {
|
||||
|
||||
running = true;
|
||||
cycleCount++;
|
||||
log.info({ cycle: cycleCount }, "Starting ingestion cycle");
|
||||
const cycleStart = Date.now();
|
||||
log.info({ cycle: cycleCount, timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 }, "Starting ingestion cycle");
|
||||
|
||||
try {
|
||||
// ── Phase 1: Authenticate pending accounts ──
|
||||
@@ -37,6 +49,10 @@ async function runCycle(): Promise<void> {
|
||||
"Found pending accounts, starting authentication"
|
||||
);
|
||||
for (const account of pendingAccounts) {
|
||||
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
|
||||
log.warn("Cycle timeout reached during authentication phase, stopping");
|
||||
break;
|
||||
}
|
||||
await withTdlibMutex(`auth:${account.phone}`, () =>
|
||||
authenticateAccount(account)
|
||||
);
|
||||
@@ -54,12 +70,22 @@ async function runCycle(): Promise<void> {
|
||||
log.info({ accountCount: accounts.length }, "Processing accounts");
|
||||
|
||||
for (const account of accounts) {
|
||||
if (Date.now() - cycleStart > CYCLE_TIMEOUT_MS) {
|
||||
log.warn(
|
||||
{ elapsed: Math.round((Date.now() - cycleStart) / 60_000), timeoutMinutes: CYCLE_TIMEOUT_MS / 60_000 },
|
||||
"Cycle timeout reached, skipping remaining accounts"
|
||||
);
|
||||
break;
|
||||
}
|
||||
await withTdlibMutex(`ingest:${account.phone}`, () =>
|
||||
runWorkerForAccount(account)
|
||||
);
|
||||
}
|
||||
|
||||
log.info("Ingestion cycle complete");
|
||||
log.info(
|
||||
{ elapsed: Math.round((Date.now() - cycleStart) / 1000) },
|
||||
"Ingestion cycle complete"
|
||||
);
|
||||
} catch (err) {
|
||||
log.error({ err }, "Ingestion cycle failed");
|
||||
} finally {
|
||||
@@ -105,6 +131,19 @@ export async function startScheduler(): Promise<void> {
|
||||
scheduleNext();
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
|
||||
* If a cycle is already running, this is a no-op.
|
||||
*/
|
||||
export async function triggerImmediateCycle(): Promise<void> {
|
||||
if (running) {
|
||||
log.info("Cycle already running, ignoring trigger");
|
||||
return;
|
||||
}
|
||||
log.info("Immediate cycle triggered via UI");
|
||||
await runCycle();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the scheduler gracefully.
|
||||
*/
|
||||
|
||||
@@ -8,6 +8,12 @@ import type { TelegramPhoto } from "../preview/match.js";
|
||||
|
||||
const log = childLogger("download");
|
||||
|
||||
/** Maximum number of pages to scan per channel/topic to prevent infinite loops */
|
||||
export const MAX_SCAN_PAGES = 5000;
|
||||
|
||||
/** Timeout for a single TDLib API call (ms) */
|
||||
export const INVOKE_TIMEOUT_MS = 120_000; // 2 minutes
|
||||
|
||||
interface TdPhotoSize {
|
||||
type: string;
|
||||
photo: {
|
||||
@@ -66,6 +72,47 @@ interface TdFile {
|
||||
export interface ChannelScanResult {
|
||||
archives: TelegramMessage[];
|
||||
photos: TelegramPhoto[];
|
||||
totalScanned: number;
|
||||
}
|
||||
|
||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||
|
||||
/**
|
||||
* Invoke a TDLib method with a timeout to prevent indefinite hangs.
|
||||
* If TDLib does not respond within the timeout, the promise rejects.
|
||||
*/
|
||||
export async function invokeWithTimeout<T>(
|
||||
client: Client,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
request: Record<string, any>,
|
||||
timeoutMs = INVOKE_TIMEOUT_MS
|
||||
): Promise<T> {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
let settled = false;
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
if (!settled) {
|
||||
settled = true;
|
||||
reject(new Error(`TDLib invoke timed out after ${timeoutMs}ms for ${request._}`));
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
(client.invoke(request) as Promise<T>)
|
||||
.then((result) => {
|
||||
if (!settled) {
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
resolve(result);
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (!settled) {
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -77,32 +124,53 @@ export interface ChannelScanResult {
|
||||
* When `lastProcessedMessageId` is null (first run), scans everything.
|
||||
* The worker applies a post-grouping filter to skip fully-processed sets,
|
||||
* and keeps `packageExistsBySourceMessage` as a safety net.
|
||||
*
|
||||
* Safety features:
|
||||
* - Max page limit to prevent infinite loops
|
||||
* - Stuck detection: breaks if from_message_id stops advancing
|
||||
* - Timeout on each TDLib API call
|
||||
*/
|
||||
export async function getChannelMessages(
|
||||
client: Client,
|
||||
chatId: bigint,
|
||||
lastProcessedMessageId?: bigint | null,
|
||||
limit = 100
|
||||
limit = 100,
|
||||
onProgress?: ScanProgressCallback
|
||||
): Promise<ChannelScanResult> {
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
|
||||
let currentFromId = 0;
|
||||
let totalScanned = 0;
|
||||
let pageCount = 0;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const result = (await client.invoke({
|
||||
if (pageCount >= MAX_SCAN_PAGES) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), pageCount, totalScanned },
|
||||
"Hit max page limit for channel scan, stopping"
|
||||
);
|
||||
break;
|
||||
}
|
||||
pageCount++;
|
||||
|
||||
const previousFromId = currentFromId;
|
||||
|
||||
const result = await invokeWithTimeout<{ messages: TdMessage[] }>(client, {
|
||||
_: "getChatHistory",
|
||||
chat_id: Number(chatId),
|
||||
from_message_id: currentFromId,
|
||||
offset: 0,
|
||||
limit: Math.min(limit, 100),
|
||||
only_local: false,
|
||||
})) as { messages: TdMessage[] };
|
||||
});
|
||||
|
||||
if (!result.messages || result.messages.length === 0) break;
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -132,19 +200,31 @@ export async function getChannelMessages(
|
||||
}
|
||||
}
|
||||
|
||||
// Report scanning progress after each page
|
||||
onProgress?.(totalScanned);
|
||||
|
||||
currentFromId = result.messages[result.messages.length - 1].id;
|
||||
|
||||
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
|
||||
if (currentFromId === previousFromId) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), currentFromId, totalScanned },
|
||||
"Pagination stuck (from_message_id not advancing), breaking"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||
if (boundary && currentFromId < boundary) break;
|
||||
|
||||
if (result.messages.length < 100) break;
|
||||
if (result.messages.length < Math.min(limit, 100)) break;
|
||||
|
||||
// Rate limit delay
|
||||
await sleep(config.apiDelayMs);
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
|
||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
|
||||
"Channel scan complete"
|
||||
);
|
||||
|
||||
@@ -152,6 +232,7 @@ export async function getChannelMessages(
|
||||
return {
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,8 @@ import { childLogger } from "../util/logger.js";
|
||||
import { isArchiveAttachment } from "../archive/detect.js";
|
||||
import type { TelegramMessage } from "../archive/multipart.js";
|
||||
import type { TelegramPhoto } from "../preview/match.js";
|
||||
import type { ChannelScanResult } from "./download.js";
|
||||
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
||||
import { invokeWithTimeout, MAX_SCAN_PAGES, INVOKE_TIMEOUT_MS } from "./download.js";
|
||||
|
||||
const log = childLogger("topics");
|
||||
|
||||
@@ -21,16 +22,16 @@ export async function isChatForum(
|
||||
chatId: bigint
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const chat = (await client.invoke({
|
||||
_: "getChat",
|
||||
chat_id: Number(chatId),
|
||||
})) as {
|
||||
const chat = await invokeWithTimeout<{
|
||||
type?: {
|
||||
_: string;
|
||||
supergroup_id?: number;
|
||||
is_forum?: boolean;
|
||||
};
|
||||
};
|
||||
}>(client, {
|
||||
_: "getChat",
|
||||
chat_id: Number(chatId),
|
||||
});
|
||||
|
||||
if (chat.type?._ === "chatTypeSupergroup" && chat.type.is_forum) {
|
||||
return true;
|
||||
@@ -38,10 +39,10 @@ export async function isChatForum(
|
||||
|
||||
// Also check via getSupergroup for older TDLib versions
|
||||
if (chat.type?._ === "chatTypeSupergroup" && chat.type.supergroup_id) {
|
||||
const sg = (await client.invoke({
|
||||
const sg = await invokeWithTimeout<{ is_forum?: boolean }>(client, {
|
||||
_: "getSupergroup",
|
||||
supergroup_id: chat.type.supergroup_id,
|
||||
})) as { is_forum?: boolean };
|
||||
});
|
||||
return sg.is_forum === true;
|
||||
}
|
||||
|
||||
@@ -54,6 +55,7 @@ export async function isChatForum(
|
||||
|
||||
/**
|
||||
* Get all forum topics in a supergroup.
|
||||
* Includes stuck detection and timeout protection on API calls.
|
||||
*/
|
||||
export async function getForumTopicList(
|
||||
client: Client,
|
||||
@@ -63,18 +65,24 @@ export async function getForumTopicList(
|
||||
let offsetDate = 0;
|
||||
let offsetMessageId = 0;
|
||||
let offsetMessageThreadId = 0;
|
||||
let pageCount = 0;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const result = (await client.invoke({
|
||||
_: "getForumTopics",
|
||||
chat_id: Number(chatId),
|
||||
query: "",
|
||||
offset_date: offsetDate,
|
||||
offset_message_id: offsetMessageId,
|
||||
offset_message_thread_id: offsetMessageThreadId,
|
||||
limit: 100,
|
||||
})) as {
|
||||
if (pageCount >= MAX_SCAN_PAGES) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), pageCount, topicCount: topics.length },
|
||||
"Hit max page limit for topic enumeration, stopping"
|
||||
);
|
||||
break;
|
||||
}
|
||||
pageCount++;
|
||||
|
||||
const prevOffsetDate = offsetDate;
|
||||
const prevOffsetMessageId = offsetMessageId;
|
||||
const prevOffsetMessageThreadId = offsetMessageThreadId;
|
||||
|
||||
const result = await invokeWithTimeout<{
|
||||
topics?: {
|
||||
info?: {
|
||||
message_thread_id?: number;
|
||||
@@ -85,7 +93,15 @@ export async function getForumTopicList(
|
||||
next_offset_date?: number;
|
||||
next_offset_message_id?: number;
|
||||
next_offset_message_thread_id?: number;
|
||||
};
|
||||
}>(client, {
|
||||
_: "getForumTopics",
|
||||
chat_id: Number(chatId),
|
||||
query: "",
|
||||
offset_date: offsetDate,
|
||||
offset_message_id: offsetMessageId,
|
||||
offset_message_thread_id: offsetMessageThreadId,
|
||||
limit: 100,
|
||||
});
|
||||
|
||||
if (!result.topics || result.topics.length === 0) break;
|
||||
|
||||
@@ -113,6 +129,19 @@ export async function getForumTopicList(
|
||||
offsetMessageId = result.next_offset_message_id ?? 0;
|
||||
offsetMessageThreadId = result.next_offset_message_thread_id ?? 0;
|
||||
|
||||
// Stuck detection: if offsets didn't advance, break
|
||||
if (
|
||||
offsetDate === prevOffsetDate &&
|
||||
offsetMessageId === prevOffsetMessageId &&
|
||||
offsetMessageThreadId === prevOffsetMessageThreadId
|
||||
) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), topicCount: topics.length },
|
||||
"Topic pagination stuck (offsets not advancing), breaking"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
await sleep(config.apiDelayMs);
|
||||
}
|
||||
|
||||
@@ -134,35 +163,43 @@ export async function getForumTopicList(
|
||||
* When `lastProcessedMessageId` is null (first run), scans everything.
|
||||
* The worker applies a post-grouping filter to skip fully-processed sets,
|
||||
* and keeps `packageExistsBySourceMessage` as a safety net.
|
||||
*
|
||||
* Safety features:
|
||||
* - Max page limit to prevent infinite loops
|
||||
* - Stuck detection: breaks if from_message_id stops advancing
|
||||
* - Timeout on each TDLib API call
|
||||
*/
|
||||
export async function getTopicMessages(
|
||||
client: Client,
|
||||
chatId: bigint,
|
||||
topicId: bigint,
|
||||
lastProcessedMessageId?: bigint | null,
|
||||
limit = 100
|
||||
limit = 100,
|
||||
onProgress?: ScanProgressCallback
|
||||
): Promise<ChannelScanResult> {
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
|
||||
let currentFromId = 0;
|
||||
let totalScanned = 0;
|
||||
let pageCount = 0;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
if (pageCount >= MAX_SCAN_PAGES) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), pageCount, totalScanned },
|
||||
"Hit max page limit for topic scan, stopping"
|
||||
);
|
||||
break;
|
||||
}
|
||||
pageCount++;
|
||||
|
||||
const previousFromId = currentFromId;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const result = (await client.invoke({
|
||||
_: "searchChatMessages",
|
||||
chat_id: Number(chatId),
|
||||
query: "",
|
||||
message_thread_id: Number(topicId),
|
||||
from_message_id: currentFromId,
|
||||
offset: 0,
|
||||
limit: Math.min(limit, 100),
|
||||
filter: null,
|
||||
sender_id: null,
|
||||
saved_messages_topic_id: 0,
|
||||
})) as {
|
||||
const result = await invokeWithTimeout<{
|
||||
messages?: {
|
||||
id: number;
|
||||
date: number;
|
||||
@@ -186,10 +223,23 @@ export async function getTopicMessages(
|
||||
caption?: { text?: string };
|
||||
};
|
||||
}[];
|
||||
};
|
||||
}>(client, {
|
||||
_: "searchChatMessages",
|
||||
chat_id: Number(chatId),
|
||||
query: "",
|
||||
message_thread_id: Number(topicId),
|
||||
from_message_id: currentFromId,
|
||||
offset: 0,
|
||||
limit: Math.min(limit, 100),
|
||||
filter: null,
|
||||
sender_id: null,
|
||||
saved_messages_topic_id: 0,
|
||||
});
|
||||
|
||||
if (!result.messages || result.messages.length === 0) break;
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -219,18 +269,30 @@ export async function getTopicMessages(
|
||||
}
|
||||
}
|
||||
|
||||
// Report scanning progress after each page
|
||||
onProgress?.(totalScanned);
|
||||
|
||||
currentFromId = result.messages[result.messages.length - 1].id;
|
||||
|
||||
// Stuck detection: if from_message_id didn't advance, break to prevent infinite loop
|
||||
if (currentFromId === previousFromId) {
|
||||
log.warn(
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), currentFromId, totalScanned },
|
||||
"Topic pagination stuck (from_message_id not advancing), breaking"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||
if (boundary && currentFromId < boundary) break;
|
||||
|
||||
if (result.messages.length < 100) break;
|
||||
if (result.messages.length < Math.min(limit, 100)) break;
|
||||
|
||||
await sleep(config.apiDelayMs);
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned, pages: pageCount },
|
||||
"Topic scan complete"
|
||||
);
|
||||
|
||||
@@ -238,6 +300,7 @@ export async function getTopicMessages(
|
||||
return {
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -4,12 +4,21 @@ const log = childLogger("mutex");
|
||||
|
||||
let locked = false;
|
||||
let holder = "";
|
||||
const queue: Array<{ resolve: () => void; label: string }> = [];
|
||||
const queue: Array<{ resolve: () => void; reject: (err: Error) => void; label: string }> = [];
|
||||
|
||||
/**
|
||||
* Maximum time to wait for the TDLib mutex (ms).
|
||||
* If the mutex is not available within this time, the operation is rejected.
|
||||
* Default: 30 minutes (long enough for large downloads, short enough to detect hangs).
|
||||
*/
|
||||
const MUTEX_WAIT_TIMEOUT_MS = 30 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* Ensures only one TDLib client runs at a time across the entire worker process.
|
||||
* Both the scheduler (auth, ingestion) and the fetch listener acquire this
|
||||
* before creating any TDLib client.
|
||||
*
|
||||
* Includes a wait timeout to prevent indefinite blocking if the current holder hangs.
|
||||
*/
|
||||
export async function withTdlibMutex<T>(
|
||||
label: string,
|
||||
@@ -17,7 +26,28 @@ export async function withTdlibMutex<T>(
|
||||
): Promise<T> {
|
||||
if (locked) {
|
||||
log.info({ waiting: label, holder }, "Waiting for TDLib mutex");
|
||||
await new Promise<void>((resolve) => queue.push({ resolve, label }));
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
const idx = queue.indexOf(entry);
|
||||
if (idx !== -1) {
|
||||
queue.splice(idx, 1);
|
||||
reject(new Error(
|
||||
`TDLib mutex wait timeout after ${MUTEX_WAIT_TIMEOUT_MS / 60_000}min ` +
|
||||
`(waiting: ${label}, holder: ${holder})`
|
||||
));
|
||||
}
|
||||
}, MUTEX_WAIT_TIMEOUT_MS);
|
||||
|
||||
const entry = {
|
||||
resolve: () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
},
|
||||
reject,
|
||||
label,
|
||||
};
|
||||
queue.push(entry);
|
||||
});
|
||||
}
|
||||
|
||||
locked = true;
|
||||
|
||||
@@ -349,8 +349,18 @@ export async function runWorkerForAccount(
|
||||
throw new Error("No global destination channel configured — set one in the admin UI");
|
||||
}
|
||||
|
||||
for (const mapping of channelMappings) {
|
||||
const totalChannels = channelMappings.length;
|
||||
|
||||
if (totalChannels === 0) {
|
||||
accountLog.info("No active source channels linked to this account — nothing to ingest");
|
||||
}
|
||||
|
||||
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
||||
const mapping = channelMappings[chIdx];
|
||||
const channel = mapping.channel;
|
||||
const channelLabel = totalChannels > 1
|
||||
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
|
||||
: channel.title;
|
||||
|
||||
try {
|
||||
// ── Check if channel is a forum ──
|
||||
@@ -380,15 +390,16 @@ export async function runWorkerForAccount(
|
||||
if (forum) {
|
||||
// ── Forum channel: scan per-topic ──
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Enumerating topics in "${channel.title}"`,
|
||||
currentActivity: `Enumerating topics in "${channelLabel}"`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channel.title,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
const topics = await getForumTopicList(client, channel.telegramId);
|
||||
@@ -399,34 +410,53 @@ export async function runWorkerForAccount(
|
||||
"Scanning forum channel by topic"
|
||||
);
|
||||
|
||||
for (const topic of topics) {
|
||||
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
|
||||
const topic = topics[tIdx];
|
||||
try {
|
||||
const progress = topicProgressList.find(
|
||||
(tp) => tp.topicId === topic.topicId
|
||||
);
|
||||
|
||||
const topicLabel = `${channel.title} › ${topic.name}`;
|
||||
const topicProgress = topics.length > 1
|
||||
? ` (topic ${tIdx + 1}/${topics.length})`
|
||||
: "";
|
||||
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
|
||||
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: `${channel.title} › ${topic.name}`,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
const scanResult = await getTopicMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
topic.topicId,
|
||||
progress?.lastProcessedMessageId
|
||||
progress?.lastProcessedMessageId,
|
||||
100,
|
||||
(scanned) => {
|
||||
throttled.update({
|
||||
currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channelLabel,
|
||||
messagesScanned: counters.messagesScanned + scanned,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Add scanned messages to global counter
|
||||
counters.messagesScanned += scanResult.totalScanned;
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.debug(
|
||||
{ channelId: channel.id, topic: topic.name },
|
||||
accountLog.info(
|
||||
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
||||
"No new archives in topic"
|
||||
);
|
||||
continue;
|
||||
@@ -463,15 +493,16 @@ export async function runWorkerForAccount(
|
||||
} else {
|
||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Scanning "${channel.title}" for new archives`,
|
||||
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channel.title,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
accountLog.info(
|
||||
@@ -482,11 +513,23 @@ export async function runWorkerForAccount(
|
||||
const scanResult = await getChannelMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
mapping.lastProcessedMessageId
|
||||
mapping.lastProcessedMessageId,
|
||||
100,
|
||||
(scanned) => {
|
||||
throttled.update({
|
||||
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channelLabel,
|
||||
messagesScanned: counters.messagesScanned + scanned,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Add scanned messages to global counter
|
||||
counters.messagesScanned += scanResult.totalScanned;
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.debug({ channelId: channel.id }, "No new archives");
|
||||
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -593,6 +636,7 @@ async function processArchiveSets(
|
||||
currentChannel: channelTitle,
|
||||
totalFiles: archiveSets.length,
|
||||
zipsFound: counters.zipsFound,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
// Track the highest message ID that was successfully processed
|
||||
@@ -646,7 +690,6 @@ async function processOneArchiveSet(
|
||||
throttled, counters, topicCreator, sourceTopicId, accountLog,
|
||||
} = ctx;
|
||||
|
||||
counters.messagesScanned += archiveSet.parts.length;
|
||||
const archiveName = archiveSet.parts[0].fileName;
|
||||
|
||||
// ── Early skip: check if this archive set was already ingested ──
|
||||
|
||||
Reference in New Issue
Block a user