mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Merge pull request #14 from xCyanGrizzly/copilot/fix-telegram-worker-issue
Fix APP_PORT: align container listen port, port mapping, and healthcheck
This commit is contained in:
@@ -13,6 +13,8 @@ AUTH_GITHUB_ID=""
|
|||||||
AUTH_GITHUB_SECRET=""
|
AUTH_GITHUB_SECRET=""
|
||||||
|
|
||||||
# App
|
# App
|
||||||
|
# APP_PORT controls the port the container listens on AND how it is exposed on the host.
|
||||||
|
# If you change APP_PORT, also update NEXT_PUBLIC_APP_URL to match.
|
||||||
NEXT_PUBLIC_APP_URL="http://localhost:3000"
|
NEXT_PUBLIC_APP_URL="http://localhost:3000"
|
||||||
APP_PORT=3000
|
APP_PORT=3000
|
||||||
|
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ RUN chmod +x docker-entrypoint.sh
|
|||||||
|
|
||||||
USER nextjs
|
USER nextjs
|
||||||
|
|
||||||
|
# Default port — overridden at runtime by the PORT env var (set via docker-compose APP_PORT)
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
ENV PORT=3000
|
ENV PORT=3000
|
||||||
ENV HOSTNAME="0.0.0.0"
|
ENV HOSTNAME="0.0.0.0"
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ services:
|
|||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
pull_policy: never
|
pull_policy: never
|
||||||
ports:
|
ports:
|
||||||
- "${APP_PORT:-3000}:3000"
|
- "${APP_PORT:-3000}:${APP_PORT:-3000}"
|
||||||
environment:
|
environment:
|
||||||
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
|
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
|
||||||
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
|
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
|
||||||
@@ -17,11 +17,13 @@ services:
|
|||||||
- BOT_TOKEN=${BOT_TOKEN:-}
|
- BOT_TOKEN=${BOT_TOKEN:-}
|
||||||
- BOT_USERNAME=${BOT_USERNAME:-}
|
- BOT_USERNAME=${BOT_USERNAME:-}
|
||||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||||
|
- WORKER_INTERVAL_MINUTES=${WORKER_INTERVAL_MINUTES:-60}
|
||||||
|
- PORT=${APP_PORT:-3000}
|
||||||
depends_on:
|
depends_on:
|
||||||
db:
|
db:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "wget", "-q", "--spider", "http://localhost:3000/api/health"]
|
test: ["CMD-SHELL", "wget -q --spider http://localhost:$$PORT/api/health || exit 1"]
|
||||||
interval: 30s
|
interval: 30s
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|||||||
2
package-lock.json
generated
2
package-lock.json
generated
@@ -49,7 +49,7 @@
|
|||||||
"ts-node": "^10.9.2",
|
"ts-node": "^10.9.2",
|
||||||
"tsx": "^4.21.0",
|
"tsx": "^4.21.0",
|
||||||
"tw-animate-css": "^1.4.0",
|
"tw-animate-css": "^1.4.0",
|
||||||
"typescript": "^5"
|
"typescript": "5.9.3"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@alloc/quick-lru": {
|
"node_modules/@alloc/quick-lru": {
|
||||||
|
|||||||
@@ -58,6 +58,6 @@
|
|||||||
"ts-node": "^10.9.2",
|
"ts-node": "^10.9.2",
|
||||||
"tsx": "^4.21.0",
|
"tsx": "^4.21.0",
|
||||||
"tw-animate-css": "^1.4.0",
|
"tw-animate-css": "^1.4.0",
|
||||||
"typescript": "^5"
|
"typescript": "5.9.3"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import {
|
|||||||
Power,
|
Power,
|
||||||
ArrowDownToLine,
|
ArrowDownToLine,
|
||||||
ArrowUpFromLine,
|
ArrowUpFromLine,
|
||||||
|
RefreshCcw,
|
||||||
} from "lucide-react";
|
} from "lucide-react";
|
||||||
import { Badge } from "@/components/ui/badge";
|
import { Badge } from "@/components/ui/badge";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
@@ -23,12 +24,14 @@ interface ChannelColumnsProps {
|
|||||||
onToggleActive: (id: string) => void;
|
onToggleActive: (id: string) => void;
|
||||||
onDelete: (id: string) => void;
|
onDelete: (id: string) => void;
|
||||||
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
|
onSetType: (id: string, type: "SOURCE" | "DESTINATION") => void;
|
||||||
|
onRescan: (id: string) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getChannelColumns({
|
export function getChannelColumns({
|
||||||
onToggleActive,
|
onToggleActive,
|
||||||
onDelete,
|
onDelete,
|
||||||
onSetType,
|
onSetType,
|
||||||
|
onRescan,
|
||||||
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
|
}: ChannelColumnsProps): ColumnDef<ChannelRow, unknown>[] {
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
@@ -121,6 +124,14 @@ export function getChannelColumns({
|
|||||||
Set as Source
|
Set as Source
|
||||||
</DropdownMenuItem>
|
</DropdownMenuItem>
|
||||||
)}
|
)}
|
||||||
|
{row.original.type === "SOURCE" && (
|
||||||
|
<DropdownMenuItem
|
||||||
|
onClick={() => onRescan(row.original.id)}
|
||||||
|
>
|
||||||
|
<RefreshCcw className="mr-2 h-3.5 w-3.5" />
|
||||||
|
Rescan Channel
|
||||||
|
</DropdownMenuItem>
|
||||||
|
)}
|
||||||
<DropdownMenuItem
|
<DropdownMenuItem
|
||||||
onClick={() => onToggleActive(row.original.id)}
|
onClick={() => onToggleActive(row.original.id)}
|
||||||
>
|
>
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import {
|
|||||||
deleteChannel,
|
deleteChannel,
|
||||||
toggleChannelActive,
|
toggleChannelActive,
|
||||||
setChannelType,
|
setChannelType,
|
||||||
|
rescanChannel,
|
||||||
} from "../actions";
|
} from "../actions";
|
||||||
import { DataTable } from "@/components/shared/data-table";
|
import { DataTable } from "@/components/shared/data-table";
|
||||||
import { DeleteDialog } from "@/components/shared/delete-dialog";
|
import { DeleteDialog } from "@/components/shared/delete-dialog";
|
||||||
@@ -22,6 +23,7 @@ interface ChannelsTabProps {
|
|||||||
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
||||||
const [isPending, startTransition] = useTransition();
|
const [isPending, startTransition] = useTransition();
|
||||||
const [deleteId, setDeleteId] = useState<string | null>(null);
|
const [deleteId, setDeleteId] = useState<string | null>(null);
|
||||||
|
const [rescanId, setRescanId] = useState<string | null>(null);
|
||||||
|
|
||||||
const columns = getChannelColumns({
|
const columns = getChannelColumns({
|
||||||
onToggleActive: (id) => {
|
onToggleActive: (id) => {
|
||||||
@@ -39,6 +41,7 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
|||||||
else toast.error(result.error);
|
else toast.error(result.error);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
onRescan: (id) => setRescanId(id),
|
||||||
});
|
});
|
||||||
|
|
||||||
const { table } = useDataTable({
|
const { table } = useDataTable({
|
||||||
@@ -60,6 +63,19 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const handleRescan = () => {
|
||||||
|
if (!rescanId) return;
|
||||||
|
startTransition(async () => {
|
||||||
|
const result = await rescanChannel(rescanId);
|
||||||
|
if (result.success) {
|
||||||
|
toast.success("Channel scan progress reset — it will be fully rescanned on the next sync");
|
||||||
|
setRescanId(null);
|
||||||
|
} else {
|
||||||
|
toast.error(result.error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="space-y-4">
|
<div className="space-y-4">
|
||||||
<DestinationCard destination={globalDestination} />
|
<DestinationCard destination={globalDestination} />
|
||||||
@@ -83,6 +99,16 @@ export function ChannelsTab({ channels, globalDestination }: ChannelsTabProps) {
|
|||||||
onConfirm={handleDelete}
|
onConfirm={handleDelete}
|
||||||
isLoading={isPending}
|
isLoading={isPending}
|
||||||
/>
|
/>
|
||||||
|
|
||||||
|
<DeleteDialog
|
||||||
|
open={!!rescanId}
|
||||||
|
onOpenChange={(open) => !open && setRescanId(null)}
|
||||||
|
title="Rescan Channel"
|
||||||
|
description="This will reset all scan progress for this channel. On the next sync the worker will re-process every message from the beginning. Packages that are already in the library will be skipped (deduplication by hash), but any missing files will be re-downloaded and re-uploaded. This may take a long time for large channels."
|
||||||
|
confirmLabel="Rescan"
|
||||||
|
onConfirm={handleRescan}
|
||||||
|
isLoading={isPending}
|
||||||
|
/>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ interface TelegramAdminProps {
|
|||||||
ingestionStatus: IngestionAccountStatus[];
|
ingestionStatus: IngestionAccountStatus[];
|
||||||
globalDestination: GlobalDestination;
|
globalDestination: GlobalDestination;
|
||||||
sendHistory: SendHistoryRow[];
|
sendHistory: SendHistoryRow[];
|
||||||
|
workerIntervalMinutes: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function TelegramAdmin({
|
export function TelegramAdmin({
|
||||||
@@ -24,6 +25,7 @@ export function TelegramAdmin({
|
|||||||
ingestionStatus,
|
ingestionStatus,
|
||||||
globalDestination,
|
globalDestination,
|
||||||
sendHistory,
|
sendHistory,
|
||||||
|
workerIntervalMinutes,
|
||||||
}: TelegramAdminProps) {
|
}: TelegramAdminProps) {
|
||||||
return (
|
return (
|
||||||
<div className="space-y-4">
|
<div className="space-y-4">
|
||||||
@@ -32,7 +34,7 @@ export function TelegramAdmin({
|
|||||||
description="Manage Telegram accounts, channels, and ingestion"
|
description="Manage Telegram accounts, channels, and ingestion"
|
||||||
/>
|
/>
|
||||||
|
|
||||||
<WorkerStatusPanel initialStatus={ingestionStatus} />
|
<WorkerStatusPanel initialStatus={ingestionStatus} initialIntervalMinutes={workerIntervalMinutes} />
|
||||||
|
|
||||||
<Tabs defaultValue="accounts" className="space-y-4">
|
<Tabs defaultValue="accounts" className="space-y-4">
|
||||||
<TabsList>
|
<TabsList>
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { useEffect, useState, useCallback } from "react";
|
import { useEffect, useState, useCallback, useTransition } from "react";
|
||||||
import {
|
import {
|
||||||
Loader2,
|
Loader2,
|
||||||
CheckCircle2,
|
CheckCircle2,
|
||||||
@@ -14,10 +14,13 @@ import { Card, CardContent } from "@/components/ui/card";
|
|||||||
import { Badge } from "@/components/ui/badge";
|
import { Badge } from "@/components/ui/badge";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
import { cn } from "@/lib/utils";
|
import { cn } from "@/lib/utils";
|
||||||
|
import { toast } from "sonner";
|
||||||
|
import { triggerIngestion } from "../actions";
|
||||||
import type { IngestionAccountStatus } from "@/lib/telegram/types";
|
import type { IngestionAccountStatus } from "@/lib/telegram/types";
|
||||||
|
|
||||||
interface WorkerStatusPanelProps {
|
interface WorkerStatusPanelProps {
|
||||||
initialStatus: IngestionAccountStatus[];
|
initialStatus: IngestionAccountStatus[];
|
||||||
|
initialIntervalMinutes?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const AUTH_STATE_CONFIG: Record<
|
const AUTH_STATE_CONFIG: Record<
|
||||||
@@ -39,15 +42,28 @@ const AUTH_STATE_CONFIG: Record<
|
|||||||
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
|
EXPIRED: { label: "Expired", color: "text-red-500", icon: "x" },
|
||||||
};
|
};
|
||||||
|
|
||||||
export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
export function WorkerStatusPanel({ initialStatus, initialIntervalMinutes = 60 }: WorkerStatusPanelProps) {
|
||||||
const [accounts, setAccounts] = useState(initialStatus);
|
const [accounts, setAccounts] = useState(initialStatus);
|
||||||
const [error, setError] = useState(false);
|
const [error, setError] = useState(false);
|
||||||
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
|
const [nextRunCountdown, setNextRunCountdown] = useState<string | null>(null);
|
||||||
|
const [workerIntervalMinutes, setWorkerIntervalMinutes] = useState(initialIntervalMinutes);
|
||||||
|
const [isPending, startTransition] = useTransition();
|
||||||
|
|
||||||
// Find active run
|
// Find active run
|
||||||
const activeRun = accounts.find((a) => a.currentRun);
|
const activeRun = accounts.find((a) => a.currentRun);
|
||||||
const isRunning = !!activeRun;
|
const isRunning = !!activeRun;
|
||||||
|
|
||||||
|
const handleSyncNow = useCallback(() => {
|
||||||
|
startTransition(async () => {
|
||||||
|
const result = await triggerIngestion();
|
||||||
|
if (result.success) {
|
||||||
|
toast.success("Sync triggered — worker will start shortly");
|
||||||
|
} else {
|
||||||
|
toast.error(result.error ?? "Failed to trigger sync");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, []);
|
||||||
|
|
||||||
// Poll for status
|
// Poll for status
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
let timer: ReturnType<typeof setTimeout>;
|
let timer: ReturnType<typeof setTimeout>;
|
||||||
@@ -60,6 +76,9 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
|||||||
const data = await res.json();
|
const data = await res.json();
|
||||||
if (mounted) {
|
if (mounted) {
|
||||||
setAccounts(data.accounts ?? []);
|
setAccounts(data.accounts ?? []);
|
||||||
|
if (data.workerIntervalMinutes) {
|
||||||
|
setWorkerIntervalMinutes(data.workerIntervalMinutes);
|
||||||
|
}
|
||||||
setError(false);
|
setError(false);
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
@@ -86,7 +105,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Estimate next run based on last run finish time + interval (5 min + up to 5 min jitter)
|
// Estimate next run based on last run finish time + configured interval + up to 5 min jitter
|
||||||
const lastFinished = accounts
|
const lastFinished = accounts
|
||||||
.filter((a) => a.lastRun?.finishedAt)
|
.filter((a) => a.lastRun?.finishedAt)
|
||||||
.map((a) => new Date(a.lastRun!.finishedAt!).getTime())
|
.map((a) => new Date(a.lastRun!.finishedAt!).getTime())
|
||||||
@@ -97,7 +116,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const intervalMs = 5 * 60 * 1000; // 5 min base
|
const intervalMs = workerIntervalMinutes * 60 * 1000;
|
||||||
const estimatedNext = lastFinished + intervalMs;
|
const estimatedNext = lastFinished + intervalMs;
|
||||||
|
|
||||||
const tick = () => {
|
const tick = () => {
|
||||||
@@ -116,7 +135,7 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
|||||||
tick();
|
tick();
|
||||||
const interval = setInterval(tick, 1_000);
|
const interval = setInterval(tick, 1_000);
|
||||||
return () => clearInterval(interval);
|
return () => clearInterval(interval);
|
||||||
}, [isRunning, accounts]);
|
}, [isRunning, accounts, workerIntervalMinutes]);
|
||||||
|
|
||||||
if (accounts.length === 0 && !error) {
|
if (accounts.length === 0 && !error) {
|
||||||
return (
|
return (
|
||||||
@@ -182,7 +201,12 @@ export function WorkerStatusPanel({ initialStatus }: WorkerStatusPanelProps) {
|
|||||||
) : isRunning && activeRun?.currentRun ? (
|
) : isRunning && activeRun?.currentRun ? (
|
||||||
<RunningStatus run={activeRun.currentRun} />
|
<RunningStatus run={activeRun.currentRun} />
|
||||||
) : (
|
) : (
|
||||||
<IdleStatus accounts={accounts} nextRunCountdown={nextRunCountdown} />
|
<IdleStatus
|
||||||
|
accounts={accounts}
|
||||||
|
nextRunCountdown={nextRunCountdown}
|
||||||
|
onSyncNow={handleSyncNow}
|
||||||
|
isSyncing={isPending}
|
||||||
|
/>
|
||||||
)}
|
)}
|
||||||
</CardContent>
|
</CardContent>
|
||||||
</Card>
|
</Card>
|
||||||
@@ -256,9 +280,13 @@ function RunningStatus({
|
|||||||
function IdleStatus({
|
function IdleStatus({
|
||||||
accounts,
|
accounts,
|
||||||
nextRunCountdown,
|
nextRunCountdown,
|
||||||
|
onSyncNow,
|
||||||
|
isSyncing,
|
||||||
}: {
|
}: {
|
||||||
accounts: IngestionAccountStatus[];
|
accounts: IngestionAccountStatus[];
|
||||||
nextRunCountdown: string | null;
|
nextRunCountdown: string | null;
|
||||||
|
onSyncNow: () => void;
|
||||||
|
isSyncing: boolean;
|
||||||
}) {
|
}) {
|
||||||
const lastRun = accounts
|
const lastRun = accounts
|
||||||
.filter((a) => a.lastRun)
|
.filter((a) => a.lastRun)
|
||||||
@@ -321,14 +349,32 @@ function IdleStatus({
|
|||||||
)}
|
)}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<div className="flex items-center gap-2 shrink-0">
|
||||||
{nextRunCountdown && hasAuthenticated && (
|
{nextRunCountdown && hasAuthenticated && (
|
||||||
<div className="flex items-center gap-1.5 shrink-0">
|
<div className="flex items-center gap-1.5">
|
||||||
<RefreshCw className="h-3 w-3 text-muted-foreground" />
|
<RefreshCw className="h-3 w-3 text-muted-foreground" />
|
||||||
<span className="text-xs text-muted-foreground tabular-nums">
|
<span className="text-xs text-muted-foreground tabular-nums">
|
||||||
Next: {nextRunCountdown}
|
Next: {nextRunCountdown}
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
)}
|
)}
|
||||||
|
{hasAuthenticated && (
|
||||||
|
<Button
|
||||||
|
variant="outline"
|
||||||
|
size="sm"
|
||||||
|
className="h-7 text-xs px-2"
|
||||||
|
onClick={onSyncNow}
|
||||||
|
disabled={isSyncing}
|
||||||
|
>
|
||||||
|
{isSyncing ? (
|
||||||
|
<Loader2 className="h-3 w-3 animate-spin mr-1" />
|
||||||
|
) : (
|
||||||
|
<RefreshCw className="h-3 w-3 mr-1" />
|
||||||
|
)}
|
||||||
|
Sync Now
|
||||||
|
</Button>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -297,6 +297,52 @@ export async function triggerChannelSync(): Promise<ActionResult> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset all scan progress for a channel so the worker will re-process it
|
||||||
|
* from the very beginning on the next ingestion cycle.
|
||||||
|
*
|
||||||
|
* This clears:
|
||||||
|
* - `lastProcessedMessageId` on every AccountChannelMap linked to this channel
|
||||||
|
* - All TopicProgress records for those maps (for forum channels)
|
||||||
|
*/
|
||||||
|
export async function rescanChannel(channelId: string): Promise<ActionResult> {
|
||||||
|
const admin = await requireAdmin();
|
||||||
|
if (!admin.success) return admin;
|
||||||
|
|
||||||
|
const channel = await prisma.telegramChannel.findUnique({
|
||||||
|
where: { id: channelId },
|
||||||
|
});
|
||||||
|
if (!channel) return { success: false, error: "Channel not found" };
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Find all account-channel maps for this channel
|
||||||
|
const maps = await prisma.accountChannelMap.findMany({
|
||||||
|
where: { channelId },
|
||||||
|
select: { id: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
const mapIds = maps.map((m) => m.id);
|
||||||
|
|
||||||
|
// Delete all topic progress records for these maps (forum channels)
|
||||||
|
if (mapIds.length > 0) {
|
||||||
|
await prisma.topicProgress.deleteMany({
|
||||||
|
where: { accountChannelMapId: { in: mapIds } },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the scan cursor so the worker re-processes from the start
|
||||||
|
await prisma.accountChannelMap.updateMany({
|
||||||
|
where: { channelId },
|
||||||
|
data: { lastProcessedMessageId: null },
|
||||||
|
});
|
||||||
|
|
||||||
|
revalidatePath(REVALIDATE_PATH);
|
||||||
|
return { success: true, data: undefined };
|
||||||
|
} catch {
|
||||||
|
return { success: false, error: "Failed to reset channel scan progress" };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Account-Channel link actions ──
|
// ── Account-Channel link actions ──
|
||||||
|
|
||||||
export async function linkChannel(
|
export async function linkChannel(
|
||||||
@@ -377,7 +423,7 @@ export async function triggerIngestion(
|
|||||||
try {
|
try {
|
||||||
await prisma.$queryRawUnsafe(
|
await prisma.$queryRawUnsafe(
|
||||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||||
accounts.map((a) => a.id).join(",")
|
accounts.map((a: { id: string }) => a.id).join(",")
|
||||||
);
|
);
|
||||||
} catch {
|
} catch {
|
||||||
// Best-effort
|
// Best-effort
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ export default async function TelegramPage() {
|
|||||||
}),
|
}),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const serializedHistory = sendHistory.map((r) => ({
|
const serializedHistory = sendHistory.map((r: typeof sendHistory[number]) => ({
|
||||||
id: r.id,
|
id: r.id,
|
||||||
packageName: r.package.fileName,
|
packageName: r.package.fileName,
|
||||||
recipientName: r.telegramLink.telegramName,
|
recipientName: r.telegramLink.telegramName,
|
||||||
@@ -42,6 +42,7 @@ export default async function TelegramPage() {
|
|||||||
ingestionStatus={ingestionStatus}
|
ingestionStatus={ingestionStatus}
|
||||||
globalDestination={globalDestination}
|
globalDestination={globalDestination}
|
||||||
sendHistory={serializedHistory}
|
sendHistory={serializedHistory}
|
||||||
|
workerIntervalMinutes={parseInt(process.env.WORKER_INTERVAL_MINUTES ?? "60", 10)}
|
||||||
/>
|
/>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,5 +9,9 @@ export async function GET(request: Request) {
|
|||||||
if ("error" in authResult) return authResult.error;
|
if ("error" in authResult) return authResult.error;
|
||||||
|
|
||||||
const accounts = await getIngestionStatus();
|
const accounts = await getIngestionStatus();
|
||||||
return NextResponse.json({ accounts });
|
const workerIntervalMinutes = parseInt(
|
||||||
|
process.env.WORKER_INTERVAL_MINUTES ?? "60",
|
||||||
|
10
|
||||||
|
);
|
||||||
|
return NextResponse.json({ accounts, workerIntervalMinutes });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,33 +45,20 @@ export async function POST(request: Request) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create ingestion runs marked as RUNNING — the worker will pick these up
|
// Send pg_notify for immediate worker pickup.
|
||||||
// when it next polls, or we use pg_notify for immediate pickup
|
// The worker creates its own IngestionRun records with proper activity tracking.
|
||||||
for (const account of accounts) {
|
|
||||||
// Only create if no run is already RUNNING for this account
|
|
||||||
const existing = await prisma.ingestionRun.findFirst({
|
|
||||||
where: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
if (!existing) {
|
|
||||||
await prisma.ingestionRun.create({
|
|
||||||
data: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send pg_notify for immediate worker pickup
|
|
||||||
try {
|
try {
|
||||||
await prisma.$queryRawUnsafe(
|
await prisma.$queryRawUnsafe(
|
||||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||||
accounts.map((a) => a.id).join(",")
|
accounts.map((a: { id: string }) => a.id).join(",")
|
||||||
);
|
);
|
||||||
} catch {
|
} catch {
|
||||||
// pg_notify is best-effort — worker will pick up on next cycle anyway
|
// pg_notify is best-effort — worker will pick up on next scheduled cycle anyway
|
||||||
}
|
}
|
||||||
|
|
||||||
return NextResponse.json({
|
return NextResponse.json({
|
||||||
triggered: true,
|
triggered: true,
|
||||||
accountIds: accounts.map((a) => a.id),
|
accountIds: accounts.map((a: { id: string }) => a.id),
|
||||||
message: `Ingestion queued for ${accounts.length} account(s)`,
|
message: `Ingestion triggered for ${accounts.length} account(s)`,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ interface DeleteDialogProps {
|
|||||||
description?: string;
|
description?: string;
|
||||||
onConfirm: () => void;
|
onConfirm: () => void;
|
||||||
isLoading?: boolean;
|
isLoading?: boolean;
|
||||||
|
confirmLabel?: string;
|
||||||
|
confirmLoadingLabel?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function DeleteDialog({
|
export function DeleteDialog({
|
||||||
@@ -27,6 +29,8 @@ export function DeleteDialog({
|
|||||||
description = "This action cannot be undone.",
|
description = "This action cannot be undone.",
|
||||||
onConfirm,
|
onConfirm,
|
||||||
isLoading,
|
isLoading,
|
||||||
|
confirmLabel = "Delete",
|
||||||
|
confirmLoadingLabel,
|
||||||
}: DeleteDialogProps) {
|
}: DeleteDialogProps) {
|
||||||
return (
|
return (
|
||||||
<AlertDialog open={open} onOpenChange={onOpenChange}>
|
<AlertDialog open={open} onOpenChange={onOpenChange}>
|
||||||
@@ -42,7 +46,7 @@ export function DeleteDialog({
|
|||||||
disabled={isLoading}
|
disabled={isLoading}
|
||||||
className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
|
className="bg-destructive text-destructive-foreground hover:bg-destructive/90"
|
||||||
>
|
>
|
||||||
{isLoading ? "Deleting..." : "Delete"}
|
{isLoading ? (confirmLoadingLabel ?? `${confirmLabel}...`) : confirmLabel}
|
||||||
</AlertDialogAction>
|
</AlertDialogAction>
|
||||||
</AlertDialogFooter>
|
</AlertDialogFooter>
|
||||||
</AlertDialogContent>
|
</AlertDialogContent>
|
||||||
|
|||||||
@@ -18,6 +18,10 @@ import {
|
|||||||
const log = childLogger("fetch-listener");
|
const log = childLogger("fetch-listener");
|
||||||
|
|
||||||
let pgClient: pg.PoolClient | null = null;
|
let pgClient: pg.PoolClient | null = null;
|
||||||
|
let stopped = false;
|
||||||
|
|
||||||
|
/** Delay (ms) before attempting to reconnect after a connection loss. */
|
||||||
|
const RECONNECT_DELAY_MS = 5_000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start listening for pg_notify signals from the web app.
|
* Start listening for pg_notify signals from the web app.
|
||||||
@@ -27,8 +31,17 @@ let pgClient: pg.PoolClient | null = null;
|
|||||||
* - `generate_invite` — payload = channelId → generate invite link for destination
|
* - `generate_invite` — payload = channelId → generate invite link for destination
|
||||||
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
||||||
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
||||||
|
*
|
||||||
|
* If the underlying connection is lost, the listener automatically reconnects
|
||||||
|
* so that pg_notify signals are never silently dropped.
|
||||||
*/
|
*/
|
||||||
export async function startFetchListener(): Promise<void> {
|
export async function startFetchListener(): Promise<void> {
|
||||||
|
stopped = false;
|
||||||
|
await connectListener();
|
||||||
|
}
|
||||||
|
|
||||||
|
async function connectListener(): Promise<void> {
|
||||||
|
try {
|
||||||
pgClient = await pool.connect();
|
pgClient = await pool.connect();
|
||||||
await pgClient.query("LISTEN channel_fetch");
|
await pgClient.query("LISTEN channel_fetch");
|
||||||
await pgClient.query("LISTEN generate_invite");
|
await pgClient.query("LISTEN generate_invite");
|
||||||
@@ -47,10 +60,46 @@ export async function startFetchListener(): Promise<void> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Reconnect automatically when the connection ends unexpectedly
|
||||||
|
pgClient.on("end", () => {
|
||||||
|
if (!stopped) {
|
||||||
|
log.warn("Fetch listener connection lost — reconnecting");
|
||||||
|
pgClient = null;
|
||||||
|
scheduleReconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
pgClient.on("error", (err) => {
|
||||||
|
log.error({ err }, "Fetch listener connection error");
|
||||||
|
if (!stopped && pgClient) {
|
||||||
|
try {
|
||||||
|
pgClient.release(true);
|
||||||
|
} catch (releaseErr) {
|
||||||
|
log.debug({ err: releaseErr }, "Failed to release pg client after error");
|
||||||
|
}
|
||||||
|
pgClient = null;
|
||||||
|
scheduleReconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
||||||
|
} catch (err) {
|
||||||
|
log.error({ err }, "Failed to start fetch listener — retrying");
|
||||||
|
scheduleReconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function scheduleReconnect(): void {
|
||||||
|
if (stopped) return;
|
||||||
|
setTimeout(() => {
|
||||||
|
if (!stopped) {
|
||||||
|
connectListener();
|
||||||
|
}
|
||||||
|
}, RECONNECT_DELAY_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function stopFetchListener(): void {
|
export function stopFetchListener(): void {
|
||||||
|
stopped = true;
|
||||||
if (pgClient) {
|
if (pgClient) {
|
||||||
pgClient.release();
|
pgClient.release();
|
||||||
pgClient = null;
|
pgClient = null;
|
||||||
|
|||||||
@@ -351,6 +351,10 @@ export async function runWorkerForAccount(
|
|||||||
|
|
||||||
const totalChannels = channelMappings.length;
|
const totalChannels = channelMappings.length;
|
||||||
|
|
||||||
|
if (totalChannels === 0) {
|
||||||
|
accountLog.info("No active source channels linked to this account — nothing to ingest");
|
||||||
|
}
|
||||||
|
|
||||||
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
||||||
const mapping = channelMappings[chIdx];
|
const mapping = channelMappings[chIdx];
|
||||||
const channel = mapping.channel;
|
const channel = mapping.channel;
|
||||||
@@ -451,8 +455,8 @@ export async function runWorkerForAccount(
|
|||||||
counters.messagesScanned += scanResult.totalScanned;
|
counters.messagesScanned += scanResult.totalScanned;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug(
|
accountLog.info(
|
||||||
{ channelId: channel.id, topic: topic.name },
|
{ channelId: channel.id, topic: topic.name, totalScanned: scanResult.totalScanned },
|
||||||
"No new archives in topic"
|
"No new archives in topic"
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
@@ -525,7 +529,7 @@ export async function runWorkerForAccount(
|
|||||||
counters.messagesScanned += scanResult.totalScanned;
|
counters.messagesScanned += scanResult.totalScanned;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug({ channelId: channel.id }, "No new archives");
|
accountLog.info({ channelId: channel.id, title: channel.title, totalScanned: scanResult.totalScanned }, "No new archives in channel");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user