mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-05-11 06:11:15 +00:00
feat: file upload from UI, notification dismiss, audit false positive fix
Manual file upload:
- Upload dialog in STL page with drag-and-drop file picker
- Files saved to shared Docker volume (/data/uploads)
- Worker processes via pg_notify('manual_upload') channel
- Hashes, reads metadata, splits >2GB, uploads to Telegram
- Multiple files automatically grouped
- Status polling shows upload/processing/complete states
Notification fixes:
- Add dismiss (X) button on each notification
- Add "Clear" button to remove all notifications
- Fix false positive MISSING_PART alerts from legacy packages
(only flag when >1 destMessageIds stored but count wrong,
not when only 1 ID from backfill)
Infrastructure:
- ManualUpload + ManualUploadFile schema + migration
- Shared manual_uploads Docker volume between app and worker
- Upload API routes (POST /api/uploads, GET /api/uploads/[id])
- Worker manual-upload processor with full pipeline
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -38,7 +38,9 @@ export async function runIntegrityAudit(): Promise<{ checked: number; issues: nu
|
||||
|
||||
for (const pkg of multipartPackages) {
|
||||
const actualParts = pkg.destMessageIds.length;
|
||||
if (actualParts > 0 && actualParts !== pkg.partCount) {
|
||||
// Only flag when we have >1 stored IDs but count doesn't match.
|
||||
// Packages with exactly 1 ID are legacy (backfilled from single destMessageId) — not actionable.
|
||||
if (actualParts > 1 && actualParts !== pkg.partCount) {
|
||||
issues++;
|
||||
|
||||
// Check if we already have a notification for this
|
||||
|
||||
@@ -5,6 +5,7 @@ import { withTdlibMutex } from "./util/mutex.js";
|
||||
import { processFetchRequest } from "./worker.js";
|
||||
import { processExtractRequest } from "./extract-listener.js";
|
||||
import { rebuildPackageDatabase } from "./rebuild.js";
|
||||
import { processManualUpload } from "./manual-upload.js";
|
||||
import { generateInviteLink, createSupergroup, searchPublicChat } from "./tdlib/chats.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
import { triggerImmediateCycle } from "./scheduler.js";
|
||||
@@ -55,6 +56,7 @@ async function connectListener(): Promise<void> {
|
||||
await pgClient.query("LISTEN join_channel");
|
||||
await pgClient.query("LISTEN archive_extract");
|
||||
await pgClient.query("LISTEN rebuild_packages");
|
||||
await pgClient.query("LISTEN manual_upload");
|
||||
|
||||
pgClient.on("notification", (msg) => {
|
||||
if (msg.channel === "channel_fetch" && msg.payload) {
|
||||
@@ -71,6 +73,8 @@ async function connectListener(): Promise<void> {
|
||||
handleArchiveExtract(msg.payload);
|
||||
} else if (msg.channel === "rebuild_packages" && msg.payload) {
|
||||
handleRebuildPackages(msg.payload);
|
||||
} else if (msg.channel === "manual_upload" && msg.payload) {
|
||||
handleManualUpload(msg.payload);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -96,7 +100,7 @@ async function connectListener(): Promise<void> {
|
||||
}
|
||||
});
|
||||
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger, join_channel, archive_extract, rebuild_packages)");
|
||||
log.info("Fetch listener started (channel_fetch, generate_invite, create_destination, ingestion_trigger, join_channel, archive_extract, rebuild_packages, manual_upload)");
|
||||
} catch (err) {
|
||||
log.error({ err }, "Failed to start fetch listener — retrying");
|
||||
scheduleReconnect();
|
||||
@@ -511,3 +515,11 @@ function handleRebuildPackages(requestId: string): void {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Manual upload handler ──
|
||||
|
||||
function handleManualUpload(uploadId: string): void {
|
||||
fetchQueue = fetchQueue
|
||||
.then(() => processManualUpload(uploadId))
|
||||
.catch((err) => log.error({ err, uploadId }, "Manual upload processing failed"));
|
||||
}
|
||||
|
||||
211
worker/src/manual-upload.ts
Normal file
211
worker/src/manual-upload.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import path from "path";
|
||||
import { rm } from "fs/promises";
|
||||
import { db } from "./db/client.js";
|
||||
import { childLogger } from "./util/logger.js";
|
||||
import { config } from "./util/config.js";
|
||||
import { hashParts } from "./archive/hash.js";
|
||||
import { byteLevelSplit } from "./archive/split.js";
|
||||
import { uploadToChannel } from "./upload/channel.js";
|
||||
import { createTdlibClient, closeTdlibClient } from "./tdlib/client.js";
|
||||
import { readZipCentralDirectory } from "./archive/zip-reader.js";
|
||||
import { readRarContents } from "./archive/rar-reader.js";
|
||||
import { read7zContents } from "./archive/sevenz-reader.js";
|
||||
import { getActiveAccounts } from "./db/queries.js";
|
||||
|
||||
const log = childLogger("manual-upload");
|
||||
|
||||
export async function processManualUpload(uploadId: string): Promise<void> {
|
||||
log.info({ uploadId }, "Processing manual upload");
|
||||
|
||||
const upload = await db.manualUpload.findUnique({
|
||||
where: { id: uploadId },
|
||||
include: { files: true },
|
||||
});
|
||||
|
||||
if (!upload || upload.status !== "PENDING") {
|
||||
log.warn({ uploadId }, "Manual upload not found or not pending");
|
||||
return;
|
||||
}
|
||||
|
||||
await db.manualUpload.update({
|
||||
where: { id: uploadId },
|
||||
data: { status: "PROCESSING" },
|
||||
});
|
||||
|
||||
try {
|
||||
// Get destination channel
|
||||
const destSetting = await db.globalSetting.findUnique({
|
||||
where: { key: "destination_channel_id" },
|
||||
});
|
||||
if (!destSetting) throw new Error("No destination channel configured");
|
||||
|
||||
const destChannel = await db.telegramChannel.findFirst({
|
||||
where: { id: destSetting.value, type: "DESTINATION", isActive: true },
|
||||
});
|
||||
if (!destChannel) throw new Error("Destination channel not found or inactive");
|
||||
|
||||
// Get a TDLib client (use first active account)
|
||||
const accounts = await getActiveAccounts();
|
||||
const account = accounts[0];
|
||||
if (!account) throw new Error("No authenticated Telegram account available");
|
||||
|
||||
const client = await createTdlibClient({ id: account.id, phone: account.phone });
|
||||
|
||||
try {
|
||||
const packageIds: string[] = [];
|
||||
|
||||
for (const file of upload.files) {
|
||||
try {
|
||||
const filePath = file.filePath;
|
||||
const fileName = file.fileName;
|
||||
const fileSize = file.fileSize;
|
||||
|
||||
log.info({ fileName, fileSize: Number(fileSize) }, "Processing file");
|
||||
|
||||
// Determine archive type
|
||||
let archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT" = "DOCUMENT";
|
||||
const ext = fileName.toLowerCase();
|
||||
if (ext.endsWith(".zip")) archiveType = "ZIP";
|
||||
else if (ext.endsWith(".rar")) archiveType = "RAR";
|
||||
else if (ext.endsWith(".7z")) archiveType = "SEVEN_Z";
|
||||
|
||||
// Hash the file
|
||||
const contentHash = await hashParts([filePath]);
|
||||
|
||||
// Check for duplicates
|
||||
const existing = await db.package.findFirst({
|
||||
where: { contentHash, destMessageId: { not: null } },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
if (existing) {
|
||||
log.info({ fileName, contentHash }, "Duplicate file, skipping upload");
|
||||
await db.manualUploadFile.update({
|
||||
where: { id: file.id },
|
||||
data: { packageId: existing.id },
|
||||
});
|
||||
packageIds.push(existing.id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Read archive metadata
|
||||
let entries: {
|
||||
path: string;
|
||||
fileName: string;
|
||||
extension: string | null;
|
||||
compressedSize: bigint;
|
||||
uncompressedSize: bigint;
|
||||
crc32: string | null;
|
||||
}[] = [];
|
||||
try {
|
||||
if (archiveType === "ZIP") entries = await readZipCentralDirectory([filePath]);
|
||||
else if (archiveType === "RAR") entries = await readRarContents(filePath);
|
||||
else if (archiveType === "SEVEN_Z") entries = await read7zContents(filePath);
|
||||
} catch {
|
||||
log.debug({ fileName }, "Could not read archive metadata");
|
||||
}
|
||||
|
||||
// Split if needed
|
||||
const MAX_UPLOAD_SIZE = BigInt(config.maxPartSizeMB) * 1024n * 1024n;
|
||||
let uploadPaths = [filePath];
|
||||
if (fileSize > MAX_UPLOAD_SIZE) {
|
||||
uploadPaths = await byteLevelSplit(filePath);
|
||||
}
|
||||
|
||||
// Upload to Telegram
|
||||
const destResult = await uploadToChannel(
|
||||
client,
|
||||
destChannel.telegramId,
|
||||
uploadPaths
|
||||
);
|
||||
|
||||
// Create package record
|
||||
const pkg = await db.package.create({
|
||||
data: {
|
||||
contentHash,
|
||||
fileName,
|
||||
fileSize,
|
||||
archiveType,
|
||||
sourceChannelId: destChannel.id,
|
||||
sourceMessageId: destResult.messageId,
|
||||
destChannelId: destChannel.id,
|
||||
destMessageId: destResult.messageId,
|
||||
destMessageIds: destResult.messageIds,
|
||||
isMultipart: uploadPaths.length > 1,
|
||||
partCount: uploadPaths.length,
|
||||
fileCount: entries.length,
|
||||
files: entries.length > 0 ? { create: entries } : undefined,
|
||||
},
|
||||
});
|
||||
|
||||
await db.manualUploadFile.update({
|
||||
where: { id: file.id },
|
||||
data: { packageId: pkg.id },
|
||||
});
|
||||
|
||||
packageIds.push(pkg.id);
|
||||
log.info({ fileName, packageId: pkg.id }, "File processed and uploaded");
|
||||
|
||||
// Clean up split files (but not the original)
|
||||
if (uploadPaths.length > 1) {
|
||||
for (const splitPath of uploadPaths) {
|
||||
if (splitPath !== filePath) {
|
||||
await rm(splitPath, { force: true }).catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (fileErr) {
|
||||
log.error({ err: fileErr, fileName: file.fileName }, "Failed to process file");
|
||||
}
|
||||
}
|
||||
|
||||
// Group packages if multiple files
|
||||
if (packageIds.length >= 2) {
|
||||
const groupName =
|
||||
upload.groupName ?? upload.files[0].fileName.replace(/\.[^.]+$/, "");
|
||||
const group = await db.packageGroup.create({
|
||||
data: {
|
||||
name: groupName,
|
||||
sourceChannelId: destChannel.id,
|
||||
groupingSource: "MANUAL",
|
||||
},
|
||||
});
|
||||
await db.package.updateMany({
|
||||
where: { id: { in: packageIds } },
|
||||
data: { packageGroupId: group.id },
|
||||
});
|
||||
log.info(
|
||||
{ groupId: group.id, groupName, packageCount: packageIds.length },
|
||||
"Created group for uploaded files"
|
||||
);
|
||||
}
|
||||
|
||||
await db.manualUpload.update({
|
||||
where: { id: uploadId },
|
||||
data: { status: "COMPLETED", completedAt: new Date() },
|
||||
});
|
||||
|
||||
log.info(
|
||||
{ uploadId, fileCount: upload.files.length, packageCount: packageIds.length },
|
||||
"Manual upload completed"
|
||||
);
|
||||
} finally {
|
||||
await closeTdlibClient(client);
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log.error({ err, uploadId }, "Manual upload failed");
|
||||
await db.manualUpload.update({
|
||||
where: { id: uploadId },
|
||||
data: { status: "FAILED", errorMessage: message },
|
||||
});
|
||||
}
|
||||
|
||||
// Clean up uploaded files
|
||||
try {
|
||||
const uploadDir = path.join("/data/uploads", uploadId);
|
||||
await rm(uploadDir, { recursive: true, force: true });
|
||||
} catch {
|
||||
// Best-effort cleanup
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user