mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
Compare commits
5 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22da4dfad2 | ||
|
|
22bcacf3bd | ||
|
|
15da57b8c0 | ||
|
|
8f1a912ccb | ||
|
|
81b65912aa |
@@ -0,0 +1,3 @@
|
|||||||
|
-- Change the default for new channels to disabled (isActive = false).
|
||||||
|
-- Existing channels are not affected — admins can manually enable/disable them.
|
||||||
|
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;
|
||||||
@@ -417,7 +417,7 @@ model TelegramChannel {
|
|||||||
title String
|
title String
|
||||||
type ChannelType
|
type ChannelType
|
||||||
isForum Boolean @default(false)
|
isForum Boolean @default(false)
|
||||||
isActive Boolean @default(true)
|
isActive Boolean @default(false)
|
||||||
createdAt DateTime @default(now())
|
createdAt DateTime @default(now())
|
||||||
updatedAt DateTime @updatedAt
|
updatedAt DateTime @updatedAt
|
||||||
|
|
||||||
|
|||||||
@@ -233,6 +233,11 @@ function RunningStatus({
|
|||||||
</span>
|
</span>
|
||||||
</span>
|
</span>
|
||||||
)}
|
)}
|
||||||
|
{run.messagesScanned > 0 && (
|
||||||
|
<span>
|
||||||
|
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
|
||||||
|
</span>
|
||||||
|
)}
|
||||||
{run.zipsIngested > 0 && (
|
{run.zipsIngested > 0 && (
|
||||||
<span>
|
<span>
|
||||||
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
||||||
|
|||||||
@@ -173,6 +173,7 @@ export async function createChannel(
|
|||||||
telegramId: BigInt(parsed.data.telegramId),
|
telegramId: BigInt(parsed.data.telegramId),
|
||||||
title: parsed.data.title,
|
title: parsed.data.title,
|
||||||
type: parsed.data.type,
|
type: parsed.data.type,
|
||||||
|
isActive: false,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
revalidatePath(REVALIDATE_PATH);
|
revalidatePath(REVALIDATE_PATH);
|
||||||
@@ -371,19 +372,8 @@ export async function triggerIngestion(
|
|||||||
return { success: false, error: "No eligible accounts found" };
|
return { success: false, error: "No eligible accounts found" };
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create ingestion runs — the worker picks these up
|
// Signal the worker to run an immediate ingestion cycle via pg_notify.
|
||||||
for (const account of accounts) {
|
// The worker will create its own IngestionRun records with proper activity tracking.
|
||||||
const existing = await prisma.ingestionRun.findFirst({
|
|
||||||
where: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
if (!existing) {
|
|
||||||
await prisma.ingestionRun.create({
|
|
||||||
data: { accountId: account.id, status: "RUNNING" },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pg_notify for immediate worker pickup
|
|
||||||
try {
|
try {
|
||||||
await prisma.$queryRawUnsafe(
|
await prisma.$queryRawUnsafe(
|
||||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||||
@@ -417,7 +407,7 @@ export async function saveChannelSelections(
|
|||||||
try {
|
try {
|
||||||
let linked = 0;
|
let linked = 0;
|
||||||
for (const ch of channels) {
|
for (const ch of channels) {
|
||||||
// Upsert the channel record
|
// Upsert the channel record (new channels default to disabled)
|
||||||
const channel = await prisma.telegramChannel.upsert({
|
const channel = await prisma.telegramChannel.upsert({
|
||||||
where: { telegramId: BigInt(ch.telegramId) },
|
where: { telegramId: BigInt(ch.telegramId) },
|
||||||
create: {
|
create: {
|
||||||
@@ -425,6 +415,7 @@ export async function saveChannelSelections(
|
|||||||
title: ch.title,
|
title: ch.title,
|
||||||
type: "SOURCE",
|
type: "SOURCE",
|
||||||
isForum: ch.isForum,
|
isForum: ch.isForum,
|
||||||
|
isActive: false,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title: ch.title,
|
title: ch.title,
|
||||||
@@ -467,10 +458,10 @@ export async function setGlobalDestination(
|
|||||||
if (!channel) return { success: false, error: "Channel not found" };
|
if (!channel) return { success: false, error: "Channel not found" };
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Set the channel type to DESTINATION
|
// Set the channel type to DESTINATION and ensure it's active
|
||||||
await prisma.telegramChannel.update({
|
await prisma.telegramChannel.update({
|
||||||
where: { id: channelId },
|
where: { id: channelId },
|
||||||
data: { type: "DESTINATION" },
|
data: { type: "DESTINATION", isActive: true },
|
||||||
});
|
});
|
||||||
|
|
||||||
// Save as global destination
|
// Save as global destination
|
||||||
@@ -521,17 +512,19 @@ export async function createDestinationChannel(
|
|||||||
if (!admin.success) return admin;
|
if (!admin.success) return admin;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Create the channel as DESTINATION
|
// Create the channel as DESTINATION (active by default — needed for uploads)
|
||||||
const channel = await prisma.telegramChannel.upsert({
|
const channel = await prisma.telegramChannel.upsert({
|
||||||
where: { telegramId: BigInt(telegramId) },
|
where: { telegramId: BigInt(telegramId) },
|
||||||
create: {
|
create: {
|
||||||
telegramId: BigInt(telegramId),
|
telegramId: BigInt(telegramId),
|
||||||
title,
|
title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
|
isActive: true,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title,
|
title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
|
isActive: true,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
|
|||||||
title: string;
|
title: string;
|
||||||
type: "SOURCE" | "DESTINATION";
|
type: "SOURCE" | "DESTINATION";
|
||||||
isForum: boolean;
|
isForum: boolean;
|
||||||
|
isActive?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upsert a channel by telegramId. Returns the channel record.
|
* Upsert a channel by telegramId. Returns the channel record.
|
||||||
* If it already exists, update title and forum status.
|
* If it already exists, update title and forum status.
|
||||||
|
* New channels default to disabled (isActive: false) so the admin must
|
||||||
|
* explicitly enable them before the worker processes them.
|
||||||
|
* Pass isActive: true for DESTINATION channels that must be active immediately.
|
||||||
*/
|
*/
|
||||||
export async function upsertChannel(input: UpsertChannelInput) {
|
export async function upsertChannel(input: UpsertChannelInput) {
|
||||||
return db.telegramChannel.upsert({
|
return db.telegramChannel.upsert({
|
||||||
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
|
|||||||
title: input.title,
|
title: input.title,
|
||||||
type: input.type,
|
type: input.type,
|
||||||
isForum: input.isForum,
|
isForum: input.isForum,
|
||||||
|
isActive: input.isActive ?? false,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
title: input.title,
|
title: input.title,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
|
|||||||
import { processFetchRequest } from "./worker.js";
|
import { processFetchRequest } from "./worker.js";
|
||||||
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
||||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||||
|
import { triggerImmediateCycle } from "./scheduler.js";
|
||||||
import {
|
import {
|
||||||
getGlobalDestinationChannel,
|
getGlobalDestinationChannel,
|
||||||
getGlobalSetting,
|
getGlobalSetting,
|
||||||
@@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null;
|
|||||||
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
||||||
* - `generate_invite` — payload = channelId → generate invite link for destination
|
* - `generate_invite` — payload = channelId → generate invite link for destination
|
||||||
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
||||||
|
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
||||||
*/
|
*/
|
||||||
export async function startFetchListener(): Promise<void> {
|
export async function startFetchListener(): Promise<void> {
|
||||||
pgClient = await pool.connect();
|
pgClient = await pool.connect();
|
||||||
await pgClient.query("LISTEN channel_fetch");
|
await pgClient.query("LISTEN channel_fetch");
|
||||||
await pgClient.query("LISTEN generate_invite");
|
await pgClient.query("LISTEN generate_invite");
|
||||||
await pgClient.query("LISTEN create_destination");
|
await pgClient.query("LISTEN create_destination");
|
||||||
|
await pgClient.query("LISTEN ingestion_trigger");
|
||||||
|
|
||||||
pgClient.on("notification", (msg) => {
|
pgClient.on("notification", (msg) => {
|
||||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||||
@@ -39,10 +42,12 @@ export async function startFetchListener(): Promise<void> {
|
|||||||
handleGenerateInvite(msg.payload);
|
handleGenerateInvite(msg.payload);
|
||||||
} else if (msg.channel === "create_destination" && msg.payload) {
|
} else if (msg.channel === "create_destination" && msg.payload) {
|
||||||
handleCreateDestination(msg.payload);
|
handleCreateDestination(msg.payload);
|
||||||
|
} else if (msg.channel === "ingestion_trigger") {
|
||||||
|
handleIngestionTrigger();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
|
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
||||||
}
|
}
|
||||||
|
|
||||||
export function stopFetchListener(): void {
|
export function stopFetchListener(): void {
|
||||||
@@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void {
|
|||||||
const result = await createSupergroup(client, parsed.title);
|
const result = await createSupergroup(client, parsed.title);
|
||||||
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
||||||
|
|
||||||
// Upsert it as a DESTINATION channel in the DB
|
// Upsert it as a DESTINATION channel in the DB (active by default)
|
||||||
const channel = await upsertChannel({
|
const channel = await upsertChannel({
|
||||||
telegramId: result.chatId,
|
telegramId: result.chatId,
|
||||||
title: result.title,
|
title: result.title,
|
||||||
type: "DESTINATION",
|
type: "DESTINATION",
|
||||||
isForum: false,
|
isForum: false,
|
||||||
|
isActive: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Set as global destination
|
// Set as global destination
|
||||||
@@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Ingestion trigger handler ──
|
||||||
|
|
||||||
|
function handleIngestionTrigger(): void {
|
||||||
|
fetchQueue = fetchQueue.then(async () => {
|
||||||
|
try {
|
||||||
|
log.info("Ingestion trigger received from UI");
|
||||||
|
await triggerImmediateCycle();
|
||||||
|
} catch (err) {
|
||||||
|
log.error({ err }, "Failed to trigger immediate ingestion cycle");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -105,6 +105,19 @@ export async function startScheduler(): Promise<void> {
|
|||||||
scheduleNext();
|
scheduleNext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
|
||||||
|
* If a cycle is already running, this is a no-op.
|
||||||
|
*/
|
||||||
|
export async function triggerImmediateCycle(): Promise<void> {
|
||||||
|
if (running) {
|
||||||
|
log.info("Cycle already running, ignoring trigger");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info("Immediate cycle triggered via UI");
|
||||||
|
await runCycle();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the scheduler gracefully.
|
* Stop the scheduler gracefully.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -66,8 +66,11 @@ interface TdFile {
|
|||||||
export interface ChannelScanResult {
|
export interface ChannelScanResult {
|
||||||
archives: TelegramMessage[];
|
archives: TelegramMessage[];
|
||||||
photos: TelegramPhoto[];
|
photos: TelegramPhoto[];
|
||||||
|
totalScanned: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch messages from a channel, stopping once we've scanned past the
|
* Fetch messages from a channel, stopping once we've scanned past the
|
||||||
* last-processed boundary (with one page of lookback for multipart safety).
|
* last-processed boundary (with one page of lookback for multipart safety).
|
||||||
@@ -82,13 +85,15 @@ export async function getChannelMessages(
|
|||||||
client: Client,
|
client: Client,
|
||||||
chatId: bigint,
|
chatId: bigint,
|
||||||
lastProcessedMessageId?: bigint | null,
|
lastProcessedMessageId?: bigint | null,
|
||||||
limit = 100
|
limit = 100,
|
||||||
|
onProgress?: ScanProgressCallback
|
||||||
): Promise<ChannelScanResult> {
|
): Promise<ChannelScanResult> {
|
||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
|
let totalScanned = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
@@ -103,6 +108,8 @@ export async function getChannelMessages(
|
|||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -132,6 +139,9 @@ export async function getChannelMessages(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report scanning progress after each page
|
||||||
|
onProgress?.(totalScanned);
|
||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
@@ -144,7 +154,7 @@ export async function getChannelMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
|
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||||
"Channel scan complete"
|
"Channel scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -152,6 +162,7 @@ export async function getChannelMessages(
|
|||||||
return {
|
return {
|
||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
|
totalScanned,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js";
|
|||||||
import { isArchiveAttachment } from "../archive/detect.js";
|
import { isArchiveAttachment } from "../archive/detect.js";
|
||||||
import type { TelegramMessage } from "../archive/multipart.js";
|
import type { TelegramMessage } from "../archive/multipart.js";
|
||||||
import type { TelegramPhoto } from "../preview/match.js";
|
import type { TelegramPhoto } from "../preview/match.js";
|
||||||
import type { ChannelScanResult } from "./download.js";
|
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
||||||
|
|
||||||
const log = childLogger("topics");
|
const log = childLogger("topics");
|
||||||
|
|
||||||
@@ -140,13 +140,15 @@ export async function getTopicMessages(
|
|||||||
chatId: bigint,
|
chatId: bigint,
|
||||||
topicId: bigint,
|
topicId: bigint,
|
||||||
lastProcessedMessageId?: bigint | null,
|
lastProcessedMessageId?: bigint | null,
|
||||||
limit = 100
|
limit = 100,
|
||||||
|
onProgress?: ScanProgressCallback
|
||||||
): Promise<ChannelScanResult> {
|
): Promise<ChannelScanResult> {
|
||||||
const archives: TelegramMessage[] = [];
|
const archives: TelegramMessage[] = [];
|
||||||
const photos: TelegramPhoto[] = [];
|
const photos: TelegramPhoto[] = [];
|
||||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||||
|
|
||||||
let currentFromId = 0;
|
let currentFromId = 0;
|
||||||
|
let totalScanned = 0;
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
@@ -190,6 +192,8 @@ export async function getTopicMessages(
|
|||||||
|
|
||||||
if (!result.messages || result.messages.length === 0) break;
|
if (!result.messages || result.messages.length === 0) break;
|
||||||
|
|
||||||
|
totalScanned += result.messages.length;
|
||||||
|
|
||||||
for (const msg of result.messages) {
|
for (const msg of result.messages) {
|
||||||
// Check for archive documents
|
// Check for archive documents
|
||||||
const doc = msg.content?.document;
|
const doc = msg.content?.document;
|
||||||
@@ -219,6 +223,9 @@ export async function getTopicMessages(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report scanning progress after each page
|
||||||
|
onProgress?.(totalScanned);
|
||||||
|
|
||||||
currentFromId = result.messages[result.messages.length - 1].id;
|
currentFromId = result.messages[result.messages.length - 1].id;
|
||||||
|
|
||||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||||
@@ -230,7 +237,7 @@ export async function getTopicMessages(
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
|
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||||
"Topic scan complete"
|
"Topic scan complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -238,6 +245,7 @@ export async function getTopicMessages(
|
|||||||
return {
|
return {
|
||||||
archives: archives.reverse(),
|
archives: archives.reverse(),
|
||||||
photos: photos.reverse(),
|
photos: photos.reverse(),
|
||||||
|
totalScanned,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
|
|||||||
throw new Error("No global destination channel configured — set one in the admin UI");
|
throw new Error("No global destination channel configured — set one in the admin UI");
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const mapping of channelMappings) {
|
const totalChannels = channelMappings.length;
|
||||||
|
|
||||||
|
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
||||||
|
const mapping = channelMappings[chIdx];
|
||||||
const channel = mapping.channel;
|
const channel = mapping.channel;
|
||||||
|
const channelLabel = totalChannels > 1
|
||||||
|
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
|
||||||
|
: channel.title;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// ── Check if channel is a forum ──
|
// ── Check if channel is a forum ──
|
||||||
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
|
|||||||
if (forum) {
|
if (forum) {
|
||||||
// ── Forum channel: scan per-topic ──
|
// ── Forum channel: scan per-topic ──
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Enumerating topics in "${channel.title}"`,
|
currentActivity: `Enumerating topics in "${channelLabel}"`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: channel.title,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
const topics = await getForumTopicList(client, channel.telegramId);
|
const topics = await getForumTopicList(client, channel.telegramId);
|
||||||
@@ -399,31 +406,50 @@ export async function runWorkerForAccount(
|
|||||||
"Scanning forum channel by topic"
|
"Scanning forum channel by topic"
|
||||||
);
|
);
|
||||||
|
|
||||||
for (const topic of topics) {
|
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
|
||||||
|
const topic = topics[tIdx];
|
||||||
try {
|
try {
|
||||||
const progress = topicProgressList.find(
|
const progress = topicProgressList.find(
|
||||||
(tp) => tp.topicId === topic.topicId
|
(tp) => tp.topicId === topic.topicId
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const topicLabel = `${channel.title} › ${topic.name}`;
|
||||||
|
const topicProgress = topics.length > 1
|
||||||
|
? ` (topic ${tIdx + 1}/${topics.length})`
|
||||||
|
: "";
|
||||||
|
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
|
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: `${channel.title} › ${topic.name}`,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
const scanResult = await getTopicMessages(
|
const scanResult = await getTopicMessages(
|
||||||
client,
|
client,
|
||||||
channel.telegramId,
|
channel.telegramId,
|
||||||
topic.topicId,
|
topic.topicId,
|
||||||
progress?.lastProcessedMessageId
|
progress?.lastProcessedMessageId,
|
||||||
|
100,
|
||||||
|
(scanned) => {
|
||||||
|
throttled.update({
|
||||||
|
currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`,
|
||||||
|
currentStep: "scanning",
|
||||||
|
currentChannel: channelLabel,
|
||||||
|
messagesScanned: counters.messagesScanned + scanned,
|
||||||
|
});
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Add scanned messages to global counter
|
||||||
|
counters.messagesScanned += scanResult.totalScanned;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug(
|
accountLog.debug(
|
||||||
{ channelId: channel.id, topic: topic.name },
|
{ channelId: channel.id, topic: topic.name },
|
||||||
@@ -463,15 +489,16 @@ export async function runWorkerForAccount(
|
|||||||
} else {
|
} else {
|
||||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||||
await updateRunActivity(activeRunId, {
|
await updateRunActivity(activeRunId, {
|
||||||
currentActivity: `Scanning "${channel.title}" for new archives`,
|
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||||
currentStep: "scanning",
|
currentStep: "scanning",
|
||||||
currentChannel: channel.title,
|
currentChannel: channelLabel,
|
||||||
currentFile: null,
|
currentFile: null,
|
||||||
currentFileNum: null,
|
currentFileNum: null,
|
||||||
totalFiles: null,
|
totalFiles: null,
|
||||||
downloadedBytes: null,
|
downloadedBytes: null,
|
||||||
totalBytes: null,
|
totalBytes: null,
|
||||||
downloadPercent: null,
|
downloadPercent: null,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
accountLog.info(
|
accountLog.info(
|
||||||
@@ -482,9 +509,21 @@ export async function runWorkerForAccount(
|
|||||||
const scanResult = await getChannelMessages(
|
const scanResult = await getChannelMessages(
|
||||||
client,
|
client,
|
||||||
channel.telegramId,
|
channel.telegramId,
|
||||||
mapping.lastProcessedMessageId
|
mapping.lastProcessedMessageId,
|
||||||
|
100,
|
||||||
|
(scanned) => {
|
||||||
|
throttled.update({
|
||||||
|
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
|
||||||
|
currentStep: "scanning",
|
||||||
|
currentChannel: channelLabel,
|
||||||
|
messagesScanned: counters.messagesScanned + scanned,
|
||||||
|
});
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Add scanned messages to global counter
|
||||||
|
counters.messagesScanned += scanResult.totalScanned;
|
||||||
|
|
||||||
if (scanResult.archives.length === 0) {
|
if (scanResult.archives.length === 0) {
|
||||||
accountLog.debug({ channelId: channel.id }, "No new archives");
|
accountLog.debug({ channelId: channel.id }, "No new archives");
|
||||||
continue;
|
continue;
|
||||||
@@ -593,6 +632,7 @@ async function processArchiveSets(
|
|||||||
currentChannel: channelTitle,
|
currentChannel: channelTitle,
|
||||||
totalFiles: archiveSets.length,
|
totalFiles: archiveSets.length,
|
||||||
zipsFound: counters.zipsFound,
|
zipsFound: counters.zipsFound,
|
||||||
|
messagesScanned: counters.messagesScanned,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Track the highest message ID that was successfully processed
|
// Track the highest message ID that was successfully processed
|
||||||
@@ -646,7 +686,6 @@ async function processOneArchiveSet(
|
|||||||
throttled, counters, topicCreator, sourceTopicId, accountLog,
|
throttled, counters, topicCreator, sourceTopicId, accountLog,
|
||||||
} = ctx;
|
} = ctx;
|
||||||
|
|
||||||
counters.messagesScanned += archiveSet.parts.length;
|
|
||||||
const archiveName = archiveSet.parts[0].fileName;
|
const archiveName = archiveSet.parts[0].fileName;
|
||||||
|
|
||||||
// ── Early skip: check if this archive set was already ingested ──
|
// ── Early skip: check if this archive set was already ingested ──
|
||||||
|
|||||||
Reference in New Issue
Block a user