Rename and refactor the types passed over the API to be based on an entity that's either living or a tombstone. A living entity has a deleted property that's either undefined or false, while a tombstone has a deleted property set to true. All entities have a numeric id and an updatedAt timestamp. To sync entities, an array of replacements are passed around. Living entities are replaced with tombstones when they're deleted. And tombstones are replaced with living entities when restored.
127 lines
3.5 KiB
TypeScript
127 lines
3.5 KiB
TypeScript
import { readAccounts } from "~/server/database";
|
|
import { canSeeCrew } from "./utils/schedule";
|
|
import type { ApiAccount, ApiEvent } from "~/shared/types/api";
|
|
|
|
function sendMessage(
|
|
stream: WritableStream<string>,
|
|
message: string,
|
|
) {
|
|
const writer = stream.getWriter();
|
|
writer.ready
|
|
.then(() => writer.write(message))
|
|
.catch(console.error)
|
|
.finally(() => writer.releaseLock())
|
|
;
|
|
}
|
|
|
|
function sendMessageAndClose(
|
|
stream: WritableStream<string>,
|
|
message: string,
|
|
) {
|
|
const writer = stream.getWriter();
|
|
writer.ready
|
|
.then(() => {
|
|
writer.write(message);
|
|
writer.close();
|
|
}).catch(console.error)
|
|
.finally(() => writer.releaseLock())
|
|
;
|
|
}
|
|
|
|
const streams = new Map<WritableStream<string>, { sessionId?: number, accountId?: number }>();
|
|
|
|
let keepaliveInterval: ReturnType<typeof setInterval> | null = null
|
|
export function addStream(stream: WritableStream<string>, sessionId?: number, accountId?: number) {
|
|
if (streams.size === 0) {
|
|
console.log("Starting keepalive")
|
|
keepaliveInterval = setInterval(sendKeepalive, 4000)
|
|
}
|
|
streams.set(stream, { sessionId, accountId });
|
|
// Produce a response immediately to avoid the reply waiting for content.
|
|
const message = `data: connected sid:${sessionId ?? '-'} aid:${accountId ?? '-'}\n\n`;
|
|
sendMessage(stream, message);
|
|
}
|
|
export function deleteStream(stream: WritableStream<string>) {
|
|
streams.delete(stream);
|
|
if (streams.size === 0) {
|
|
console.log("Ending keepalive")
|
|
clearInterval(keepaliveInterval!);
|
|
}
|
|
}
|
|
|
|
export function cancelAccountStreams(accountId: number) {
|
|
for (const [stream, data] of streams) {
|
|
if (data.accountId === accountId) {
|
|
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
|
}
|
|
}
|
|
}
|
|
|
|
export function cancelSessionStreams(sessionId: number) {
|
|
for (const [stream, data] of streams) {
|
|
if (data.sessionId === sessionId) {
|
|
sendMessageAndClose(stream, `data: cancelled\n\n`);
|
|
}
|
|
}
|
|
}
|
|
|
|
const encodeEventCache = new WeakMap<ApiEvent, Map<ApiAccount["type"] | undefined, string>>();
|
|
function encodeEvent(event: ApiEvent, accountType: ApiAccount["type"] | undefined) {
|
|
const cache = encodeEventCache.get(event);
|
|
const cacheEntry = cache?.get(accountType);
|
|
if (cacheEntry) {
|
|
return cacheEntry;
|
|
}
|
|
|
|
let data: string;
|
|
if (event.type === "schedule-update") {
|
|
if (!canSeeCrew(accountType)) {
|
|
event = {
|
|
type: event.type,
|
|
updatedFrom: event.updatedFrom,
|
|
data: filterSchedule(event.data),
|
|
};
|
|
}
|
|
data = JSON.stringify(event);
|
|
} else {
|
|
throw Error(`encodeEvent cannot encode ${event.type} event`);
|
|
}
|
|
|
|
if (cache) {
|
|
cache.set(accountType, data);
|
|
} else {
|
|
encodeEventCache.set(event, new Map([[accountType, data]]));
|
|
}
|
|
return data;
|
|
}
|
|
|
|
export async function broadcastEvent(event: ApiEvent) {
|
|
const id = Date.now();
|
|
console.log(`broadcasting update to ${streams.size} clients`);
|
|
if (!streams.size) {
|
|
return;
|
|
}
|
|
const accounts = await readAccounts();
|
|
for (const [stream, streamData] of streams) {
|
|
// Account events are specially handled and only sent to the account they belong to.
|
|
if (event.type === "account-update") {
|
|
if (streamData.accountId === event.data.id) {
|
|
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${JSON.stringify(event)}\n\n`);
|
|
}
|
|
|
|
} else {
|
|
let accountType: ApiAccount["type"] | undefined;
|
|
if (streamData.accountId !== undefined) {
|
|
accountType = accounts.find(a => a.id === streamData.accountId)?.type
|
|
}
|
|
const data = encodeEvent(event, accountType)
|
|
sendMessage(stream, `id: ${id}\nevent: update\ndata: ${data}\n\n`);
|
|
}
|
|
}
|
|
}
|
|
|
|
function sendKeepalive() {
|
|
for (const stream of streams.keys()) {
|
|
sendMessage(stream, ": keepalive\n");
|
|
}
|
|
}
|