diff --git a/server/api/account.delete.ts b/server/api/account.delete.ts index 9530b6c..1968681 100644 --- a/server/api/account.delete.ts +++ b/server/api/account.delete.ts @@ -2,6 +2,7 @@ import { readAccounts, readSessions, readSubscriptions, writeAccounts, writeSessions, writeSubscriptions, } from "~/server/database"; +import { cancelAccountStreams } from "~/server/streams"; export default defineEventHandler(async (event) => { const accountSession = await requireAccountSession(event); @@ -24,6 +25,7 @@ export default defineEventHandler(async (event) => { } return true; }); + cancelAccountStreams(accountSession.accountId); await writeSessions(sessions); await deleteCookie(event, "session"); diff --git a/server/api/auth/session.delete.ts b/server/api/auth/session.delete.ts index 565745c..93a6db2 100644 --- a/server/api/auth/session.delete.ts +++ b/server/api/auth/session.delete.ts @@ -1,4 +1,5 @@ import { readAccounts } from "~/server/database"; +import { cancelSessionStreams } from "~/server/streams"; export default defineEventHandler(async (event) => { const session = await getAccountSession(event); @@ -15,5 +16,8 @@ export default defineEventHandler(async (event) => { } } + if (session) { + cancelSessionStreams(session.id); + } await clearAccountSession(event); }) diff --git a/server/api/create-event.post.ts b/server/api/create-event.post.ts index 079ae06..db3bdbe 100644 --- a/server/api/create-event.post.ts +++ b/server/api/create-event.post.ts @@ -25,7 +25,7 @@ export default defineEventHandler(async (event) => { } ] }); - broadcastUpdate(schedule); + await broadcastUpdate(schedule); await writeSchedule(schedule); await sendPush("New event", `${name} will start at ${start}`); }); diff --git a/server/api/delete-event.post.ts b/server/api/delete-event.post.ts index d6ddc62..193d5d3 100644 --- a/server/api/delete-event.post.ts +++ b/server/api/delete-event.post.ts @@ -11,6 +11,6 @@ export default defineEventHandler(async (event) => { throw Error("No such event"); } schedule.events.splice(index, 1); - broadcastUpdate(schedule); + await broadcastUpdate(schedule); await writeSchedule(schedule); }); diff --git a/server/api/events.ts b/server/api/events.ts index 372cb11..54184b1 100644 --- a/server/api/events.ts +++ b/server/api/events.ts @@ -1,7 +1,15 @@ import { pipeline } from "node:stream"; import { addStream, deleteStream } from "~/server/streams"; +import { readAccounts } from "~/server/database"; export default defineEventHandler(async (event) => { + const session = await getAccountSession(event); + let accountId: number | undefined; + if (session) { + const accounts = await readAccounts() + accountId = accounts.find(account => account.id === session.accountId)?.id; + } + const encoder = new TextEncoder(); const source = event.headers.get("x-forwarded-for"); console.log(`starting event stream for ${source}`) @@ -19,12 +27,11 @@ export default defineEventHandler(async (event) => { deleteStream(stream.writable); } }) - addStream(stream.writable); + addStream(stream.writable, session?.id, accountId); // Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986 setHeader(event, "Access-Control-Allow-Origin", "*"); setHeader(event, "Content-Type", "text/event-stream"); - event.node.res.write("data: connected\n\n"); // Produce a response immediately to avoid the reply waiting for content. pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); event._handled = true; }); diff --git a/server/api/modify-event.post.ts b/server/api/modify-event.post.ts index 985f5a5..abceecb 100644 --- a/server/api/modify-event.post.ts +++ b/server/api/modify-event.post.ts @@ -30,7 +30,7 @@ export default defineEventHandler(async (event) => { } ] }; - broadcastUpdate(schedule); + await broadcastUpdate(schedule); await writeSchedule(schedule); if (timeChanged) await sendPush(`New time for ${name}`, `${name} will now start at ${start}`); diff --git a/server/api/schedule.ts b/server/api/schedule.ts index cf4630c..5c73166 100644 --- a/server/api/schedule.ts +++ b/server/api/schedule.ts @@ -1,5 +1,14 @@ -import { readSchedule } from "~/server/database"; +import { readAccounts, readSchedule } from "~/server/database"; +import { Account } from "~/shared/types/account"; +import { canSeeCrew } from "../utils/schedule"; export default defineEventHandler(async (event) => { - return await readSchedule(); + const session = await getAccountSession(event); + let account: Account | undefined; + if (session) { + const accounts = await readAccounts() + account = accounts.find(account => account.id === session.accountId); + } + const schedule = await readSchedule(); + return canSeeCrew(account?.type) ? schedule : filterSchedule(schedule); }) diff --git a/server/streams.ts b/server/streams.ts index aed361e..2d10837 100644 --- a/server/streams.ts +++ b/server/streams.ts @@ -1,4 +1,6 @@ import { Schedule } from "~/shared/types/schedule" +import { readAccounts } from "~/server/database"; +import { canSeeCrew } from "./utils/schedule"; function sendMessage( stream: WritableStream, @@ -12,15 +14,32 @@ function sendMessage( ; } -const streams = new Set>(); +function sendMessageAndClose( + stream: WritableStream, + message: string, +) { + const writer = stream.getWriter(); + writer.ready + .then(() => { + writer.write(message); + writer.close(); + }).catch(console.error) + .finally(() => writer.releaseLock()) + ; +} + +const streams = new Map, { sessionId?: number, accountId?: number }>(); let keepaliveInterval: ReturnType | null = null -export function addStream(stream: WritableStream) { +export function addStream(stream: WritableStream, sessionId?: number, accountId?: number) { if (streams.size === 0) { console.log("Starting keepalive") keepaliveInterval = setInterval(sendKeepalive, 4000) } - streams.add(stream); + streams.set(stream, { sessionId, accountId }); + // Produce a response immediately to avoid the reply waiting for content. + const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`; + sendMessage(stream, message); } export function deleteStream(stream: WritableStream) { streams.delete(stream); @@ -30,18 +49,43 @@ export function deleteStream(stream: WritableStream) { } } -export function broadcastUpdate(schedule: Schedule) { +export function cancelAccountStreams(accountId: number) { + for (const [stream, data] of streams) { + if (data.accountId === accountId) { + sendMessageAndClose(stream, `data: cancelled\n\n`); + } + } +} + +export function cancelSessionStreams(sessionId: number) { + for (const [stream, data] of streams) { + if (data.sessionId === sessionId) { + sendMessageAndClose(stream, `data: cancelled\n\n`); + } + } +} + +export async function broadcastUpdate(schedule: Schedule) { const id = Date.now(); - const data = JSON.stringify(schedule); - const message = `id: ${id}\nevent: update\ndata: ${data}\n\n` - console.log(`broadcasting update from ${process.pid} to ${streams.size} clients`); - for (const stream of streams) { + console.log(`broadcasting update to ${streams.size} clients`); + if (!streams.size) { + return; + } + const accounts = await readAccounts(); + const filteredSchedule = filterSchedule(schedule); + for (const [stream, streamData] of streams) { + let accountType: string | undefined; + if (streamData.accountId !== undefined) { + accountType = accounts.find(a => a.id === streamData.accountId)?.type + } + const data = JSON.stringify(canSeeCrew(accountType) ? schedule : filteredSchedule); + const message = `id: ${id}\nevent: update\ndata: ${data}\n\n` sendMessage(stream, message); } } function sendKeepalive() { - for (const stream of streams) { + for (const stream of streams.keys()) { sendMessage(stream, "data: keepalive\n\n"); } } diff --git a/server/utils/schedule.ts b/server/utils/schedule.ts index 0653c1d..cfc78e7 100644 --- a/server/utils/schedule.ts +++ b/server/utils/schedule.ts @@ -1,4 +1,5 @@ import { Account } from '~/shared/types/account'; +import { Schedule } from '~/shared/types/schedule'; import { readSchedule, writeSchedule } from '~/server/database'; import { broadcastUpdate } from '~/server/streams'; @@ -17,5 +18,17 @@ export async function updateScheduleInterestedCounts(accounts: Account[]) { } } await writeSchedule(schedule); - broadcastUpdate(schedule); + await broadcastUpdate(schedule); +} + +export function canSeeCrew(accountType: string | undefined) { + return accountType === "crew" || accountType === "admin"; +} + +/** Filters out crew visible only parts of schedule */ +export function filterSchedule(schedule: Schedule): Schedule { + return { + locations: schedule.locations, + events: schedule.events.filter(event => !event.crew), + } }