Compare commits

..

No commits in common. "f9d188b2ba36c4253e36c93d2774bfd1e7fdd1c1" and "0a0eb43d78f9cae9c7ea91e4812256519e06b7e3" have entirely different histories.

23 changed files with 147 additions and 329 deletions

View file

@ -11,10 +11,7 @@
import "~/assets/global.css"; import "~/assets/global.css";
const event = useRequestEvent(); const event = useRequestEvent();
const sessionStore = useSessionStore(); const sessionStore = useSessionStore();
const eventsStore = useEventsStore(); await callOnce("fetch-session", async () => {
const nuxtApp = useNuxtApp();
await callOnce("fetch-globals", async () => {
await sessionStore.fetch(event); await sessionStore.fetch(event);
await nuxtApp.runWithContext(eventsStore.fetchLastEventId);
}) })
</script> </script>

View file

@ -61,7 +61,7 @@ const shifts = computed(() => {
}); });
</script> </script>
<style scoped> <style>
h2 { h2 {
margin-block-start: 0.2rem; margin-block-start: 0.2rem;
} }

View file

@ -35,3 +35,7 @@ async function logIn() {
} }
} }
</script> </script>
<style>
</style>

View file

@ -61,3 +61,7 @@
useEventSource(); useEventSource();
const usersStore = useUsersStore(); const usersStore = useUsersStore();
</script> </script>
<style>
</style>

View file

@ -2,12 +2,12 @@
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no> SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import type { ApiEvent, ApiEventStreamMessage } from "~/shared/types/api"; import type { ApiEvent } from "~/shared/types/api";
interface AppEventMap { interface AppEventMap {
"open": Event, "open": Event,
"message": MessageEvent<ApiEventStreamMessage>, "message": MessageEvent<string>,
"event": MessageEvent<ApiEvent>, "update": MessageEvent<ApiEvent>,
"error": Event, "error": Event,
"close": Event, "close": Event,
} }
@ -18,11 +18,12 @@ class AppEventSource extends EventTarget {
#forwardEvent(type: string) { #forwardEvent(type: string) {
this.#source!.addEventListener(type, event => { this.#source!.addEventListener(type, event => {
console.log("AppEventSource", event.type, event.data); if (type === "open" || type === "message" || type === "error") {
if (type === "open" || type === "error") { console.log("AppEventSource", event.type, event.data);
this.dispatchEvent(new Event(event.type)); this.dispatchEvent(new Event(event.type));
} else if (type === "message") { } else {
const data = event.data ? JSON.parse(event.data) as ApiEventStreamMessage : undefined; const data = event.data ? JSON.parse(event.data) as ApiEvent : undefined;
console.log("AppEventSource", event.type, data);
if (data?.type === "connected") { if (data?.type === "connected") {
this.#sourceSessionId = data.session?.id; this.#sourceSessionId = data.session?.id;
} }
@ -33,27 +34,17 @@ class AppEventSource extends EventTarget {
source: event.source, source: event.source,
ports: [...event.ports], ports: [...event.ports],
})); }));
} else {
const data = event.data ? JSON.parse(event.data) as ApiEvent : undefined;
this.dispatchEvent(new MessageEvent(event.type, {
data,
origin: event.origin,
lastEventId: event.lastEventId,
source: event.source,
ports: [...event.ports],
}));
} }
}); });
} }
open(sessionId: number | undefined, lastEventId: number) { open(sessionId: number | undefined) {
console.log("Opening event source sid:", sessionId); console.log("Opening event source sid:", sessionId);
this.#sourceSessionId = sessionId; this.#sourceSessionId = sessionId;
const query = new URLSearchParams({ lastEventId: String(lastEventId) }); this.#source = new EventSource("/api/events");
this.#source = new EventSource(`/api/events?${query}`);
this.#forwardEvent("open"); this.#forwardEvent("open");
this.#forwardEvent("message"); this.#forwardEvent("message");
this.#forwardEvent("event"); this.#forwardEvent("update");
this.#forwardEvent("error"); this.#forwardEvent("error");
} }
@ -67,20 +58,20 @@ class AppEventSource extends EventTarget {
} }
#connectRefs = 0; #connectRefs = 0;
connect(sessionId: number | undefined, lastEventId: number) { connect(sessionId: number | undefined) {
this.#connectRefs += 1; this.#connectRefs += 1;
if (this.#source && this.#sourceSessionId !== sessionId) { if (this.#source && this.#sourceSessionId !== sessionId) {
this.close(); this.close();
} }
if (!this.#source) { if (!this.#source) {
this.open(sessionId, lastEventId); this.open(sessionId);
} }
} }
reconnect(sessionId: number | undefined, lastEventId: number) { reconnect(sessionId: number | undefined) {
if (this.#source && this.#sourceSessionId !== sessionId) { if (this.#source && this.#sourceSessionId !== sessionId) {
this.close(); this.close();
this.open(sessionId, lastEventId); this.open(sessionId);
} }
} }
@ -122,15 +113,14 @@ export const appEventSource = import.meta.client ? new AppEventSource() : null;
export function useEventSource() { export function useEventSource() {
const sessionStore = useSessionStore(); const sessionStore = useSessionStore();
const eventsStore = useEventsStore();
onMounted(() => { onMounted(() => {
console.log("useEventSource onMounted", sessionStore.id); console.log("useEventSource onMounted", sessionStore.id);
appEventSource!.connect(sessionStore.id, eventsStore.lastEventId); appEventSource!.connect(sessionStore.id);
}) })
watch(() => sessionStore.id, () => { watch(() => sessionStore.id, () => {
console.log("useEventSource sessionStore.id change", sessionStore.id); console.log("useEventSource sessionStore.id change", sessionStore.id);
appEventSource!.reconnect(sessionStore.id, eventsStore.lastEventId); appEventSource!.reconnect(sessionStore.id);
}) })
onUnmounted(() => { onUnmounted(() => {

View file

@ -11,9 +11,3 @@ To update in real time this application sends a `text/event-source` stream using
Upon connecting a `"connect"` event is emitted with the session the connection was made under. This is the primary mechanism a user agent discovers its own session having been rotated into a new one, which also happens when the access level of the account associated with the session changes. Upon connecting a `"connect"` event is emitted with the session the connection was made under. This is the primary mechanism a user agent discovers its own session having been rotated into a new one, which also happens when the access level of the account associated with the session changes.
After the `"connect"` event the user agent will start to receive updates to resources it has access to that has changed. There is no filtering for what resoucres the user agent receives updates for at the moment as there's not enough events to justify the complexity of server-side subscriptions and filtering. After the `"connect"` event the user agent will start to receive updates to resources it has access to that has changed. There is no filtering for what resoucres the user agent receives updates for at the moment as there's not enough events to justify the complexity of server-side subscriptions and filtering.
## Id and order
Events are guaranteed to be delivered in order, and to maintain consistency the server provides the following guarantee: Any entities fetched after receiving a response from `/api/last-event-id` will include updates from all events up to and including the `id` received from the response.
This means that a client can fetch an up to date and live representation of any API entity by first fetching the last event from `/api/last-event-id`, and then in parallel fetch any entities as well as opening the `/api/events` stream with the `lastEventId` query param set to the value received from the `/api/last-event-id` endpoint.

View file

@ -109,3 +109,7 @@ const tabs = [
{ id: "database", title: "Database" }, { id: "database", title: "Database" },
]; ];
</script> </script>
<style>
</style>

View file

@ -84,7 +84,7 @@ const { pending, data, error } = await useFetch(() => `/api/users/${id.value}/de
const userDetails = data as Ref<ApiUserDetails | ApiTombstone>; const userDetails = data as Ref<ApiUserDetails | ApiTombstone>;
</script> </script>
<style scoped> <style>
dl { dl {
display: grid; display: grid;
grid-template-columns: auto 1fr; grid-template-columns: auto 1fr;

View file

@ -2,7 +2,7 @@
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no> SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { nextEventId, readSessions, readUsers, writeSessions, writeUsers } from "~/server/database"; import { readSessions, readUsers, writeSessions, writeUsers } from "~/server/database";
import { apiUserPatchSchema } from "~/shared/types/api"; import { apiUserPatchSchema } from "~/shared/types/api";
import { z } from "zod/v4-mini"; import { z } from "zod/v4-mini";
import { broadcastEvent } from "~/server/streams"; import { broadcastEvent } from "~/server/streams";
@ -54,7 +54,6 @@ export default defineEventHandler(async (event) => {
user.updatedAt = new Date().toISOString(); user.updatedAt = new Date().toISOString();
await writeUsers(users); await writeUsers(users);
broadcastEvent({ broadcastEvent({
id: await nextEventId(),
type: "user-update", type: "user-update",
data: serverUserToApi(user), data: serverUserToApi(user),
}); });
@ -67,7 +66,6 @@ export default defineEventHandler(async (event) => {
if (session.accountId === user.id) { if (session.accountId === user.id) {
session.rotatesAtMs = nowMs; session.rotatesAtMs = nowMs;
broadcastEvent({ broadcastEvent({
id: await nextEventId(),
type: "session-expired", type: "session-expired",
sessionId: session.id, sessionId: session.id,
}); });

View file

@ -5,7 +5,6 @@
import { import {
readUsers, readSessions, readSubscriptions, readUsers, readSessions, readSubscriptions,
writeUsers, writeSessions, writeSubscriptions, writeUsers, writeSessions, writeSubscriptions,
nextEventId,
} from "~/server/database"; } from "~/server/database";
import { broadcastEvent, cancelAccountStreams } from "~/server/streams"; import { broadcastEvent, cancelAccountStreams } from "~/server/streams";
@ -25,7 +24,6 @@ export default defineEventHandler(async (event) => {
) { ) {
session.expiresAtMs = nowMs; session.expiresAtMs = nowMs;
broadcastEvent({ broadcastEvent({
id: await nextEventId(),
type: "session-expired", type: "session-expired",
sessionId: session.id, sessionId: session.id,
}); });
@ -50,7 +48,6 @@ export default defineEventHandler(async (event) => {
account.updatedAt = now; account.updatedAt = now;
await writeUsers(users); await writeUsers(users);
await broadcastEvent({ await broadcastEvent({
id: await nextEventId(),
type: "user-update", type: "user-update",
data: { data: {
id: account.id, id: account.id,

View file

@ -2,7 +2,7 @@
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no> SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { readUsers, writeUsers, nextUserId, type ServerUser, readAuthenticationMethods, nextAuthenticationMethodId, writeAuthenticationMethods, nextEventId } from "~/server/database"; import { readUsers, writeUsers, nextUserId, type ServerUser, readAuthenticationMethods, nextAuthenticationMethodId, writeAuthenticationMethods } from "~/server/database";
import { broadcastEvent } from "~/server/streams"; import { broadcastEvent } from "~/server/streams";
import type { ApiSession } from "~/shared/types/api"; import type { ApiSession } from "~/shared/types/api";
@ -88,7 +88,6 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
users.push(user); users.push(user);
await writeUsers(users); await writeUsers(users);
await broadcastEvent({ await broadcastEvent({
id: await nextEventId(),
type: "user-update", type: "user-update",
data: user, data: user,
}); });

View file

@ -3,46 +3,33 @@
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { pipeline } from "node:stream"; import { pipeline } from "node:stream";
import { createEventStream } from "~/server/streams"; import { addStream, deleteStream } from "~/server/streams";
export default defineEventHandler(async (event) => { export default defineEventHandler(async (event) => {
const session = await getServerSession(event, false); const session = await getServerSession(event, false);
let lastEventId: number | undefined; const encoder = new TextEncoder();
const lastEventIdHeader = event.headers.get("Last-Event-ID"); const source = event.headers.get("x-forwarded-for");
const lastEventIdQuery = getQuery(event)["lastEventId"]; console.log(`starting event stream for ${source}`)
if (lastEventIdHeader) { const stream = new TransformStream<string, Uint8Array>({
if (!/^[0-9]{1,15}$/.test(lastEventIdHeader)) { transform(chunk, controller) {
throw createError({ controller.enqueue(encoder.encode(chunk));
statusCode: 400, },
statusMessage: "Bad Request", flush(controller) {
message: "Malformed Last-Event-ID header", console.log(`finished event stream for ${source}`);
}); deleteStream(stream.writable);
},
// @ts-expect-error experimental API
cancel(reason) {
console.log(`cancelled event stream for ${source}`);
deleteStream(stream.writable);
} }
lastEventId = Number.parseInt(lastEventIdHeader, 10); });
} else if (lastEventIdQuery) { addStream(event, stream.writable, session);
if (typeof lastEventIdQuery !== "string" || !/^[0-9]{1,15}$/.test(lastEventIdQuery)) {
throw createError({
statusCode: 400,
statusMessage: "Bad Request",
message: "Malformed lastEventId",
});
}
lastEventId = Number.parseInt(lastEventIdQuery, 10);
} else {
throw createError({
statusCode: 400,
statusMessage: "Bad Request",
message: "lastEventId is required",
});
}
const source = event.headers.get("x-forwarded-for") ?? "";
const stream = await createEventStream(event, source, lastEventId, session);
// Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986 // Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986
setHeader(event, "Access-Control-Allow-Origin", "*"); setHeader(event, "Access-Control-Allow-Origin", "*");
setHeader(event, "Content-Type", "text/event-stream"); setHeader(event, "Content-Type", "text/event-stream");
pipeline(stream as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ });
event._handled = true; event._handled = true;
}); });

View file

@ -1,11 +0,0 @@
/*
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later
*/
import { readEvents } from "../database";
export default defineEventHandler(async (event) => {
const events = await readEvents();
return events[events.length - 1]?. id ?? 0;
});

View file

@ -3,7 +3,7 @@
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { z } from "zod/v4-mini"; import { z } from "zod/v4-mini";
import { nextEventId, readSchedule, writeSchedule } from "~/server/database"; import { readSchedule, writeSchedule } from "~/server/database";
import { broadcastEvent } from "~/server/streams"; import { broadcastEvent } from "~/server/streams";
import { apiScheduleSchema } from "~/shared/types/api"; import { apiScheduleSchema } from "~/shared/types/api";
import { applyUpdatesToArray } from "~/shared/utils/update"; import { applyUpdatesToArray } from "~/shared/utils/update";
@ -87,7 +87,6 @@ export default defineEventHandler(async (event) => {
await writeSchedule(schedule); await writeSchedule(schedule);
await broadcastEvent({ await broadcastEvent({
id: await nextEventId(),
type: "schedule-update", type: "schedule-update",
updatedFrom, updatedFrom,
data: update, data: update,

View file

@ -3,7 +3,7 @@
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { readFile, unlink, writeFile } from "node:fs/promises"; import { readFile, unlink, writeFile } from "node:fs/promises";
import type { ApiAuthenticationProvider, ApiEvent, ApiSchedule, ApiSubscription, ApiUserType } from "~/shared/types/api"; import type { ApiAuthenticationProvider, ApiSchedule, ApiSubscription, ApiUserType } from "~/shared/types/api";
import type { Id } from "~/shared/types/common"; import type { Id } from "~/shared/types/common";
export interface ServerSession { export interface ServerSession {
@ -50,8 +50,6 @@ const sessionsPath = "data/sessions.json";
const nextSessionIdPath = "data/next-session-id.json"; const nextSessionIdPath = "data/next-session-id.json";
const authMethodPath = "data/auth-method.json"; const authMethodPath = "data/auth-method.json";
const nextAuthenticationMethodIdPath = "data/auth-method-id.json" const nextAuthenticationMethodIdPath = "data/auth-method-id.json"
const nextEventIdPath = "data/next-event-id.json";
const eventsPath = "data/events.json";
async function remove(path: string) { async function remove(path: string) {
try { try {
@ -170,21 +168,3 @@ export async function readAuthenticationMethods() {
export async function writeAuthenticationMethods(authMethods: ServerAuthenticationMethod[]) { export async function writeAuthenticationMethods(authMethods: ServerAuthenticationMethod[]) {
await writeFile(authMethodPath, JSON.stringify(authMethods, undefined, "\t") + "\n", "utf-8"); await writeFile(authMethodPath, JSON.stringify(authMethods, undefined, "\t") + "\n", "utf-8");
} }
export async function nextEventId() {
const nextId = await readJson(nextEventIdPath, 0);
await writeFile(nextEventIdPath, String(nextId + 1), "utf-8");
return nextId;
}
export async function writeNextEventId(nextId: number) {
await writeFile(nextEventIdPath, String(nextId), "utf-8");
}
export async function readEvents() {
return readJson<ApiEvent[]>(eventsPath, [])
}
export async function writeEvents(events: ApiEvent[]) {
await writeFile(eventsPath, JSON.stringify(events, undefined, "\t") + "\n", "utf-8");
}

View file

@ -2,134 +2,82 @@
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no> SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { readEvents, writeEvents, readUsers, type ServerSession } from "~/server/database"; import { readUsers, type ServerSession } from "~/server/database";
import type { ApiAccount, ApiDisconnected, ApiEvent, ApiEventStreamMessage, ApiUserType } from "~/shared/types/api"; import type { ApiAccount, ApiEvent } from "~/shared/types/api";
import { serverSessionToApi } from "./utils/session"; import { serverSessionToApi } from "./utils/session";
import { H3Event } from "h3"; import { H3Event } from "h3";
const keepaliveTimeoutMs = 45e3; function sendMessage(
const eventUpdateTimeMs = 1e3; stream: WritableStream<string>,
message: string,
class EventStream { ) {
write!: (data: string) => void; const writer = stream.getWriter();
close!: (reason?: string) => void; writer.ready
.then(() => writer.write(message))
constructor( .catch(console.error)
public sessionId: number | undefined, .finally(() => writer.releaseLock())
public accountId: number | undefined, ;
public userType: ApiUserType | undefined,
public rotatesAtMs: number ,
public lastKeepAliveMs: number,
public lastEventId: number,
) {
}
} }
export async function createEventStream( function sendMessageAndClose(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => {
writer.write(message);
writer.close();
}).catch(console.error)
.finally(() => writer.releaseLock())
;
}
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number, rotatesAtMs: number }>();
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
export async function addStream(
event: H3Event, event: H3Event,
source: string, stream: WritableStream<string>,
lastEventId: number,
session?: ServerSession, session?: ServerSession,
) { ) {
const runtimeConfig = useRuntimeConfig(event); if (streams.size === 0) {
const now = Date.now(); console.log("Starting keepalive")
const events = (await readEvents()).filter(e => e.id > lastEventId); keepaliveInterval = setInterval(sendKeepalive, 4000)
const users = await readUsers();
const apiSession = session ? await serverSessionToApi(event, session) : undefined;
let userType: ApiAccount["type"] | undefined;
if (session?.accountId !== undefined) {
userType = users.find(a => !a.deleted && a.id === session.accountId)?.type
} }
const stream = new EventStream( const runtimeConfig = useRuntimeConfig(event);
session?.id, streams.set(stream, {
session?.accountId, sessionId: session?.id,
userType, accountId: session?.accountId,
session?.rotatesAtMs ?? now + runtimeConfig.sessionRotatesTimeout * 1000, rotatesAtMs: session?.rotatesAtMs ?? Date.now() + runtimeConfig.sessionRotatesTimeout * 1000,
now,
events[events.length - 1]?.id ?? lastEventId,
);
const readableStream = new ReadableStream<Uint8Array>({
start(controller) {
const encoder = new TextEncoder();
stream.write = (data: string) => {
controller.enqueue(encoder.encode(data));
}
stream.close = (reason?: string) => {
const data: ApiDisconnected = {
type: "disconnect",
reason,
};
stream.write(`data: ${JSON.stringify(data)}\n\n`);
controller.close();
deleteStream(stream);
},
console.log(`Starting event stream for ${source}`)
addStream(stream);
},
cancel(reason) {
console.log(`Cancelled event stream for ${source}:`, reason);
deleteStream(stream);
}
}); });
// Produce a response immediately to avoid the reply waiting for content. // Produce a response immediately to avoid the reply waiting for content.
const update: ApiEventStreamMessage = { const update: ApiEvent = {
type: "connected", type: "connected",
session: apiSession, session: session ? await serverSessionToApi(event, session) : undefined,
}; };
stream.write(`data: ${JSON.stringify(update)}\n\n`); sendMessage(stream, `event: update\ndata: ${JSON.stringify(update)}\n\n`);
/*
Send events since the provided lastEventId
Warning: This have to happen either before addStream(stream) is
called, or as done here synchronously after it. Otherwise there's a
possibility of events being delivered out of order, which will break
the assumption made by the schedule updating logic.
*/
if (events.length)
console.log(`Sending ${events.length} event(s) to ${source}`);
for (const event of events) {
if (!sendEventToStream(stream, event)) {
break;
}
}
return readableStream;
} }
export function deleteStream(stream: WritableStream<string>) {
let updateInterval: ReturnType<typeof setInterval> | null = null
const streams = new Set<EventStream>();
function addStream(
stream: EventStream,
) {
if (streams.size === 0) {
console.log("Starting event updates")
updateInterval = setInterval(sendEventUpdates, eventUpdateTimeMs)
}
streams.add(stream);
}
function deleteStream(stream: EventStream) {
streams.delete(stream); streams.delete(stream);
if (streams.size === 0) { if (streams.size === 0) {
console.log("Ending event updates") console.log("Ending keepalive")
clearInterval(updateInterval!); clearInterval(keepaliveInterval!);
updateInterval = null;
} }
} }
export function cancelAccountStreams(accountId: number) { export function cancelAccountStreams(accountId: number) {
for (const stream of streams.values()) { for (const [stream, data] of streams) {
if (stream.accountId === accountId) { if (data.accountId === accountId) {
stream.close("cancelled"); sendMessageAndClose(stream, `data: cancelled\n\n`);
} }
} }
} }
export function cancelSessionStreams(sessionId: number) { export function cancelSessionStreams(sessionId: number) {
for (const stream of streams.values()) { for (const [stream, data] of streams) {
if (stream.sessionId === sessionId) { if (data.sessionId === sessionId) {
stream.close("cancelled"); sendMessageAndClose(stream, `data: cancelled\n\n`);
} }
} }
} }
@ -146,7 +94,6 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
if (event.type === "schedule-update") { if (event.type === "schedule-update") {
if (!canSeeCrew(userType)) { if (!canSeeCrew(userType)) {
event = { event = {
id: event.id,
type: event.type, type: event.type,
updatedFrom: event.updatedFrom, updatedFrom: event.updatedFrom,
data: filterSchedule(event.data), data: filterSchedule(event.data),
@ -159,7 +106,6 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
|| !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType) || !event.data.deleted && event.data.type === "anonymous" && !canSeeAnonymous(userType)
) { ) {
event = { event = {
id: event.id,
type: event.type, type: event.type,
data: { data: {
id: event.data.id, id: event.data.id,
@ -182,67 +128,44 @@ function encodeEvent(event: ApiEvent, userType: ApiAccount["type"] | undefined)
} }
export async function broadcastEvent(event: ApiEvent) { export async function broadcastEvent(event: ApiEvent) {
const events = await readEvents(); const id = Date.now();
events.push(event); console.log(`broadcasting update to ${streams.size} clients`);
await writeEvents(events); if (!streams.size) {
} return;
}
function sendEventToStream(stream: EventStream, event: ApiEvent) {
// Session expiry events cause the streams belonging to that session to be terminated // Session expiry events cause the streams belonging to that session to be terminated
if (event.type === "session-expired") { if (event.type === "session-expired") {
if (stream.sessionId === event.sessionId) { cancelSessionStreams(event.sessionId);
stream.close("session expired"); return;
}
return false;
} }
// Account events are specially handled and only sent to the user they belong to. const users = await readUsers();
if (event.type === "account-update") { for (const [stream, streamData] of streams) {
if (stream.accountId === event.data.id) { // Account events are specially handled and only sent to the user they belong to.
stream.write(`id: ${event.id}\nevent: event\ndata: ${JSON.stringify(event)}\n\n`); if (event.type === "account-update") {
} if (streamData.accountId === event.data.id) {
return true; sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
}
// All other events are encoded according to the user access level seeing it.
const data = encodeEvent(event, stream.userType)
stream.write(`id: ${event.id}\nevent: event\ndata: ${data}\n\n`);
return true;
}
async function sendEventUpdates() {
// Cancel streams that need to be rotated.
const now = Date.now();
for (const stream of streams.values()) {
if (stream.rotatesAtMs < now) {
stream.close("session rotation");
continue;
}
}
// Send events.
const skipEventId = Math.min(...[...streams.values()].map(s => s.lastEventId));
const events = (await readEvents()).filter(e => e.id > skipEventId);
if (events.length)
console.log(`broadcasting ${events.length} event(s) to ${streams.size} client(s)`);
for (const stream of streams.values()) {
for (const event of events) {
if (event.id > stream.lastEventId) {
stream.lastEventId = event.id;
stream.lastKeepAliveMs = now;
if (!sendEventToStream(stream, event)) {
break;
}
} }
}
}
// Send Keepalives to streams with no activity. } else {
for (const stream of streams.values()) { let userType: ApiAccount["type"] | undefined;
if (stream.lastKeepAliveMs + keepaliveTimeoutMs < now) { if (streamData.accountId !== undefined) {
stream.write(": keepalive\n"); userType = users.find(a => !a.deleted && a.id === streamData.accountId)?.type
stream.lastKeepAliveMs = now; }
const data = encodeEvent(event, userType)
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
}
}
}
function sendKeepalive() {
const now = Date.now();
for (const [stream, streamData] of streams) {
if (streamData.rotatesAtMs > now) {
sendMessage(stream, ": keepalive\n");
} else {
sendMessageAndClose(stream, `data: cancelled\n\n`);
} }
} }
} }

View file

@ -2,7 +2,7 @@
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no> SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later SPDX-License-Identifier: AGPL-3.0-or-later
*/ */
import { nextEventId, readSchedule, type ServerUser, writeSchedule } from '~/server/database'; import { readSchedule, type ServerUser, writeSchedule } from '~/server/database';
import { broadcastEvent } from '~/server/streams'; import { broadcastEvent } from '~/server/streams';
import type { ApiSchedule, ApiTombstone } from '~/shared/types/api'; import type { ApiSchedule, ApiTombstone } from '~/shared/types/api';
@ -58,7 +58,6 @@ export async function updateScheduleInterestedCounts(users: ServerUser[]) {
schedule.updatedAt = updatedFrom; schedule.updatedAt = updatedFrom;
await writeSchedule(schedule); await writeSchedule(schedule);
await broadcastEvent({ await broadcastEvent({
id: await nextEventId(),
type: "schedule-update", type: "schedule-update",
updatedFrom, updatedFrom,
data: update, data: update,

View file

@ -4,7 +4,6 @@
*/ */
import type { H3Event } from "h3"; import type { H3Event } from "h3";
import { import {
nextEventId,
nextSessionId, nextSessionId,
readSessions, readSessions,
readSubscriptions, readSubscriptions,
@ -35,7 +34,6 @@ async function clearServerSessionInternal(event: H3Event, sessions: ServerSessio
if (session) { if (session) {
session.expiresAtMs = Date.now(); session.expiresAtMs = Date.now();
broadcastEvent({ broadcastEvent({
id: await nextEventId(),
type: "session-expired", type: "session-expired",
sessionId, sessionId,
}); });

View file

@ -155,26 +155,27 @@ export interface ApiUserDetails {
} }
export interface ApiAccountUpdate { export interface ApiAccountUpdate {
id: Id,
type: "account-update", type: "account-update",
data: ApiAccount, data: ApiAccount,
} }
export interface ApiConnected {
type: "connected",
session?: ApiSession,
}
export interface ApiScheduleUpdate { export interface ApiScheduleUpdate {
id: Id,
type: "schedule-update", type: "schedule-update",
updatedFrom?: string, updatedFrom?: string,
data: ApiSchedule | ApiTombstone, data: ApiSchedule | ApiTombstone,
} }
export interface ApiSessionExpired { export interface ApiSessionExpired {
id: Id,
type: "session-expired", type: "session-expired",
sessionId: Id, sessionId: Id,
} }
export interface ApiUserUpdate { export interface ApiUserUpdate {
id: Id,
type: "user-update", type: "user-update",
updatedFrom?: string, updatedFrom?: string,
data: ApiUser | ApiTombstone, data: ApiUser | ApiTombstone,
@ -182,22 +183,8 @@ export interface ApiUserUpdate {
export type ApiEvent = export type ApiEvent =
| ApiAccountUpdate | ApiAccountUpdate
| ApiConnected
| ApiScheduleUpdate | ApiScheduleUpdate
| ApiSessionExpired | ApiSessionExpired
| ApiUserUpdate | ApiUserUpdate
; ;
export interface ApiConnected {
type: "connected",
session?: ApiSession,
}
export interface ApiDisconnected {
type: "disconnect",
reason?: string,
}
export type ApiEventStreamMessage =
| ApiConnected
| ApiDisconnected
;

View file

@ -1,31 +0,0 @@
/*
SPDX-FileCopyrightText: © 2025 Hornwitser <code@hornwitser.no>
SPDX-License-Identifier: AGPL-3.0-or-later
*/
export const useEventsStore = defineStore("events", () => {
const state = {
lastEventId: ref(0),
};
const getters = {
}
const actions = {
async fetchLastEventId() {
const requestFetch = useRequestFetch();
state.lastEventId.value = await requestFetch("/api/last-event-id");
}
}
appEventSource?.addEventListener("event", (event) => {
if (event.data.id !== undefined) {
state.lastEventId.value = event.data.id
return;
}
});
return {
...state,
...getters,
...actions,
};
});

View file

@ -96,7 +96,7 @@ export const useSchedulesStore = defineStore("schedules", () => {
} }
}) })
appEventSource?.addEventListener("event", (event) => { appEventSource?.addEventListener("update", (event) => {
if (event.data.type !== "schedule-update") { if (event.data.type !== "schedule-update") {
return; return;
} }

View file

@ -57,7 +57,7 @@ export const useSessionStore = defineStore("session", () => {
}, },
}; };
appEventSource?.addEventListener("message", (event) => { appEventSource?.addEventListener("update", (event) => {
if (event.data.type !== "connected") { if (event.data.type !== "connected") {
return; return;
} }

View file

@ -75,7 +75,7 @@ export const useUsersStore = defineStore("users", () => {
}, },
} }
appEventSource?.addEventListener("event", (event) => { appEventSource?.addEventListener("update", (event) => {
if (event.data.type !== "user-update") { if (event.data.type !== "user-update") {
return; return;
} }