mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Fix worker stuck on "Working..." and default channels to disabled
1. Worker trigger: Add ingestion_trigger pg_notify listener so the worker picks up on-demand triggers from the UI and runs an immediate cycle with full activity tracking (currentActivity, currentStep, etc). 2. Remove orphaned IngestionRun creation from triggerIngestion server action. Previously the UI created RUNNING runs without activity fields, causing the UI to show "Working..." with no details. Now only the worker creates runs with proper activity tracking. 3. Default channels to disabled (isActive: false) in schema and all creation paths. Destination channels are explicitly set to active since they must receive uploads. Includes Prisma migration. Co-authored-by: xCyanGrizzly <53275238+xCyanGrizzly@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,3 @@
|
|||||||
|
-- Change the default for new channels to disabled (isActive = false).
|
||||||
|
-- Existing channels are not affected — admins can manually enable/disable them.
|
||||||
|
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;
|
||||||
@@ -417,7 +417,7 @@ model TelegramChannel {
|
|||||||
title String
|
title String
|
||||||
type ChannelType
|
type ChannelType
|
||||||
isForum Boolean @default(false)
|
isForum Boolean @default(false)
|
||||||
isActive Boolean @default(true)
|
isActive Boolean @default(false)
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
|
|
||||||
|
|||||||
@@ -173,6 +173,7 @@ export async function createChannel(
|
|||||||
telegramId: BigInt(parsed.data.telegramId),
|
telegramId: BigInt(parsed.data.telegramId),
|
||||||
title: parsed.data.title,
|
title: parsed.data.title,
|
||||||
type: parsed.data.type,
|
type: parsed.data.type,
|
||||||
|
isActive: false,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
revalidatePath(REVALIDATE_PATH);
|
revalidatePath(REVALIDATE_PATH);
|
||||||
@@ -371,19 +372,8 @@ export async function triggerIngestion(
|
|||||||
return { success: false, error: "No eligible accounts found" };
|
return { success: false, error: "No eligible accounts found" };
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create ingestion runs — the worker picks these up
|
// Signal the worker to run an immediate ingestion cycle via pg_notify.
|
||||||
for (const account of accounts) {
|
// The worker will create its own IngestionRun records with proper activity tracking.
|
||||||
const existing = await prisma.ingestionRun.findFirst({
|
|
||||||
where: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
if (!existing) {
|
|
||||||
await prisma.ingestionRun.create({
|
|
||||||
data: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pg_notify for immediate worker pickup
|
|
||||||
try {
|
try {
|
||||||
await prisma.$queryRawUnsafe(
|
await prisma.$queryRawUnsafe(
|
||||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||||
@@ -417,7 +407,7 @@ export async function saveChannelSelections(
|
|||||||
try {
|
try {
|
||||||
let linked = 0;
|
let linked = 0;
|
||||||
for (const ch of channels) {
|
for (const ch of channels) {
|
||||||
// Upsert the channel record
|
// Upsert the channel record (new channels default to disabled)
|
||||||
const channel = await prisma.telegramChannel.upsert({
|
const channel = await prisma.telegramChannel.upsert({
|
||||||
where: { telegramId: BigInt(ch.telegramId) },
|
where: { telegramId: BigInt(ch.telegramId) },
|
||||||
create: {
|
create: {
|
||||||
@@ -425,6 +415,7 @@ export async function saveChannelSelections(
|
|||||||
title: ch.title,
|
title: ch.title,
|
||||||
type: "SOURCE",
|
type: "SOURCE",
|
||||||
isForum: ch.isForum,
|
isForum: ch.isForum,
|
||||||
|
isActive: false,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title: ch.title,
|
title: ch.title,
|
||||||
@@ -467,10 +458,10 @@ export async function setGlobalDestination(
|
|||||||
if (!channel) return { success: false, error: "Channel not found" };
|
if (!channel) return { success: false, error: "Channel not found" };
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Set the channel type to DESTINATION
|
// Set the channel type to DESTINATION and ensure it's active
|
||||||
await prisma.telegramChannel.update({
|
await prisma.telegramChannel.update({
|
||||||
where: { id: channelId },
|
where: { id: channelId },
|
||||||
data: { type: "DESTINATION" },
|
data: { type: "DESTINATION", isActive: true },
|
||||||
});
|
});
|
||||||
|
|
||||||
// Save as global destination
|
// Save as global destination
|
||||||
@@ -521,17 +512,19 @@ export async function createDestinationChannel(
|
|||||||
if (!admin.success) return admin;
|
if (!admin.success) return admin;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create the channel as DESTINATION
|
// Create the channel as DESTINATION (active by default — needed for uploads)
|
||||||
const channel = await prisma.telegramChannel.upsert({
|
const channel = await prisma.telegramChannel.upsert({
|
||||||
where: { telegramId: BigInt(telegramId) },
|
where: { telegramId: BigInt(telegramId) },
|
||||||
create: {
|
create: {
|
||||||
telegramId: BigInt(telegramId),
|
telegramId: BigInt(telegramId),
|
||||||
title,
|
title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
|
isActive: true,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title,
|
title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
|
isActive: true,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
|
|||||||
title: string;
|
title: string;
|
||||||
type: "SOURCE" | "DESTINATION";
|
type: "SOURCE" | "DESTINATION";
|
||||||
isForum: boolean;
|
isForum: boolean;
|
||||||
|
isActive?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upsert a channel by telegramId. Returns the channel record.
|
* Upsert a channel by telegramId. Returns the channel record.
|
||||||
* If it already exists, update title and forum status.
|
* If it already exists, update title and forum status.
|
||||||
|
* New channels default to disabled (isActive: false) so the admin must
|
||||||
|
* explicitly enable them before the worker processes them.
|
||||||
|
* Pass isActive: true for DESTINATION channels that must be active immediately.
|
||||||
*/
|
*/
|
||||||
export async function upsertChannel(input: UpsertChannelInput) {
|
export async function upsertChannel(input: UpsertChannelInput) {
|
||||||
return db.telegramChannel.upsert({
|
return db.telegramChannel.upsert({
|
||||||
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
|
|||||||
title: input.title,
|
title: input.title,
|
||||||
type: input.type,
|
type: input.type,
|
||||||
isForum: input.isForum,
|
isForum: input.isForum,
|
||||||
|
isActive: input.isActive ?? false,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title: input.title,
|
title: input.title,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
|
|||||||
import { processFetchRequest } from "./worker.js";
|
import { processFetchRequest } from "./worker.js";
|
||||||
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
||||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||||
|
import { triggerImmediateCycle } from "./scheduler.js";
|
||||||
import {
|
import {
|
||||||
getGlobalDestinationChannel,
|
getGlobalDestinationChannel,
|
||||||
getGlobalSetting,
|
getGlobalSetting,
|
||||||
@@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null;
|
|||||||
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
||||||
* - `generate_invite` — payload = channelId → generate invite link for destination
|
* - `generate_invite` — payload = channelId → generate invite link for destination
|
||||||
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
||||||
|
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
||||||
*/
|
*/
|
||||||
export async function startFetchListener(): Promise<void> {
|
export async function startFetchListener(): Promise<void> {
|
||||||
pgClient = await pool.connect();
|
pgClient = await pool.connect();
|
||||||
await pgClient.query("LISTEN channel_fetch");
|
await pgClient.query("LISTEN channel_fetch");
|
||||||
await pgClient.query("LISTEN generate_invite");
|
await pgClient.query("LISTEN generate_invite");
|
||||||
await pgClient.query("LISTEN create_destination");
|
await pgClient.query("LISTEN create_destination");
|
||||||
|
await pgClient.query("LISTEN ingestion_trigger");
|
||||||
|
|
||||||
pgClient.on("notification", (msg) => {
|
pgClient.on("notification", (msg) => {
|
||||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||||
@@ -39,10 +42,12 @@ export async function startFetchListener(): Promise<void> {
|
|||||||
handleGenerateInvite(msg.payload);
|
handleGenerateInvite(msg.payload);
|
||||||
} else if (msg.channel === "create_destination" && msg.payload) {
|
} else if (msg.channel === "create_destination" && msg.payload) {
|
||||||
handleCreateDestination(msg.payload);
|
handleCreateDestination(msg.payload);
|
||||||
|
} else if (msg.channel === "ingestion_trigger") {
|
||||||
|
handleIngestionTrigger();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
|
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
||||||
}
|
}
|
||||||
|
|
||||||
export function stopFetchListener(): void {
|
export function stopFetchListener(): void {
|
||||||
@@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void {
|
|||||||
const result = await createSupergroup(client, parsed.title);
|
const result = await createSupergroup(client, parsed.title);
|
||||||
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
||||||
|
|
||||||
// Upsert it as a DESTINATION channel in the DB
|
// Upsert it as a DESTINATION channel in the DB (active by default)
|
||||||
const channel = await upsertChannel({
|
const channel = await upsertChannel({
|
||||||
telegramId: result.chatId,
|
telegramId: result.chatId,
|
||||||
title: result.title,
|
title: result.title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
isForum: false,
|
isForum: false,
|
||||||
|
isActive: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Set as global destination
|
// Set as global destination
|
||||||
@@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Ingestion trigger handler ──
|
||||||
|
|
||||||
|
function handleIngestionTrigger(): void {
|
||||||
|
fetchQueue = fetchQueue.then(async () => {
|
||||||
|
try {
|
||||||
|
log.info("Ingestion trigger received from UI");
|
||||||
|
await triggerImmediateCycle();
|
||||||
|
} catch (err) {
|
||||||
|
log.error({ err }, "Failed to trigger immediate ingestion cycle");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -105,6 +105,19 @@ export async function startScheduler(): Promise<void> {
|
|||||||
scheduleNext();
|
scheduleNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
|
||||||
|
* If a cycle is already running, this is a no-op.
|
||||||
|
*/
|
||||||
|
export async function triggerImmediateCycle(): Promise<void> {
|
||||||
|
if (running) {
|
||||||
|
log.info("Cycle already running, ignoring trigger");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("Immediate cycle triggered via UI");
|
||||||
|
await runCycle();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the scheduler gracefully.
|
* Stop the scheduler gracefully.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user