From 8f1a912ccb5ffc0b842bc63e3fae1091f25b879e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:11:20 +0000 Subject: [PATCH 1/4] Initial plan From 15da57b8c0c6560306e1e64960c16c919340d8e1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:27:37 +0000 Subject: [PATCH 2/4] Fix worker stuck on "Working..." and default channels to disabled 1. Worker trigger: Add ingestion_trigger pg_notify listener so the worker picks up on-demand triggers from the UI and runs an immediate cycle with full activity tracking (currentActivity, currentStep, etc). 2. Remove orphaned IngestionRun creation from triggerIngestion server action. Previously the UI created RUNNING runs without activity fields, causing the UI to show "Working..." with no details. Now only the worker creates runs with proper activity tracking. 3. Default channels to disabled (isActive: false) in schema and all creation paths. Destination channels are explicitly set to active since they must receive uploads. Includes Prisma migration. Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com> --- .../migration.sql | 3 +++ prisma/schema.prisma | 2 +- src/app/(app)/telegram/actions.ts | 27 +++++++------------ worker/src/db/queries.ts | 5 ++++ worker/src/fetch-listener.ts | 23 ++++++++++++++-- worker/src/scheduler.ts | 13 +++++++++ 6 files changed, 53 insertions(+), 20 deletions(-) create mode 100644 prisma/migrations/20260305200000_default_channels_disabled/migration.sql diff --git a/prisma/migrations/20260305200000_default_channels_disabled/migration.sql b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql new file mode 100644 index 0000000..f137fc4 --- /dev/null +++ b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql @@ -0,0 +1,3 @@ +-- Change the default for new channels to disabled (isActive = false). +-- Existing channels are not affected — admins can manually enable/disable them. +ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b78ea48..de85552 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -417,7 +417,7 @@ model TelegramChannel { title String type ChannelType isForum Boolean @default(false) - isActive Boolean @default(true) + isActive Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 18b0edf..67acf2e 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -173,6 +173,7 @@ export async function createChannel( telegramId: BigInt(parsed.data.telegramId), title: parsed.data.title, type: parsed.data.type, + isActive: false, }, }); revalidatePath(REVALIDATE_PATH); @@ -371,19 +372,8 @@ export async function triggerIngestion( return { success: false, error: "No eligible accounts found" }; } - // Create ingestion runs — the worker picks these up - for (const account of accounts) { - const existing = await prisma.ingestionRun.findFirst({ - where: { accountId: account.id, status: "RUNNING" }, - }); - if (!existing) { - await prisma.ingestionRun.create({ - data: { accountId: account.id, status: "RUNNING" }, - }); - } - } - - // pg_notify for immediate worker pickup + // Signal the worker to run an immediate ingestion cycle via pg_notify. + // The worker will create its own IngestionRun records with proper activity tracking. try { await prisma.$queryRawUnsafe( `SELECT pg_notify('ingestion_trigger', $1)`, @@ -417,7 +407,7 @@ export async function saveChannelSelections( try { let linked = 0; for (const ch of channels) { - // Upsert the channel record + // Upsert the channel record (new channels default to disabled) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(ch.telegramId) }, create: { @@ -425,6 +415,7 @@ export async function saveChannelSelections( title: ch.title, type: "SOURCE", isForum: ch.isForum, + isActive: false, }, update: { title: ch.title, @@ -467,10 +458,10 @@ export async function setGlobalDestination( if (!channel) return { success: false, error: "Channel not found" }; try { - // Set the channel type to DESTINATION + // Set the channel type to DESTINATION and ensure it's active await prisma.telegramChannel.update({ where: { id: channelId }, - data: { type: "DESTINATION" }, + data: { type: "DESTINATION", isActive: true }, }); // Save as global destination @@ -521,17 +512,19 @@ export async function createDestinationChannel( if (!admin.success) return admin; try { - // Create the channel as DESTINATION + // Create the channel as DESTINATION (active by default — needed for uploads) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(telegramId) }, create: { telegramId: BigInt(telegramId), title, type: "DESTINATION", + isActive: true, }, update: { title, type: "DESTINATION", + isActive: true, }, }); diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index f42fbcf..e8058e7 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -302,11 +302,15 @@ export interface UpsertChannelInput { title: string; type: "SOURCE" | "DESTINATION"; isForum: boolean; + isActive?: boolean; } /** * Upsert a channel by telegramId. Returns the channel record. * If it already exists, update title and forum status. + * New channels default to disabled (isActive: false) so the admin must + * explicitly enable them before the worker processes them. + * Pass isActive: true for DESTINATION channels that must be active immediately. */ export async function upsertChannel(input: UpsertChannelInput) { return db.telegramChannel.upsert({ @@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) { title: input.title, type: input.type, isForum: input.isForum, + isActive: input.isActive ?? false, }, update: { title: input.title, diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index f431dc8..681979f 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js"; import { processFetchRequest } from "./worker.js"; import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; +import { triggerImmediateCycle } from "./scheduler.js"; import { getGlobalDestinationChannel, getGlobalSetting, @@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null; * - `channel_fetch` — payload = requestId → fetch channels for an account * - `generate_invite` — payload = channelId → generate invite link for destination * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib + * - `ingestion_trigger` — trigger an immediate ingestion cycle */ export async function startFetchListener(): Promise { pgClient = await pool.connect(); await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN create_destination"); + await pgClient.query("LISTEN ingestion_trigger"); pgClient.on("notification", (msg) => { if (msg.channel === "channel_fetch" && msg.payload) { @@ -39,10 +42,12 @@ export async function startFetchListener(): Promise { handleGenerateInvite(msg.payload); } else if (msg.channel === "create_destination" && msg.payload) { handleCreateDestination(msg.payload); + } else if (msg.channel === "ingestion_trigger") { + handleIngestionTrigger(); } }); - log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)"); + log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); } export function stopFetchListener(): void { @@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void { const result = await createSupergroup(client, parsed.title); log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created"); - // Upsert it as a DESTINATION channel in the DB + // Upsert it as a DESTINATION channel in the DB (active by default) const channel = await upsertChannel({ telegramId: result.chatId, title: result.title, type: "DESTINATION", isForum: false, + isActive: true, }); // Set as global destination @@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void { } }); } + +// ── Ingestion trigger handler ── + +function handleIngestionTrigger(): void { + fetchQueue = fetchQueue.then(async () => { + try { + log.info("Ingestion trigger received from UI"); + await triggerImmediateCycle(); + } catch (err) { + log.error({ err }, "Failed to trigger immediate ingestion cycle"); + } + }); +} diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index f7dbc23..edb02c1 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -105,6 +105,19 @@ export async function startScheduler(): Promise { scheduleNext(); } +/** + * Trigger an immediate ingestion cycle (e.g. from the admin UI). + * If a cycle is already running, this is a no-op. + */ +export async function triggerImmediateCycle(): Promise { + if (running) { + log.info("Cycle already running, ignoring trigger"); + return; + } + log.info("Immediate cycle triggered via UI"); + await runCycle(); +} + /** * Stop the scheduler gracefully. */ From 22bcacf3bdfcb2adc28712368a330668fb28da07 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:33:26 +0000 Subject: [PATCH 3/4] Add live message scanning progress, channel/topic counters to worker activity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add progress callbacks to getChannelMessages and getTopicMessages that fire after each page of messages is fetched - Worker now shows channel progress (e.g. "[2/5] Channel Name") when processing multiple source channels - Worker now shows topic progress (e.g. "topic 3/12") when scanning forums - Worker now shows live message scanning count during channel/topic scans (e.g. "Scanning Channel — 300 messages scanned") - UI stats line now always shows messagesScanned count - messagesScanned counter now increments during the scanning phase, not just during archive processing Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com> --- .../_components/worker-status-panel.tsx | 5 ++ worker/src/tdlib/download.ts | 13 +++- worker/src/tdlib/topics.ts | 13 +++- worker/src/worker.ts | 62 ++++++++++++++++--- 4 files changed, 78 insertions(+), 15 deletions(-) diff --git a/src/app/(app)/telegram/_components/worker-status-panel.tsx b/src/app/(app)/telegram/_components/worker-status-panel.tsx index 282ebef..ef7601d 100644 --- a/src/app/(app)/telegram/_components/worker-status-panel.tsx +++ b/src/app/(app)/telegram/_components/worker-status-panel.tsx @@ -233,6 +233,11 @@ function RunningStatus({ )} + {run.messagesScanned > 0 && ( + + {run.messagesScanned} messages + + )} {run.zipsIngested > 0 && ( {run.zipsIngested} ingested diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 4f40eca..c405c55 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -68,6 +68,8 @@ export interface ChannelScanResult { photos: TelegramPhoto[]; } +export type ScanProgressCallback = (messagesScanned: number) => void; + /** * Fetch messages from a channel, stopping once we've scanned past the * last-processed boundary (with one page of lookback for multipart safety). @@ -82,13 +84,15 @@ export async function getChannelMessages( client: Client, chatId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -103,6 +107,8 @@ export async function getChannelMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -132,6 +138,9 @@ export async function getChannelMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -144,7 +153,7 @@ export async function getChannelMessages( } log.info( - { chatId: chatId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Channel scan complete" ); diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 23f02c5..1c8e7ce 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js"; import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; -import type { ChannelScanResult } from "./download.js"; +import type { ChannelScanResult, ScanProgressCallback } from "./download.js"; const log = childLogger("topics"); @@ -140,13 +140,15 @@ export async function getTopicMessages( chatId: bigint, topicId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -190,6 +192,8 @@ export async function getTopicMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -219,6 +223,9 @@ export async function getTopicMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -230,7 +237,7 @@ export async function getTopicMessages( } log.info( - { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Topic scan complete" ); diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 945e447..e77c4e4 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -349,8 +349,14 @@ export async function runWorkerForAccount( throw new Error("No global destination channel configured — set one in the admin UI"); } - for (const mapping of channelMappings) { + const totalChannels = channelMappings.length; + + for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { + const mapping = channelMappings[chIdx]; const channel = mapping.channel; + const channelLabel = totalChannels > 1 + ? `[${chIdx + 1}/${totalChannels}] ${channel.title}` + : channel.title; try { // ── Check if channel is a forum ── @@ -380,15 +386,16 @@ export async function runWorkerForAccount( if (forum) { // ── Forum channel: scan per-topic ── await updateRunActivity(activeRunId, { - currentActivity: `Enumerating topics in "${channel.title}"`, + currentActivity: `Enumerating topics in "${channelLabel}"`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const topics = await getForumTopicList(client, channel.telegramId); @@ -399,31 +406,51 @@ export async function runWorkerForAccount( "Scanning forum channel by topic" ); - for (const topic of topics) { + for (let tIdx = 0; tIdx < topics.length; tIdx++) { + const topic = topics[tIdx]; try { const progress = topicProgressList.find( (tp) => tp.topicId === topic.topicId ); + const topicLabel = `${channel.title} › ${topic.name}`; + const topicProgress = topics.length > 1 + ? ` (topic ${tIdx + 1}/${topics.length})` + : ""; + await updateRunActivity(activeRunId, { - currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`, + currentActivity: `Scanning "${topicLabel}"${topicProgress}`, currentStep: "scanning", - currentChannel: `${channel.title} › ${topic.name}`, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const scanResult = await getTopicMessages( client, channel.telegramId, topic.topicId, - progress?.lastProcessedMessageId + progress?.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + const topicMsgCount = scanResult.archives.length + scanResult.photos.length; + counters.messagesScanned += topicMsgCount; + if (scanResult.archives.length === 0) { accountLog.debug( { channelId: channel.id, topic: topic.name }, @@ -463,15 +490,16 @@ export async function runWorkerForAccount( } else { // ── Non-forum channel: flat scan (existing behavior) ── await updateRunActivity(activeRunId, { - currentActivity: `Scanning "${channel.title}" for new archives`, + currentActivity: `Scanning "${channelLabel}" for new archives`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); accountLog.info( @@ -482,9 +510,22 @@ export async function runWorkerForAccount( const scanResult = await getChannelMessages( client, channel.telegramId, - mapping.lastProcessedMessageId + mapping.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + const channelMsgCount = scanResult.archives.length + scanResult.photos.length; + counters.messagesScanned += channelMsgCount; + if (scanResult.archives.length === 0) { accountLog.debug({ channelId: channel.id }, "No new archives"); continue; @@ -593,6 +634,7 @@ async function processArchiveSets( currentChannel: channelTitle, totalFiles: archiveSets.length, zipsFound: counters.zipsFound, + messagesScanned: counters.messagesScanned, }); // Track the highest message ID that was successfully processed From 22da4dfad29d8b06849ac86fa1eff1df8045f80f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Mar 2026 08:36:40 +0000 Subject: [PATCH 4/4] Fix messagesScanned consistency: use totalScanned from scan results, remove double-counting Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com> --- worker/src/tdlib/download.ts | 2 ++ worker/src/tdlib/topics.ts | 1 + worker/src/worker.ts | 7 ++----- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index c405c55..0597ae3 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -66,6 +66,7 @@ interface TdFile { export interface ChannelScanResult { archives: TelegramMessage[]; photos: TelegramPhoto[]; + totalScanned: number; } export type ScanProgressCallback = (messagesScanned: number) => void; @@ -161,6 +162,7 @@ export async function getChannelMessages( return { archives: archives.reverse(), photos: photos.reverse(), + totalScanned, }; } diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 1c8e7ce..107bf37 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -245,6 +245,7 @@ export async function getTopicMessages( return { archives: archives.reverse(), photos: photos.reverse(), + totalScanned, }; } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index e77c4e4..e61c64c 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -448,8 +448,7 @@ export async function runWorkerForAccount( ); // Add scanned messages to global counter - const topicMsgCount = scanResult.archives.length + scanResult.photos.length; - counters.messagesScanned += topicMsgCount; + counters.messagesScanned += scanResult.totalScanned; if (scanResult.archives.length === 0) { accountLog.debug( @@ -523,8 +522,7 @@ export async function runWorkerForAccount( ); // Add scanned messages to global counter - const channelMsgCount = scanResult.archives.length + scanResult.photos.length; - counters.messagesScanned += channelMsgCount; + counters.messagesScanned += scanResult.totalScanned; if (scanResult.archives.length === 0) { accountLog.debug({ channelId: channel.id }, "No new archives"); @@ -688,7 +686,6 @@ async function processOneArchiveSet( throttled, counters, topicCreator, sourceTopicId, accountLog, } = ctx; - counters.messagesScanned += archiveSet.parts.length; const archiveName = archiveSet.parts[0].fileName; // ── Early skip: check if this archive set was already ingested ──