owltide/server/streams.ts
Hornwitser 753da6d3d4 Refactor to persist and reliably deliver events
Store events that are to be broadcasted in the database, and fetch
events to serve in the /api/event stream to the client from the
database.  This ensures that events are not lost if the operation to
open the stream takes longer than usual, or the client was not connected
at the time the event was broadcast.

To ensure no events are lost in the transition from server generating
the page to the client hydrating and establishing a connection with the
event stream, the /api/last-event-id endpoint is first queried on the
server before any other entities is fetched from the database.  The
client then passes this id when establishing the event stream, and
receives all events greater than that id.
2025-09-20 20:36:37 +02:00

248 lines
6.7 KiB
TypeScript

/*
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later
*/
import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database";
import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api";
import { serverSessionToApi } from "./utils/session";
import { H3Event } from "h3";
const keepaliveTimeoutMs = 45e3;
const eventUpdateTimeMs = 1e3;
class EventStream {
write!: (data: string) => void;
close!: (reason?: string) => void;
constructor(
public sessionId: number | undefined,
public accountId: number | undefined,
public userType: ApiUserType | undefined,
public rotatesAtMs: number ,
public lastKeepAliveMs: number,
public lastEventId: number,
) {
}
}
export async function createEventStream(
event: H3Event,
source: string,
lastEventId: number,
session?: ServerSession,
) {
const runtimeConfig = useRuntimeConfig(event);
const now = Date.now();
const events = (await readEvents()).filter(e => e.id > lastEventId);
const users = await readUsers();
const apiSession = session ? await serverSessionToApi(event, session) : undefined;
let userType: ApiAccount["type"] | undefined;
if (session?.accountId !== undefined) {
userType = users.find(a => !a.deleted && a.id === session.accountId)?.type
}
const stream = new EventStream(
session?.id,
session?.accountId,
userType,
session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000,
now,
events[events.length - 1]?.id ?? lastEventId,
);
const readableStream = new ReadableStream<Uint8Array>({
start(controller) {
const encoder = new TextEncoder();
stream.write = (data: string) => {
controller.enqueue(encoder.encode(data));
}
stream.close = (reason?: string) => {
const data: ApiDisconnected = {
type: "disconnect",
reason,
};
stream.write(`data: ${JSON.stringify(data)}\n\n`);
controller.close();
deleteStream(stream);
},
console.log(`Starting event stream for ${source}`)
addStream(stream);
},
cancel(reason) {
console.log(`Cancelled event stream for ${source}:`, reason);
deleteStream(stream);
}
});
// Produce a response immediately to avoid the reply waiting for content.
const update: ApiEventStreamMessage = {
type: "connected",
session: apiSession,
};
stream.write(`data: ${JSON.stringify(update)}\n\n`);
/*
Send events since the provided lastEventId
Warning: This have to happen either before addStream(stream) is
called, or as done here synchronously after it. Otherwise there's a
possibility of events being delivered out of order, which will break
the assumption made by the schedule updating logic.
*/
if (events.length)
console.log(`Sending ${events.length} event(s) to ${source}`);
for (const event of events) {
if (!sendEventToStream(stream, event)) {
break;
}
}
return readableStream;
}
let updateInterval: ReturnType<typeof setInterval> | null = null
const streams = new Set<EventStream>();
function addStream(
stream: EventStream,
) {
if (streams.size === 0) {
console.log("Starting event updates")
updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs)
}
streams.add(stream);
}
function deleteStream(stream: EventStream) {
streams.delete(stream);
if (streams.size === 0) {
console.log("Ending event updates")
clearInterval(updateInterval!);
updateInterval = null;
}
}
export function cancelAccountStreams(accountId: number) {
for (const stream of streams.values()) {
if (stream.accountId === accountId) {
stream.close("cancelled");
}
}
}
export function cancelSessionStreams(sessionId: number) {
for (const stream of streams.values()) {
if (stream.sessionId === sessionId) {
stream.close("cancelled");
}
}
}
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) {
const cache = encodeEventCache.get(event);
const cacheEntry = cache?.get(userType);
if (cacheEntry) {
return cacheEntry;
}
let data: string;
if (event.type === "schedule-update") {
if (!canSeeCrew(userType)) {
event = {
id: event.id,
type: event.type,
updatedFrom: event.updatedFrom,
data: filterSchedule(event.data),
};
}
data = JSON.stringify(event);
} else if (event.type === "user-update") {
if (
!canSeeCrew(userType)
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
) {
event = {
id: event.id,
type: event.type,
data: {
id: event.data.id,
updatedAt: event.data.updatedAt,
deleted: true,
}
}
}
data = JSON.stringify(event);
} else {
throw Error(`encodeEvent cannot encode ${event.type} event`);
}
if (cache) {
cache.set(userType, data);
} else {
encodeEventCache.set(event, new Map([[userType, data]]));
}
return data;
}
export async function broadcastEvent(event: ApiEvent) {
const events = await readEvents();
events.push(event);
await writeEvents(events);
}
function sendEventToStream(stream: EventStream, event: ApiEvent) {
// Session expiry events cause the streams belonging to that session to be terminated
if (event.type === "session-expired") {
if (stream.sessionId === event.sessionId) {
stream.close("session expired");
}
return false;
}
// Account events are specially handled and only sent to the user they belong to.
if (event.type === "account-update") {
if (stream.accountId === event.data.id) {
stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`);
}
return true;
}
// All other events are encoded according to the user access level seeing it.
const data = encodeEvent(event, stream.userType)
stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`);
return true;
}
async function sendEventUpdates() {
// Cancel streams that need to be rotated.
const now = Date.now();
for (const stream of streams.values()) {
if (stream.rotatesAtMs < now) {
stream.close("session rotation");
continue;
}
}
// Send events.
const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId));
const events = (await readEvents()).filter(e => e.id > skipEventId);
if (events.length)
console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`);
for (const stream of streams.values()) {
for (const event of events) {
if (event.id > stream.lastEventId) {
stream.lastEventId = event.id;
stream.lastKeepAliveMs = now;
if (!sendEventToStream(stream, event)) {
break;
}
}
}
}
// Send Keepalives to streams with no activity.
for (const stream of streams.values()) {
if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) {
stream.write(": keepalive\n");
stream.lastKeepAliveMs = now;
}
}
}