mirror of
https://github.com/xCyanGrizzly/DragonsStash.git
synced 2026-06-13 04:31:16 +00:00
Diagnosed from production: in 8 hours of main's current run, zero
uploads happened despite the worker being busy 100% of the time. Logs
showed continuous "Downloading archive part" entries with no
corresponding upload activity.
Root cause: the source channel ("Model Printing Emporium") frequently
reposts the same file at new Telegram message IDs. Concrete example
from the DB:
- "(EN) PaintGuides All.zip" → present 6 times, msgIds 44B → 92B
- "00 Welcome Pack.7z" → present 2 times, msgIds 91B and 177B
- "FanteZi April 2022-...zip" → uploaded May 8 at msgId 24,697,110,528;
current run re-downloading at 87,488,987,136
packageExistsBySourceMessage(channelId, msgId) correctly misses because
the msgId is different. We download the (potentially gigabyte-sized)
file, hash it, then packageExistsByHash hits and we discard the
download. ~30 seconds wasted per repost x thousands of reposts = whole
runs spent uploading nothing.
Fix: add findRepostedPackage(sourceChannelId, fileName, fileSize) — a
pre-download check that catches reposts by the strong (channel + name
+ total size) signal. On hit, skip the set entirely. Watermark
advances normally (no minFailedId tracking) so the next cycle sees
the channel as caught up.
False-positive risk: two unrelated files in the same channel with
identical name AND identical total fileSize. Extremely rare in
practice; if it ever happens, the new file is silently treated as a
duplicate. Logged at info level with the existing Package ID and dest
message ID so the user can audit if a file is mysteriously missing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
797 lines
22 KiB
TypeScript
797 lines
22 KiB
TypeScript
import { db } from "./client.js";
|
|
import type { ArchiveType, FetchStatus } from "@prisma/client";
|
|
|
|
export async function getActiveAccounts() {
|
|
return db.telegramAccount.findMany({
|
|
where: { isActive: true, authState: "AUTHENTICATED" },
|
|
});
|
|
}
|
|
|
|
export async function getPendingAccounts() {
|
|
return db.telegramAccount.findMany({
|
|
where: { isActive: true, authState: "PENDING" },
|
|
});
|
|
}
|
|
|
|
export async function hasAnyChannels(): Promise<boolean> {
|
|
const count = await db.telegramChannel.count();
|
|
return count > 0;
|
|
}
|
|
|
|
export async function getSourceChannelMappings(accountId: string) {
|
|
return db.accountChannelMap.findMany({
|
|
where: {
|
|
accountId,
|
|
role: "READER",
|
|
channel: { type: "SOURCE", isActive: true },
|
|
},
|
|
include: { channel: true },
|
|
});
|
|
}
|
|
|
|
// ── Global destination channel ──
|
|
|
|
export async function getGlobalDestinationChannel() {
|
|
const setting = await db.globalSetting.findUnique({
|
|
where: { key: "destination_channel_id" },
|
|
});
|
|
if (!setting) return null;
|
|
return db.telegramChannel.findFirst({
|
|
where: { id: setting.value, type: "DESTINATION", isActive: true },
|
|
});
|
|
}
|
|
|
|
export async function getGlobalSetting(key: string): Promise<string | null> {
|
|
const setting = await db.globalSetting.findUnique({ where: { key } });
|
|
return setting?.value ?? null;
|
|
}
|
|
|
|
export async function setGlobalSetting(key: string, value: string) {
|
|
return db.globalSetting.upsert({
|
|
where: { key },
|
|
create: { key, value },
|
|
update: { value },
|
|
});
|
|
}
|
|
|
|
export async function packageExistsByHash(contentHash: string) {
|
|
const pkg = await db.package.findFirst({
|
|
where: { contentHash, destMessageId: { not: null } },
|
|
select: { id: true },
|
|
});
|
|
return pkg !== null;
|
|
}
|
|
|
|
/**
|
|
* Find an already-uploaded package by content hash.
|
|
* Used to detect orphaned uploads — files that reached Telegram
|
|
* but whose package record was created from a previous successful run.
|
|
*/
|
|
export async function getUploadedPackageByHash(contentHash: string) {
|
|
return db.package.findFirst({
|
|
where: { contentHash, destMessageId: { not: null }, destChannelId: { not: null } },
|
|
select: { destChannelId: true, destMessageId: true, destMessageIds: true },
|
|
});
|
|
}
|
|
|
|
export interface CreatePackageStubInput {
|
|
contentHash: string;
|
|
fileName: string;
|
|
fileSize: bigint;
|
|
archiveType: ArchiveType;
|
|
sourceChannelId: string;
|
|
sourceMessageId: bigint;
|
|
sourceTopicId?: bigint | null;
|
|
destChannelId: string;
|
|
destMessageId: bigint;
|
|
destMessageIds: bigint[];
|
|
isMultipart: boolean;
|
|
partCount: number;
|
|
ingestionRunId: string;
|
|
creator?: string | null;
|
|
tags?: string[];
|
|
}
|
|
|
|
/**
|
|
* Write a minimal Package record immediately after Telegram confirms the upload.
|
|
* Call this before preview/metadata extraction so recoverIncompleteUploads() can
|
|
* detect and verify the package if the worker crashes mid-metadata.
|
|
*
|
|
* Follow with updatePackageWithMetadata() once file entries and preview are ready.
|
|
*/
|
|
export async function createPackageStub(
|
|
input: CreatePackageStubInput
|
|
): Promise<{ id: string }> {
|
|
const pkg = await db.package.create({
|
|
data: {
|
|
contentHash: input.contentHash,
|
|
fileName: input.fileName,
|
|
fileSize: input.fileSize,
|
|
archiveType: input.archiveType,
|
|
sourceChannelId: input.sourceChannelId,
|
|
sourceMessageId: input.sourceMessageId,
|
|
sourceTopicId: input.sourceTopicId ?? undefined,
|
|
destChannelId: input.destChannelId,
|
|
destMessageId: input.destMessageId,
|
|
destMessageIds: input.destMessageIds,
|
|
isMultipart: input.isMultipart,
|
|
partCount: input.partCount,
|
|
fileCount: 0,
|
|
ingestionRunId: input.ingestionRunId,
|
|
creator: input.creator ?? undefined,
|
|
tags: input.tags?.length ? input.tags : undefined,
|
|
},
|
|
select: { id: true },
|
|
});
|
|
|
|
try {
|
|
await db.$queryRawUnsafe(
|
|
`SELECT pg_notify('new_package', $1)`,
|
|
JSON.stringify({
|
|
packageId: pkg.id,
|
|
fileName: input.fileName,
|
|
creator: input.creator ?? null,
|
|
tags: input.tags ?? [],
|
|
})
|
|
);
|
|
} catch {
|
|
// Best-effort
|
|
}
|
|
|
|
return pkg;
|
|
}
|
|
|
|
/**
|
|
* Update a stub Package with file entries and preview after metadata extraction.
|
|
* Called as Phase 2 of the two-phase write after createPackageStub().
|
|
*/
|
|
export async function updatePackageWithMetadata(
|
|
packageId: string,
|
|
input: {
|
|
files: {
|
|
path: string;
|
|
fileName: string;
|
|
extension: string | null;
|
|
compressedSize: bigint;
|
|
uncompressedSize: bigint;
|
|
crc32: string | null;
|
|
}[];
|
|
previewData?: Buffer | null;
|
|
previewMsgId?: bigint | null;
|
|
}
|
|
): Promise<void> {
|
|
await db.package.update({
|
|
where: { id: packageId },
|
|
data: {
|
|
fileCount: input.files.length,
|
|
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
|
previewMsgId: input.previewMsgId ?? undefined,
|
|
files: {
|
|
create: input.files,
|
|
},
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Check if a package already exists for a given source message ID
|
|
* AND was successfully uploaded to the destination (destMessageId is set).
|
|
* Used as an early skip before downloading.
|
|
*/
|
|
export async function packageExistsBySourceMessage(
|
|
sourceChannelId: string,
|
|
sourceMessageId: bigint
|
|
): Promise<boolean> {
|
|
const pkg = await db.package.findFirst({
|
|
where: { sourceChannelId, sourceMessageId, destMessageId: { not: null } },
|
|
select: { id: true },
|
|
});
|
|
return pkg !== null;
|
|
}
|
|
|
|
/**
|
|
* Detect a likely repost: same source channel + same fileName + same total
|
|
* fileSize already exists with destMessageId set. Used to skip downloads
|
|
* when the channel admin re-posts the same file under a new message ID
|
|
* (which `packageExistsBySourceMessage` cannot catch because the message ID
|
|
* is different).
|
|
*
|
|
* Returns the existing package's destMessageId for logging/observability,
|
|
* or null if no match. Approximate: same name + same total size is an
|
|
* extremely strong signal that it's the same content, but theoretically
|
|
* two unrelated files could collide. If that ever happens, the new file
|
|
* gets treated as a duplicate and is lost; the user can manually re-link
|
|
* via the UI by removing the existing Package.
|
|
*/
|
|
export async function findRepostedPackage(
|
|
sourceChannelId: string,
|
|
fileName: string,
|
|
fileSize: bigint
|
|
): Promise<{ id: string; destMessageId: bigint | null } | null> {
|
|
return db.package.findFirst({
|
|
where: {
|
|
sourceChannelId,
|
|
fileName,
|
|
fileSize,
|
|
destMessageId: { not: null },
|
|
},
|
|
select: { id: true, destMessageId: true },
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Delete orphaned Package rows that have the same content hash but never
|
|
* completed the upload (destMessageId is null). Called before creating a
|
|
* new complete record to avoid unique constraint violations.
|
|
*/
|
|
export async function deleteOrphanedPackageByHash(contentHash: string): Promise<void> {
|
|
await db.package.deleteMany({
|
|
where: { contentHash, destMessageId: null },
|
|
});
|
|
}
|
|
|
|
export interface CreatePackageInput {
|
|
contentHash: string;
|
|
fileName: string;
|
|
fileSize: bigint;
|
|
archiveType: ArchiveType;
|
|
sourceChannelId: string;
|
|
sourceMessageId: bigint;
|
|
sourceTopicId?: bigint | null;
|
|
destChannelId?: string;
|
|
destMessageId?: bigint;
|
|
destMessageIds?: bigint[];
|
|
isMultipart: boolean;
|
|
partCount: number;
|
|
ingestionRunId: string;
|
|
creator?: string | null;
|
|
tags?: string[];
|
|
previewData?: Buffer | null;
|
|
previewMsgId?: bigint | null;
|
|
sourceCaption?: string | null;
|
|
replyToMessageId?: bigint | null;
|
|
files: {
|
|
path: string;
|
|
fileName: string;
|
|
extension: string | null;
|
|
compressedSize: bigint;
|
|
uncompressedSize: bigint;
|
|
crc32: string | null;
|
|
}[];
|
|
}
|
|
|
|
export async function createPackageWithFiles(input: CreatePackageInput) {
|
|
const pkg = await db.package.create({
|
|
data: {
|
|
contentHash: input.contentHash,
|
|
fileName: input.fileName,
|
|
fileSize: input.fileSize,
|
|
archiveType: input.archiveType,
|
|
sourceChannelId: input.sourceChannelId,
|
|
sourceMessageId: input.sourceMessageId,
|
|
sourceTopicId: input.sourceTopicId ?? undefined,
|
|
destChannelId: input.destChannelId,
|
|
destMessageId: input.destMessageId,
|
|
destMessageIds: input.destMessageIds ?? (input.destMessageId ? [input.destMessageId] : []),
|
|
isMultipart: input.isMultipart,
|
|
partCount: input.partCount,
|
|
fileCount: input.files.length,
|
|
ingestionRunId: input.ingestionRunId,
|
|
creator: input.creator ?? undefined,
|
|
tags: input.tags && input.tags.length > 0 ? input.tags : undefined,
|
|
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
|
previewMsgId: input.previewMsgId ?? undefined,
|
|
sourceCaption: input.sourceCaption ?? undefined,
|
|
replyToMessageId: input.replyToMessageId ?? undefined,
|
|
files: {
|
|
create: input.files,
|
|
},
|
|
},
|
|
});
|
|
|
|
// Notify the bot service about the new package (for subscription alerts)
|
|
try {
|
|
await db.$queryRawUnsafe(
|
|
`SELECT pg_notify('new_package', $1)`,
|
|
JSON.stringify({
|
|
packageId: pkg.id,
|
|
fileName: input.fileName,
|
|
creator: input.creator ?? null,
|
|
tags: input.tags ?? [],
|
|
})
|
|
);
|
|
} catch {
|
|
// Best-effort — don't fail the ingestion if notification fails
|
|
}
|
|
|
|
return pkg;
|
|
}
|
|
|
|
export async function createIngestionRun(accountId: string) {
|
|
return db.ingestionRun.create({
|
|
data: {
|
|
accountId,
|
|
status: "RUNNING",
|
|
currentActivity: "Starting ingestion run",
|
|
currentStep: "initializing",
|
|
lastActivityAt: new Date(),
|
|
},
|
|
});
|
|
}
|
|
|
|
export interface ActivityUpdate {
|
|
currentActivity: string;
|
|
currentStep: string;
|
|
currentChannel?: string | null;
|
|
currentFile?: string | null;
|
|
currentFileNum?: number | null;
|
|
totalFiles?: number | null;
|
|
downloadedBytes?: bigint | null;
|
|
totalBytes?: bigint | null;
|
|
downloadPercent?: number | null;
|
|
messagesScanned?: number;
|
|
zipsFound?: number;
|
|
zipsDuplicate?: number;
|
|
zipsIngested?: number;
|
|
}
|
|
|
|
export async function updateRunActivity(
|
|
runId: string,
|
|
activity: ActivityUpdate
|
|
) {
|
|
return db.ingestionRun.update({
|
|
where: { id: runId },
|
|
data: {
|
|
currentActivity: activity.currentActivity,
|
|
currentStep: activity.currentStep,
|
|
currentChannel: activity.currentChannel ?? undefined,
|
|
currentFile: activity.currentFile ?? undefined,
|
|
currentFileNum: activity.currentFileNum ?? undefined,
|
|
totalFiles: activity.totalFiles ?? undefined,
|
|
downloadedBytes: activity.downloadedBytes ?? undefined,
|
|
totalBytes: activity.totalBytes ?? undefined,
|
|
downloadPercent: activity.downloadPercent ?? undefined,
|
|
lastActivityAt: new Date(),
|
|
...(activity.messagesScanned !== undefined && { messagesScanned: activity.messagesScanned }),
|
|
...(activity.zipsFound !== undefined && { zipsFound: activity.zipsFound }),
|
|
...(activity.zipsDuplicate !== undefined && { zipsDuplicate: activity.zipsDuplicate }),
|
|
...(activity.zipsIngested !== undefined && { zipsIngested: activity.zipsIngested }),
|
|
},
|
|
});
|
|
}
|
|
|
|
const CLEAR_ACTIVITY = {
|
|
currentActivity: null,
|
|
currentStep: null,
|
|
currentChannel: null,
|
|
currentFile: null,
|
|
currentFileNum: null,
|
|
totalFiles: null,
|
|
downloadedBytes: null,
|
|
totalBytes: null,
|
|
downloadPercent: null,
|
|
lastActivityAt: new Date(),
|
|
};
|
|
|
|
export async function completeIngestionRun(
|
|
runId: string,
|
|
counters: {
|
|
messagesScanned: number;
|
|
zipsFound: number;
|
|
zipsDuplicate: number;
|
|
zipsIngested: number;
|
|
}
|
|
) {
|
|
return db.ingestionRun.update({
|
|
where: { id: runId },
|
|
data: {
|
|
status: "COMPLETED",
|
|
finishedAt: new Date(),
|
|
...counters,
|
|
...CLEAR_ACTIVITY,
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function failIngestionRun(runId: string, errorMessage: string) {
|
|
return db.ingestionRun.update({
|
|
where: { id: runId },
|
|
data: {
|
|
status: "FAILED",
|
|
finishedAt: new Date(),
|
|
errorMessage,
|
|
...CLEAR_ACTIVITY,
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function updateLastProcessedMessage(
|
|
mappingId: string,
|
|
messageId: bigint
|
|
) {
|
|
return db.accountChannelMap.update({
|
|
where: { id: mappingId },
|
|
data: { lastProcessedMessageId: messageId },
|
|
});
|
|
}
|
|
|
|
export async function markStaleRunsAsFailed() {
|
|
return db.ingestionRun.updateMany({
|
|
where: { status: "RUNNING" },
|
|
data: {
|
|
status: "FAILED",
|
|
finishedAt: new Date(),
|
|
errorMessage: "Worker restarted — run was still marked as RUNNING",
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function updateAccountAuthState(
|
|
accountId: string,
|
|
authState: "PENDING" | "AWAITING_CODE" | "AWAITING_PASSWORD" | "AUTHENTICATED" | "EXPIRED",
|
|
authCode?: string | null
|
|
) {
|
|
return db.telegramAccount.update({
|
|
where: { id: accountId },
|
|
data: { authState, authCode, lastSeenAt: authState === "AUTHENTICATED" ? new Date() : undefined },
|
|
});
|
|
}
|
|
|
|
export async function updateAccountPremiumStatus(
|
|
accountId: string,
|
|
isPremium: boolean
|
|
): Promise<void> {
|
|
await db.telegramAccount.update({
|
|
where: { id: accountId },
|
|
data: { isPremium },
|
|
});
|
|
}
|
|
|
|
export async function getAccountAuthCode(accountId: string) {
|
|
const account = await db.telegramAccount.findUnique({
|
|
where: { id: accountId },
|
|
select: { authCode: true, authState: true },
|
|
});
|
|
return account;
|
|
}
|
|
|
|
// ── Channel sync (auto-discovery from Telegram) ──
|
|
|
|
export interface UpsertChannelInput {
|
|
telegramId: bigint;
|
|
title: string;
|
|
type: "SOURCE" | "DESTINATION";
|
|
isForum: boolean;
|
|
isActive?: boolean;
|
|
}
|
|
|
|
/**
|
|
* Upsert a channel by telegramId. Returns the channel record.
|
|
* If it already exists, update title and forum status.
|
|
* New channels default to disabled (isActive: false) so the admin must
|
|
* explicitly enable them before the worker processes them.
|
|
* Pass isActive: true for DESTINATION channels that must be active immediately.
|
|
*/
|
|
export async function upsertChannel(input: UpsertChannelInput) {
|
|
return db.telegramChannel.upsert({
|
|
where: { telegramId: input.telegramId },
|
|
create: {
|
|
telegramId: input.telegramId,
|
|
title: input.title,
|
|
type: input.type,
|
|
isForum: input.isForum,
|
|
isActive: input.isActive ?? false,
|
|
},
|
|
update: {
|
|
title: input.title,
|
|
isForum: input.isForum,
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Link an account to a channel if not already linked.
|
|
* Uses a try/catch on unique constraint to make it idempotent.
|
|
*/
|
|
export async function ensureAccountChannelLink(
|
|
accountId: string,
|
|
channelId: string,
|
|
role: "READER" | "WRITER"
|
|
) {
|
|
try {
|
|
return await db.accountChannelMap.create({
|
|
data: { accountId, channelId, role },
|
|
});
|
|
} catch (err: unknown) {
|
|
// Already linked — ignore unique constraint violation
|
|
if (err instanceof Error && err.message.includes("Unique constraint")) {
|
|
return null;
|
|
}
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
// ── Forum / Topic progress ──
|
|
|
|
export async function setChannelForum(channelId: string, isForum: boolean) {
|
|
return db.telegramChannel.update({
|
|
where: { id: channelId },
|
|
data: { isForum },
|
|
});
|
|
}
|
|
|
|
export async function getTopicProgress(mappingId: string) {
|
|
return db.topicProgress.findMany({
|
|
where: { accountChannelMapId: mappingId },
|
|
});
|
|
}
|
|
|
|
export async function upsertTopicProgress(
|
|
mappingId: string,
|
|
topicId: bigint,
|
|
topicName: string | null,
|
|
lastProcessedMessageId: bigint
|
|
) {
|
|
return db.topicProgress.upsert({
|
|
where: {
|
|
accountChannelMapId_topicId: {
|
|
accountChannelMapId: mappingId,
|
|
topicId,
|
|
},
|
|
},
|
|
create: {
|
|
accountChannelMapId: mappingId,
|
|
topicId,
|
|
topicName,
|
|
lastProcessedMessageId,
|
|
},
|
|
update: {
|
|
topicName,
|
|
lastProcessedMessageId,
|
|
},
|
|
});
|
|
}
|
|
|
|
// ── Channel fetch requests (DB-mediated communication with web app) ──
|
|
|
|
export async function getChannelFetchRequest(requestId: string) {
|
|
return db.channelFetchRequest.findUnique({
|
|
where: { id: requestId },
|
|
include: { account: true },
|
|
});
|
|
}
|
|
|
|
export async function updateFetchRequestStatus(
|
|
requestId: string,
|
|
status: FetchStatus,
|
|
extra?: { resultJson?: string; error?: string }
|
|
) {
|
|
return db.channelFetchRequest.update({
|
|
where: { id: requestId },
|
|
data: {
|
|
status,
|
|
resultJson: extra?.resultJson ?? undefined,
|
|
error: extra?.error ?? undefined,
|
|
},
|
|
});
|
|
}
|
|
|
|
export async function getAccountLinkedChannelIds(accountId: string): Promise<Set<string>> {
|
|
const links = await db.accountChannelMap.findMany({
|
|
where: { accountId },
|
|
select: { channel: { select: { telegramId: true } } },
|
|
});
|
|
return new Set(links.map((l) => l.channel.telegramId.toString()));
|
|
}
|
|
|
|
export async function getExistingChannelsByTelegramId(): Promise<Map<string, string>> {
|
|
const channels = await db.telegramChannel.findMany({
|
|
select: { id: true, telegramId: true },
|
|
});
|
|
const map = new Map<string, string>();
|
|
for (const ch of channels) {
|
|
map.set(ch.telegramId.toString(), ch.id);
|
|
}
|
|
return map;
|
|
}
|
|
|
|
export async function getAccountById(accountId: string) {
|
|
return db.telegramAccount.findUnique({ where: { id: accountId } });
|
|
}
|
|
|
|
/**
|
|
* Find packages that have a destMessageId set (appear uploaded) but may
|
|
* reference messages that no longer exist in Telegram. These need
|
|
* verification on startup.
|
|
*
|
|
* Groups by destChannelId so the caller can batch-verify per channel.
|
|
*/
|
|
export async function getPackagesWithDestMessage() {
|
|
return db.package.findMany({
|
|
where: { destMessageId: { not: null }, destChannelId: { not: null } },
|
|
select: {
|
|
id: true,
|
|
fileName: true,
|
|
contentHash: true,
|
|
destChannelId: true,
|
|
destMessageId: true,
|
|
sourceChannel: { select: { telegramId: true } },
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Reset a package's destination fields so it will be re-processed
|
|
* on the next ingestion run (treated as not-yet-uploaded).
|
|
*/
|
|
export async function resetPackageDestination(packageId: string) {
|
|
return db.package.update({
|
|
where: { id: packageId },
|
|
data: { destChannelId: null, destMessageId: null },
|
|
});
|
|
}
|
|
|
|
export async function upsertSkippedPackage(data: {
|
|
fileName: string;
|
|
fileSize: bigint;
|
|
reason: "SIZE_LIMIT" | "DOWNLOAD_FAILED" | "EXTRACT_FAILED" | "UPLOAD_FAILED";
|
|
errorMessage?: string;
|
|
sourceChannelId: string;
|
|
sourceMessageId: bigint;
|
|
sourceTopicId?: bigint | null;
|
|
isMultipart: boolean;
|
|
partCount: number;
|
|
accountId: string;
|
|
}) {
|
|
return db.skippedPackage.upsert({
|
|
where: {
|
|
sourceChannelId_sourceMessageId: {
|
|
sourceChannelId: data.sourceChannelId,
|
|
sourceMessageId: data.sourceMessageId,
|
|
},
|
|
},
|
|
update: {
|
|
reason: data.reason,
|
|
errorMessage: data.errorMessage ?? null,
|
|
fileName: data.fileName,
|
|
fileSize: data.fileSize,
|
|
attemptCount: { increment: 1 },
|
|
createdAt: new Date(),
|
|
},
|
|
create: {
|
|
fileName: data.fileName,
|
|
fileSize: data.fileSize,
|
|
reason: data.reason,
|
|
errorMessage: data.errorMessage ?? null,
|
|
sourceChannelId: data.sourceChannelId,
|
|
sourceMessageId: data.sourceMessageId,
|
|
sourceTopicId: data.sourceTopicId ?? null,
|
|
isMultipart: data.isMultipart,
|
|
partCount: data.partCount,
|
|
accountId: data.accountId,
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Return source-message IDs in a channel whose SkippedPackage attemptCount has
|
|
* reached or exceeded the cap — these are treated as "permanently failed for
|
|
* now" so the watermark can advance past them. The user can manually retry via
|
|
* the UI to reset the SkippedPackage record.
|
|
*/
|
|
export async function getCappedSkippedMessageIds(
|
|
sourceChannelId: string,
|
|
cap: number
|
|
): Promise<Set<bigint>> {
|
|
const rows = await db.skippedPackage.findMany({
|
|
where: {
|
|
sourceChannelId,
|
|
attemptCount: { gte: cap },
|
|
},
|
|
select: { sourceMessageId: true },
|
|
});
|
|
return new Set(rows.map((r) => r.sourceMessageId));
|
|
}
|
|
|
|
export async function deleteSkippedPackage(
|
|
sourceChannelId: string,
|
|
sourceMessageId: bigint
|
|
) {
|
|
return db.skippedPackage.deleteMany({
|
|
where: { sourceChannelId, sourceMessageId },
|
|
});
|
|
}
|
|
|
|
export async function createOrFindPackageGroup(input: {
|
|
mediaAlbumId: string;
|
|
sourceChannelId: string;
|
|
name: string;
|
|
previewData?: Buffer | null;
|
|
}): Promise<string> {
|
|
// findFirst + conditional create (Prisma doesn't support upsert on nullable compound unique)
|
|
const existing = await db.packageGroup.findFirst({
|
|
where: {
|
|
mediaAlbumId: input.mediaAlbumId,
|
|
sourceChannelId: input.sourceChannelId,
|
|
},
|
|
select: { id: true },
|
|
});
|
|
|
|
if (existing) return existing.id;
|
|
|
|
try {
|
|
const group = await db.packageGroup.create({
|
|
data: {
|
|
mediaAlbumId: input.mediaAlbumId,
|
|
sourceChannelId: input.sourceChannelId,
|
|
name: input.name,
|
|
previewData: input.previewData ? new Uint8Array(input.previewData) : undefined,
|
|
},
|
|
});
|
|
return group.id;
|
|
} catch (err) {
|
|
// Handle race condition: another process created the group between our findFirst and create
|
|
if (err instanceof Error && err.message.includes("Unique constraint")) {
|
|
const raced = await db.packageGroup.findFirst({
|
|
where: { mediaAlbumId: input.mediaAlbumId, sourceChannelId: input.sourceChannelId },
|
|
select: { id: true },
|
|
});
|
|
if (raced) return raced.id;
|
|
}
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
export async function linkPackagesToGroup(
|
|
packageIds: string[],
|
|
groupId: string
|
|
): Promise<void> {
|
|
await db.package.updateMany({
|
|
where: { id: { in: packageIds } },
|
|
data: { packageGroupId: groupId },
|
|
});
|
|
}
|
|
|
|
export async function createTimeWindowGroup(input: {
|
|
sourceChannelId: string;
|
|
name: string;
|
|
packageIds: string[];
|
|
}): Promise<string> {
|
|
const group = await db.packageGroup.create({
|
|
data: {
|
|
sourceChannelId: input.sourceChannelId,
|
|
name: input.name,
|
|
groupingSource: "AUTO_TIME",
|
|
},
|
|
});
|
|
|
|
await db.package.updateMany({
|
|
where: { id: { in: input.packageIds } },
|
|
data: { packageGroupId: group.id },
|
|
});
|
|
|
|
return group.id;
|
|
}
|
|
|
|
export async function createAutoGroup(input: {
|
|
sourceChannelId: string;
|
|
name: string;
|
|
packageIds: string[];
|
|
groupingSource: "ALBUM" | "MANUAL" | "AUTO_TIME" | "AUTO_PATTERN" | "AUTO_ZIP" | "AUTO_CAPTION" | "AUTO_REPLY";
|
|
}): Promise<string> {
|
|
const group = await db.packageGroup.create({
|
|
data: {
|
|
sourceChannelId: input.sourceChannelId,
|
|
name: input.name,
|
|
groupingSource: input.groupingSource,
|
|
},
|
|
});
|
|
|
|
await db.package.updateMany({
|
|
where: { id: { in: input.packageIds } },
|
|
data: { packageGroupId: group.id },
|
|
});
|
|
|
|
return group.id;
|
|
}
|