Update tg issues

This commit is contained in:
xCyanGrizzly
2026-03-16 16:51:30 +01:00
parent 2763de2711
commit d7bbb7587e
13 changed files with 450 additions and 70 deletions

View File

@@ -36,11 +36,13 @@ async function main(): Promise<void> {
// Graceful shutdown
function shutdown(signal: string): void {
log.info({ signal }, "Shutdown signal received");
stopScheduler();
// Stop accepting new work
stopFetchListener();
// Close DB connections
Promise.all([db.$disconnect(), pool.end()])
// Wait for any active cycle to finish before closing DB
stopScheduler()
.then(() => Promise.all([db.$disconnect(), pool.end()]))
.then(() => {
log.info("Shutdown complete");
process.exit(0);

View File

@@ -9,6 +9,7 @@ const log = childLogger("scheduler");
let running = false;
let timer: ReturnType<typeof setTimeout> | null = null;
let cycleCount = 0;
let activeCyclePromise: Promise<void> | null = null;
/**
* Maximum time for a single ingestion cycle (ms).
@@ -107,7 +108,9 @@ function scheduleNext(): void {
);
timer = setTimeout(async () => {
await runCycle();
activeCyclePromise = runCycle();
await activeCyclePromise;
activeCyclePromise = null;
scheduleNext();
}, delay);
}
@@ -125,7 +128,9 @@ export async function startScheduler(): Promise<void> {
);
// Run immediately on start
await runCycle();
activeCyclePromise = runCycle();
await activeCyclePromise;
activeCyclePromise = null;
// Then schedule recurring cycles
scheduleNext();
@@ -146,11 +151,21 @@ export async function triggerImmediateCycle(): Promise<void> {
/**
* Stop the scheduler gracefully.
* Returns a promise that resolves when any active cycle finishes,
* so callers can wait before closing DB connections.
*/
export function stopScheduler(): void {
export function stopScheduler(): Promise<void> {
if (timer) {
clearTimeout(timer);
timer = null;
}
if (activeCyclePromise) {
log.info("Scheduler stopping — waiting for active cycle to finish");
return activeCyclePromise.finally(() => {
activeCyclePromise = null;
log.info("Scheduler stopped");
});
}
log.info("Scheduler stopped");
return Promise.resolve();
}

View File

@@ -107,12 +107,10 @@ export async function getForumTopicList(
for (const t of result.topics) {
if (!t.info?.message_thread_id) continue;
// Skip the "General" topic — it's not creator-specific
if (t.info.is_general) continue;
topics.push({
topicId: BigInt(t.info.message_thread_id),
name: t.info.name ?? "Unnamed",
name: t.info.is_general ? "General" : (t.info.name ?? "Unnamed"),
});
}

View File

@@ -76,6 +76,10 @@ export async function uploadToChannel(
/**
* Send a single file message and wait for Telegram to confirm the upload.
* Returns the final server-assigned message ID.
*
* IMPORTANT: The update listener is attached BEFORE sending the message to
* avoid a race where fast uploads (cached files) complete before the listener
* is registered, which would cause the promise to hang forever.
*/
async function sendAndWaitForUpload(
client: Client,
@@ -85,41 +89,10 @@ async function sendAndWaitForUpload(
fileName: string,
fileSizeMB: number
): Promise<bigint> {
// Send the message — this returns a temporary message immediately.
// Wrapped in withFloodWait to handle Telegram rate limits on upload.
const tempMsg = (await withFloodWait(
() =>
client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
}),
"sendMessage:upload"
)) as { id: number };
const tempMsgId = tempMsg.id;
log.debug(
{ fileName, tempMsgId },
"Message queued, waiting for upload confirmation"
);
// Wait for the actual upload to complete
return new Promise<bigint>((resolve, reject) => {
let settled = false;
let lastLoggedPercent = 0;
let tempMsgId: number | null = null;
// Timeout: 10 minutes per GB, minimum 10 minutes
const timeoutMs = Math.max(
@@ -162,7 +135,7 @@ async function sendAndWaitForUpload(
if (update?._ === "updateMessageSendSucceeded") {
const msg = update.message;
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (tempMsgId !== null && oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
@@ -179,7 +152,7 @@ async function sendAndWaitForUpload(
// Upload failed
if (update?._ === "updateMessageSendFailed") {
const oldMsgId = update.old_message_id;
if (oldMsgId === tempMsgId) {
if (tempMsgId !== null && oldMsgId === tempMsgId) {
if (!settled) {
settled = true;
cleanup();
@@ -195,7 +168,47 @@ async function sendAndWaitForUpload(
client.off("update", handleUpdate);
};
// Attach listener BEFORE sending to avoid missing fast completions
client.on("update", handleUpdate);
// Send the message — this returns a temporary message immediately.
// Wrapped in withFloodWait to handle Telegram rate limits on upload.
withFloodWait(
() =>
client.invoke({
_: "sendMessage",
chat_id: Number(chatId),
input_message_content: {
_: "inputMessageDocument",
document: {
_: "inputFileLocal",
path: filePath,
},
caption: caption
? {
_: "formattedText",
text: caption,
}
: undefined,
},
}),
"sendMessage:upload"
)
.then((result) => {
const tempMsg = result as { id: number };
tempMsgId = tempMsg.id;
log.debug(
{ fileName, tempMsgId },
"Message queued, waiting for upload confirmation"
);
})
.catch((err) => {
if (!settled) {
settled = true;
cleanup();
reject(err);
}
});
});
}

View File

@@ -559,9 +559,11 @@ export async function runWorkerForAccount(
}
// ── Done ──
await throttled.flush();
await completeIngestionRun(activeRunId, counters);
accountLog.info({ counters }, "Ingestion run completed");
} finally {
await throttled.flush();
await closeTdlibClient(client);
}
} catch (err) {