diff --git a/worker/src/archive/integrity.ts b/worker/src/archive/integrity.ts new file mode 100644 index 0000000..7bb9a6e --- /dev/null +++ b/worker/src/archive/integrity.ts @@ -0,0 +1,93 @@ +import { execFile } from "child_process"; +import { promisify } from "util"; +import { childLogger } from "../util/logger.js"; + +const execFileAsync = promisify(execFile); +const log = childLogger("integrity"); + +export type IntegrityResult = + | { ok: true } + | { ok: false; reason: string }; + +/** + * Test that the archive can be read end-to-end without errors, BEFORE we + * spend bandwidth uploading it to the destination channel. Catches: + * - Truncated downloads (rare given our size check, but cheap to confirm) + * - CRC errors inside the archive + * - Bad central directories + * - Encrypted archives (we report them as failures rather than upload + * a file users can't extract) + * + * Returns { ok: true } if the archive is intact. Returns + * { ok: false, reason } otherwise. Logs at warn level on failure. + * + * For multipart archives, pass the first part. unzip / unrar / 7z all + * auto-discover sibling parts. + * + * archiveType "DOCUMENT" is a pass-through — there's no container to test. + */ +export async function testArchiveIntegrity( + archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT", + firstPartPath: string +): Promise { + if (archiveType === "DOCUMENT") { + return { ok: true }; + } + + try { + if (archiveType === "ZIP") { + // -t = test, -qq = very quiet (errors only) + const { stderr } = await execFileAsync("unzip", ["-tqq", firstPartPath], { + timeout: 300_000, // 5 min for very large archives + maxBuffer: 10 * 1024 * 1024, + }); + if (stderr && stderr.trim()) { + return { ok: false, reason: `unzip -t reported: ${stderr.slice(0, 500)}` }; + } + return { ok: true }; + } + + if (archiveType === "RAR") { + const { stdout, stderr } = await execFileAsync("unrar", ["t", firstPartPath], { + timeout: 300_000, + maxBuffer: 10 * 1024 * 1024, + }); + // unrar uses non-zero exit code on errors, which becomes a throw. + // If it succeeds, "All OK" is in stdout. + const combined = `${stdout}\n${stderr}`; + if (/All OK/i.test(combined)) { + return { ok: true }; + } + return { ok: false, reason: `unrar t did not report "All OK": ${combined.slice(-500)}` }; + } + + if (archiveType === "SEVEN_Z") { + const { stdout, stderr } = await execFileAsync("7z", ["t", firstPartPath], { + timeout: 300_000, + maxBuffer: 10 * 1024 * 1024, + }); + const combined = `${stdout}\n${stderr}`; + if (/Everything is Ok/i.test(combined)) { + return { ok: true }; + } + return { ok: false, reason: `7z t did not report "Everything is Ok": ${combined.slice(-500)}` }; + } + + return { ok: false, reason: `Unknown archive type: ${archiveType}` }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + // execFile throws on non-zero exit. Try to extract the most useful part. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const stderr = (err as any)?.stderr as string | undefined; + const detail = stderr ? `: ${stderr.slice(0, 500)}` : ""; + + // Specifically flag encrypted archives so the caller can record a more + // specific SkipReason / notification. + if (/password|encrypted|need.*password/i.test(`${msg}${detail}`)) { + return { ok: false, reason: `Archive is encrypted (password protected): ${msg}${detail}` }; + } + + log.debug({ err, archiveType, firstPartPath }, "Archive integrity test failed"); + return { ok: false, reason: `Integrity test failed: ${msg}${detail}` }; + } +} diff --git a/worker/src/recovery.ts b/worker/src/recovery.ts index cb8c262..cf688bf 100644 --- a/worker/src/recovery.ts +++ b/worker/src/recovery.ts @@ -81,17 +81,28 @@ export async function recoverIncompleteUploads(): Promise { let unknownCount = 0; let wrongContentCount = 0; + // Batch size for getMessages. TDLib accepts up to ~100 IDs per call. + // Using 100 means 20k packages → ~200 round-trips instead of 20k. + const BATCH_SIZE = 100; + for (const [, channelPackages] of byChannel) { - for (const pkg of channelPackages) { - const result = await verifyMessageExists( + // Group packages by destChannelId (already done) — within each group, + // process in batches via getMessages (plural). + for (let i = 0; i < channelPackages.length; i += BATCH_SIZE) { + const batch = channelPackages.slice(i, i + BATCH_SIZE); + const batchResults = await verifyMessagesBatch( client, destChannel.telegramId, - pkg.destMessageId! + batch.map((p) => p.destMessageId!) ); - if (result.state === "exists") { - verifiedCount++; - } else if (result.state === "deleted") { + for (let j = 0; j < batch.length; j++) { + const pkg = batch[j]; + const result = batchResults[j]; + + if (result.state === "exists") { + verifiedCount++; + } else if (result.state === "deleted") { log.warn( { packageId: pkg.id, @@ -130,6 +141,7 @@ export async function recoverIncompleteUploads(): Promise { "Could not verify destination message — will retry on next startup" ); } + } } } @@ -160,6 +172,55 @@ type VerifyResult = | { state: "wrong-content"; contentType: string } | { state: "unknown"; reason: string }; +/** + * Batch version of verifyMessageExists. Calls TDLib's getMessages (plural) + * with up to ~100 message IDs at once. Returns one VerifyResult per input + * ID, in input order. Missing messages come back as null in TDLib's response + * — translated to {state: "deleted"} here. + * + * Falls back to per-message verification on any error so that one bad batch + * doesn't lose all verification for that chunk. + */ +async function verifyMessagesBatch( + client: Client, + chatTelegramId: bigint, + messageIds: bigint[] +): Promise { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const result = (await withFloodWait( + () => + client.invoke({ + _: "getMessages", + chat_id: Number(chatTelegramId), + message_ids: messageIds.map((id) => Number(id)), + }), + "getMessages:verify" + )) as { messages?: (null | { content?: { _: string } })[] }; + + const messages = result.messages ?? []; + return messageIds.map((_id, i) => { + const m = messages[i]; + if (!m || !m.content) return { state: "deleted" }; + if (m.content._ !== "messageDocument") { + return { state: "wrong-content", contentType: String(m.content._) }; + } + return { state: "exists" }; + }); + } catch (err) { + // If the whole batch errors out, fall back to per-message verification. + log.warn( + { err, batchSize: messageIds.length, chatTelegramId: chatTelegramId.toString() }, + "getMessages batch failed, falling back to per-message verification" + ); + const out: VerifyResult[] = []; + for (const id of messageIds) { + out.push(await verifyMessageExists(client, chatTelegramId, id)); + } + return out; + } +} + /** * Check whether a message exists in a Telegram chat and is the document we * uploaded. Returns a discriminated result instead of a bare boolean so the diff --git a/worker/src/worker.ts b/worker/src/worker.ts index 4a314fe..c0d2b7b 100644 --- a/worker/src/worker.ts +++ b/worker/src/worker.ts @@ -48,6 +48,7 @@ import { groupArchiveSets } from "./archive/multipart.js"; import type { ArchiveSet } from "./archive/multipart.js"; import { extractCreatorFromFileName, extractCreatorFromChannelTitle } from "./archive/creator.js"; import { extractSlicerTags } from "./archive/slicer-tags.js"; +import { testArchiveIntegrity } from "./archive/integrity.js"; import { hashParts } from "./archive/hash.js"; import { readZipCentralDirectory } from "./archive/zip-reader.js"; import { readRarContents } from "./archive/rar-reader.js"; @@ -1653,6 +1654,19 @@ async function processOneArchiveSet( ); } + // ── Pre-upload integrity test ── + // Catch broken/encrypted archives before we burn upload bandwidth on + // them. Cheap (unzip -t / unrar t / 7z t) compared to a multi-GB upload. + // Skipped when we're reusing an existing upload — no point testing the + // file again. + const integrity = await testArchiveIntegrity( + archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type, + uploadPaths[0] + ); + if (!integrity.ok) { + throw new Error(`Archive integrity check failed: ${integrity.reason}`); + } + // ── Uploading ── // Check if a prior run already uploaded this file (orphaned upload scenario: // file reached Telegram but DB write failed or worker crashed before indexing) @@ -1718,6 +1732,72 @@ async function processOneArchiveSet( } } + // ── Destination read-back verification ── + // Telegram's updateMessageSendSucceeded fires when TG acknowledges the + // message, but that's separate from "the message is queryable and + // contains the file we sent". Fetch each destination message and + // confirm the document's size matches what we uploaded. + // + // Skipped when reusing an existing upload (we never sent anything). + // Failures here surface as a SystemNotification but DO NOT abort the + // ingestion — the Package will be created with whatever destMessageIds + // Telegram returned, and a future recovery run can reset it if needed. + if (!existingUpload && destResult.messageIds.length > 0) { + try { + const expectedSizes = uploadPaths.length === destResult.messageIds.length + ? await Promise.all( + uploadPaths.map(async (p) => (await import("fs/promises")).stat(p).then((s) => s.size)) + ) + : null; + + for (let i = 0; i < destResult.messageIds.length; i++) { + const msgId = Number(destResult.messageIds[i]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const tdMsg = (await client.invoke({ + _: "getMessage", + chat_id: Number(destChannelTelegramId), + message_id: msgId, + }).catch(() => null)) as any; + + const doc = tdMsg?.content?.document?.document; + const actualSize = doc?.size; + const expected = expectedSizes?.[i]; + + if (!actualSize) { + accountLog.warn( + { fileName: archiveName, destMessageId: msgId }, + "Post-upload read-back: destination message has no document content" + ); + await db.systemNotification.create({ + data: { + type: "UPLOAD_FAILED", + severity: "WARNING", + title: `Read-back failed: ${archiveName}`, + message: `Destination message ${msgId} has no document content after upload. The upload may have failed silently.`, + context: { fileName: archiveName, destMessageId: msgId, sourceChannelId: channel.id }, + }, + }); + } else if (expected !== undefined && actualSize !== expected) { + accountLog.error( + { fileName: archiveName, destMessageId: msgId, expectedSize: expected, actualSize }, + "Post-upload read-back: destination file size mismatch" + ); + await db.systemNotification.create({ + data: { + type: "HASH_MISMATCH", + severity: "ERROR", + title: `Read-back size mismatch: ${archiveName}`, + message: `Sent ${expected} bytes but destination message ${msgId} contains a ${actualSize}-byte file.`, + context: { fileName: archiveName, destMessageId: msgId, expectedSize: expected, actualSize, sourceChannelId: channel.id }, + }, + }); + } + } + } catch (readBackErr) { + accountLog.warn({ err: readBackErr, fileName: archiveName }, "Post-upload read-back failed (non-fatal)"); + } + } + // ── Phase 1: Stub record — persisted immediately after upload ── await deleteOrphanedPackageByHash(contentHash);