Merge pull request #12 from xCyanGrizzly/copilot/fix-worker-functionality-visibility

Fix worker activity tracking, add scan progress, default channels to disabled
This commit is contained in:
xCyanGrizzly
2026-03-05 09:42:48 +01:00
committed by GitHub
10 changed files with 132 additions and 36 deletions

View File

@@ -0,0 +1,3 @@
-- Change the default for new channels to disabled (isActive = false).
-- Existing channels are not affected — admins can manually enable/disable them.
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;

View File

@@ -417,7 +417,7 @@ model TelegramChannel {
title String title String
type ChannelType type ChannelType
isForum Boolean @default(false) isForum Boolean @default(false)
isActive Boolean @default(true) isActive Boolean @default(false)
createdAt DateTime @default(now()) createdAt DateTime @default(now())
updatedAt DateTime @updatedAt updatedAt DateTime @updatedAt

View File

@@ -233,6 +233,11 @@ function RunningStatus({
</span> </span>
</span> </span>
)} )}
{run.messagesScanned > 0 && (
<span>
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
</span>
)}
{run.zipsIngested > 0 && ( {run.zipsIngested > 0 && (
<span> <span>
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested <span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested

View File

@@ -173,6 +173,7 @@ export async function createChannel(
telegramId: BigInt(parsed.data.telegramId), telegramId: BigInt(parsed.data.telegramId),
title: parsed.data.title, title: parsed.data.title,
type: parsed.data.type, type: parsed.data.type,
isActive: false,
}, },
}); });
revalidatePath(REVALIDATE_PATH); revalidatePath(REVALIDATE_PATH);
@@ -371,19 +372,8 @@ export async function triggerIngestion(
return { success: false, error: "No eligible accounts found" }; return { success: false, error: "No eligible accounts found" };
} }
// Create ingestion runs — the worker picks these up // Signal the worker to run an immediate ingestion cycle via pg_notify.
for (const account of accounts) { // The worker will create its own IngestionRun records with proper activity tracking.
const existing = await prisma.ingestionRun.findFirst({
where: { accountId: account.id, status: "RUNNING" },
});
if (!existing) {
await prisma.ingestionRun.create({
data: { accountId: account.id, status: "RUNNING" },
});
}
}
// pg_notify for immediate worker pickup
try { try {
await prisma.$queryRawUnsafe( await prisma.$queryRawUnsafe(
`SELECT pg_notify('ingestion_trigger', $1)`, `SELECT pg_notify('ingestion_trigger', $1)`,
@@ -417,7 +407,7 @@ export async function saveChannelSelections(
try { try {
let linked = 0; let linked = 0;
for (const ch of channels) { for (const ch of channels) {
// Upsert the channel record // Upsert the channel record (new channels default to disabled)
const channel = await prisma.telegramChannel.upsert({ const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(ch.telegramId) }, where: { telegramId: BigInt(ch.telegramId) },
create: { create: {
@@ -425,6 +415,7 @@ export async function saveChannelSelections(
title: ch.title, title: ch.title,
type: "SOURCE", type: "SOURCE",
isForum: ch.isForum, isForum: ch.isForum,
isActive: false,
}, },
update: { update: {
title: ch.title, title: ch.title,
@@ -467,10 +458,10 @@ export async function setGlobalDestination(
if (!channel) return { success: false, error: "Channel not found" }; if (!channel) return { success: false, error: "Channel not found" };
try { try {
// Set the channel type to DESTINATION // Set the channel type to DESTINATION and ensure it's active
await prisma.telegramChannel.update({ await prisma.telegramChannel.update({
where: { id: channelId }, where: { id: channelId },
data: { type: "DESTINATION" }, data: { type: "DESTINATION", isActive: true },
}); });
// Save as global destination // Save as global destination
@@ -521,17 +512,19 @@ export async function createDestinationChannel(
if (!admin.success) return admin; if (!admin.success) return admin;
try { try {
// Create the channel as DESTINATION // Create the channel as DESTINATION (active by default — needed for uploads)
const channel = await prisma.telegramChannel.upsert({ const channel = await prisma.telegramChannel.upsert({
where: { telegramId: BigInt(telegramId) }, where: { telegramId: BigInt(telegramId) },
create: { create: {
telegramId: BigInt(telegramId), telegramId: BigInt(telegramId),
title, title,
type: "DESTINATION", type: "DESTINATION",
isActive: true,
}, },
update: { update: {
title, title,
type: "DESTINATION", type: "DESTINATION",
isActive: true,
}, },
}); });

View File

@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
title: string; title: string;
type: "SOURCE" | "DESTINATION"; type: "SOURCE" | "DESTINATION";
isForum: boolean; isForum: boolean;
isActive?: boolean;
} }
/** /**
* Upsert a channel by telegramId. Returns the channel record. * Upsert a channel by telegramId. Returns the channel record.
* If it already exists, update title and forum status. * If it already exists, update title and forum status.
* New channels default to disabled (isActive: false) so the admin must
* explicitly enable them before the worker processes them.
* Pass isActive: true for DESTINATION channels that must be active immediately.
*/ */
export async function upsertChannel(input: UpsertChannelInput) { export async function upsertChannel(input: UpsertChannelInput) {
return db.telegramChannel.upsert({ return db.telegramChannel.upsert({
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
title: input.title, title: input.title,
type: input.type, type: input.type,
isForum: input.isForum, isForum: input.isForum,
isActive: input.isActive ?? false,
}, },
update: { update: {
title: input.title, title: input.title,

View File

@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
import { processFetchRequest } from "./worker.js"; import { processFetchRequest } from "./worker.js";
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
import { triggerImmediateCycle } from "./scheduler.js";
import { import {
getGlobalDestinationChannel, getGlobalDestinationChannel,
getGlobalSetting, getGlobalSetting,
@@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null;
* - `channel_fetch` — payload = requestId → fetch channels for an account * - `channel_fetch` — payload = requestId → fetch channels for an account
* - `generate_invite` — payload = channelId → generate invite link for destination * - `generate_invite` — payload = channelId → generate invite link for destination
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
* - `ingestion_trigger` — trigger an immediate ingestion cycle
*/ */
export async function startFetchListener(): Promise<void> { export async function startFetchListener(): Promise<void> {
pgClient = await pool.connect(); pgClient = await pool.connect();
await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN channel_fetch");
await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN generate_invite");
await pgClient.query("LISTEN create_destination"); await pgClient.query("LISTEN create_destination");
await pgClient.query("LISTEN ingestion_trigger");
pgClient.on("notification", (msg) => { pgClient.on("notification", (msg) => {
if (msg.channel === "channel_fetch" && msg.payload) { if (msg.channel === "channel_fetch" && msg.payload) {
@@ -39,10 +42,12 @@ export async function startFetchListener(): Promise<void> {
handleGenerateInvite(msg.payload); handleGenerateInvite(msg.payload);
} else if (msg.channel === "create_destination" && msg.payload) { } else if (msg.channel === "create_destination" && msg.payload) {
handleCreateDestination(msg.payload); handleCreateDestination(msg.payload);
} else if (msg.channel === "ingestion_trigger") {
handleIngestionTrigger();
} }
}); });
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)"); log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
} }
export function stopFetchListener(): void { export function stopFetchListener(): void {
@@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void {
const result = await createSupergroup(client, parsed.title); const result = await createSupergroup(client, parsed.title);
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created"); log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
// Upsert it as a DESTINATION channel in the DB // Upsert it as a DESTINATION channel in the DB (active by default)
const channel = await upsertChannel({ const channel = await upsertChannel({
telegramId: result.chatId, telegramId: result.chatId,
title: result.title, title: result.title,
type: "DESTINATION", type: "DESTINATION",
isForum: false, isForum: false,
isActive: true,
}); });
// Set as global destination // Set as global destination
@@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void {
} }
}); });
} }
// ── Ingestion trigger handler ──
function handleIngestionTrigger(): void {
fetchQueue = fetchQueue.then(async () => {
try {
log.info("Ingestion trigger received from UI");
await triggerImmediateCycle();
} catch (err) {
log.error({ err }, "Failed to trigger immediate ingestion cycle");
}
});
}

View File

@@ -105,6 +105,19 @@ export async function startScheduler(): Promise<void> {
scheduleNext(); scheduleNext();
} }
/**
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
* If a cycle is already running, this is a no-op.
*/
export async function triggerImmediateCycle(): Promise<void> {
if (running) {
log.info("Cycle already running, ignoring trigger");
return;
}
log.info("Immediate cycle triggered via UI");
await runCycle();
}
/** /**
* Stop the scheduler gracefully. * Stop the scheduler gracefully.
*/ */

View File

@@ -66,8 +66,11 @@ interface TdFile {
export interface ChannelScanResult { export interface ChannelScanResult {
archives: TelegramMessage[]; archives: TelegramMessage[];
photos: TelegramPhoto[]; photos: TelegramPhoto[];
totalScanned: number;
} }
export type ScanProgressCallback = (messagesScanned: number) => void;
/** /**
* Fetch messages from a channel, stopping once we've scanned past the * Fetch messages from a channel, stopping once we've scanned past the
* last-processed boundary (with one page of lookback for multipart safety). * last-processed boundary (with one page of lookback for multipart safety).
@@ -82,13 +85,15 @@ export async function getChannelMessages(
client: Client, client: Client,
chatId: bigint, chatId: bigint,
lastProcessedMessageId?: bigint | null, lastProcessedMessageId?: bigint | null,
limit = 100 limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> { ): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
@@ -103,6 +108,8 @@ export async function getChannelMessages(
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -132,6 +139,9 @@ export async function getChannelMessages(
} }
} }
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
@@ -144,7 +154,7 @@ export async function getChannelMessages(
} }
log.info( log.info(
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length }, { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
"Channel scan complete" "Channel scan complete"
); );
@@ -152,6 +162,7 @@ export async function getChannelMessages(
return { return {
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned,
}; };
} }

View File

@@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js";
import { isArchiveAttachment } from "../archive/detect.js"; import { isArchiveAttachment } from "../archive/detect.js";
import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramMessage } from "../archive/multipart.js";
import type { TelegramPhoto } from "../preview/match.js"; import type { TelegramPhoto } from "../preview/match.js";
import type { ChannelScanResult } from "./download.js"; import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
const log = childLogger("topics"); const log = childLogger("topics");
@@ -140,13 +140,15 @@ export async function getTopicMessages(
chatId: bigint, chatId: bigint,
topicId: bigint, topicId: bigint,
lastProcessedMessageId?: bigint | null, lastProcessedMessageId?: bigint | null,
limit = 100 limit = 100,
onProgress?: ScanProgressCallback
): Promise<ChannelScanResult> { ): Promise<ChannelScanResult> {
const archives: TelegramMessage[] = []; const archives: TelegramMessage[] = [];
const photos: TelegramPhoto[] = []; const photos: TelegramPhoto[] = [];
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
let currentFromId = 0; let currentFromId = 0;
let totalScanned = 0;
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
@@ -190,6 +192,8 @@ export async function getTopicMessages(
if (!result.messages || result.messages.length === 0) break; if (!result.messages || result.messages.length === 0) break;
totalScanned += result.messages.length;
for (const msg of result.messages) { for (const msg of result.messages) {
// Check for archive documents // Check for archive documents
const doc = msg.content?.document; const doc = msg.content?.document;
@@ -219,6 +223,9 @@ export async function getTopicMessages(
} }
} }
// Report scanning progress after each page
onProgress?.(totalScanned);
currentFromId = result.messages[result.messages.length - 1].id; currentFromId = result.messages[result.messages.length - 1].id;
// Stop scanning once we've gone past the boundary (this page is the lookback) // Stop scanning once we've gone past the boundary (this page is the lookback)
@@ -230,7 +237,7 @@ export async function getTopicMessages(
} }
log.info( log.info(
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length }, { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
"Topic scan complete" "Topic scan complete"
); );
@@ -238,6 +245,7 @@ export async function getTopicMessages(
return { return {
archives: archives.reverse(), archives: archives.reverse(),
photos: photos.reverse(), photos: photos.reverse(),
totalScanned,
}; };
} }

View File

@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
throw new Error("No global destination channel configured — set one in the admin UI"); throw new Error("No global destination channel configured — set one in the admin UI");
} }
for (const mapping of channelMappings) { const totalChannels = channelMappings.length;
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
const mapping = channelMappings[chIdx];
const channel = mapping.channel; const channel = mapping.channel;
const channelLabel = totalChannels > 1
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
: channel.title;
try { try {
// ── Check if channel is a forum ── // ── Check if channel is a forum ──
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
if (forum) { if (forum) {
// ── Forum channel: scan per-topic ── // ── Forum channel: scan per-topic ──
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Enumerating topics in "${channel.title}"`, currentActivity: `Enumerating topics in "${channelLabel}"`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: channel.title, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
const topics = await getForumTopicList(client, channel.telegramId); const topics = await getForumTopicList(client, channel.telegramId);
@@ -399,31 +406,50 @@ export async function runWorkerForAccount(
"Scanning forum channel by topic" "Scanning forum channel by topic"
); );
for (const topic of topics) { for (let tIdx = 0; tIdx < topics.length; tIdx++) {
const topic = topics[tIdx];
try { try {
const progress = topicProgressList.find( const progress = topicProgressList.find(
(tp) => tp.topicId === topic.topicId (tp) => tp.topicId === topic.topicId
); );
const topicLabel = `${channel.title} ${topic.name}`;
const topicProgress = topics.length > 1
? ` (topic ${tIdx + 1}/${topics.length})`
: "";
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`, currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: `${channel.title} ${topic.name}`, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
const scanResult = await getTopicMessages( const scanResult = await getTopicMessages(
client, client,
channel.telegramId, channel.telegramId,
topic.topicId, topic.topicId,
progress?.lastProcessedMessageId progress?.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${topicLabel}"${topicProgress}${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
); );
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug( accountLog.debug(
{ channelId: channel.id, topic: topic.name }, { channelId: channel.id, topic: topic.name },
@@ -463,15 +489,16 @@ export async function runWorkerForAccount(
} else { } else {
// ── Non-forum channel: flat scan (existing behavior) ── // ── Non-forum channel: flat scan (existing behavior) ──
await updateRunActivity(activeRunId, { await updateRunActivity(activeRunId, {
currentActivity: `Scanning "${channel.title}" for new archives`, currentActivity: `Scanning "${channelLabel}" for new archives`,
currentStep: "scanning", currentStep: "scanning",
currentChannel: channel.title, currentChannel: channelLabel,
currentFile: null, currentFile: null,
currentFileNum: null, currentFileNum: null,
totalFiles: null, totalFiles: null,
downloadedBytes: null, downloadedBytes: null,
totalBytes: null, totalBytes: null,
downloadPercent: null, downloadPercent: null,
messagesScanned: counters.messagesScanned,
}); });
accountLog.info( accountLog.info(
@@ -482,9 +509,21 @@ export async function runWorkerForAccount(
const scanResult = await getChannelMessages( const scanResult = await getChannelMessages(
client, client,
channel.telegramId, channel.telegramId,
mapping.lastProcessedMessageId mapping.lastProcessedMessageId,
100,
(scanned) => {
throttled.update({
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
currentStep: "scanning",
currentChannel: channelLabel,
messagesScanned: counters.messagesScanned + scanned,
});
}
); );
// Add scanned messages to global counter
counters.messagesScanned += scanResult.totalScanned;
if (scanResult.archives.length === 0) { if (scanResult.archives.length === 0) {
accountLog.debug({ channelId: channel.id }, "No new archives"); accountLog.debug({ channelId: channel.id }, "No new archives");
continue; continue;
@@ -593,6 +632,7 @@ async function processArchiveSets(
currentChannel: channelTitle, currentChannel: channelTitle,
totalFiles: archiveSets.length, totalFiles: archiveSets.length,
zipsFound: counters.zipsFound, zipsFound: counters.zipsFound,
messagesScanned: counters.messagesScanned,
}); });
// Track the highest message ID that was successfully processed // Track the highest message ID that was successfully processed
@@ -646,7 +686,6 @@ async function processOneArchiveSet(
throttled, counters, topicCreator, sourceTopicId, accountLog, throttled, counters, topicCreator, sourceTopicId, accountLog,
} = ctx; } = ctx;
counters.messagesScanned += archiveSet.parts.length;
const archiveName = archiveSet.parts[0].fileName; const archiveName = archiveSet.parts[0].fileName;
// ── Early skip: check if this archive set was already ingested ── // ── Early skip: check if this archive set was already ingested ──