diff --git a/composables/event-source.ts b/composables/event-source.ts new file mode 100644 index 0000000..924a99c --- /dev/null +++ b/composables/event-source.ts @@ -0,0 +1,125 @@ +import type { Schedule } from "~/shared/types/schedule"; + +interface AppEventMap { + "open": Event, + "message": MessageEvent, + "update": MessageEvent, + "error": Event, + "close": Event, +} + +class AppEventSource extends EventTarget { + #source: EventSource | null = null; + #sourceSessionId: number | undefined = undefined; + + #forwardEvent(type: string) { + this.#source!.addEventListener(type, event => { + if (type === "open" || type === "message" || type === "error") { + console.log("AppEventSource", event.type, event.data); + this.dispatchEvent(new Event(event.type)); + } else { + const data = event.data ? JSON.parse(event.data) : undefined; + console.log("AppEventSource", event.type, data); + this.dispatchEvent(new MessageEvent(event.type, { + data, + origin: event.origin, + lastEventId: event.lastEventId, + source: event.source, + ports: [...event.ports], + })); + } + }); + } + + open(sessionId: number | undefined) { + console.log("Opening event source sid:", sessionId); + this.#sourceSessionId = sessionId; + this.#source = new EventSource("/api/events"); + this.#forwardEvent("open"); + this.#forwardEvent("message"); + this.#forwardEvent("update"); + this.#forwardEvent("error"); + } + + close() { + console.log("Closing event source sid:", this.#sourceSessionId); + this.#source!.close(); + this.#source = null; + this.#sourceSessionId = undefined; + console.log("AppEventSource", "close"); + this.dispatchEvent(new Event("close")); + } + + #connectRefs = 0; + connect(sessionId: number | undefined) { + this.#connectRefs += 1; + if (this.#source && this.#sourceSessionId !== sessionId) { + this.close(); + } + if (!this.#source) { + this.open(sessionId); + } + } + + reconnect(sessionId: number | undefined) { + if (this.#source && this.#sourceSessionId !== sessionId) { + this.close(); + this.open(sessionId); + } + } + + disconnect() { + if (this.#connectRefs === 0) { + throw Error("Connection reference count already zero"); + } + this.#connectRefs -= 1; + if (this.#connectRefs === 0 && this.#source) { + this.close(); + } + } + + override addEventListener( + type: K, + listener: (this: AppEventSource, ev: AppEventMap[K]) => any, + options?: boolean | AddEventListenerOptions + ) { + super.addEventListener(type, listener as (ev: Event) => void, options); + } + + override dispatchEvent( + event: AppEventMap[K], + ) { + return super.dispatchEvent(event); + } + + override removeEventListener( + type: K, + listener: (this: AppEventSource, ev: AppEventMap[K]) => any, + options?: boolean | EventListenerOptions + ) { + return super.removeEventListener(type, listener as (ev: Event) => void, options); + } +} + +// The event source exists only on the client. +export const appEventSource = import.meta.client ? new AppEventSource() : null; + +export function useEventSource() { + const sessionStore = useSessionStore(); + onMounted(() => { + console.log("useEventSource onMounted", sessionStore.id); + appEventSource!.connect(sessionStore.id); + }) + + watch(() => sessionStore.id, () => { + console.log("useEventSource sessionStore.id change", sessionStore.id); + appEventSource!.reconnect(sessionStore.id); + }) + + onUnmounted(() => { + console.log("useEventSource onUnmounted"); + appEventSource!.disconnect(); + }); + + return appEventSource; +} diff --git a/composables/schedule.ts b/composables/schedule.ts index 813a45c..497eb88 100644 --- a/composables/schedule.ts +++ b/composables/schedule.ts @@ -1,75 +1,8 @@ import type { Schedule } from '~/shared/types/schedule'; -let source: EventSource | null = null; -let sourceRefs = 0; -let sourceSessionId: number | undefined = undefined; export const useSchedule = () => { - const sessionStore = useSessionStore(); - const requestFetch = useRequestFetch(); - const asyncData = useAsyncData( - 'schedule', - () => requestFetch("/api/schedule"), - { default: () => ({ events: [], locations: [] }) }, - ) - const { data: schedule } = asyncData; - - function connect() { - console.log("Opening event source sid:", sessionStore.id); - sourceSessionId = sessionStore.id; - source = new EventSource("/api/events"); - source.addEventListener("message", (message) => { - console.log("Message", message.data); - }); - source.addEventListener("update", (message) => { - const updatedSchedule: Schedule = JSON.parse(message.data); - console.log("Update", updatedSchedule); - schedule.value = updatedSchedule; - }); - } - - function disconnect() { - console.log("Closing event source") - source!.close(); - source = null; - } - - onMounted(() => { - console.log("useSchedule onMounted", sourceRefs); - sourceRefs += 1; - if (sourceRefs !== 1) { - console.log("Event source already open"); - return; - } - connect(); - }) - - watch(() => sessionStore.id, () => { - if (sourceSessionId === sessionStore.id) { - return; - } - sourceSessionId = sessionStore.id; - console.log("Session changed, refetching schedule") - $fetch("/api/schedule").then( - data => { schedule.value = data; }, - err => { console.error(err); schedule.value = { locations: [], events: []}}, - ) - if (source && sourceRefs > 0) { - console.log("Restarting event stream") - disconnect(); - connect(); - } - }) - - onUnmounted(() => { - console.log("useSchedule onUnmounted", sourceRefs); - sourceRefs -= 1; - if (source && sourceRefs === 0) { - disconnect(); - } - if (sourceRefs < 0) { - throw Error("Source reference count below zero"); - } - }); - - return asyncData.then(({ data }) => data); + useEventSource(); + const schedulesStore = useSchedulesStore(); + schedulesStore.activeScheduleId = 111; + return schedulesStore.fetch(111); } diff --git a/stores/schedules.ts b/stores/schedules.ts new file mode 100644 index 0000000..f7d8ab1 --- /dev/null +++ b/stores/schedules.ts @@ -0,0 +1,92 @@ +import type { Schedule } from "~/shared/types/schedule"; + +interface SyncOperation { + controller: AbortController, + promise: Promise>, +} + +export const useSchedulesStore = defineStore("schedules", () => { + const sessionStore = useSessionStore(); + + const state = { + activeScheduleId: ref(111), + schedules: ref>>(new Map()), + pendingSyncs: ref>(new Map()), + }; + + const getters = { + activeSchedule: computed(() => { + if (state.activeScheduleId.value === undefined) + throw Error("No active schedule"); + const schedule = state.schedules.value.get(state.activeScheduleId.value); + if (!schedule) + throw Error("Active schedule has not been fetched"); + return schedule; + }), + }; + + const actions = { + async fetch(id: number) { + if (id !== 111) { throw Error("invalid id"); } + console.log("schedules store fetch", id); + const schedule = state.schedules.value.get(id); + if (schedule) { + console.log("return cached"); + return schedule; + } + const pending = state.pendingSyncs.value.get(id); + if (pending) { + console.log("return pending"); + return pending.promise; + } + + console.log("return new fetch"); + const requestFetch = useRequestFetch(); + const controller = new AbortController(); + const promise = (async () => { + try { + const schedule = ref(await requestFetch("/api/schedule", { signal: controller.signal })); + state.schedules.value.set(id, schedule); + state.pendingSyncs.value.delete(id); + return schedule; + } catch (err: any) { + if (err.name !== "AbortError") + state.pendingSyncs.value.delete(id); + throw err; + } + })(); + state.pendingSyncs.value.set(id, { + controller, + promise, + }); + return promise; + }, + async resync(id: number) { + if (id !== 111) { throw Error("invalid id"); } + const pending = state.pendingSyncs.value.get(id); + if (pending) { + pending.controller.abort(); + } + state.schedules.value.delete(id); + state.pendingSyncs.value.delete(id); + await actions.fetch(id); + }, + } + + watch(() => sessionStore.id, (id, oldId) => { + for (const [scheduleId, pending] of state.pendingSyncs.value) { + console.log("Aborting pending schedule sync", scheduleId, "due session.id change from", oldId, "to", id); + pending.controller.abort(); + state.pendingSyncs.value.delete(scheduleId); + } + }) + + appEventSource?.addEventListener("update", (event) => { + const schedule = state.schedules.value.get(111); + if (schedule) { + schedule.value = event.data; + } + }); + + return { ...state, ...getters, ...actions }; +});