Filter crew events to only be visible for crew

This commit is contained in:
Hornwitser 2025-03-10 16:26:52 +01:00
parent 13f344472e
commit 4806343250
9 changed files with 96 additions and 17 deletions

View file

@ -2,6 +2,7 @@ import {
readAccounts, readSessions, readSubscriptions, readAccounts, readSessions, readSubscriptions,
writeAccounts, writeSessions, writeSubscriptions, writeAccounts, writeSessions, writeSubscriptions,
} from "~/server/database"; } from "~/server/database";
import { cancelAccountStreams } from "~/server/streams";
export default defineEventHandler(async (event) => { export default defineEventHandler(async (event) => {
const accountSession = await requireAccountSession(event); const accountSession = await requireAccountSession(event);
@ -24,6 +25,7 @@ export default defineEventHandler(async (event) => {
} }
return true; return true;
}); });
cancelAccountStreams(accountSession.accountId);
await writeSessions(sessions); await writeSessions(sessions);
await deleteCookie(event, "session"); await deleteCookie(event, "session");

View file

@ -1,4 +1,5 @@
import { readAccounts } from "~/server/database"; import { readAccounts } from "~/server/database";
import { cancelSessionStreams } from "~/server/streams";
export default defineEventHandler(async (event) => { export default defineEventHandler(async (event) => {
const session = await getAccountSession(event); const session = await getAccountSession(event);
@ -15,5 +16,8 @@ export default defineEventHandler(async (event) => {
} }
} }
if (session) {
cancelSessionStreams(session.id);
}
await clearAccountSession(event); await clearAccountSession(event);
}) })

View file

@ -25,7 +25,7 @@ export default defineEventHandler(async (event) => {
} }
] ]
}); });
broadcastUpdate(schedule); await broadcastUpdate(schedule);
await writeSchedule(schedule); await writeSchedule(schedule);
await sendPush("New event", `${name} will start at ${start}`); await sendPush("New event", `${name} will start at ${start}`);
}); });

View file

@ -11,6 +11,6 @@ export default defineEventHandler(async (event) => {
throw Error("No such event"); throw Error("No such event");
} }
schedule.events.splice(index, 1); schedule.events.splice(index, 1);
broadcastUpdate(schedule); await broadcastUpdate(schedule);
await writeSchedule(schedule); await writeSchedule(schedule);
}); });

View file

@ -1,7 +1,15 @@
import { pipeline } from "node:stream"; import { pipeline } from "node:stream";
import { addStream, deleteStream } from "~/server/streams"; import { addStream, deleteStream } from "~/server/streams";
import { readAccounts } from "~/server/database";
export default defineEventHandler(async (event) => { export default defineEventHandler(async (event) => {
const session = await getAccountSession(event);
let accountId: number | undefined;
if (session) {
const accounts = await readAccounts()
accountId = accounts.find(account => account.id === session.accountId)?.id;
}
const encoder = new TextEncoder(); const encoder = new TextEncoder();
const source = event.headers.get("x-forwarded-for"); const source = event.headers.get("x-forwarded-for");
console.log(`starting event stream for ${source}`) console.log(`starting event stream for ${source}`)
@ -19,12 +27,11 @@ export default defineEventHandler(async (event) => {
deleteStream(stream.writable); deleteStream(stream.writable);
} }
}) })
addStream(stream.writable); addStream(stream.writable, session?.id, accountId);
// Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986 // Workaround to properly handle stream errors. See https://github.com/unjs/h3/issues/986
setHeader(event, "Access-Control-Allow-Origin", "*"); setHeader(event, "Access-Control-Allow-Origin", "*");
setHeader(event, "Content-Type", "text/event-stream"); setHeader(event, "Content-Type", "text/event-stream");
event.node.res.write("data: connected\n\n"); // Produce a response immediately to avoid the reply waiting for content.
pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ }); pipeline(stream.readable as unknown as NodeJS.ReadableStream, event.node.res, (err) => { /* ignore */ });
event._handled = true; event._handled = true;
}); });

View file

@ -30,7 +30,7 @@ export default defineEventHandler(async (event) => {
} }
] ]
}; };
broadcastUpdate(schedule); await broadcastUpdate(schedule);
await writeSchedule(schedule); await writeSchedule(schedule);
if (timeChanged) if (timeChanged)
await sendPush(`New time for ${name}`, `${name} will now start at ${start}`); await sendPush(`New time for ${name}`, `${name} will now start at ${start}`);

View file

@ -1,5 +1,14 @@
import { readSchedule } from "~/server/database"; import { readAccounts, readSchedule } from "~/server/database";
import { Account } from "~/shared/types/account";
import { canSeeCrew } from "../utils/schedule";
export default defineEventHandler(async (event) => { export default defineEventHandler(async (event) => {
return await readSchedule(); const session = await getAccountSession(event);
let account: Account | undefined;
if (session) {
const accounts = await readAccounts()
account = accounts.find(account => account.id === session.accountId);
}
const schedule = await readSchedule();
return canSeeCrew(account?.type) ? schedule : filterSchedule(schedule);
}) })

View file

@ -1,4 +1,6 @@
import { Schedule } from "~/shared/types/schedule" import { Schedule } from "~/shared/types/schedule"
import { readAccounts } from "~/server/database";
import { canSeeCrew } from "./utils/schedule";
function sendMessage( function sendMessage(
stream: WritableStream<string>, stream: WritableStream<string>,
@ -12,15 +14,32 @@ function sendMessage(
; ;
} }
const streams = new Set<WritableStream<string>>(); function sendMessageAndClose(
stream: WritableStream<string>,
message: string,
) {
const writer = stream.getWriter();
writer.ready
.then(() => {
writer.write(message);
writer.close();
}).catch(console.error)
.finally(() => writer.releaseLock())
;
}
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number }>();
let keepaliveInterval: ReturnType<typeof setInterval> | null = null let keepaliveInterval: ReturnType<typeof setInterval> | null = null
export function addStream(stream: WritableStream<string>) { export function addStream(stream: WritableStream<string>, sessionId?: number, accountId?: number) {
if (streams.size === 0) { if (streams.size === 0) {
console.log("Starting keepalive") console.log("Starting keepalive")
keepaliveInterval = setInterval(sendKeepalive, 4000) keepaliveInterval = setInterval(sendKeepalive, 4000)
} }
streams.add(stream); streams.set(stream, { sessionId, accountId });
// Produce a response immediately to avoid the reply waiting for content.
const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`;
sendMessage(stream, message);
} }
export function deleteStream(stream: WritableStream<string>) { export function deleteStream(stream: WritableStream<string>) {
streams.delete(stream); streams.delete(stream);
@ -30,18 +49,43 @@ export function deleteStream(stream: WritableStream<string>) {
} }
} }
export function broadcastUpdate(schedule: Schedule) { export function cancelAccountStreams(accountId: number) {
for (const [stream, data] of streams) {
if (data.accountId === accountId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
export function cancelSessionStreams(sessionId: number) {
for (const [stream, data] of streams) {
if (data.sessionId === sessionId) {
sendMessageAndClose(stream, `data: cancelled\n\n`);
}
}
}
export async function broadcastUpdate(schedule: Schedule) {
const id = Date.now(); const id = Date.now();
const data = JSON.stringify(schedule); console.log(`broadcasting update to ${streams.size} clients`);
if (!streams.size) {
return;
}
const accounts = await readAccounts();
const filteredSchedule = filterSchedule(schedule);
for (const [stream, streamData] of streams) {
let accountType: string | undefined;
if (streamData.accountId !== undefined) {
accountType = accounts.find(a => a.id === streamData.accountId)?.type
}
const data = JSON.stringify(canSeeCrew(accountType) ? schedule : filteredSchedule);
const message = `id: ${id}\nevent: update\ndata: ${data}\n\n` const message = `id: ${id}\nevent: update\ndata: ${data}\n\n`
console.log(`broadcasting update from ${process.pid} to ${streams.size} clients`);
for (const stream of streams) {
sendMessage(stream, message); sendMessage(stream, message);
} }
} }
function sendKeepalive() { function sendKeepalive() {
for (const stream of streams) { for (const stream of streams.keys()) {
sendMessage(stream, "data: keepalive\n\n"); sendMessage(stream, "data: keepalive\n\n");
} }
} }

View file

@ -1,4 +1,5 @@
import { Account } from '~/shared/types/account'; import { Account } from '~/shared/types/account';
import { Schedule } from '~/shared/types/schedule';
import { readSchedule, writeSchedule } from '~/server/database'; import { readSchedule, writeSchedule } from '~/server/database';
import { broadcastUpdate } from '~/server/streams'; import { broadcastUpdate } from '~/server/streams';
@ -17,5 +18,17 @@ export async function updateScheduleInterestedCounts(accounts: Account[]) {
} }
} }
await writeSchedule(schedule); await writeSchedule(schedule);
broadcastUpdate(schedule); await broadcastUpdate(schedule);
}
export function canSeeCrew(accountType: string | undefined) {
return accountType === "crew" || accountType === "admin";
}
/** Filters out crew visible only parts of schedule */
export function filterSchedule(schedule: Schedule): Schedule {
return {
locations: schedule.locations,
events: schedule.events.filter(event => !event.crew),
}
} }