diff --git a/worker/src/db/queries.ts b/worker/src/db/queries.ts index 89f52ef..4411ef3 100644 --- a/worker/src/db/queries.ts +++ b/worker/src/db/queries.ts @@ -74,6 +74,105 @@ export async function getUploadedPackageByHash(contentHash: string) { }); } +export interface CreatePackageStubInput { + contentHash: string; + fileName: string; + fileSize: bigint; + archiveType: ArchiveType; + sourceChannelId: string; + sourceMessageId: bigint; + sourceTopicId?: bigint | null; + destChannelId: string; + destMessageId: bigint; + destMessageIds: bigint[]; + isMultipart: boolean; + partCount: number; + ingestionRunId: string; + creator?: string | null; + tags?: string[]; +} + +/** + * Write a minimal Package record immediately after Telegram confirms the upload. + * Call this before preview/metadata extraction so recoverIncompleteUploads() can + * detect and verify the package if the worker crashes mid-metadata. + * + * Follow with updatePackageWithMetadata() once file entries and preview are ready. + */ +export async function createPackageStub( + input: CreatePackageStubInput +): Promise<{ id: string }> { + const pkg = await db.package.create({ + data: { + contentHash: input.contentHash, + fileName: input.fileName, + fileSize: input.fileSize, + archiveType: input.archiveType, + sourceChannelId: input.sourceChannelId, + sourceMessageId: input.sourceMessageId, + sourceTopicId: input.sourceTopicId ?? undefined, + destChannelId: input.destChannelId, + destMessageId: input.destMessageId, + destMessageIds: input.destMessageIds, + isMultipart: input.isMultipart, + partCount: input.partCount, + fileCount: 0, + ingestionRunId: input.ingestionRunId, + creator: input.creator ?? undefined, + tags: input.tags?.length ? input.tags : undefined, + }, + select: { id: true }, + }); + + try { + await db.$queryRawUnsafe( + `SELECT pg_notify('new_package', $1)`, + JSON.stringify({ + packageId: pkg.id, + fileName: input.fileName, + creator: input.creator ?? null, + tags: input.tags ?? [], + }) + ); + } catch { + // Best-effort + } + + return pkg; +} + +/** + * Update a stub Package with file entries and preview after metadata extraction. + * Called as Phase 2 of the two-phase write after createPackageStub(). + */ +export async function updatePackageWithMetadata( + packageId: string, + input: { + files: { + path: string; + fileName: string; + extension: string | null; + compressedSize: bigint; + uncompressedSize: bigint; + crc32: string | null; + }[]; + previewData?: Buffer | null; + previewMsgId?: bigint | null; + } +): Promise { + await db.package.update({ + where: { id: packageId }, + data: { + fileCount: input.files.length, + previewData: input.previewData ? new Uint8Array(input.previewData) : undefined, + previewMsgId: input.previewMsgId ?? undefined, + files: { + create: input.files, + }, + }, + }); +} + /** * Check if a package already exists for a given source message ID * AND was successfully uploaded to the destination (destMessageId is set).