From 753da6d3d4e958e818b1aad357a4e040bf3ef17a Mon Sep 17 00:00:00 2001 From: Hornwitser Date: Sat, 20 Sep 2025 20:11:58 +0200 Subject: [PATCH 1/3] Refactor to persist and reliably deliver events Store events that are to be broadcasted in the database, and fetch events to serve in the /api/event stream to the client from the database. This ensures that events are not lost if the operation to open the stream takes longer than usual, or the client was not connected at the time the event was broadcast. To ensure no events are lost in the transition from server generating the page to the client hydrating and establishing a connection with the event stream, the /api/last-event-id endpoint is first queried on the server before any other entities is fetched from the database. The client then passes this id when establishing the event stream, and receives all events greater than that id. --- app.vue | 5 +- composables/event-source.ts | 44 +++--- docs/dev/server-sent-events.md | 6 + server/api/admin/user.patch.ts | 4 +- server/api/auth/account.delete.ts | 3 + server/api/auth/account.post.ts | 3 +- server/api/events.ts | 51 ++++--- server/api/last-event-id.ts | 11 ++ server/api/schedule.patch.ts | 3 +- server/database.ts | 22 ++- server/streams.ts | 239 ++++++++++++++++++++---------- server/utils/schedule.ts | 3 +- server/utils/session.ts | 2 + shared/types/api.ts | 25 +++- stores/events.ts | 31 ++++ stores/schedules.ts | 2 +- stores/session.ts | 2 +- stores/users.ts | 2 +- 18 files changed, 326 insertions(+), 132 deletions(-) create mode 100644 server/api/last-event-id.ts create mode 100644 stores/events.ts diff --git a/app.vue b/app.vue index a8c7190..afa34bb 100644 --- a/app.vue +++ b/app.vue @@ -11,7 +11,10 @@ import "~/assets/global.css"; const event = useRequestEvent(); const sessionStore = useSessionStore(); -await callOnce("fetch-session", async () => { +const eventsStore = useEventsStore(); +const nuxtApp = useNuxtApp(); +await callOnce("fetch-globals", async () => { await sessionStore.fetch(event); + await nuxtApp.runWithContext(eventsStore.fetchLastEventId); }) diff --git a/composables/event-source.ts b/composables/event-source.ts index 838b879..0e06b17 100644 --- a/composables/event-source.ts +++ b/composables/event-source.ts @@ -2,12 +2,12 @@ SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ -import type { ApiEvent } from "~/shared/types/api"; +import type { ApiEvent, ApiEventStreamMessage } from "~/shared/types/api"; interface AppEventMap { "open": Event, - "message": MessageEvent, - "update": MessageEvent, + "message": MessageEvent, + "event": MessageEvent, "error": Event, "close": Event, } @@ -18,12 +18,11 @@ class AppEventSource extends EventTarget { #forwardEvent(type: string) { this.#source!.addEventListener(type, event => { - if (type === "open" || type === "message" || type === "error") { - console.log("AppEventSource", event.type, event.data); + console.log("AppEventSource", event.type, event.data); + if (type === "open" || type === "error") { this.dispatchEvent(new Event(event.type)); - } else { - const data = event.data ? JSON.parse(event.data) as ApiEvent : undefined; - console.log("AppEventSource", event.type, data); + } else if (type === "message") { + const data = event.data ? JSON.parse(event.data) as ApiEventStreamMessage : undefined; if (data?.type === "connected") { this.#sourceSessionId = data.session?.id; } @@ -34,17 +33,27 @@ class AppEventSource extends EventTarget { source: event.source, ports: [...event.ports], })); + } else { + const data = event.data ? JSON.parse(event.data) as ApiEvent : undefined; + this.dispatchEvent(new MessageEvent(event.type, { + data, + origin: event.origin, + lastEventId: event.lastEventId, + source: event.source, + ports: [...event.ports], + })); } }); } - open(sessionId: number | undefined) { + open(sessionId: number | undefined, lastEventId: number) { console.log("Opening event source sid:", sessionId); this.#sourceSessionId = sessionId; - this.#source = new EventSource("/api/events"); + const query = new URLSearchParams({ lastEventId: String(lastEventId) }); + this.#source = new EventSource(`/api/events?${query}`); this.#forwardEvent("open"); this.#forwardEvent("message"); - this.#forwardEvent("update"); + this.#forwardEvent("event"); this.#forwardEvent("error"); } @@ -58,20 +67,20 @@ class AppEventSource extends EventTarget { } #connectRefs = 0; - connect(sessionId: number | undefined) { + connect(sessionId: number | undefined, lastEventId: number) { this.#connectRefs += 1; if (this.#source && this.#sourceSessionId !== sessionId) { this.close(); } if (!this.#source) { - this.open(sessionId); + this.open(sessionId, lastEventId); } } - reconnect(sessionId: number | undefined) { + reconnect(sessionId: number | undefined, lastEventId: number) { if (this.#source && this.#sourceSessionId !== sessionId) { this.close(); - this.open(sessionId); + this.open(sessionId, lastEventId); } } @@ -113,14 +122,15 @@ export const appEventSource = import.meta.client ? new AppEventSource() : null; export function useEventSource() { const sessionStore = useSessionStore(); + const eventsStore = useEventsStore(); onMounted(() => { console.log("useEventSource onMounted", sessionStore.id); - appEventSource!.connect(sessionStore.id); + appEventSource!.connect(sessionStore.id, eventsStore.lastEventId); }) watch(() => sessionStore.id, () => { console.log("useEventSource sessionStore.id change", sessionStore.id); - appEventSource!.reconnect(sessionStore.id); + appEventSource!.reconnect(sessionStore.id, eventsStore.lastEventId); }) onUnmounted(() => { diff --git a/docs/dev/server-sent-events.md b/docs/dev/server-sent-events.md index 82ff99b..d510c1e 100644 --- a/docs/dev/server-sent-events.md +++ b/docs/dev/server-sent-events.md @@ -11,3 +11,9 @@ To update in real time this application sends a `text/event-source` stream using Upon connecting a `"connect"` event is emitted with the session the connection was made under. This is the primary mechanism a user agent discovers its own session having been rotated into a new one, which also happens when the access level of the account associated with the session changes. After the `"connect"` event the user agent will start to receive updates to resources it has access to that has changed. There is no filtering for what resoucres the user agent receives updates for at the moment as there's not enough events to justify the complexity of server-side subscriptions and filtering. + +## Id and order + +Events are guaranteed to be delivered in order, and to maintain consistency the server provides the following guarantee: Any entities fetched after receiving a response from `/api/last-event-id` will include updates from all events up to and including the `id` received from the response. + +This means that a client can fetch an up to date and live representation of any API entity by first fetching the last event from `/api/last-event-id`, and then in parallel fetch any entities as well as opening the `/api/events` stream with the `lastEventId` query param set to the value received from the `/api/last-event-id` endpoint. diff --git a/server/api/admin/user.patch.ts b/server/api/admin/user.patch.ts index 25c559f..6f1c9a7 100644 --- a/server/api/admin/user.patch.ts +++ b/server/api/admin/user.patch.ts @@ -2,7 +2,7 @@ SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ -import { readSessions, readUsers, writeSessions, writeUsers } from "~/server/database"; +import { nextEventId, readSessions, readUsers, writeSessions, writeUsers } from "~/server/database"; import { apiUserPatchSchema } from "~/shared/types/api"; import { z } from "zod/v4-mini"; import { broadcastEvent } from "~/server/streams"; @@ -54,6 +54,7 @@ export default defineEventHandler(async (event) => { user.updatedAt = new Date().toISOString(); await writeUsers(users); broadcastEvent({ + id: await nextEventId(), type: "user-update", data: serverUserToApi(user), }); @@ -66,6 +67,7 @@ export default defineEventHandler(async (event) => { if (session.accountId === user.id) { session.rotatesAtMs = nowMs; broadcastEvent({ + id: await nextEventId(), type: "session-expired", sessionId: session.id, }); diff --git a/server/api/auth/account.delete.ts b/server/api/auth/account.delete.ts index 2e66d14..37a8770 100644 --- a/server/api/auth/account.delete.ts +++ b/server/api/auth/account.delete.ts @@ -5,6 +5,7 @@ import { readUsers, readSessions, readSubscriptions, writeUsers, writeSessions, writeSubscriptions, + nextEventId, } from "~/server/database"; import { broadcastEvent, cancelAccountStreams } from "~/server/streams"; @@ -24,6 +25,7 @@ export default defineEventHandler(async (event) => { ) { session.expiresAtMs = nowMs; broadcastEvent({ + id: await nextEventId(), type: "session-expired", sessionId: session.id, }); @@ -48,6 +50,7 @@ export default defineEventHandler(async (event) => { account.updatedAt = now; await writeUsers(users); await broadcastEvent({ + id: await nextEventId(), type: "user-update", data: { id: account.id, diff --git a/server/api/auth/account.post.ts b/server/api/auth/account.post.ts index e0b3be8..52ef502 100644 --- a/server/api/auth/account.post.ts +++ b/server/api/auth/account.post.ts @@ -2,7 +2,7 @@ SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ -import { readUsers, writeUsers, nextUserId, type ServerUser, readAuthenticationMethods, nextAuthenticationMethodId, writeAuthenticationMethods } from "~/server/database"; +import { readUsers, writeUsers, nextUserId, type ServerUser, readAuthenticationMethods, nextAuthenticationMethodId, writeAuthenticationMethods, nextEventId } from "~/server/database"; import { broadcastEvent } from "~/server/streams"; import type { ApiSession } from "~/shared/types/api"; @@ -88,6 +88,7 @@ export default defineEventHandler(async (event): Promise => { users.push(user); await writeUsers(users); await broadcastEvent({ + id: await nextEventId(), type: "user-update", data: user, }); diff --git a/server/api/events.ts b/server/api/events.ts index 12e1f3b..4415550 100644 --- a/server/api/events.ts +++ b/server/api/events.ts @@ -3,33 +3,46 @@ SPDX-License-Identifier: AGPL-3.0-or-later */ import { pipeline } from "node:stream"; -import { addStream, deleteStream } from "~/server/streams"; +import { createEventStream } from "~/server/streams"; export default defineEventHandler(async (event) => { const session = await getServerSession(event, false); - const encoder = new TextEncoder(); - const source = event.headers.get("x-forwarded-for"); - console.log(`starting event stream for ${source}`) - const stream = new TransformStream({ - transform(chunk, controller) { - controller.enqueue(encoder.encode(chunk)); - }, - flush(controller) { - console.log(`finished event stream for ${source}`); - deleteStream(stream.writable); - }, - // @ts-expect-error experimental API - cancel(reason) { - console.log(`cancelled event stream for ${source}`); - deleteStream(stream.writable); + let lastEventId: number | undefined; + const lastEventIdHeader = event.headers.get("Last-Event-ID"); + const lastEventIdQuery = getQuery(event)["lastEventId"]; + if (lastEventIdHeader) { + if (!/^[0-9]{1,15}$/.test(lastEventIdHeader)) { + throw createError({ + statusCode: 400, + statusMessage: "Bad Request", + message: "Malformed Last-Event-ID header", + }); } - }); - addStream(event, stream.writable, session); + lastEventId = Number.parseInt(lastEventIdHeader, 10); + } else if (lastEventIdQuery) { + if (typeof lastEventIdQuery !== "string" || !/^[0-9]{1,15}$/.test(lastEventIdQuery)) { + throw createError({ + statusCode: 400, + statusMessage: "Bad Request", + message: "Malformed lastEventId", + }); + } + lastEventId = Number.parseInt(lastEventIdQuery, 10); + } else { + throw createError({ + statusCode: 400, + statusMessage: "Bad Request", + message: "lastEventId is required", + }); + } + + const source = event.headers.get("x-forwarded-for") ?? ""; + const stream = await createEventStream(event, source, lastEventId, session); // Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986 setHeader(event, "Access-Control-Allow-Origin", "*"); setHeader(event, "Content-Type", "text/event-stream"); - pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); + pipeline(stream as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); event._handled = true; }); diff --git a/server/api/last-event-id.ts b/server/api/last-event-id.ts new file mode 100644 index 0000000..6fd39ac --- /dev/null +++ b/server/api/last-event-id.ts @@ -0,0 +1,11 @@ +/* + SPDX-FileCopyrightText: © 2025 Hornwitser + SPDX-License-Identifier: AGPL-3.0-or-later +*/ + +import { readEvents } from "../database"; + +export default defineEventHandler(async (event) => { + const events = await readEvents(); + return events[events.length - 1]?. id ?? 0; +}); diff --git a/server/api/schedule.patch.ts b/server/api/schedule.patch.ts index 49d365b..517b6f0 100644 --- a/server/api/schedule.patch.ts +++ b/server/api/schedule.patch.ts @@ -3,7 +3,7 @@ SPDX-License-Identifier: AGPL-3.0-or-later */ import { z } from "zod/v4-mini"; -import { readSchedule, writeSchedule } from "~/server/database"; +import { nextEventId, readSchedule, writeSchedule } from "~/server/database"; import { broadcastEvent } from "~/server/streams"; import { apiScheduleSchema } from "~/shared/types/api"; import { applyUpdatesToArray } from "~/shared/utils/update"; @@ -87,6 +87,7 @@ export default defineEventHandler(async (event) => { await writeSchedule(schedule); await broadcastEvent({ + id: await nextEventId(), type: "schedule-update", updatedFrom, data: update, diff --git a/server/database.ts b/server/database.ts index db5226a..7bba61b 100644 --- a/server/database.ts +++ b/server/database.ts @@ -3,7 +3,7 @@ SPDX-License-Identifier: AGPL-3.0-or-later */ import { readFile, unlink, writeFile } from "node:fs/promises"; -import type { ApiAuthenticationProvider, ApiSchedule, ApiSubscription, ApiUserType } from "~/shared/types/api"; +import type { ApiAuthenticationProvider, ApiEvent, ApiSchedule, ApiSubscription, ApiUserType } from "~/shared/types/api"; import type { Id } from "~/shared/types/common"; export interface ServerSession { @@ -50,6 +50,8 @@ const sessionsPath = "data/sessions.json"; const nextSessionIdPath = "data/next-session-id.json"; const authMethodPath = "data/auth-method.json"; const nextAuthenticationMethodIdPath = "data/auth-method-id.json" +const nextEventIdPath = "data/next-event-id.json"; +const eventsPath = "data/events.json"; async function remove(path: string) { try { @@ -168,3 +170,21 @@ export async function readAuthenticationMethods() { export async function writeAuthenticationMethods(authMethods: ServerAuthenticationMethod[]) { await writeFile(authMethodPath, JSON.stringify(authMethods, undefined, "\t") + "\n", "utf-8"); } + +export async function nextEventId() { + const nextId = await readJson(nextEventIdPath, 0); + await writeFile(nextEventIdPath, String(nextId + 1), "utf-8"); + return nextId; +} + +export async function writeNextEventId(nextId: number) { + await writeFile(nextEventIdPath, String(nextId), "utf-8"); +} + +export async function readEvents() { + return readJson(eventsPath, []) +} + +export async function writeEvents(events: ApiEvent[]) { + await writeFile(eventsPath, JSON.stringify(events, undefined, "\t") + "\n", "utf-8"); +} diff --git a/server/streams.ts b/server/streams.ts index 111c9a2..bc2dfa0 100644 --- a/server/streams.ts +++ b/server/streams.ts @@ -2,82 +2,134 @@ SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ -import { readUsers, type ServerSession } from "~/server/database"; -import type { ApiAccount, ApiEvent } from "~/shared/types/api"; +import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database"; +import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api"; import { serverSessionToApi } from "./utils/session"; import { H3Event } from "h3"; -function sendMessage( - stream: WritableStream, - message: string, -) { - const writer = stream.getWriter(); - writer.ready - .then(() => writer.write(message)) - .catch(console.error) - .finally(() => writer.releaseLock()) - ; +const keepaliveTimeoutMs = 45e3; +const eventUpdateTimeMs = 1e3; + +class EventStream { + write!: (data: string) => void; + close!: (reason?: string) => void; + + constructor( + public sessionId: number | undefined, + public accountId: number | undefined, + public userType: ApiUserType | undefined, + public rotatesAtMs: number , + public lastKeepAliveMs: number, + public lastEventId: number, + ) { + } } -function sendMessageAndClose( - stream: WritableStream, - message: string, -) { - const writer = stream.getWriter(); - writer.ready - .then(() => { - writer.write(message); - writer.close(); - }).catch(console.error) - .finally(() => writer.releaseLock()) - ; -} - -const streams = new Map, { sessionId?: number, accountId?: number, rotatesAtMs: number }>(); - -let keepaliveInterval: ReturnType | null = null -export async function addStream( +export async function createEventStream( event: H3Event, - stream: WritableStream, + source: string, + lastEventId: number, session?: ServerSession, ) { - if (streams.size === 0) { - console.log("Starting keepalive") - keepaliveInterval = setInterval(sendKeepalive, 4000) - } const runtimeConfig = useRuntimeConfig(event); - streams.set(stream, { - sessionId: session?.id, - accountId: session?.accountId, - rotatesAtMs: session?.rotatesAtMs ?? Date.now() + runtimeConfig.sessionRotatesTimeout * 1000, + const now = Date.now(); + const events = (await readEvents()).filter(e => e.id > lastEventId); + const users = await readUsers(); + const apiSession = session ? await serverSessionToApi(event, session) : undefined; + let userType: ApiAccount["type"] | undefined; + if (session?.accountId !== undefined) { + userType = users.find(a => !a.deleted && a.id === session.accountId)?.type + } + const stream = new EventStream( + session?.id, + session?.accountId, + userType, + session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000, + now, + events[events.length - 1]?.id ?? lastEventId, + ); + + const readableStream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + stream.write = (data: string) => { + controller.enqueue(encoder.encode(data)); + } + stream.close = (reason?: string) => { + const data: ApiDisconnected = { + type: "disconnect", + reason, + }; + stream.write(`data: ${JSON.stringify(data)}\n\n`); + controller.close(); + deleteStream(stream); + }, + console.log(`Starting event stream for ${source}`) + addStream(stream); + }, + cancel(reason) { + console.log(`Cancelled event stream for ${source}:`, reason); + deleteStream(stream); + } }); // Produce a response immediately to avoid the reply waiting for content. - const update: ApiEvent = { + const update: ApiEventStreamMessage = { type: "connected", - session: session ? await serverSessionToApi(event, session) : undefined, + session: apiSession, }; - sendMessage(stream, `event: update\ndata: ${JSON.stringify(update)}\n\n`); + stream.write(`data: ${JSON.stringify(update)}\n\n`); + + /* + Send events since the provided lastEventId + + Warning: This have to happen either before addStream(stream) is + called, or as done here synchronously after it. Otherwise there's a + possibility of events being delivered out of order, which will break + the assumption made by the schedule updating logic. + */ + if (events.length) + console.log(`Sending ${events.length} event(s) to ${source}`); + for (const event of events) { + if (!sendEventToStream(stream, event)) { + break; + } + } + + return readableStream; } -export function deleteStream(stream: WritableStream) { + +let updateInterval: ReturnType | null = null +const streams = new Set(); +function addStream( + stream: EventStream, +) { + if (streams.size === 0) { + console.log("Starting event updates") + updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs) + } + streams.add(stream); +} +function deleteStream(stream: EventStream) { streams.delete(stream); if (streams.size === 0) { - console.log("Ending keepalive") - clearInterval(keepaliveInterval!); + console.log("Ending event updates") + clearInterval(updateInterval!); + updateInterval = null; } } export function cancelAccountStreams(accountId: number) { - for (const [stream, data] of streams) { - if (data.accountId === accountId) { - sendMessageAndClose(stream, `data: cancelled\n\n`); + for (const stream of streams.values()) { + if (stream.accountId === accountId) { + stream.close("cancelled"); } } } export function cancelSessionStreams(sessionId: number) { - for (const [stream, data] of streams) { - if (data.sessionId === sessionId) { - sendMessageAndClose(stream, `data: cancelled\n\n`); + for (const stream of streams.values()) { + if (stream.sessionId === sessionId) { + stream.close("cancelled"); } } } @@ -94,6 +146,7 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) if (event.type === "schedule-update") { if (!canSeeCrew(userType)) { event = { + id: event.id, type: event.type, updatedFrom: event.updatedFrom, data: filterSchedule(event.data), @@ -106,6 +159,7 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) || !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType) ) { event = { + id: event.id, type: event.type, data: { id: event.data.id, @@ -128,44 +182,67 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined) } export async function broadcastEvent(event: ApiEvent) { - const id = Date.now(); - console.log(`broadcasting update to ${streams.size} clients`); - if (!streams.size) { - return; - } + const events = await readEvents(); + events.push(event); + await writeEvents(events); +} +function sendEventToStream(stream: EventStream, event: ApiEvent) { // Session expiry events cause the streams belonging to that session to be terminated if (event.type === "session-expired") { - cancelSessionStreams(event.sessionId); - return; - } - - const users = await readUsers(); - for (const [stream, streamData] of streams) { - // Account events are specially handled and only sent to the user they belong to. - if (event.type === "account-update") { - if (streamData.accountId === event.data.id) { - sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`); - } - - } else { - let userType: ApiAccount["type"] | undefined; - if (streamData.accountId !== undefined) { - userType = users.find(a => !a.deleted && a.id === streamData.accountId)?.type - } - const data = encodeEvent(event, userType) - sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`); + if (stream.sessionId === event.sessionId) { + stream.close("session expired"); } + return false; } + + // Account events are specially handled and only sent to the user they belong to. + if (event.type === "account-update") { + if (stream.accountId === event.data.id) { + stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`); + } + return true; + } + + // All other events are encoded according to the user access level seeing it. + const data = encodeEvent(event, stream.userType) + stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`); + return true; } -function sendKeepalive() { +async function sendEventUpdates() { + // Cancel streams that need to be rotated. const now = Date.now(); - for (const [stream, streamData] of streams) { - if (streamData.rotatesAtMs > now) { - sendMessage(stream, ": keepalive\n"); - } else { - sendMessageAndClose(stream, `data: cancelled\n\n`); + for (const stream of streams.values()) { + if (stream.rotatesAtMs < now) { + stream.close("session rotation"); + continue; + } + } + + // Send events. + const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId)); + const events = (await readEvents()).filter(e => e.id > skipEventId); + if (events.length) + console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`); + for (const stream of streams.values()) { + for (const event of events) { + if (event.id > stream.lastEventId) { + stream.lastEventId = event.id; + stream.lastKeepAliveMs = now; + + if (!sendEventToStream(stream, event)) { + break; + } + } + } + } + + // Send Keepalives to streams with no activity. + for (const stream of streams.values()) { + if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) { + stream.write(": keepalive\n"); + stream.lastKeepAliveMs = now; } } } diff --git a/server/utils/schedule.ts b/server/utils/schedule.ts index e0e606a..c42960a 100644 --- a/server/utils/schedule.ts +++ b/server/utils/schedule.ts @@ -2,7 +2,7 @@ SPDX-FileCopyrightText: © 2025 Hornwitser SPDX-License-Identifier: AGPL-3.0-or-later */ -import { readSchedule, type ServerUser, writeSchedule } from '~/server/database'; +import { nextEventId, readSchedule, type ServerUser, writeSchedule } from '~/server/database'; import { broadcastEvent } from '~/server/streams'; import type { ApiSchedule, ApiTombstone } from '~/shared/types/api'; @@ -58,6 +58,7 @@ export async function updateScheduleInterestedCounts(users: ServerUser[]) { schedule.updatedAt = updatedFrom; await writeSchedule(schedule); await broadcastEvent({ + id: await nextEventId(), type: "schedule-update", updatedFrom, data: update, diff --git a/server/utils/session.ts b/server/utils/session.ts index dfba536..c605406 100644 --- a/server/utils/session.ts +++ b/server/utils/session.ts @@ -4,6 +4,7 @@ */ import type { H3Event } from "h3"; import { + nextEventId, nextSessionId, readSessions, readSubscriptions, @@ -34,6 +35,7 @@ async function clearServerSessionInternal(event: H3Event, sessions: ServerSessio if (session) { session.expiresAtMs = Date.now(); broadcastEvent({ + id: await nextEventId(), type: "session-expired", sessionId, }); diff --git a/shared/types/api.ts b/shared/types/api.ts index b72ccb6..89355a9 100644 --- a/shared/types/api.ts +++ b/shared/types/api.ts @@ -155,27 +155,26 @@ export interface ApiUserDetails { } export interface ApiAccountUpdate { + id: Id, type: "account-update", data: ApiAccount, } -export interface ApiConnected { - type: "connected", - session?: ApiSession, -} - export interface ApiScheduleUpdate { + id: Id, type: "schedule-update", updatedFrom?: string, data: ApiSchedule | ApiTombstone, } export interface ApiSessionExpired { + id: Id, type: "session-expired", sessionId: Id, } export interface ApiUserUpdate { + id: Id, type: "user-update", updatedFrom?: string, data: ApiUser | ApiTombstone, @@ -183,8 +182,22 @@ export interface ApiUserUpdate { export type ApiEvent = | ApiAccountUpdate - | ApiConnected | ApiScheduleUpdate | ApiSessionExpired | ApiUserUpdate ; + +export interface ApiConnected { + type: "connected", + session?: ApiSession, +} + +export interface ApiDisconnected { + type: "disconnect", + reason?: string, +} + +export type ApiEventStreamMessage = + | ApiConnected + | ApiDisconnected +; diff --git a/stores/events.ts b/stores/events.ts new file mode 100644 index 0000000..f02ee20 --- /dev/null +++ b/stores/events.ts @@ -0,0 +1,31 @@ +/* + SPDX-FileCopyrightText: © 2025 Hornwitser + SPDX-License-Identifier: AGPL-3.0-or-later +*/ + +export const useEventsStore = defineStore("events", () => { + const state = { + lastEventId: ref(0), + }; + const getters = { + } + const actions = { + async fetchLastEventId() { + const requestFetch = useRequestFetch(); + state.lastEventId.value = await requestFetch("/api/last-event-id"); + } + } + + appEventSource?.addEventListener("event", (event) => { + if (event.data.id !== undefined) { + state.lastEventId.value = event.data.id + return; + } + }); + + return { + ...state, + ...getters, + ...actions, + }; +}); diff --git a/stores/schedules.ts b/stores/schedules.ts index 9d0a097..03c903e 100644 --- a/stores/schedules.ts +++ b/stores/schedules.ts @@ -96,7 +96,7 @@ export const useSchedulesStore = defineStore("schedules", () => { } }) - appEventSource?.addEventListener("update", (event) => { + appEventSource?.addEventListener("event", (event) => { if (event.data.type !== "schedule-update") { return; } diff --git a/stores/session.ts b/stores/session.ts index 380390b..882a6cb 100644 --- a/stores/session.ts +++ b/stores/session.ts @@ -57,7 +57,7 @@ export const useSessionStore = defineStore("session", () => { }, }; - appEventSource?.addEventListener("update", (event) => { + appEventSource?.addEventListener("message", (event) => { if (event.data.type !== "connected") { return; } diff --git a/stores/users.ts b/stores/users.ts index 57ae4f3..fdd78a3 100644 --- a/stores/users.ts +++ b/stores/users.ts @@ -75,7 +75,7 @@ export const useUsersStore = defineStore("users", () => { }, } - appEventSource?.addEventListener("update", (event) => { + appEventSource?.addEventListener("event", (event) => { if (event.data.type !== "user-update") { return; } From 0083696343086f8d64f792511c089e9aeb13fd26 Mon Sep 17 00:00:00 2001 From: Hornwitser Date: Sat, 20 Sep 2025 20:43:11 +0200 Subject: [PATCH 2/3] Fix unscoped CSS leaking out The missing scoped attribute cause h2 headers to no longer have the expected top margin. Fix by adding the intended scope attribute. --- components/DiffSchedule.vue | 2 +- pages/admin/users/[id].vue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/components/DiffSchedule.vue b/components/DiffSchedule.vue index 3175d5a..5bc2541 100644 --- a/components/DiffSchedule.vue +++ b/components/DiffSchedule.vue @@ -61,7 +61,7 @@ const shifts = computed(() => { }); - diff --git a/components/TableUsers.vue b/components/TableUsers.vue index 789f902..7021206 100644 --- a/components/TableUsers.vue +++ b/components/TableUsers.vue @@ -61,7 +61,3 @@ useEventSource(); const usersStore = useUsersStore(); - - diff --git a/pages/admin/index.vue b/pages/admin/index.vue index 116a189..86a1b5f 100644 --- a/pages/admin/index.vue +++ b/pages/admin/index.vue @@ -109,7 +109,3 @@ const tabs = [ { id: "database", title: "Database" }, ]; - -