diff --git a/prisma/migrations/20260305200000_default_channels_disabled/migration.sql b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql new file mode 100644 index 0000000..f137fc4 --- /dev/null +++ b/prisma/migrations/20260305200000_default_channels_disabled/migration.sql @@ -0,0 +1,3 @@ +-- Change the default for new channels to disabled (isActive = false). +-- Existing channels are not affected — admins can manually enable/disable them. +ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b78ea48..de85552 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -417,7 +417,7 @@ model TelegramChannel { title String type ChannelType isForum Boolean @default(false) - isActive Boolean @default(true) + isActive Boolean @default(false) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt diff --git a/src/app/(app)/telegram/actions.ts b/src/app/(app)/telegram/actions.ts index 18b0edf..67acf2e 100644 --- a/src/app/(app)/telegram/actions.ts +++ b/src/app/(app)/telegram/actions.ts @@ -173,6 +173,7 @@ export async function createChannel( telegramId: BigInt(parsed.data.telegramId), title: parsed.data.title, type: parsed.data.type, + isActive: false, }, }); revalidatePath(REVALIDATE_PATH); @@ -371,19 +372,8 @@ export async function triggerIngestion( return { success: false, error: "No eligible accounts found" }; } - // Create ingestion runs — the worker picks these up - for (const account of accounts) { - const existing = await prisma.ingestionRun.findFirst({ - where: { accountId: account.id, status: "RUNNING" }, - }); - if (!existing) { - await prisma.ingestionRun.create({ - data: { accountId: account.id, status: "RUNNING" }, - }); - } - } - - // pg_notify for immediate worker pickup + // Signal the worker to run an immediate ingestion cycle via pg_notify. + // The worker will create its own IngestionRun records with proper activity tracking. try { await prisma.$queryRawUnsafe( `SELECT pg_notify('ingestion_trigger', $1)`, @@ -417,7 +407,7 @@ export async function saveChannelSelections( try { let linked = 0; for (const ch of channels) { - // Upsert the channel record + // Upsert the channel record (new channels default to disabled) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(ch.telegramId) }, create: { @@ -425,6 +415,7 @@ export async function saveChannelSelections( title: ch.title, type: "SOURCE", isForum: ch.isForum, + isActive: false, }, update: { title: ch.title, @@ -467,10 +458,10 @@ export async function setGlobalDestination( if (!channel) return { success: false, error: "Channel not found" }; try { - // Set the channel type to DESTINATION + // Set the channel type to DESTINATION and ensure it's active await prisma.telegramChannel.update({ where: { id: channelId }, - data: { type: "DESTINATION" }, + data: { type: "DESTINATION", isActive: true }, }); // Save as global destination @@ -521,17 +512,19 @@ export async function createDestinationChannel( if (!admin.success) return admin; try { - // Create the channel as DESTINATION + // Create the channel as DESTINATION (active by default — needed for uploads) const channel = await prisma.telegramChannel.upsert({ where: { telegramId: BigInt(telegramId) }, create: { telegramId: BigInt(telegramId), title, type: "DESTINATION", + isActive: true, }, update: { title, type: "DESTINATION", + isActive: true, }, }); diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index f42fbcf..e8058e7 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -302,11 +302,15 @@ export interface UpsertChannelInput { title: string; type: "SOURCE" | "DESTINATION"; isForum: boolean; + isActive?: boolean; } /** * Upsert a channel by telegramId. Returns the channel record. * If it already exists, update title and forum status. + * New channels default to disabled (isActive: false) so the admin must + * explicitly enable them before the worker processes them. + * Pass isActive: true for DESTINATION channels that must be active immediately. */ export async function upsertChannel(input: UpsertChannelInput) { return db.telegramChannel.upsert({ @@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) { title: input.title, type: input.type, isForum: input.isForum, + isActive: input.isActive ?? false, }, update: { title: input.title, diff --git a/worker/src/fetch-listener.ts b/worker/src/fetch-listener.ts index f431dc8..681979f 100644 --- a/worker/src/fetch-listener.ts +++ b/worker/src/fetch-listener.ts @@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js"; import { processFetchRequest } from "./worker.js"; import { generateInviteLink, createSupergroup } from "./tdlib/chats.js"; import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js"; +import { triggerImmediateCycle } from "./scheduler.js"; import { getGlobalDestinationChannel, getGlobalSetting, @@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null; * - `channel_fetch` — payload = requestId → fetch channels for an account * - `generate_invite` — payload = channelId → generate invite link for destination * - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib + * - `ingestion_trigger` — trigger an immediate ingestion cycle */ export async function startFetchListener(): Promise { pgClient = await pool.connect(); await pgClient.query("LISTEN channel_fetch"); await pgClient.query("LISTEN generate_invite"); await pgClient.query("LISTEN create_destination"); + await pgClient.query("LISTEN ingestion_trigger"); pgClient.on("notification", (msg) => { if (msg.channel === "channel_fetch" && msg.payload) { @@ -39,10 +42,12 @@ export async function startFetchListener(): Promise { handleGenerateInvite(msg.payload); } else if (msg.channel === "create_destination" && msg.payload) { handleCreateDestination(msg.payload); + } else if (msg.channel === "ingestion_trigger") { + handleIngestionTrigger(); } }); - log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)"); + log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)"); } export function stopFetchListener(): void { @@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void { const result = await createSupergroup(client, parsed.title); log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created"); - // Upsert it as a DESTINATION channel in the DB + // Upsert it as a DESTINATION channel in the DB (active by default) const channel = await upsertChannel({ telegramId: result.chatId, title: result.title, type: "DESTINATION", isForum: false, + isActive: true, }); // Set as global destination @@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void { } }); } + +// ── Ingestion trigger handler ── + +function handleIngestionTrigger(): void { + fetchQueue = fetchQueue.then(async () => { + try { + log.info("Ingestion trigger received from UI"); + await triggerImmediateCycle(); + } catch (err) { + log.error({ err }, "Failed to trigger immediate ingestion cycle"); + } + }); +} diff --git a/worker/src/scheduler.ts b/worker/src/scheduler.ts index f7dbc23..edb02c1 100644 --- a/worker/src/scheduler.ts +++ b/worker/src/scheduler.ts @@ -105,6 +105,19 @@ export async function startScheduler(): Promise { scheduleNext(); } +/** + * Trigger an immediate ingestion cycle (e.g. from the admin UI). + * If a cycle is already running, this is a no-op. + */ +export async function triggerImmediateCycle(): Promise { + if (running) { + log.info("Cycle already running, ignoring trigger"); + return; + } + log.info("Immediate cycle triggered via UI"); + await runCycle(); +} + /** * Stop the scheduler gracefully. */