2025-06-30 18:58:24 +02:00
|
|
|
/*
|
|
|
|
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
|
|
|
|
SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
*/
|
2025-07-09 18:08:39 +02:00
|
|
|
import { readUsers, type ServerSession } from "~/server/database";
|
2025-06-11 21:05:17 +02:00
|
|
|
import type { ApiAccount, ApiEvent } from "~/shared/types/api";
|
2025-07-08 15:43:14 +02:00
|
|
|
import { serverSessionToApi } from "./utils/session";
|
|
|
|
import { H3Event } from "h3";
|
2025-02-27 18:39:04 +01:00
|
|
|
|
|
|
|
function sendMessage(
|
|
|
|
stream: WritableStream<string>,
|
|
|
|
message: string,
|
|
|
|
) {
|
|
|
|
const writer = stream.getWriter();
|
|
|
|
writer.ready
|
|
|
|
.then(() => writer.write(message))
|
|
|
|
.catch(console.error)
|
|
|
|
.finally(() => writer.releaseLock())
|
|
|
|
;
|
|
|
|
}
|
|
|
|
|
2025-03-10 16:26:52 +01:00
|
|
|
function sendMessageAndClose(
|
|
|
|
stream: WritableStream<string>,
|
|
|
|
message: string,
|
|
|
|
) {
|
|
|
|
const writer = stream.getWriter();
|
|
|
|
writer.ready
|
|
|
|
.then(() => {
|
|
|
|
writer.write(message);
|
|
|
|
writer.close();
|
|
|
|
}).catch(console.error)
|
|
|
|
.finally(() => writer.releaseLock())
|
|
|
|
;
|
|
|
|
}
|
|
|
|
|
2025-07-09 14:54:54 +02:00
|
|
|
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number, rotatesAtMs: number }>();
|
2025-02-27 18:39:04 +01:00
|
|
|
|
|
|
|
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
|
2025-07-08 15:43:14 +02:00
|
|
|
export async function addStream(
|
|
|
|
event: H3Event,
|
|
|
|
stream: WritableStream<string>,
|
|
|
|
session?: ServerSession,
|
|
|
|
) {
|
2025-02-27 18:39:04 +01:00
|
|
|
if (streams.size === 0) {
|
|
|
|
console.log("Starting keepalive")
|
|
|
|
keepaliveInterval = setInterval(sendKeepalive, 4000)
|
|
|
|
}
|
2025-07-08 15:43:14 +02:00
|
|
|
const runtimeConfig = useRuntimeConfig(event);
|
|
|
|
streams.set(stream, {
|
|
|
|
sessionId: session?.id,
|
|
|
|
accountId: session?.accountId,
|
2025-07-09 14:54:54 +02:00
|
|
|
rotatesAtMs: session?.rotatesAtMs ?? Date.now() + runtimeConfig.sessionRotatesTimeout * 1000,
|
2025-07-08 15:43:14 +02:00
|
|
|
});
|
|
|
|
// Produce a response immediately to avoid the reply waiting for content.
|
|
|
|
const update: ApiEvent = {
|
|
|
|
type: "connected",
|
|
|
|
session: session ? await serverSessionToApi(event, session) : undefined,
|
|
|
|
};
|
|
|
|
sendMessage(stream, `event: update\ndata: ${JSON.stringify(update)}\n\n`);
|
2025-02-27 18:39:04 +01:00
|
|
|
}
|
|
|
|
export function deleteStream(stream: WritableStream<string>) {
|
|
|
|
streams.delete(stream);
|
|
|
|
if (streams.size === 0) {
|
|
|
|
console.log("Ending keepalive")
|
|
|
|
clearInterval(keepaliveInterval!);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-03-10 16:26:52 +01:00
|
|
|
export function cancelAccountStreams(accountId: number) {
|
|
|
|
for (const [stream, data] of streams) {
|
|
|
|
if (data.accountId === accountId) {
|
|
|
|
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
export function cancelSessionStreams(sessionId: number) {
|
|
|
|
for (const [stream, data] of streams) {
|
|
|
|
if (data.sessionId === sessionId) {
|
|
|
|
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-06-11 21:05:17 +02:00
|
|
|
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
|
2025-06-23 00:17:22 +02:00
|
|
|
function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) {
|
2025-06-11 21:05:17 +02:00
|
|
|
const cache = encodeEventCache.get(event);
|
2025-06-23 00:17:22 +02:00
|
|
|
const cacheEntry = cache?.get(userType);
|
2025-06-11 21:05:17 +02:00
|
|
|
if (cacheEntry) {
|
|
|
|
return cacheEntry;
|
|
|
|
}
|
|
|
|
|
|
|
|
let data: string;
|
|
|
|
if (event.type === "schedule-update") {
|
2025-06-23 00:17:22 +02:00
|
|
|
if (!canSeeCrew(userType)) {
|
2025-06-11 21:05:17 +02:00
|
|
|
event = {
|
|
|
|
type: event.type,
|
|
|
|
updatedFrom: event.updatedFrom,
|
|
|
|
data: filterSchedule(event.data),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
data = JSON.stringify(event);
|
2025-06-23 00:17:22 +02:00
|
|
|
} else if (event.type === "user-update") {
|
|
|
|
if (
|
|
|
|
!canSeeCrew(userType)
|
|
|
|
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
|
|
|
|
) {
|
|
|
|
event = {
|
|
|
|
type: event.type,
|
|
|
|
data: {
|
|
|
|
id: event.data.id,
|
|
|
|
updatedAt: event.data.updatedAt,
|
|
|
|
deleted: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
data = JSON.stringify(event);
|
2025-06-11 21:05:17 +02:00
|
|
|
} else {
|
|
|
|
throw Error(`encodeEvent cannot encode ${event.type} event`);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cache) {
|
2025-06-23 00:17:22 +02:00
|
|
|
cache.set(userType, data);
|
2025-06-11 21:05:17 +02:00
|
|
|
} else {
|
2025-06-23 00:17:22 +02:00
|
|
|
encodeEventCache.set(event, new Map([[userType, data]]));
|
2025-06-11 21:05:17 +02:00
|
|
|
}
|
|
|
|
return data;
|
|
|
|
}
|
|
|
|
|
|
|
|
export async function broadcastEvent(event: ApiEvent) {
|
2025-02-27 18:39:04 +01:00
|
|
|
const id = Date.now();
|
2025-03-10 16:26:52 +01:00
|
|
|
console.log(`broadcasting update to ${streams.size} clients`);
|
|
|
|
if (!streams.size) {
|
|
|
|
return;
|
|
|
|
}
|
2025-07-08 15:43:14 +02:00
|
|
|
|
|
|
|
// Session expiry events cause the streams belonging to that session to be terminated
|
|
|
|
if (event.type === "session-expired") {
|
|
|
|
cancelSessionStreams(event.sessionId);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2025-06-23 00:17:22 +02:00
|
|
|
const users = await readUsers();
|
2025-03-10 16:26:52 +01:00
|
|
|
for (const [stream, streamData] of streams) {
|
2025-06-23 00:17:22 +02:00
|
|
|
// Account events are specially handled and only sent to the user they belong to.
|
2025-06-11 21:05:17 +02:00
|
|
|
if (event.type === "account-update") {
|
|
|
|
if (streamData.accountId === event.data.id) {
|
|
|
|
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
2025-06-23 00:17:22 +02:00
|
|
|
let userType: ApiAccount["type"] | undefined;
|
2025-06-11 21:05:17 +02:00
|
|
|
if (streamData.accountId !== undefined) {
|
2025-07-08 16:23:31 +02:00
|
|
|
userType = users.find(a => !a.deleted && a.id === streamData.accountId)?.type
|
2025-06-11 21:05:17 +02:00
|
|
|
}
|
2025-06-23 00:17:22 +02:00
|
|
|
const data = encodeEvent(event, userType)
|
2025-06-11 21:05:17 +02:00
|
|
|
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
|
2025-03-10 16:26:52 +01:00
|
|
|
}
|
2025-02-27 18:39:04 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function sendKeepalive() {
|
2025-07-08 15:43:14 +02:00
|
|
|
const now = Date.now();
|
|
|
|
for (const [stream, streamData] of streams) {
|
2025-07-09 14:54:54 +02:00
|
|
|
if (streamData.rotatesAtMs > now) {
|
2025-07-08 15:43:14 +02:00
|
|
|
sendMessage(stream, ": keepalive\n");
|
|
|
|
} else {
|
|
|
|
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
|
|
|
}
|
2025-02-27 18:39:04 +01:00
|
|
|
}
|
|
|
|
}
|