diff --git a/prisma/migrations/20260305200000_default_channels_disabled/migration.sql b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql new file mode 100644 index 0000000..f137fc4 --- /dev/null +++ b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql @@ -0,0 +1,3 @@ +-- Change the default for new channels to disabled (isActive = false). +-- Existing channels are not affected — admins can manually enable/disable them. +ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b78ea48..de85552 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -417,7 +417,7 @@ model TelegramChannel { title String type ChannelType isForum Boolean @default(false) - isActive Boolean @default(true) + isActive Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt diff --git a/src/app/(app)/telegram/_components/worker-status-panel.tsx b/src/app/(app)/telegram/_components/worker-status-panel.tsx index 282ebef..ef7601d 100644 --- a/src/app/(app)/telegram/_components/worker-status-panel.tsx +++ b/src/app/(app)/telegram/_components/worker-status-panel.tsx @@ -233,6 +233,11 @@ function RunningStatus({ )} + {run.messagesScanned > 0 && ( + + {run.messagesScanned} messages + + )} {run.zipsIngested > 0 && ( {run.zipsIngested} ingested diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 18b0edf..67acf2e 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -173,6 +173,7 @@ export async function createChannel( telegramId: BigInt(parsed.data.telegramId), title: parsed.data.title, type: parsed.data.type, + isActive: false, }, }); revalidatePath(REVALIDATE_PATH); @@ -371,19 +372,8 @@ export async function triggerIngestion( return { success: false, error: "No eligible accounts found" }; } - // Create ingestion runs — the worker picks these up - for (const account of accounts) { - const existing = await prisma.ingestionRun.findFirst({ - where: { accountId: account.id, status: "RUNNING" }, - }); - if (!existing) { - await prisma.ingestionRun.create({ - data: { accountId: account.id, status: "RUNNING" }, - }); - } - } - - // pg_notify for immediate worker pickup + // Signal the worker to run an immediate ingestion cycle via pg_notify. + // The worker will create its own IngestionRun records with proper activity tracking. try { await prisma.$queryRawUnsafe( `SELECT pg_notify('ingestion_trigger', $1)`, @@ -417,7 +407,7 @@ export async function saveChannelSelections( try { let linked = 0; for (const ch of channels) { - // Upsert the channel record + // Upsert the channel record (new channels default to disabled) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(ch.telegramId) }, create: { @@ -425,6 +415,7 @@ export async function saveChannelSelections( title: ch.title, type: "SOURCE", isForum: ch.isForum, + isActive: false, }, update: { title: ch.title, @@ -467,10 +458,10 @@ export async function setGlobalDestination( if (!channel) return { success: false, error: "Channel not found" }; try { - // Set the channel type to DESTINATION + // Set the channel type to DESTINATION and ensure it's active await prisma.telegramChannel.update({ where: { id: channelId }, - data: { type: "DESTINATION" }, + data: { type: "DESTINATION", isActive: true }, }); // Save as global destination @@ -521,17 +512,19 @@ export async function createDestinationChannel( if (!admin.success) return admin; try { - // Create the channel as DESTINATION + // Create the channel as DESTINATION (active by default — needed for uploads) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(telegramId) }, create: { telegramId: BigInt(telegramId), title, type: "DESTINATION", + isActive: true, }, update: { title, type: "DESTINATION", + isActive: true, }, }); diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index f42fbcf..e8058e7 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -302,11 +302,15 @@ export interface UpsertChannelInput { title: string; type: "SOURCE" | "DESTINATION"; isForum: boolean; + isActive?: boolean; } /** * Upsert a channel by telegramId. Returns the channel record. * If it already exists, update title and forum status. + * New channels default to disabled (isActive: false) so the admin must + * explicitly enable them before the worker processes them. + * Pass isActive: true for DESTINATION channels that must be active immediately. */ export async function upsertChannel(input: UpsertChannelInput) { return db.telegramChannel.upsert({ @@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) { title: input.title, type: input.type, isForum: input.isForum, + isActive: input.isActive ?? false, }, update: { title: input.title, diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index f431dc8..681979f 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js"; import { processFetchRequest } from "./worker.js"; import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; +import { triggerImmediateCycle } from "./scheduler.js"; import { getGlobalDestinationChannel, getGlobalSetting, @@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null; * - `channel_fetch` — payload = requestId → fetch channels for an account * - `generate_invite` — payload = channelId → generate invite link for destination * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib + * - `ingestion_trigger` — trigger an immediate ingestion cycle */ export async function startFetchListener(): Promise { pgClient = await pool.connect(); await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN create_destination"); + await pgClient.query("LISTEN ingestion_trigger"); pgClient.on("notification", (msg) => { if (msg.channel === "channel_fetch" && msg.payload) { @@ -39,10 +42,12 @@ export async function startFetchListener(): Promise { handleGenerateInvite(msg.payload); } else if (msg.channel === "create_destination" && msg.payload) { handleCreateDestination(msg.payload); + } else if (msg.channel === "ingestion_trigger") { + handleIngestionTrigger(); } }); - log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)"); + log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); } export function stopFetchListener(): void { @@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void { const result = await createSupergroup(client, parsed.title); log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created"); - // Upsert it as a DESTINATION channel in the DB + // Upsert it as a DESTINATION channel in the DB (active by default) const channel = await upsertChannel({ telegramId: result.chatId, title: result.title, type: "DESTINATION", isForum: false, + isActive: true, }); // Set as global destination @@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void { } }); } + +// ── Ingestion trigger handler ── + +function handleIngestionTrigger(): void { + fetchQueue = fetchQueue.then(async () => { + try { + log.info("Ingestion trigger received from UI"); + await triggerImmediateCycle(); + } catch (err) { + log.error({ err }, "Failed to trigger immediate ingestion cycle"); + } + }); +} diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index f7dbc23..edb02c1 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -105,6 +105,19 @@ export async function startScheduler(): Promise { scheduleNext(); } +/** + * Trigger an immediate ingestion cycle (e.g. from the admin UI). + * If a cycle is already running, this is a no-op. + */ +export async function triggerImmediateCycle(): Promise { + if (running) { + log.info("Cycle already running, ignoring trigger"); + return; + } + log.info("Immediate cycle triggered via UI"); + await runCycle(); +} + /** * Stop the scheduler gracefully. */ diff --git a/worker/src/tdlib/download.ts b/worker/src/tdlib/download.ts index 4f40eca..0597ae3 100644 --- a/worker/src/tdlib/download.ts +++ b/worker/src/tdlib/download.ts @@ -66,8 +66,11 @@ interface TdFile { export interface ChannelScanResult { archives: TelegramMessage[]; photos: TelegramPhoto[]; + totalScanned: number; } +export type ScanProgressCallback = (messagesScanned: number) => void; + /** * Fetch messages from a channel, stopping once we've scanned past the * last-processed boundary (with one page of lookback for multipart safety). @@ -82,13 +85,15 @@ export async function getChannelMessages( client: Client, chatId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -103,6 +108,8 @@ export async function getChannelMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -132,6 +139,9 @@ export async function getChannelMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -144,7 +154,7 @@ export async function getChannelMessages( } log.info( - { chatId: chatId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Channel scan complete" ); @@ -152,6 +162,7 @@ export async function getChannelMessages( return { archives: archives.reverse(), photos: photos.reverse(), + totalScanned, }; } diff --git a/worker/src/tdlib/topics.ts b/worker/src/tdlib/topics.ts index 23f02c5..107bf37 100644 --- a/worker/src/tdlib/topics.ts +++ b/worker/src/tdlib/topics.ts @@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js"; import { isArchiveAttachment } from "../archive/detect.js"; import type { TelegramMessage } from "../archive/multipart.js"; import type { TelegramPhoto } from "../preview/match.js"; -import type { ChannelScanResult } from "./download.js"; +import type { ChannelScanResult, ScanProgressCallback } from "./download.js"; const log = childLogger("topics"); @@ -140,13 +140,15 @@ export async function getTopicMessages( chatId: bigint, topicId: bigint, lastProcessedMessageId?: bigint | null, - limit = 100 + limit = 100, + onProgress?: ScanProgressCallback ): Promise { const archives: TelegramMessage[] = []; const photos: TelegramPhoto[] = []; const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null; let currentFromId = 0; + let totalScanned = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -190,6 +192,8 @@ export async function getTopicMessages( if (!result.messages || result.messages.length === 0) break; + totalScanned += result.messages.length; + for (const msg of result.messages) { // Check for archive documents const doc = msg.content?.document; @@ -219,6 +223,9 @@ export async function getTopicMessages( } } + // Report scanning progress after each page + onProgress?.(totalScanned); + currentFromId = result.messages[result.messages.length - 1].id; // Stop scanning once we've gone past the boundary (this page is the lookback) @@ -230,7 +237,7 @@ export async function getTopicMessages( } log.info( - { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length }, + { chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned }, "Topic scan complete" ); @@ -238,6 +245,7 @@ export async function getTopicMessages( return { archives: archives.reverse(), photos: photos.reverse(), + totalScanned, }; } diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 945e447..e61c64c 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -349,8 +349,14 @@ export async function runWorkerForAccount( throw new Error("No global destination channel configured — set one in the admin UI"); } - for (const mapping of channelMappings) { + const totalChannels = channelMappings.length; + + for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) { + const mapping = channelMappings[chIdx]; const channel = mapping.channel; + const channelLabel = totalChannels > 1 + ? `[${chIdx + 1}/${totalChannels}] ${channel.title}` + : channel.title; try { // ── Check if channel is a forum ── @@ -380,15 +386,16 @@ export async function runWorkerForAccount( if (forum) { // ── Forum channel: scan per-topic ── await updateRunActivity(activeRunId, { - currentActivity: `Enumerating topics in "${channel.title}"`, + currentActivity: `Enumerating topics in "${channelLabel}"`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const topics = await getForumTopicList(client, channel.telegramId); @@ -399,31 +406,50 @@ export async function runWorkerForAccount( "Scanning forum channel by topic" ); - for (const topic of topics) { + for (let tIdx = 0; tIdx < topics.length; tIdx++) { + const topic = topics[tIdx]; try { const progress = topicProgressList.find( (tp) => tp.topicId === topic.topicId ); + const topicLabel = `${channel.title} › ${topic.name}`; + const topicProgress = topics.length > 1 + ? ` (topic ${tIdx + 1}/${topics.length})` + : ""; + await updateRunActivity(activeRunId, { - currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`, + currentActivity: `Scanning "${topicLabel}"${topicProgress}`, currentStep: "scanning", - currentChannel: `${channel.title} › ${topic.name}`, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); const scanResult = await getTopicMessages( client, channel.telegramId, topic.topicId, - progress?.lastProcessedMessageId + progress?.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + counters.messagesScanned += scanResult.totalScanned; + if (scanResult.archives.length === 0) { accountLog.debug( { channelId: channel.id, topic: topic.name }, @@ -463,15 +489,16 @@ export async function runWorkerForAccount( } else { // ── Non-forum channel: flat scan (existing behavior) ── await updateRunActivity(activeRunId, { - currentActivity: `Scanning "${channel.title}" for new archives`, + currentActivity: `Scanning "${channelLabel}" for new archives`, currentStep: "scanning", - currentChannel: channel.title, + currentChannel: channelLabel, currentFile: null, currentFileNum: null, totalFiles: null, downloadedBytes: null, totalBytes: null, downloadPercent: null, + messagesScanned: counters.messagesScanned, }); accountLog.info( @@ -482,9 +509,21 @@ export async function runWorkerForAccount( const scanResult = await getChannelMessages( client, channel.telegramId, - mapping.lastProcessedMessageId + mapping.lastProcessedMessageId, + 100, + (scanned) => { + throttled.update({ + currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`, + currentStep: "scanning", + currentChannel: channelLabel, + messagesScanned: counters.messagesScanned + scanned, + }); + } ); + // Add scanned messages to global counter + counters.messagesScanned += scanResult.totalScanned; + if (scanResult.archives.length === 0) { accountLog.debug({ channelId: channel.id }, "No new archives"); continue; @@ -593,6 +632,7 @@ async function processArchiveSets( currentChannel: channelTitle, totalFiles: archiveSets.length, zipsFound: counters.zipsFound, + messagesScanned: counters.messagesScanned, }); // Track the highest message ID that was successfully processed @@ -646,7 +686,6 @@ async function processOneArchiveSet( throttled, counters, topicCreator, sourceTopicId, accountLog, } = ctx; - counters.messagesScanned += archiveSet.parts.length; const archiveName = archiveSet.parts[0].fileName; // ── Early skip: check if this archive set was already ingested ──