Replace all async reads and writes to the JSON database with the sync reads and writes to prevent a data corruption race condition where two requests are processed at the same time and write to the same file, or one reads while the other writes causing read of partially written data.
248 lines
6.7 KiB
TypeScript
248 lines
6.7 KiB
TypeScript
/*
|
|
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
|
|
SPDX-License-Identifier: AGPL-3.0-or-later
|
|
*/
|
|
import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database";
|
|
import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api";
|
|
import { serverSessionToApi } from "./utils/session";
|
|
import { H3Event } from "h3";
|
|
|
|
const keepaliveTimeoutMs = 45e3;
|
|
const eventUpdateTimeMs = 1e3;
|
|
|
|
class EventStream {
|
|
write!: (data: string) => void;
|
|
close!: (reason?: string) => void;
|
|
|
|
constructor(
|
|
public sessionId: number | undefined,
|
|
public accountId: number | undefined,
|
|
public userType: ApiUserType | undefined,
|
|
public rotatesAtMs: number ,
|
|
public lastKeepAliveMs: number,
|
|
public lastEventId: number,
|
|
) {
|
|
}
|
|
}
|
|
|
|
export async function createEventStream(
|
|
event: H3Event,
|
|
source: string,
|
|
lastEventId: number,
|
|
session?: ServerSession,
|
|
) {
|
|
const runtimeConfig = useRuntimeConfig(event);
|
|
const now = Date.now();
|
|
const events = (readEvents()).filter(e => e.id > lastEventId);
|
|
const users = readUsers();
|
|
const apiSession = session ? await serverSessionToApi(event, session) : undefined;
|
|
let userType: ApiAccount["type"] | undefined;
|
|
if (session?.accountId !== undefined) {
|
|
userType = users.find(a => !a.deleted && a.id === session.accountId)?.type
|
|
}
|
|
const stream = new EventStream(
|
|
session?.id,
|
|
session?.accountId,
|
|
userType,
|
|
session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000,
|
|
now,
|
|
events[events.length - 1]?.id ?? lastEventId,
|
|
);
|
|
|
|
const readableStream = new ReadableStream<Uint8Array>({
|
|
start(controller) {
|
|
const encoder = new TextEncoder();
|
|
stream.write = (data: string) => {
|
|
controller.enqueue(encoder.encode(data));
|
|
}
|
|
stream.close = (reason?: string) => {
|
|
const data: ApiDisconnected = {
|
|
type: "disconnect",
|
|
reason,
|
|
};
|
|
stream.write(`data: ${JSON.stringify(data)}\n\n`);
|
|
controller.close();
|
|
deleteStream(stream);
|
|
},
|
|
console.log(`Starting event stream for ${source}`)
|
|
addStream(stream);
|
|
},
|
|
cancel(reason) {
|
|
console.log(`Cancelled event stream for ${source}:`, reason);
|
|
deleteStream(stream);
|
|
}
|
|
});
|
|
// Produce a response immediately to avoid the reply waiting for content.
|
|
const update: ApiEventStreamMessage = {
|
|
type: "connected",
|
|
session: apiSession,
|
|
};
|
|
stream.write(`data: ${JSON.stringify(update)}\n\n`);
|
|
|
|
/*
|
|
Send events since the provided lastEventId
|
|
|
|
Warning: This have to happen either before addStream(stream) is
|
|
called, or as done here synchronously after it. Otherwise there's a
|
|
possibility of events being delivered out of order, which will break
|
|
the assumption made by the schedule updating logic.
|
|
*/
|
|
if (events.length)
|
|
console.log(`Sending ${events.length} event(s) to ${source}`);
|
|
for (const event of events) {
|
|
if (!sendEventToStream(stream, event)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return readableStream;
|
|
}
|
|
|
|
let updateInterval: ReturnType<typeof setInterval> | null = null
|
|
const streams = new Set<EventStream>();
|
|
function addStream(
|
|
stream: EventStream,
|
|
) {
|
|
if (streams.size === 0) {
|
|
console.log("Starting event updates")
|
|
updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs)
|
|
}
|
|
streams.add(stream);
|
|
}
|
|
function deleteStream(stream: EventStream) {
|
|
streams.delete(stream);
|
|
if (streams.size === 0) {
|
|
console.log("Ending event updates")
|
|
clearInterval(updateInterval!);
|
|
updateInterval = null;
|
|
}
|
|
}
|
|
|
|
export function cancelAccountStreams(accountId: number) {
|
|
for (const stream of streams.values()) {
|
|
if (stream.accountId === accountId) {
|
|
stream.close("cancelled");
|
|
}
|
|
}
|
|
}
|
|
|
|
export function cancelSessionStreams(sessionId: number) {
|
|
for (const stream of streams.values()) {
|
|
if (stream.sessionId === sessionId) {
|
|
stream.close("cancelled");
|
|
}
|
|
}
|
|
}
|
|
|
|
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
|
|
function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) {
|
|
const cache = encodeEventCache.get(event);
|
|
const cacheEntry = cache?.get(userType);
|
|
if (cacheEntry) {
|
|
return cacheEntry;
|
|
}
|
|
|
|
let data: string;
|
|
if (event.type === "schedule-update") {
|
|
if (!canSeeCrew(userType)) {
|
|
event = {
|
|
id: event.id,
|
|
type: event.type,
|
|
updatedFrom: event.updatedFrom,
|
|
data: filterSchedule(event.data),
|
|
};
|
|
}
|
|
data = JSON.stringify(event);
|
|
} else if (event.type === "user-update") {
|
|
if (
|
|
!canSeeCrew(userType)
|
|
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
|
|
) {
|
|
event = {
|
|
id: event.id,
|
|
type: event.type,
|
|
data: {
|
|
id: event.data.id,
|
|
updatedAt: event.data.updatedAt,
|
|
deleted: true,
|
|
}
|
|
}
|
|
}
|
|
data = JSON.stringify(event);
|
|
} else {
|
|
throw Error(`encodeEvent cannot encode ${event.type} event`);
|
|
}
|
|
|
|
if (cache) {
|
|
cache.set(userType, data);
|
|
} else {
|
|
encodeEventCache.set(event, new Map([[userType, data]]));
|
|
}
|
|
return data;
|
|
}
|
|
|
|
export async function broadcastEvent(event: ApiEvent) {
|
|
const events = readEvents();
|
|
events.push(event);
|
|
writeEvents(events);
|
|
}
|
|
|
|
function sendEventToStream(stream: EventStream, event: ApiEvent) {
|
|
// Session expiry events cause the streams belonging to that session to be terminated
|
|
if (event.type === "session-expired") {
|
|
if (stream.sessionId === event.sessionId) {
|
|
stream.close("session expired");
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// Account events are specially handled and only sent to the user they belong to.
|
|
if (event.type === "account-update") {
|
|
if (stream.accountId === event.data.id) {
|
|
stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// All other events are encoded according to the user access level seeing it.
|
|
const data = encodeEvent(event, stream.userType)
|
|
stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`);
|
|
return true;
|
|
}
|
|
|
|
async function sendEventUpdates() {
|
|
// Cancel streams that need to be rotated.
|
|
const now = Date.now();
|
|
for (const stream of streams.values()) {
|
|
if (stream.rotatesAtMs < now) {
|
|
stream.close("session rotation");
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Send events.
|
|
const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId));
|
|
const events = (readEvents()).filter(e => e.id > skipEventId);
|
|
if (events.length)
|
|
console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`);
|
|
for (const stream of streams.values()) {
|
|
for (const event of events) {
|
|
if (event.id > stream.lastEventId) {
|
|
stream.lastEventId = event.id;
|
|
stream.lastKeepAliveMs = now;
|
|
|
|
if (!sendEventToStream(stream, event)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send Keepalives to streams with no activity.
|
|
for (const stream of streams.values()) {
|
|
if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) {
|
|
stream.write(": keepalive\n");
|
|
stream.lastKeepAliveMs = now;
|
|
}
|
|
}
|
|
}
|