/* SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database"; import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api"; import { serverSessionToApi } from "./utils/session"; import { H3Event } from "h3"; const keepaliveTimeoutMs = 45e3; const eventUpdateTimeMs = 1e3; class EventStream { write!: (data: string) => void; close!: (reason?: string) => void; constructor( public sessionId: number | undefined, public accountId: number | undefined, public userType: ApiUserType | undefined, public rotatesAtMs: number , public lastKeepAliveMs: number, public lastEventId: number, ) { } } export async function createEventStream( event: H3Event, source: string, lastEventId: number, session?: ServerSession, ) { const runtimeConfig = useRuntimeConfig(event); const now = Date.now(); const events = (readEvents()).filter(e => e.id > lastEventId); const users = readUsers(); const apiSession = session ? await serverSessionToApi(event, session) : undefined; let userType: ApiAccount["type"] | undefined; if (session?.accountId !== undefined) { userType = users.find(a => !a.deleted && a.id === session.accountId)?.type } const stream = new EventStream( session?.id, session?.accountId, userType, session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000, now, events[events.length - 1]?.id ?? lastEventId, ); const readableStream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); stream.write = (data: string) => { controller.enqueue(encoder.encode(data)); } stream.close = (reason?: string) => { const data: ApiDisconnected = { type: "disconnect", reason, }; stream.write(`data: ${JSON.stringify(data)}\n\n`); controller.close(); deleteStream(stream); }, console.log(`Starting event stream for ${source}`) addStream(stream); }, cancel(reason) { console.log(`Cancelled event stream for ${source}:`, reason); deleteStream(stream); } }); // Produce a response immediately to avoid the reply waiting for content. const update: ApiEventStreamMessage = { type: "connected", session: apiSession, }; stream.write(`data: ${JSON.stringify(update)}\n\n`); /* Send events since the provided lastEventId Warning: This have to happen either before addStream(stream) is called, or as done here synchronously after it. Otherwise there's a possibility of events being delivered out of order, which will break the assumption made by the schedule updating logic. */ if (events.length) console.log(`Sending ${events.length} event(s) to ${source}`); for (const event of events) { if (!sendEventToStream(stream, event)) { break; } } return readableStream; } let updateInterval: ReturnType | null = null const streams = new Set(); function addStream( stream: EventStream, ) { if (streams.size === 0) { console.log("Starting event updates") updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs) } streams.add(stream); } function deleteStream(stream: EventStream) { streams.delete(stream); if (streams.size === 0) { console.log("Ending event updates") clearInterval(updateInterval!); updateInterval = null; } } export function cancelAccountStreams(accountId: number) { for (const stream of streams.values()) { if (stream.accountId === accountId) { stream.close("cancelled"); } } } export function cancelSessionStreams(sessionId: number) { for (const stream of streams.values()) { if (stream.sessionId === sessionId) { stream.close("cancelled"); } } } const encodeEventCache = new WeakMap>(); function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) { const cache = encodeEventCache.get(event); const cacheEntry = cache?.get(userType); if (cacheEntry) { return cacheEntry; } let data: string; if (event.type === "schedule-update") { if (!canSeeCrew(userType)) { event = { id: event.id, type: event.type, updatedFrom: event.updatedFrom, data: filterSchedule(event.data), }; } data = JSON.stringify(event); } else if (event.type === "user-update") { if ( !canSeeCrew(userType) || !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType) ) { event = { id: event.id, type: event.type, data: { id: event.data.id, updatedAt: event.data.updatedAt, deleted: true, } } } data = JSON.stringify(event); } else { throw Error(`encodeEvent cannot encode ${event.type} event`); } if (cache) { cache.set(userType, data); } else { encodeEventCache.set(event, new Map([[userType, data]])); } return data; } export async function broadcastEvent(event: ApiEvent) { const events = readEvents(); events.push(event); writeEvents(events); } function sendEventToStream(stream: EventStream, event: ApiEvent) { // Session expiry events cause the streams belonging to that session to be terminated if (event.type === "session-expired") { if (stream.sessionId === event.sessionId) { stream.close("session expired"); return false; } return true; } // Account events are specially handled and only sent to the user they belong to. if (event.type === "account-update") { if (stream.accountId === event.data.id) { stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`); } return true; } // All other events are encoded according to the user access level seeing it. const data = encodeEvent(event, stream.userType) stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`); return true; } async function sendEventUpdates() { // Cancel streams that need to be rotated. const now = Date.now(); for (const stream of streams.values()) { if (stream.rotatesAtMs < now) { stream.close("session rotation"); continue; } } // Send events. const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId)); const events = (readEvents()).filter(e => e.id > skipEventId); if (events.length) console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`); for (const stream of streams.values()) { for (const event of events) { if (event.id > stream.lastEventId) { stream.lastEventId = event.id; stream.lastKeepAliveMs = now; if (!sendEventToStream(stream, event)) { break; } } } } // Send Keepalives to streams with no activity. for (const stream of streams.values()) { if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) { stream.write(": keepalive\n"); stream.lastKeepAliveMs = now; } } }