feat(verify): pre-upload integrity test, post-upload read-back, batched recovery
All checks were successful
continuous-integration/drone/push Build is passing

Three independent verification improvements landing together.

1. Pre-upload archive integrity test (testArchiveIntegrity)
   Before sending an archive to the destination channel, runs the
   appropriate CLI test:
     - unzip -t   for ZIP
     - unrar t    for RAR
     - 7z t       for SEVEN_Z
   Catches truncated downloads, internal CRC errors, bad central
   directories, and password-protected archives BEFORE we burn upload
   bandwidth on a file that can't be extracted. Encrypted archives are
   specifically flagged so the SkippedPackage error message is clear.

2. Post-upload destination read-back
   updateMessageSendSucceeded tells us Telegram accepted the upload,
   but says nothing about whether the destination message actually
   contains the file we sent. After each successful upload, getMessage
   each destMessageId and confirm document.size matches uploadPaths[i]'s
   on-disk size.

   Mismatches don't abort ingestion — they surface as
   HASH_MISMATCH / UPLOAD_FAILED SystemNotifications so the admin can
   see them in the UI and decide whether to recover.

3. Batched recovery (verifyMessagesBatch)
   recoverIncompleteUploads previously called getMessage (singular)
   per Package — at 20k packages that's 20k round-trips. Switched to
   TDLib's getMessages (plural) with batch size 100 → 200 round-trips.
   On 20k packages this is ~100x faster.

   Per-message fallback if a whole batch errors out, so one bad batch
   never loses all verification.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-24 08:56:50 +02:00
parent c4d9be83bd
commit 04effed825
3 changed files with 240 additions and 6 deletions

View File

@@ -0,0 +1,93 @@
import { execFile } from "child_process";
import { promisify } from "util";
import { childLogger } from "../util/logger.js";
const execFileAsync = promisify(execFile);
const log = childLogger("integrity");
export type IntegrityResult =
| { ok: true }
| { ok: false; reason: string };
/**
* Test that the archive can be read end-to-end without errors, BEFORE we
* spend bandwidth uploading it to the destination channel. Catches:
* - Truncated downloads (rare given our size check, but cheap to confirm)
* - CRC errors inside the archive
* - Bad central directories
* - Encrypted archives (we report them as failures rather than upload
* a file users can't extract)
*
* Returns { ok: true } if the archive is intact. Returns
* { ok: false, reason } otherwise. Logs at warn level on failure.
*
* For multipart archives, pass the first part. unzip / unrar / 7z all
* auto-discover sibling parts.
*
* archiveType "DOCUMENT" is a pass-through — there's no container to test.
*/
export async function testArchiveIntegrity(
archiveType: "ZIP" | "RAR" | "SEVEN_Z" | "DOCUMENT",
firstPartPath: string
): Promise<IntegrityResult> {
if (archiveType === "DOCUMENT") {
return { ok: true };
}
try {
if (archiveType === "ZIP") {
// -t = test, -qq = very quiet (errors only)
const { stderr } = await execFileAsync("unzip", ["-tqq", firstPartPath], {
timeout: 300_000, // 5 min for very large archives
maxBuffer: 10 * 1024 * 1024,
});
if (stderr && stderr.trim()) {
return { ok: false, reason: `unzip -t reported: ${stderr.slice(0, 500)}` };
}
return { ok: true };
}
if (archiveType === "RAR") {
const { stdout, stderr } = await execFileAsync("unrar", ["t", firstPartPath], {
timeout: 300_000,
maxBuffer: 10 * 1024 * 1024,
});
// unrar uses non-zero exit code on errors, which becomes a throw.
// If it succeeds, "All OK" is in stdout.
const combined = `${stdout}\n${stderr}`;
if (/All OK/i.test(combined)) {
return { ok: true };
}
return { ok: false, reason: `unrar t did not report "All OK": ${combined.slice(-500)}` };
}
if (archiveType === "SEVEN_Z") {
const { stdout, stderr } = await execFileAsync("7z", ["t", firstPartPath], {
timeout: 300_000,
maxBuffer: 10 * 1024 * 1024,
});
const combined = `${stdout}\n${stderr}`;
if (/Everything is Ok/i.test(combined)) {
return { ok: true };
}
return { ok: false, reason: `7z t did not report "Everything is Ok": ${combined.slice(-500)}` };
}
return { ok: false, reason: `Unknown archive type: ${archiveType}` };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
// execFile throws on non-zero exit. Try to extract the most useful part.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const stderr = (err as any)?.stderr as string | undefined;
const detail = stderr ? `: ${stderr.slice(0, 500)}` : "";
// Specifically flag encrypted archives so the caller can record a more
// specific SkipReason / notification.
if (/password|encrypted|need.*password/i.test(`${msg}${detail}`)) {
return { ok: false, reason: `Archive is encrypted (password protected): ${msg}${detail}` };
}
log.debug({ err, archiveType, firstPartPath }, "Archive integrity test failed");
return { ok: false, reason: `Integrity test failed: ${msg}${detail}` };
}
}

View File

@@ -81,17 +81,28 @@ export async function recoverIncompleteUploads(): Promise<void> {
let unknownCount = 0;
let wrongContentCount = 0;
// Batch size for getMessages. TDLib accepts up to ~100 IDs per call.
// Using 100 means 20k packages → ~200 round-trips instead of 20k.
const BATCH_SIZE = 100;
for (const [, channelPackages] of byChannel) {
for (const pkg of channelPackages) {
const result = await verifyMessageExists(
// Group packages by destChannelId (already done) — within each group,
// process in batches via getMessages (plural).
for (let i = 0; i < channelPackages.length; i += BATCH_SIZE) {
const batch = channelPackages.slice(i, i + BATCH_SIZE);
const batchResults = await verifyMessagesBatch(
client,
destChannel.telegramId,
pkg.destMessageId!
batch.map((p) => p.destMessageId!)
);
if (result.state === "exists") {
verifiedCount++;
} else if (result.state === "deleted") {
for (let j = 0; j < batch.length; j++) {
const pkg = batch[j];
const result = batchResults[j];
if (result.state === "exists") {
verifiedCount++;
} else if (result.state === "deleted") {
log.warn(
{
packageId: pkg.id,
@@ -130,6 +141,7 @@ export async function recoverIncompleteUploads(): Promise<void> {
"Could not verify destination message — will retry on next startup"
);
}
}
}
}
@@ -160,6 +172,55 @@ type VerifyResult =
| { state: "wrong-content"; contentType: string }
| { state: "unknown"; reason: string };
/**
* Batch version of verifyMessageExists. Calls TDLib's getMessages (plural)
* with up to ~100 message IDs at once. Returns one VerifyResult per input
* ID, in input order. Missing messages come back as null in TDLib's response
* — translated to {state: "deleted"} here.
*
* Falls back to per-message verification on any error so that one bad batch
* doesn't lose all verification for that chunk.
*/
async function verifyMessagesBatch(
client: Client,
chatTelegramId: bigint,
messageIds: bigint[]
): Promise<VerifyResult[]> {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const result = (await withFloodWait(
() =>
client.invoke({
_: "getMessages",
chat_id: Number(chatTelegramId),
message_ids: messageIds.map((id) => Number(id)),
}),
"getMessages:verify"
)) as { messages?: (null | { content?: { _: string } })[] };
const messages = result.messages ?? [];
return messageIds.map((_id, i) => {
const m = messages[i];
if (!m || !m.content) return { state: "deleted" };
if (m.content._ !== "messageDocument") {
return { state: "wrong-content", contentType: String(m.content._) };
}
return { state: "exists" };
});
} catch (err) {
// If the whole batch errors out, fall back to per-message verification.
log.warn(
{ err, batchSize: messageIds.length, chatTelegramId: chatTelegramId.toString() },
"getMessages batch failed, falling back to per-message verification"
);
const out: VerifyResult[] = [];
for (const id of messageIds) {
out.push(await verifyMessageExists(client, chatTelegramId, id));
}
return out;
}
}
/**
* Check whether a message exists in a Telegram chat and is the document we
* uploaded. Returns a discriminated result instead of a bare boolean so the

View File

@@ -48,6 +48,7 @@ import { groupArchiveSets } from "./archive/multipart.js";
import type { ArchiveSet } from "./archive/multipart.js";
import { extractCreatorFromFileName, extractCreatorFromChannelTitle } from "./archive/creator.js";
import { extractSlicerTags } from "./archive/slicer-tags.js";
import { testArchiveIntegrity } from "./archive/integrity.js";
import { hashParts } from "./archive/hash.js";
import { readZipCentralDirectory } from "./archive/zip-reader.js";
import { readRarContents } from "./archive/rar-reader.js";
@@ -1653,6 +1654,19 @@ async function processOneArchiveSet(
);
}
// ── Pre-upload integrity test ──
// Catch broken/encrypted archives before we burn upload bandwidth on
// them. Cheap (unzip -t / unrar t / 7z t) compared to a multi-GB upload.
// Skipped when we're reusing an existing upload — no point testing the
// file again.
const integrity = await testArchiveIntegrity(
archiveSet.type === "7Z" ? "SEVEN_Z" : archiveSet.type,
uploadPaths[0]
);
if (!integrity.ok) {
throw new Error(`Archive integrity check failed: ${integrity.reason}`);
}
// ── Uploading ──
// Check if a prior run already uploaded this file (orphaned upload scenario:
// file reached Telegram but DB write failed or worker crashed before indexing)
@@ -1718,6 +1732,72 @@ async function processOneArchiveSet(
}
}
// ── Destination read-back verification ──
// Telegram's updateMessageSendSucceeded fires when TG acknowledges the
// message, but that's separate from "the message is queryable and
// contains the file we sent". Fetch each destination message and
// confirm the document's size matches what we uploaded.
//
// Skipped when reusing an existing upload (we never sent anything).
// Failures here surface as a SystemNotification but DO NOT abort the
// ingestion — the Package will be created with whatever destMessageIds
// Telegram returned, and a future recovery run can reset it if needed.
if (!existingUpload && destResult.messageIds.length > 0) {
try {
const expectedSizes = uploadPaths.length === destResult.messageIds.length
? await Promise.all(
uploadPaths.map(async (p) => (await import("fs/promises")).stat(p).then((s) => s.size))
)
: null;
for (let i = 0; i < destResult.messageIds.length; i++) {
const msgId = Number(destResult.messageIds[i]);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const tdMsg = (await client.invoke({
_: "getMessage",
chat_id: Number(destChannelTelegramId),
message_id: msgId,
}).catch(() => null)) as any;
const doc = tdMsg?.content?.document?.document;
const actualSize = doc?.size;
const expected = expectedSizes?.[i];
if (!actualSize) {
accountLog.warn(
{ fileName: archiveName, destMessageId: msgId },
"Post-upload read-back: destination message has no document content"
);
await db.systemNotification.create({
data: {
type: "UPLOAD_FAILED",
severity: "WARNING",
title: `Read-back failed: ${archiveName}`,
message: `Destination message ${msgId} has no document content after upload. The upload may have failed silently.`,
context: { fileName: archiveName, destMessageId: msgId, sourceChannelId: channel.id },
},
});
} else if (expected !== undefined && actualSize !== expected) {
accountLog.error(
{ fileName: archiveName, destMessageId: msgId, expectedSize: expected, actualSize },
"Post-upload read-back: destination file size mismatch"
);
await db.systemNotification.create({
data: {
type: "HASH_MISMATCH",
severity: "ERROR",
title: `Read-back size mismatch: ${archiveName}`,
message: `Sent ${expected} bytes but destination message ${msgId} contains a ${actualSize}-byte file.`,
context: { fileName: archiveName, destMessageId: msgId, expectedSize: expected, actualSize, sourceChannelId: channel.id },
},
});
}
}
} catch (readBackErr) {
accountLog.warn({ err: readBackErr, fileName: archiveName }, "Post-upload read-back failed (non-fatal)");
}
}
// ── Phase 1: Stub record — persisted immediately after upload ──
await deleteOrphanedPackageByHash(contentHash);