Refactor to persist and reliably deliver events
Store events that are to be broadcasted in the database, and fetch events to serve in the /api/event stream to the client from the database. This ensures that events are not lost if the operation to open the stream takes longer than usual, or the client was not connected at the time the event was broadcast. To ensure no events are lost in the transition from server generating the page to the client hydrating and establishing a connection with the event stream, the /api/last-event-id endpoint is first queried on the server before any other entities is fetched from the database. The client then passes this id when establishing the event stream, and receives all events greater than that id.
This commit is contained in:
parent
0a0eb43d78
commit
753da6d3d4
18 changed files with 326 additions and 132 deletions
|
@ -2,82 +2,134 @@
|
|||
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
import { readUsers, type ServerSession } from "~/server/database";
|
||||
import type { ApiAccount, ApiEvent } from "~/shared/types/api";
|
||||
import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database";
|
||||
import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api";
|
||||
import { serverSessionToApi } from "./utils/session";
|
||||
import { H3Event } from "h3";
|
||||
|
||||
function sendMessage(
|
||||
stream: WritableStream<string>,
|
||||
message: string,
|
||||
) {
|
||||
const writer = stream.getWriter();
|
||||
writer.ready
|
||||
.then(() => writer.write(message))
|
||||
.catch(console.error)
|
||||
.finally(() => writer.releaseLock())
|
||||
;
|
||||
const keepaliveTimeoutMs = 45e3;
|
||||
const eventUpdateTimeMs = 1e3;
|
||||
|
||||
class EventStream {
|
||||
write!: (data: string) => void;
|
||||
close!: (reason?: string) => void;
|
||||
|
||||
constructor(
|
||||
public sessionId: number | undefined,
|
||||
public accountId: number | undefined,
|
||||
public userType: ApiUserType | undefined,
|
||||
public rotatesAtMs: number ,
|
||||
public lastKeepAliveMs: number,
|
||||
public lastEventId: number,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
function sendMessageAndClose(
|
||||
stream: WritableStream<string>,
|
||||
message: string,
|
||||
) {
|
||||
const writer = stream.getWriter();
|
||||
writer.ready
|
||||
.then(() => {
|
||||
writer.write(message);
|
||||
writer.close();
|
||||
}).catch(console.error)
|
||||
.finally(() => writer.releaseLock())
|
||||
;
|
||||
}
|
||||
|
||||
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number, rotatesAtMs: number }>();
|
||||
|
||||
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
|
||||
export async function addStream(
|
||||
export async function createEventStream(
|
||||
event: H3Event,
|
||||
stream: WritableStream<string>,
|
||||
source: string,
|
||||
lastEventId: number,
|
||||
session?: ServerSession,
|
||||
) {
|
||||
if (streams.size === 0) {
|
||||
console.log("Starting keepalive")
|
||||
keepaliveInterval = setInterval(sendKeepalive, 4000)
|
||||
}
|
||||
const runtimeConfig = useRuntimeConfig(event);
|
||||
streams.set(stream, {
|
||||
sessionId: session?.id,
|
||||
accountId: session?.accountId,
|
||||
rotatesAtMs: session?.rotatesAtMs ?? Date.now() + runtimeConfig.sessionRotatesTimeout * 1000,
|
||||
const now = Date.now();
|
||||
const events = (await readEvents()).filter(e => e.id > lastEventId);
|
||||
const users = await readUsers();
|
||||
const apiSession = session ? await serverSessionToApi(event, session) : undefined;
|
||||
let userType: ApiAccount["type"] | undefined;
|
||||
if (session?.accountId !== undefined) {
|
||||
userType = users.find(a => !a.deleted && a.id === session.accountId)?.type
|
||||
}
|
||||
const stream = new EventStream(
|
||||
session?.id,
|
||||
session?.accountId,
|
||||
userType,
|
||||
session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000,
|
||||
now,
|
||||
events[events.length - 1]?.id ?? lastEventId,
|
||||
);
|
||||
|
||||
const readableStream = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
const encoder = new TextEncoder();
|
||||
stream.write = (data: string) => {
|
||||
controller.enqueue(encoder.encode(data));
|
||||
}
|
||||
stream.close = (reason?: string) => {
|
||||
const data: ApiDisconnected = {
|
||||
type: "disconnect",
|
||||
reason,
|
||||
};
|
||||
stream.write(`data: ${JSON.stringify(data)}\n\n`);
|
||||
controller.close();
|
||||
deleteStream(stream);
|
||||
},
|
||||
console.log(`Starting event stream for ${source}`)
|
||||
addStream(stream);
|
||||
},
|
||||
cancel(reason) {
|
||||
console.log(`Cancelled event stream for ${source}:`, reason);
|
||||
deleteStream(stream);
|
||||
}
|
||||
});
|
||||
// Produce a response immediately to avoid the reply waiting for content.
|
||||
const update: ApiEvent = {
|
||||
const update: ApiEventStreamMessage = {
|
||||
type: "connected",
|
||||
session: session ? await serverSessionToApi(event, session) : undefined,
|
||||
session: apiSession,
|
||||
};
|
||||
sendMessage(stream, `event: update\ndata: ${JSON.stringify(update)}\n\n`);
|
||||
stream.write(`data: ${JSON.stringify(update)}\n\n`);
|
||||
|
||||
/*
|
||||
Send events since the provided lastEventId
|
||||
|
||||
Warning: This have to happen either before addStream(stream) is
|
||||
called, or as done here synchronously after it. Otherwise there's a
|
||||
possibility of events being delivered out of order, which will break
|
||||
the assumption made by the schedule updating logic.
|
||||
*/
|
||||
if (events.length)
|
||||
console.log(`Sending ${events.length} event(s) to ${source}`);
|
||||
for (const event of events) {
|
||||
if (!sendEventToStream(stream, event)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return readableStream;
|
||||
}
|
||||
export function deleteStream(stream: WritableStream<string>) {
|
||||
|
||||
let updateInterval: ReturnType<typeof setInterval> | null = null
|
||||
const streams = new Set<EventStream>();
|
||||
function addStream(
|
||||
stream: EventStream,
|
||||
) {
|
||||
if (streams.size === 0) {
|
||||
console.log("Starting event updates")
|
||||
updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs)
|
||||
}
|
||||
streams.add(stream);
|
||||
}
|
||||
function deleteStream(stream: EventStream) {
|
||||
streams.delete(stream);
|
||||
if (streams.size === 0) {
|
||||
console.log("Ending keepalive")
|
||||
clearInterval(keepaliveInterval!);
|
||||
console.log("Ending event updates")
|
||||
clearInterval(updateInterval!);
|
||||
updateInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
export function cancelAccountStreams(accountId: number) {
|
||||
for (const [stream, data] of streams) {
|
||||
if (data.accountId === accountId) {
|
||||
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
||||
for (const stream of streams.values()) {
|
||||
if (stream.accountId === accountId) {
|
||||
stream.close("cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function cancelSessionStreams(sessionId: number) {
|
||||
for (const [stream, data] of streams) {
|
||||
if (data.sessionId === sessionId) {
|
||||
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
||||
for (const stream of streams.values()) {
|
||||
if (stream.sessionId === sessionId) {
|
||||
stream.close("cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -94,6 +146,7 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
|
|||
if (event.type === "schedule-update") {
|
||||
if (!canSeeCrew(userType)) {
|
||||
event = {
|
||||
id: event.id,
|
||||
type: event.type,
|
||||
updatedFrom: event.updatedFrom,
|
||||
data: filterSchedule(event.data),
|
||||
|
@ -106,6 +159,7 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
|
|||
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
|
||||
) {
|
||||
event = {
|
||||
id: event.id,
|
||||
type: event.type,
|
||||
data: {
|
||||
id: event.data.id,
|
||||
|
@ -128,44 +182,67 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
|
|||
}
|
||||
|
||||
export async function broadcastEvent(event: ApiEvent) {
|
||||
const id = Date.now();
|
||||
console.log(`broadcasting update to ${streams.size} clients`);
|
||||
if (!streams.size) {
|
||||
return;
|
||||
}
|
||||
const events = await readEvents();
|
||||
events.push(event);
|
||||
await writeEvents(events);
|
||||
}
|
||||
|
||||
function sendEventToStream(stream: EventStream, event: ApiEvent) {
|
||||
// Session expiry events cause the streams belonging to that session to be terminated
|
||||
if (event.type === "session-expired") {
|
||||
cancelSessionStreams(event.sessionId);
|
||||
return;
|
||||
}
|
||||
|
||||
const users = await readUsers();
|
||||
for (const [stream, streamData] of streams) {
|
||||
// Account events are specially handled and only sent to the user they belong to.
|
||||
if (event.type === "account-update") {
|
||||
if (streamData.accountId === event.data.id) {
|
||||
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
}
|
||||
|
||||
} else {
|
||||
let userType: ApiAccount["type"] | undefined;
|
||||
if (streamData.accountId !== undefined) {
|
||||
userType = users.find(a => !a.deleted && a.id === streamData.accountId)?.type
|
||||
}
|
||||
const data = encodeEvent(event, userType)
|
||||
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
|
||||
if (stream.sessionId === event.sessionId) {
|
||||
stream.close("session expired");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Account events are specially handled and only sent to the user they belong to.
|
||||
if (event.type === "account-update") {
|
||||
if (stream.accountId === event.data.id) {
|
||||
stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// All other events are encoded according to the user access level seeing it.
|
||||
const data = encodeEvent(event, stream.userType)
|
||||
stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`);
|
||||
return true;
|
||||
}
|
||||
|
||||
function sendKeepalive() {
|
||||
async function sendEventUpdates() {
|
||||
// Cancel streams that need to be rotated.
|
||||
const now = Date.now();
|
||||
for (const [stream, streamData] of streams) {
|
||||
if (streamData.rotatesAtMs > now) {
|
||||
sendMessage(stream, ": keepalive\n");
|
||||
} else {
|
||||
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
||||
for (const stream of streams.values()) {
|
||||
if (stream.rotatesAtMs < now) {
|
||||
stream.close("session rotation");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Send events.
|
||||
const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId));
|
||||
const events = (await readEvents()).filter(e => e.id > skipEventId);
|
||||
if (events.length)
|
||||
console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`);
|
||||
for (const stream of streams.values()) {
|
||||
for (const event of events) {
|
||||
if (event.id > stream.lastEventId) {
|
||||
stream.lastEventId = event.id;
|
||||
stream.lastKeepAliveMs = now;
|
||||
|
||||
if (!sendEventToStream(stream, event)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send Keepalives to streams with no activity.
|
||||
for (const stream of streams.values()) {
|
||||
if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) {
|
||||
stream.write(": keepalive\n");
|
||||
stream.lastKeepAliveMs = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue