owltide/server/streams.ts
Hornwitser 3be7f8be05 Refactor user storage and update
Rename accounts to users to be consistent with the new naming scheme
where account only referes to the logged in user of the session and
implement live updates of users via a user store which listens for
updates from the event stream.
2025-06-23 00:28:58 +02:00

142 lines
3.8 KiB
TypeScript

import { readUsers } from "~/server/database";
import { canSeeCrew } from "./utils/schedule";
import type { ApiAccount, ApiEvent } from "~/shared/types/api";
function sendMessage(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => writer.write(message))
.catch(console.error)
.finally(() => writer.releaseLock())
;
}
function sendMessageAndClose(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => {
writer.write(message);
writer.close();
}).catch(console.error)
.finally(() => writer.releaseLock())
;
}
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number }>();
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
export function addStream(stream: WritableStream<string>, sessionId?: number, accountId?: number) {
if (streams.size === 0) {
console.log("Starting keepalive")
keepaliveInterval = setInterval(sendKeepalive, 4000)
}
streams.set(stream, { sessionId, accountId });
// Produce a response immediately to avoid the reply waiting for content.
const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`;
sendMessage(stream, message);
}
export function deleteStream(stream: WritableStream<string>) {
streams.delete(stream);
if (streams.size === 0) {
console.log("Ending keepalive")
clearInterval(keepaliveInterval!);
}
}
export function cancelAccountStreams(accountId: number) {
for (const [stream, data] of streams) {
if (data.accountId === accountId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
export function cancelSessionStreams(sessionId: number) {
for (const [stream, data] of streams) {
if (data.sessionId === sessionId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) {
const cache = encodeEventCache.get(event);
const cacheEntry = cache?.get(userType);
if (cacheEntry) {
return cacheEntry;
}
let data: string;
if (event.type === "schedule-update") {
if (!canSeeCrew(userType)) {
event = {
type: event.type,
updatedFrom: event.updatedFrom,
data: filterSchedule(event.data),
};
}
data = JSON.stringify(event);
} else if (event.type === "user-update") {
if (
!canSeeCrew(userType)
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
) {
event = {
type: event.type,
data: {
id: event.data.id,
updatedAt: event.data.updatedAt,
deleted: true,
}
}
}
data = JSON.stringify(event);
} else {
throw Error(`encodeEvent cannot encode ${event.type} event`);
}
if (cache) {
cache.set(userType, data);
} else {
encodeEventCache.set(event, new Map([[userType, data]]));
}
return data;
}
export async function broadcastEvent(event: ApiEvent) {
const id = Date.now();
console.log(`broadcasting update to ${streams.size} clients`);
if (!streams.size) {
return;
}
const users = await readUsers();
for (const [stream, streamData] of streams) {
// Account events are specially handled and only sent to the user they belong to.
if (event.type === "account-update") {
if (streamData.accountId === event.data.id) {
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
}
} else {
let userType: ApiAccount["type"] | undefined;
if (streamData.accountId !== undefined) {
userType = users.find(a => a.id === streamData.accountId)?.type
}
const data = encodeEvent(event, userType)
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
}
}
}
function sendKeepalive() {
for (const stream of streams.keys()) {
sendMessage(stream, ": keepalive\n");
}
}