Rename accounts to users to be consistent with the new naming scheme where account only referes to the logged in user of the session and implement live updates of users via a user store which listens for updates from the event stream.
33 lines
1.2 KiB
TypeScript
33 lines
1.2 KiB
TypeScript
import { pipeline } from "node:stream";
|
|
import { addStream, deleteStream } from "~/server/streams";
|
|
import { readUsers } from "~/server/database";
|
|
|
|
export default defineEventHandler(async (event) => {
|
|
const session = await getServerSession(event);
|
|
const accountId = session?.account.id;
|
|
|
|
const encoder = new TextEncoder();
|
|
const source = event.headers.get("x-forwarded-for");
|
|
console.log(`starting event stream for ${source}`)
|
|
const stream = new TransformStream<string, Uint8Array>({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(encoder.encode(chunk));
|
|
},
|
|
flush(controller) {
|
|
console.log(`finished event stream for ${source}`);
|
|
deleteStream(stream.writable);
|
|
},
|
|
// @ts-expect-error experimental API
|
|
cancel(reason) {
|
|
console.log(`cancelled event stream for ${source}`);
|
|
deleteStream(stream.writable);
|
|
}
|
|
})
|
|
addStream(stream.writable, session?.id, accountId);
|
|
|
|
// Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986
|
|
setHeader(event, "Access-Control-Allow-Origin", "*");
|
|
setHeader(event, "Content-Type", "text/event-stream");
|
|
pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ });
|
|
event._handled = true;
|
|
});
|