import { pipeline } from "node:stream"; import { addStream, deleteStream } from "~/server/streams"; import { readAccounts } from "~/server/database"; export default defineEventHandler(async (event) => { const session = await getServerSession(event); let accountId: number | undefined; if (session) { const accounts = await readAccounts() accountId = accounts.find(account => account.id === session.accountId)?.id; } const encoder = new TextEncoder(); const source = event.headers.get("x-forwarded-for"); console.log(`starting event stream for ${source}`) const stream = new TransformStream({ transform(chunk, controller) { controller.enqueue(encoder.encode(chunk)); }, flush(controller) { console.log(`finished event stream for ${source}`); deleteStream(stream.writable); }, // @ts-expect-error experimental API cancel(reason) { console.log(`cancelled event stream for ${source}`); deleteStream(stream.writable); } }) addStream(stream.writable, session?.id, accountId); // Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986 setHeader(event, "Access-Control-Allow-Origin", "*"); setHeader(event, "Content-Type", "text/event-stream"); pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); event._handled = true; });