Refactor event source and storage from useSchedule
Split up useSchedule into a useEventSource composable and a schedules store for keeping track of the schedule updates.
This commit is contained in:
parent
68f731f094
commit
cf90de1aae
3 changed files with 221 additions and 71 deletions
125
composables/event-source.ts
Normal file
125
composables/event-source.ts
Normal file
|
@ -0,0 +1,125 @@
|
||||||
|
import type { Schedule } from "~/shared/types/schedule";
|
||||||
|
|
||||||
|
interface AppEventMap {
|
||||||
|
"open": Event,
|
||||||
|
"message": MessageEvent<string>,
|
||||||
|
"update": MessageEvent<Schedule>,
|
||||||
|
"error": Event,
|
||||||
|
"close": Event,
|
||||||
|
}
|
||||||
|
|
||||||
|
class AppEventSource extends EventTarget {
|
||||||
|
#source: EventSource | null = null;
|
||||||
|
#sourceSessionId: number | undefined = undefined;
|
||||||
|
|
||||||
|
#forwardEvent(type: string) {
|
||||||
|
this.#source!.addEventListener(type, event => {
|
||||||
|
if (type === "open" || type === "message" || type === "error") {
|
||||||
|
console.log("AppEventSource", event.type, event.data);
|
||||||
|
this.dispatchEvent(new Event(event.type));
|
||||||
|
} else {
|
||||||
|
const data = event.data ? JSON.parse(event.data) : undefined;
|
||||||
|
console.log("AppEventSource", event.type, data);
|
||||||
|
this.dispatchEvent(new MessageEvent(event.type, {
|
||||||
|
data,
|
||||||
|
origin: event.origin,
|
||||||
|
lastEventId: event.lastEventId,
|
||||||
|
source: event.source,
|
||||||
|
ports: [...event.ports],
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
open(sessionId: number | undefined) {
|
||||||
|
console.log("Opening event source sid:", sessionId);
|
||||||
|
this.#sourceSessionId = sessionId;
|
||||||
|
this.#source = new EventSource("/api/events");
|
||||||
|
this.#forwardEvent("open");
|
||||||
|
this.#forwardEvent("message");
|
||||||
|
this.#forwardEvent("update");
|
||||||
|
this.#forwardEvent("error");
|
||||||
|
}
|
||||||
|
|
||||||
|
close() {
|
||||||
|
console.log("Closing event source sid:", this.#sourceSessionId);
|
||||||
|
this.#source!.close();
|
||||||
|
this.#source = null;
|
||||||
|
this.#sourceSessionId = undefined;
|
||||||
|
console.log("AppEventSource", "close");
|
||||||
|
this.dispatchEvent(new Event("close"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#connectRefs = 0;
|
||||||
|
connect(sessionId: number | undefined) {
|
||||||
|
this.#connectRefs += 1;
|
||||||
|
if (this.#source && this.#sourceSessionId !== sessionId) {
|
||||||
|
this.close();
|
||||||
|
}
|
||||||
|
if (!this.#source) {
|
||||||
|
this.open(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reconnect(sessionId: number | undefined) {
|
||||||
|
if (this.#source && this.#sourceSessionId !== sessionId) {
|
||||||
|
this.close();
|
||||||
|
this.open(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect() {
|
||||||
|
if (this.#connectRefs === 0) {
|
||||||
|
throw Error("Connection reference count already zero");
|
||||||
|
}
|
||||||
|
this.#connectRefs -= 1;
|
||||||
|
if (this.#connectRefs === 0 && this.#source) {
|
||||||
|
this.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override addEventListener<K extends keyof AppEventMap>(
|
||||||
|
type: K,
|
||||||
|
listener: (this: AppEventSource, ev: AppEventMap[K]) => any,
|
||||||
|
options?: boolean | AddEventListenerOptions
|
||||||
|
) {
|
||||||
|
super.addEventListener(type, listener as (ev: Event) => void, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
override dispatchEvent<K extends keyof AppEventMap>(
|
||||||
|
event: AppEventMap[K],
|
||||||
|
) {
|
||||||
|
return super.dispatchEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
override removeEventListener<K extends keyof AppEventMap>(
|
||||||
|
type: K,
|
||||||
|
listener: (this: AppEventSource, ev: AppEventMap[K]) => any,
|
||||||
|
options?: boolean | EventListenerOptions
|
||||||
|
) {
|
||||||
|
return super.removeEventListener(type, listener as (ev: Event) => void, options);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The event source exists only on the client.
|
||||||
|
export const appEventSource = import.meta.client ? new AppEventSource() : null;
|
||||||
|
|
||||||
|
export function useEventSource() {
|
||||||
|
const sessionStore = useSessionStore();
|
||||||
|
onMounted(() => {
|
||||||
|
console.log("useEventSource onMounted", sessionStore.id);
|
||||||
|
appEventSource!.connect(sessionStore.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
watch(() => sessionStore.id, () => {
|
||||||
|
console.log("useEventSource sessionStore.id change", sessionStore.id);
|
||||||
|
appEventSource!.reconnect(sessionStore.id);
|
||||||
|
})
|
||||||
|
|
||||||
|
onUnmounted(() => {
|
||||||
|
console.log("useEventSource onUnmounted");
|
||||||
|
appEventSource!.disconnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
return appEventSource;
|
||||||
|
}
|
|
@ -1,75 +1,8 @@
|
||||||
import type { Schedule } from '~/shared/types/schedule';
|
import type { Schedule } from '~/shared/types/schedule';
|
||||||
|
|
||||||
let source: EventSource | null = null;
|
|
||||||
let sourceRefs = 0;
|
|
||||||
let sourceSessionId: number | undefined = undefined;
|
|
||||||
export const useSchedule = () => {
|
export const useSchedule = () => {
|
||||||
const sessionStore = useSessionStore();
|
useEventSource();
|
||||||
const requestFetch = useRequestFetch();
|
const schedulesStore = useSchedulesStore();
|
||||||
const asyncData = useAsyncData<Schedule>(
|
schedulesStore.activeScheduleId = 111;
|
||||||
'schedule',
|
return schedulesStore.fetch(111);
|
||||||
() => requestFetch("/api/schedule"),
|
|
||||||
{ default: () => ({ events: [], locations: [] }) },
|
|
||||||
)
|
|
||||||
const { data: schedule } = asyncData;
|
|
||||||
|
|
||||||
function connect() {
|
|
||||||
console.log("Opening event source sid:", sessionStore.id);
|
|
||||||
sourceSessionId = sessionStore.id;
|
|
||||||
source = new EventSource("/api/events");
|
|
||||||
source.addEventListener("message", (message) => {
|
|
||||||
console.log("Message", message.data);
|
|
||||||
});
|
|
||||||
source.addEventListener("update", (message) => {
|
|
||||||
const updatedSchedule: Schedule = JSON.parse(message.data);
|
|
||||||
console.log("Update", updatedSchedule);
|
|
||||||
schedule.value = updatedSchedule;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function disconnect() {
|
|
||||||
console.log("Closing event source")
|
|
||||||
source!.close();
|
|
||||||
source = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
onMounted(() => {
|
|
||||||
console.log("useSchedule onMounted", sourceRefs);
|
|
||||||
sourceRefs += 1;
|
|
||||||
if (sourceRefs !== 1) {
|
|
||||||
console.log("Event source already open");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
connect();
|
|
||||||
})
|
|
||||||
|
|
||||||
watch(() => sessionStore.id, () => {
|
|
||||||
if (sourceSessionId === sessionStore.id) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sourceSessionId = sessionStore.id;
|
|
||||||
console.log("Session changed, refetching schedule")
|
|
||||||
$fetch("/api/schedule").then(
|
|
||||||
data => { schedule.value = data; },
|
|
||||||
err => { console.error(err); schedule.value = { locations: [], events: []}},
|
|
||||||
)
|
|
||||||
if (source && sourceRefs > 0) {
|
|
||||||
console.log("Restarting event stream")
|
|
||||||
disconnect();
|
|
||||||
connect();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
onUnmounted(() => {
|
|
||||||
console.log("useSchedule onUnmounted", sourceRefs);
|
|
||||||
sourceRefs -= 1;
|
|
||||||
if (source && sourceRefs === 0) {
|
|
||||||
disconnect();
|
|
||||||
}
|
|
||||||
if (sourceRefs < 0) {
|
|
||||||
throw Error("Source reference count below zero");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return asyncData.then(({ data }) => data);
|
|
||||||
}
|
}
|
||||||
|
|
92
stores/schedules.ts
Normal file
92
stores/schedules.ts
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
import type { Schedule } from "~/shared/types/schedule";
|
||||||
|
|
||||||
|
interface SyncOperation {
|
||||||
|
controller: AbortController,
|
||||||
|
promise: Promise<Ref<Schedule>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
export const useSchedulesStore = defineStore("schedules", () => {
|
||||||
|
const sessionStore = useSessionStore();
|
||||||
|
|
||||||
|
const state = {
|
||||||
|
activeScheduleId: ref<number | undefined>(111),
|
||||||
|
schedules: ref<Map<number, Ref<Schedule>>>(new Map()),
|
||||||
|
pendingSyncs: ref<Map<number, SyncOperation>>(new Map()),
|
||||||
|
};
|
||||||
|
|
||||||
|
const getters = {
|
||||||
|
activeSchedule: computed(() => {
|
||||||
|
if (state.activeScheduleId.value === undefined)
|
||||||
|
throw Error("No active schedule");
|
||||||
|
const schedule = state.schedules.value.get(state.activeScheduleId.value);
|
||||||
|
if (!schedule)
|
||||||
|
throw Error("Active schedule has not been fetched");
|
||||||
|
return schedule;
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
const actions = {
|
||||||
|
async fetch(id: number) {
|
||||||
|
if (id !== 111) { throw Error("invalid id"); }
|
||||||
|
console.log("schedules store fetch", id);
|
||||||
|
const schedule = state.schedules.value.get(id);
|
||||||
|
if (schedule) {
|
||||||
|
console.log("return cached");
|
||||||
|
return schedule;
|
||||||
|
}
|
||||||
|
const pending = state.pendingSyncs.value.get(id);
|
||||||
|
if (pending) {
|
||||||
|
console.log("return pending");
|
||||||
|
return pending.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("return new fetch");
|
||||||
|
const requestFetch = useRequestFetch();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const promise = (async () => {
|
||||||
|
try {
|
||||||
|
const schedule = ref(await requestFetch("/api/schedule", { signal: controller.signal }));
|
||||||
|
state.schedules.value.set(id, schedule);
|
||||||
|
state.pendingSyncs.value.delete(id);
|
||||||
|
return schedule;
|
||||||
|
} catch (err: any) {
|
||||||
|
if (err.name !== "AbortError")
|
||||||
|
state.pendingSyncs.value.delete(id);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
state.pendingSyncs.value.set(id, {
|
||||||
|
controller,
|
||||||
|
promise,
|
||||||
|
});
|
||||||
|
return promise;
|
||||||
|
},
|
||||||
|
async resync(id: number) {
|
||||||
|
if (id !== 111) { throw Error("invalid id"); }
|
||||||
|
const pending = state.pendingSyncs.value.get(id);
|
||||||
|
if (pending) {
|
||||||
|
pending.controller.abort();
|
||||||
|
}
|
||||||
|
state.schedules.value.delete(id);
|
||||||
|
state.pendingSyncs.value.delete(id);
|
||||||
|
await actions.fetch(id);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
watch(() => sessionStore.id, (id, oldId) => {
|
||||||
|
for (const [scheduleId, pending] of state.pendingSyncs.value) {
|
||||||
|
console.log("Aborting pending schedule sync", scheduleId, "due session.id change from", oldId, "to", id);
|
||||||
|
pending.controller.abort();
|
||||||
|
state.pendingSyncs.value.delete(scheduleId);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
appEventSource?.addEventListener("update", (event) => {
|
||||||
|
const schedule = state.schedules.value.get(111);
|
||||||
|
if (schedule) {
|
||||||
|
schedule.value = event.data;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return { ...state, ...getters, ...actions };
|
||||||
|
});
|
Loading…
Add table
Add a link
Reference in a new issue