owltide/server/streams.ts

172 lines
4.6 KiB
TypeScript
Raw Normal View History

/*
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later
*/
import { readUsers, ServerSession } from "~/server/database";
import type { ApiAccount, ApiEvent } from "~/shared/types/api";
import { serverSessionToApi } from "./utils/session";
import { H3Event } from "h3";
function sendMessage(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => writer.write(message))
.catch(console.error)
.finally(() => writer.releaseLock())
;
}
function sendMessageAndClose(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => {
writer.write(message);
writer.close();
}).catch(console.error)
.finally(() => writer.releaseLock())
;
}
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number, expiresAtMs: number }>();
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
export async function addStream(
event: H3Event,
stream: WritableStream<string>,
session?: ServerSession,
) {
if (streams.size === 0) {
console.log("Starting keepalive")
keepaliveInterval = setInterval(sendKeepalive, 4000)
}
const runtimeConfig = useRuntimeConfig(event);
streams.set(stream, {
sessionId: session?.id,
accountId: session?.accountId,
expiresAtMs: session?.expiresAtMs ?? Date.now() + runtimeConfig.sessionExpiresTimeout * 1000,
});
// Produce a response immediately to avoid the reply waiting for content.
const update: ApiEvent = {
type: "connected",
session: session ? await serverSessionToApi(event, session) : undefined,
};
sendMessage(stream, `event: update\ndata: ${JSON.stringify(update)}\n\n`);
}
export function deleteStream(stream: WritableStream<string>) {
streams.delete(stream);
if (streams.size === 0) {
console.log("Ending keepalive")
clearInterval(keepaliveInterval!);
}
}
export function cancelAccountStreams(accountId: number) {
for (const [stream, data] of streams) {
if (data.accountId === accountId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
export function cancelSessionStreams(sessionId: number) {
for (const [stream, data] of streams) {
if (data.sessionId === sessionId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) {
const cache = encodeEventCache.get(event);
const cacheEntry = cache?.get(userType);
if (cacheEntry) {
return cacheEntry;
}
let data: string;
if (event.type === "schedule-update") {
if (!canSeeCrew(userType)) {
event = {
type: event.type,
updatedFrom: event.updatedFrom,
data: filterSchedule(event.data),
};
}
data = JSON.stringify(event);
} else if (event.type === "user-update") {
if (
!canSeeCrew(userType)
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
) {
event = {
type: event.type,
data: {
id: event.data.id,
updatedAt: event.data.updatedAt,
deleted: true,
}
}
}
data = JSON.stringify(event);
} else {
throw Error(`encodeEvent cannot encode ${event.type} event`);
}
if (cache) {
cache.set(userType, data);
} else {
encodeEventCache.set(event, new Map([[userType, data]]));
}
return data;
}
export async function broadcastEvent(event: ApiEvent) {
const id = Date.now();
console.log(`broadcasting update to ${streams.size} clients`);
if (!streams.size) {
return;
}
// Session expiry events cause the streams belonging to that session to be terminated
if (event.type === "session-expired") {
cancelSessionStreams(event.sessionId);
return;
}
const users = await readUsers();
for (const [stream, streamData] of streams) {
// Account events are specially handled and only sent to the user they belong to.
if (event.type === "account-update") {
if (streamData.accountId === event.data.id) {
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
}
} else {
let userType: ApiAccount["type"] | undefined;
if (streamData.accountId !== undefined) {
userType = users.find(a => a.id === streamData.accountId)?.type
}
const data = encodeEvent(event, userType)
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
}
}
}
function sendKeepalive() {
const now = Date.now();
for (const [stream, streamData] of streams) {
if (streamData.expiresAtMs > now) {
sendMessage(stream, ": keepalive\n");
} else {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}