diff --git a/worker/src/db/locks.ts b/worker/src/db/locks.ts index c5a384c..76d62f5 100644 --- a/worker/src/db/locks.ts +++ b/worker/src/db/locks.ts @@ -79,3 +79,66 @@ export async function releaseLock(accountId: string): Promise { client.release(); } } + +/** + * Derive a lock ID for a content hash. Prefixes with "hash:" so the resulting + * 32-bit integer does not collide with account advisory lock IDs. + */ +function contentHashToLockId(contentHash: string): number { + return hashToLockId(`hash:${contentHash}`); +} + +/** + * Acquire a per-content-hash advisory lock before uploading. + * Prevents two concurrent workers from uploading the same archive + * when both scan a shared source channel. + * + * Returns true if acquired (proceed with upload). + * Returns false if already held (another worker is handling this archive — skip). + * + * MUST be released via releaseHashLock() after createPackageStub() completes, + * including on all error paths (use try/finally). + */ +export async function tryAcquireHashLock(contentHash: string): Promise { + const lockId = contentHashToLockId(contentHash); + const client = await pool.connect(); + try { + const result = await client.query<{ pg_try_advisory_lock: boolean }>( + "SELECT pg_try_advisory_lock($1)", + [lockId] + ); + const acquired = result.rows[0]?.pg_try_advisory_lock ?? false; + if (acquired) { + heldConnections.set(`hash:${contentHash}`, client); + log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock acquired"); + return true; + } else { + client.release(); + log.debug({ hash: contentHash.slice(0, 16), lockId }, "Hash lock held by another worker — skipping"); + return false; + } + } catch (err) { + client.release(); + throw err; + } +} + +/** + * Release the per-content-hash advisory lock. + * Call after createPackageStub() completes (or on any error path). + */ +export async function releaseHashLock(contentHash: string): Promise { + const lockId = contentHashToLockId(contentHash); + const client = heldConnections.get(`hash:${contentHash}`); + if (!client) { + log.warn({ hash: contentHash.slice(0, 16) }, "No held connection for hash lock release"); + return; + } + try { + await client.query("SELECT pg_advisory_unlock($1)", [lockId]); + log.debug({ hash: contentHash.slice(0, 16) }, "Hash lock released"); + } finally { + heldConnections.delete(`hash:${contentHash}`); + client.release(); + } +}