mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-10 22:01:16 +00:00
Compare commits
14 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22da4dfad2 | ||
|
|
22bcacf3bd | ||
|
|
15da57b8c0 | ||
|
|
8f1a912ccb | ||
|
|
81b65912aa | ||
|
|
5eb2cf05b9 | ||
|
|
f73d06b3d9 | ||
|
|
cac3d518e1 | ||
|
|
987167de0c | ||
|
|
4f331d5411 | ||
|
|
8088a86feb | ||
|
|
b53934ebf2 | ||
|
|
464c86b32a | ||
|
|
fc00fb6f2e |
12
Dockerfile
12
Dockerfile
@@ -30,19 +30,19 @@ RUN addgroup --system --gid 1001 nodejs && \
|
||||
adduser --system --uid 1001 nextjs
|
||||
|
||||
# Copy public assets
|
||||
COPY --from=builder /app/public ./public
|
||||
|
||||
# Copy prisma schema + migrations for runtime migrate deploy
|
||||
COPY --from=builder /app/prisma ./prisma
|
||||
COPY --from=builder /app/prisma.config.ts ./prisma.config.ts
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/public ./public
|
||||
|
||||
# Copy standalone build output
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
|
||||
|
||||
# Copy prisma schema + migrations for runtime migrate deploy
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/prisma ./prisma
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/prisma.config.ts ./prisma.config.ts
|
||||
|
||||
# Copy node_modules for prisma CLI (needed for migrate deploy at startup).
|
||||
# Copying the full directory ensures all transitive dependencies are present.
|
||||
COPY --from=builder /app/node_modules ./node_modules
|
||||
COPY --from=builder --chown=nextjs:nodejs /app/node_modules ./node_modules
|
||||
# Recreate the .bin/prisma symlink so Node resolves __dirname to prisma/build/,
|
||||
# where the WASM files live (COPY dereferences symlinks, breaking WASM resolution)
|
||||
RUN mkdir -p ./node_modules/.bin && \
|
||||
|
||||
@@ -125,18 +125,15 @@ docker compose up -d
|
||||
|
||||
The app will be available at [http://localhost:3000](http://localhost:3000).
|
||||
|
||||
### Adding Telegram Services
|
||||
### Adding the Telegram Bot
|
||||
|
||||
The worker and bot run as optional profiles so `docker compose up` works with just the app + database:
|
||||
The worker starts by default with `docker compose up`. The bot runs as an optional profile:
|
||||
|
||||
```bash
|
||||
# App + DB + Telegram worker (needs TELEGRAM_API_ID + TELEGRAM_API_HASH in .env)
|
||||
docker compose --profile telegram up -d
|
||||
|
||||
# App + DB + Worker + Bot (also needs BOT_TOKEN in .env)
|
||||
docker compose --profile full up -d
|
||||
|
||||
# Or just the bot (alongside app + db)
|
||||
# Or just the bot (alongside app + db + worker)
|
||||
docker compose --profile bot up -d
|
||||
```
|
||||
|
||||
|
||||
@@ -16,7 +16,6 @@ services:
|
||||
retries: 5
|
||||
|
||||
worker:
|
||||
profiles: ["telegram", "full"]
|
||||
build:
|
||||
context: .
|
||||
dockerfile: worker/Dockerfile
|
||||
|
||||
@@ -10,9 +10,13 @@ services:
|
||||
- DATABASE_URL=postgresql://${POSTGRES_USER:-dragons}:${POSTGRES_PASSWORD:-stash}@db:5432/${POSTGRES_DB:-dragonsstash}
|
||||
- AUTH_SECRET=${AUTH_SECRET:?Set AUTH_SECRET in .env}
|
||||
- AUTH_TRUST_HOST=true
|
||||
- AUTH_GITHUB_ID=${AUTH_GITHUB_ID:-}
|
||||
- AUTH_GITHUB_SECRET=${AUTH_GITHUB_SECRET:-}
|
||||
- NEXT_PUBLIC_APP_URL=${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
|
||||
- TELEGRAM_API_KEY=${TELEGRAM_API_KEY:-}
|
||||
- BOT_TOKEN=${BOT_TOKEN:-}
|
||||
- BOT_USERNAME=${BOT_USERNAME:-}
|
||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
@@ -21,7 +25,7 @@ services:
|
||||
interval: 30s
|
||||
timeout: 5s
|
||||
retries: 3
|
||||
start_period: 30s
|
||||
start_period: 60s
|
||||
restart: unless-stopped
|
||||
deploy:
|
||||
resources:
|
||||
@@ -31,7 +35,6 @@ services:
|
||||
- frontend
|
||||
|
||||
worker:
|
||||
profiles: ["telegram", "full"]
|
||||
build:
|
||||
context: .
|
||||
dockerfile: worker/Dockerfile
|
||||
|
||||
@@ -10,7 +10,10 @@ if [ "$AUTH_SECRET" = "change-me-to-a-random-secret-in-production" ] || [ -z "$A
|
||||
fi
|
||||
|
||||
echo "Running database migrations..."
|
||||
./node_modules/.bin/prisma migrate deploy
|
||||
if ! ./node_modules/.bin/prisma migrate deploy; then
|
||||
echo "ERROR: Database migration failed. Check DATABASE_URL and database connectivity."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$SEED_DATABASE" = "true" ]; then
|
||||
echo "Seeding database..."
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
-- Promote all existing users to ADMIN (self-hosted: every user is an admin)
|
||||
UPDATE "User" SET "role" = 'ADMIN' WHERE "role" = 'USER';
|
||||
|
||||
-- Change the default role for new users to ADMIN
|
||||
ALTER TABLE "User" ALTER COLUMN "role" SET DEFAULT 'ADMIN';
|
||||
@@ -0,0 +1,3 @@
|
||||
-- Change the default for new channels to disabled (isActive = false).
|
||||
-- Existing channels are not affected — admins can manually enable/disable them.
|
||||
ALTER TABLE "telegram_channels" ALTER COLUMN "isActive" SET DEFAULT false;
|
||||
@@ -22,7 +22,7 @@ model User {
|
||||
emailVerified DateTime?
|
||||
image String?
|
||||
hashedPassword String?
|
||||
role Role @default(USER)
|
||||
role Role @default(ADMIN)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
@@ -417,7 +417,7 @@ model TelegramChannel {
|
||||
title String
|
||||
type ChannelType
|
||||
isForum Boolean @default(false)
|
||||
isActive Boolean @default(true)
|
||||
isActive Boolean @default(false)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
|
||||
@@ -233,6 +233,11 @@ function RunningStatus({
|
||||
</span>
|
||||
</span>
|
||||
)}
|
||||
{run.messagesScanned > 0 && (
|
||||
<span>
|
||||
<span className="text-foreground tabular-nums">{run.messagesScanned}</span> messages
|
||||
</span>
|
||||
)}
|
||||
{run.zipsIngested > 0 && (
|
||||
<span>
|
||||
<span className="text-foreground tabular-nums">{run.zipsIngested}</span> ingested
|
||||
|
||||
@@ -173,6 +173,7 @@ export async function createChannel(
|
||||
telegramId: BigInt(parsed.data.telegramId),
|
||||
title: parsed.data.title,
|
||||
type: parsed.data.type,
|
||||
isActive: false,
|
||||
},
|
||||
});
|
||||
revalidatePath(REVALIDATE_PATH);
|
||||
@@ -371,19 +372,8 @@ export async function triggerIngestion(
|
||||
return { success: false, error: "No eligible accounts found" };
|
||||
}
|
||||
|
||||
// Create ingestion runs — the worker picks these up
|
||||
for (const account of accounts) {
|
||||
const existing = await prisma.ingestionRun.findFirst({
|
||||
where: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
if (!existing) {
|
||||
await prisma.ingestionRun.create({
|
||||
data: { accountId: account.id, status: "RUNNING" },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// pg_notify for immediate worker pickup
|
||||
// Signal the worker to run an immediate ingestion cycle via pg_notify.
|
||||
// The worker will create its own IngestionRun records with proper activity tracking.
|
||||
try {
|
||||
await prisma.$queryRawUnsafe(
|
||||
`SELECT pg_notify('ingestion_trigger', $1)`,
|
||||
@@ -417,7 +407,7 @@ export async function saveChannelSelections(
|
||||
try {
|
||||
let linked = 0;
|
||||
for (const ch of channels) {
|
||||
// Upsert the channel record
|
||||
// Upsert the channel record (new channels default to disabled)
|
||||
const channel = await prisma.telegramChannel.upsert({
|
||||
where: { telegramId: BigInt(ch.telegramId) },
|
||||
create: {
|
||||
@@ -425,6 +415,7 @@ export async function saveChannelSelections(
|
||||
title: ch.title,
|
||||
type: "SOURCE",
|
||||
isForum: ch.isForum,
|
||||
isActive: false,
|
||||
},
|
||||
update: {
|
||||
title: ch.title,
|
||||
@@ -467,10 +458,10 @@ export async function setGlobalDestination(
|
||||
if (!channel) return { success: false, error: "Channel not found" };
|
||||
|
||||
try {
|
||||
// Set the channel type to DESTINATION
|
||||
// Set the channel type to DESTINATION and ensure it's active
|
||||
await prisma.telegramChannel.update({
|
||||
where: { id: channelId },
|
||||
data: { type: "DESTINATION" },
|
||||
data: { type: "DESTINATION", isActive: true },
|
||||
});
|
||||
|
||||
// Save as global destination
|
||||
@@ -521,17 +512,19 @@ export async function createDestinationChannel(
|
||||
if (!admin.success) return admin;
|
||||
|
||||
try {
|
||||
// Create the channel as DESTINATION
|
||||
// Create the channel as DESTINATION (active by default — needed for uploads)
|
||||
const channel = await prisma.telegramChannel.upsert({
|
||||
where: { telegramId: BigInt(telegramId) },
|
||||
create: {
|
||||
telegramId: BigInt(telegramId),
|
||||
title,
|
||||
type: "DESTINATION",
|
||||
isActive: true,
|
||||
},
|
||||
update: {
|
||||
title,
|
||||
type: "DESTINATION",
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
@@ -21,27 +21,22 @@ export async function registerUser(input: unknown): Promise<ActionResult<{ id: s
|
||||
|
||||
const hashedPassword = await bcrypt.hash(parsed.data.password, 10);
|
||||
|
||||
// First user to register becomes ADMIN (self-hosted owner)
|
||||
const user = await prisma.$transaction(async (tx) => {
|
||||
const userCount = await tx.user.count();
|
||||
const role = userCount === 0 ? "ADMIN" : "USER";
|
||||
|
||||
return tx.user.create({
|
||||
data: {
|
||||
name: parsed.data.name,
|
||||
email: parsed.data.email,
|
||||
hashedPassword,
|
||||
role,
|
||||
settings: {
|
||||
create: {
|
||||
lowStockThreshold: 10,
|
||||
currency: "USD",
|
||||
theme: "dark",
|
||||
units: "metric",
|
||||
},
|
||||
// Self-hosted: all users are admins
|
||||
const user = await prisma.user.create({
|
||||
data: {
|
||||
name: parsed.data.name,
|
||||
email: parsed.data.email,
|
||||
hashedPassword,
|
||||
role: "ADMIN",
|
||||
settings: {
|
||||
create: {
|
||||
lowStockThreshold: 10,
|
||||
currency: "USD",
|
||||
theme: "dark",
|
||||
units: "metric",
|
||||
},
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
return { success: true, data: { id: user.id } };
|
||||
|
||||
@@ -18,12 +18,12 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
|
||||
async jwt({ token, user }) {
|
||||
if (user) {
|
||||
token.id = user.id!;
|
||||
// Fetch the role from the database to pick up first-user ADMIN promotion
|
||||
// Fetch the role from the database to ensure token reflects current role
|
||||
const dbUser = await prisma.user.findUnique({
|
||||
where: { id: user.id! },
|
||||
select: { role: true },
|
||||
});
|
||||
token.role = dbUser?.role ?? user.role ?? "USER";
|
||||
token.role = dbUser?.role ?? user.role ?? "ADMIN";
|
||||
}
|
||||
return token;
|
||||
},
|
||||
@@ -38,17 +38,11 @@ export const { auth, handlers, signIn, signOut } = NextAuth({
|
||||
events: {
|
||||
async createUser({ user }) {
|
||||
if (user.id) {
|
||||
// First user to register becomes ADMIN (self-hosted owner)
|
||||
const adminExists = await prisma.user.findFirst({
|
||||
where: { role: "ADMIN" },
|
||||
select: { id: true },
|
||||
// Self-hosted: all users are admins
|
||||
await prisma.user.update({
|
||||
where: { id: user.id },
|
||||
data: { role: "ADMIN" },
|
||||
});
|
||||
if (!adminExists) {
|
||||
await prisma.user.update({
|
||||
where: { id: user.id },
|
||||
data: { role: "ADMIN" },
|
||||
});
|
||||
}
|
||||
|
||||
await prisma.userSettings.upsert({
|
||||
where: { userId: user.id },
|
||||
|
||||
@@ -302,11 +302,15 @@ export interface UpsertChannelInput {
|
||||
title: string;
|
||||
type: "SOURCE" | "DESTINATION";
|
||||
isForum: boolean;
|
||||
isActive?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upsert a channel by telegramId. Returns the channel record.
|
||||
* If it already exists, update title and forum status.
|
||||
* New channels default to disabled (isActive: false) so the admin must
|
||||
* explicitly enable them before the worker processes them.
|
||||
* Pass isActive: true for DESTINATION channels that must be active immediately.
|
||||
*/
|
||||
export async function upsertChannel(input: UpsertChannelInput) {
|
||||
return db.telegramChannel.upsert({
|
||||
@@ -316,6 +320,7 @@ export async function upsertChannel(input: UpsertChannelInput) {
|
||||
title: input.title,
|
||||
type: input.type,
|
||||
isForum: input.isForum,
|
||||
isActive: input.isActive ?? false,
|
||||
},
|
||||
update: {
|
||||
title: input.title,
|
||||
|
||||
@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { processFetchRequest } from "./worker.js";
|
||||
import { generateInviteLink, createSupergroup } from "./tdlib/chats.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
import { triggerImmediateCycle } from "./scheduler.js";
|
||||
import {
|
||||
getGlobalDestinationChannel,
|
||||
getGlobalSetting,
|
||||
@@ -25,12 +26,14 @@ let pgClient: pg.PoolClient | null = null;
|
||||
* - `channel_fetch` — payload = requestId → fetch channels for an account
|
||||
* - `generate_invite` — payload = channelId → generate invite link for destination
|
||||
* - `create_destination` — payload = JSON { requestId, title } → create supergroup via TDLib
|
||||
* - `ingestion_trigger` — trigger an immediate ingestion cycle
|
||||
*/
|
||||
export async function startFetchListener(): Promise<void> {
|
||||
pgClient = await pool.connect();
|
||||
await pgClient.query("LISTEN channel_fetch");
|
||||
await pgClient.query("LISTEN generate_invite");
|
||||
await pgClient.query("LISTEN create_destination");
|
||||
await pgClient.query("LISTEN ingestion_trigger");
|
||||
|
||||
pgClient.on("notification", (msg) => {
|
||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||
@@ -39,10 +42,12 @@ export async function startFetchListener(): Promise<void> {
|
||||
handleGenerateInvite(msg.payload);
|
||||
} else if (msg.channel === "create_destination" && msg.payload) {
|
||||
handleCreateDestination(msg.payload);
|
||||
} else if (msg.channel === "ingestion_trigger") {
|
||||
handleIngestionTrigger();
|
||||
}
|
||||
});
|
||||
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination)");
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger)");
|
||||
}
|
||||
|
||||
export function stopFetchListener(): void {
|
||||
@@ -138,12 +143,13 @@ function handleCreateDestination(payload: string): void {
|
||||
const result = await createSupergroup(client, parsed.title);
|
||||
log.info({ chatId: result.chatId.toString(), title: result.title }, "Supergroup created");
|
||||
|
||||
// Upsert it as a DESTINATION channel in the DB
|
||||
// Upsert it as a DESTINATION channel in the DB (active by default)
|
||||
const channel = await upsertChannel({
|
||||
telegramId: result.chatId,
|
||||
title: result.title,
|
||||
type: "DESTINATION",
|
||||
isForum: false,
|
||||
isActive: true,
|
||||
});
|
||||
|
||||
// Set as global destination
|
||||
@@ -204,3 +210,16 @@ function handleCreateDestination(payload: string): void {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Ingestion trigger handler ──
|
||||
|
||||
function handleIngestionTrigger(): void {
|
||||
fetchQueue = fetchQueue.then(async () => {
|
||||
try {
|
||||
log.info("Ingestion trigger received from UI");
|
||||
await triggerImmediateCycle();
|
||||
} catch (err) {
|
||||
log.error({ err }, "Failed to trigger immediate ingestion cycle");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -105,6 +105,19 @@ export async function startScheduler(): Promise<void> {
|
||||
scheduleNext();
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger an immediate ingestion cycle (e.g. from the admin UI).
|
||||
* If a cycle is already running, this is a no-op.
|
||||
*/
|
||||
export async function triggerImmediateCycle(): Promise<void> {
|
||||
if (running) {
|
||||
log.info("Cycle already running, ignoring trigger");
|
||||
return;
|
||||
}
|
||||
log.info("Immediate cycle triggered via UI");
|
||||
await runCycle();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the scheduler gracefully.
|
||||
*/
|
||||
|
||||
@@ -66,8 +66,11 @@ interface TdFile {
|
||||
export interface ChannelScanResult {
|
||||
archives: TelegramMessage[];
|
||||
photos: TelegramPhoto[];
|
||||
totalScanned: number;
|
||||
}
|
||||
|
||||
export type ScanProgressCallback = (messagesScanned: number) => void;
|
||||
|
||||
/**
|
||||
* Fetch messages from a channel, stopping once we've scanned past the
|
||||
* last-processed boundary (with one page of lookback for multipart safety).
|
||||
@@ -82,13 +85,15 @@ export async function getChannelMessages(
|
||||
client: Client,
|
||||
chatId: bigint,
|
||||
lastProcessedMessageId?: bigint | null,
|
||||
limit = 100
|
||||
limit = 100,
|
||||
onProgress?: ScanProgressCallback
|
||||
): Promise<ChannelScanResult> {
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
|
||||
let currentFromId = 0;
|
||||
let totalScanned = 0;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
@@ -103,6 +108,8 @@ export async function getChannelMessages(
|
||||
|
||||
if (!result.messages || result.messages.length === 0) break;
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -132,6 +139,9 @@ export async function getChannelMessages(
|
||||
}
|
||||
}
|
||||
|
||||
// Report scanning progress after each page
|
||||
onProgress?.(totalScanned);
|
||||
|
||||
currentFromId = result.messages[result.messages.length - 1].id;
|
||||
|
||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||
@@ -144,7 +154,7 @@ export async function getChannelMessages(
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length },
|
||||
{ chatId: chatId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||
"Channel scan complete"
|
||||
);
|
||||
|
||||
@@ -152,6 +162,7 @@ export async function getChannelMessages(
|
||||
return {
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import { childLogger } from "../util/logger.js";
|
||||
import { isArchiveAttachment } from "../archive/detect.js";
|
||||
import type { TelegramMessage } from "../archive/multipart.js";
|
||||
import type { TelegramPhoto } from "../preview/match.js";
|
||||
import type { ChannelScanResult } from "./download.js";
|
||||
import type { ChannelScanResult, ScanProgressCallback } from "./download.js";
|
||||
|
||||
const log = childLogger("topics");
|
||||
|
||||
@@ -140,13 +140,15 @@ export async function getTopicMessages(
|
||||
chatId: bigint,
|
||||
topicId: bigint,
|
||||
lastProcessedMessageId?: bigint | null,
|
||||
limit = 100
|
||||
limit = 100,
|
||||
onProgress?: ScanProgressCallback
|
||||
): Promise<ChannelScanResult> {
|
||||
const archives: TelegramMessage[] = [];
|
||||
const photos: TelegramPhoto[] = [];
|
||||
const boundary = lastProcessedMessageId ? Number(lastProcessedMessageId) : null;
|
||||
|
||||
let currentFromId = 0;
|
||||
let totalScanned = 0;
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
@@ -190,6 +192,8 @@ export async function getTopicMessages(
|
||||
|
||||
if (!result.messages || result.messages.length === 0) break;
|
||||
|
||||
totalScanned += result.messages.length;
|
||||
|
||||
for (const msg of result.messages) {
|
||||
// Check for archive documents
|
||||
const doc = msg.content?.document;
|
||||
@@ -219,6 +223,9 @@ export async function getTopicMessages(
|
||||
}
|
||||
}
|
||||
|
||||
// Report scanning progress after each page
|
||||
onProgress?.(totalScanned);
|
||||
|
||||
currentFromId = result.messages[result.messages.length - 1].id;
|
||||
|
||||
// Stop scanning once we've gone past the boundary (this page is the lookback)
|
||||
@@ -230,7 +237,7 @@ export async function getTopicMessages(
|
||||
}
|
||||
|
||||
log.info(
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length },
|
||||
{ chatId: chatId.toString(), topicId: topicId.toString(), archives: archives.length, photos: photos.length, totalScanned },
|
||||
"Topic scan complete"
|
||||
);
|
||||
|
||||
@@ -238,6 +245,7 @@ export async function getTopicMessages(
|
||||
return {
|
||||
archives: archives.reverse(),
|
||||
photos: photos.reverse(),
|
||||
totalScanned,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -349,8 +349,14 @@ export async function runWorkerForAccount(
|
||||
throw new Error("No global destination channel configured — set one in the admin UI");
|
||||
}
|
||||
|
||||
for (const mapping of channelMappings) {
|
||||
const totalChannels = channelMappings.length;
|
||||
|
||||
for (let chIdx = 0; chIdx < channelMappings.length; chIdx++) {
|
||||
const mapping = channelMappings[chIdx];
|
||||
const channel = mapping.channel;
|
||||
const channelLabel = totalChannels > 1
|
||||
? `[${chIdx + 1}/${totalChannels}] ${channel.title}`
|
||||
: channel.title;
|
||||
|
||||
try {
|
||||
// ── Check if channel is a forum ──
|
||||
@@ -380,15 +386,16 @@ export async function runWorkerForAccount(
|
||||
if (forum) {
|
||||
// ── Forum channel: scan per-topic ──
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Enumerating topics in "${channel.title}"`,
|
||||
currentActivity: `Enumerating topics in "${channelLabel}"`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channel.title,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
const topics = await getForumTopicList(client, channel.telegramId);
|
||||
@@ -399,31 +406,50 @@ export async function runWorkerForAccount(
|
||||
"Scanning forum channel by topic"
|
||||
);
|
||||
|
||||
for (const topic of topics) {
|
||||
for (let tIdx = 0; tIdx < topics.length; tIdx++) {
|
||||
const topic = topics[tIdx];
|
||||
try {
|
||||
const progress = topicProgressList.find(
|
||||
(tp) => tp.topicId === topic.topicId
|
||||
);
|
||||
|
||||
const topicLabel = `${channel.title} › ${topic.name}`;
|
||||
const topicProgress = topics.length > 1
|
||||
? ` (topic ${tIdx + 1}/${topics.length})`
|
||||
: "";
|
||||
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Scanning topic "${topic.name}" in "${channel.title}"`,
|
||||
currentActivity: `Scanning "${topicLabel}"${topicProgress}`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: `${channel.title} › ${topic.name}`,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
const scanResult = await getTopicMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
topic.topicId,
|
||||
progress?.lastProcessedMessageId
|
||||
progress?.lastProcessedMessageId,
|
||||
100,
|
||||
(scanned) => {
|
||||
throttled.update({
|
||||
currentActivity: `Scanning "${topicLabel}"${topicProgress} — ${scanned} messages scanned`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channelLabel,
|
||||
messagesScanned: counters.messagesScanned + scanned,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Add scanned messages to global counter
|
||||
counters.messagesScanned += scanResult.totalScanned;
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.debug(
|
||||
{ channelId: channel.id, topic: topic.name },
|
||||
@@ -463,15 +489,16 @@ export async function runWorkerForAccount(
|
||||
} else {
|
||||
// ── Non-forum channel: flat scan (existing behavior) ──
|
||||
await updateRunActivity(activeRunId, {
|
||||
currentActivity: `Scanning "${channel.title}" for new archives`,
|
||||
currentActivity: `Scanning "${channelLabel}" for new archives`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channel.title,
|
||||
currentChannel: channelLabel,
|
||||
currentFile: null,
|
||||
currentFileNum: null,
|
||||
totalFiles: null,
|
||||
downloadedBytes: null,
|
||||
totalBytes: null,
|
||||
downloadPercent: null,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
accountLog.info(
|
||||
@@ -482,9 +509,21 @@ export async function runWorkerForAccount(
|
||||
const scanResult = await getChannelMessages(
|
||||
client,
|
||||
channel.telegramId,
|
||||
mapping.lastProcessedMessageId
|
||||
mapping.lastProcessedMessageId,
|
||||
100,
|
||||
(scanned) => {
|
||||
throttled.update({
|
||||
currentActivity: `Scanning "${channelLabel}" — ${scanned} messages scanned`,
|
||||
currentStep: "scanning",
|
||||
currentChannel: channelLabel,
|
||||
messagesScanned: counters.messagesScanned + scanned,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
// Add scanned messages to global counter
|
||||
counters.messagesScanned += scanResult.totalScanned;
|
||||
|
||||
if (scanResult.archives.length === 0) {
|
||||
accountLog.debug({ channelId: channel.id }, "No new archives");
|
||||
continue;
|
||||
@@ -593,6 +632,7 @@ async function processArchiveSets(
|
||||
currentChannel: channelTitle,
|
||||
totalFiles: archiveSets.length,
|
||||
zipsFound: counters.zipsFound,
|
||||
messagesScanned: counters.messagesScanned,
|
||||
});
|
||||
|
||||
// Track the highest message ID that was successfully processed
|
||||
@@ -646,7 +686,6 @@ async function processOneArchiveSet(
|
||||
throttled, counters, topicCreator, sourceTopicId, accountLog,
|
||||
} = ctx;
|
||||
|
||||
counters.messagesScanned += archiveSet.parts.length;
|
||||
const archiveName = archiveSet.parts[0].fileName;
|
||||
|
||||
// ── Early skip: check if this archive set was already ingested ──
|
||||
|
||||
Reference in New Issue
Block a user