Use sync access for the temp JSON file database
Replace all async reads and writes to the JSON database with the sync reads and writes to prevent a data corruption race condition where two requests are processed at the same time and write to the same file, or one reads while the other writes causing read of partially written data.
This commit is contained in:
parent
f9d188b2ba
commit
80cec71308
23 changed files with 138 additions and 138 deletions
|
@ -9,11 +9,11 @@ export default defineEventHandler(async (event) => {
|
|||
setHeader(event, "Content-Disposition", 'attachment; filename="database-dump.json"');
|
||||
setHeader(event, "Content-Type", "application/json; charset=utf-8");
|
||||
return {
|
||||
nextUserId: await readNextUserId(),
|
||||
users: await readUsers(),
|
||||
nextSessionId: await readNextSessionId(),
|
||||
sessions: await readSessions(),
|
||||
subscriptions: await readSubscriptions(),
|
||||
schedule: await readSchedule(),
|
||||
nextUserId: readNextUserId(),
|
||||
users: readUsers(),
|
||||
nextSessionId: readNextSessionId(),
|
||||
sessions: readSessions(),
|
||||
subscriptions: readSubscriptions(),
|
||||
schedule: readSchedule(),
|
||||
};
|
||||
})
|
||||
|
|
|
@ -7,14 +7,14 @@ import { generateDemoSchedule, generateDemoAccounts } from "~/server/generate-de
|
|||
export default defineEventHandler(async (event) => {
|
||||
await requireServerSessionWithAdmin(event);
|
||||
const accounts = generateDemoAccounts();
|
||||
await writeUsers(accounts);
|
||||
await writeSchedule(generateDemoSchedule());
|
||||
await writeAuthenticationMethods(accounts.map((user, index) => ({
|
||||
writeUsers(accounts);
|
||||
writeSchedule(generateDemoSchedule());
|
||||
writeAuthenticationMethods(accounts.map((user, index) => ({
|
||||
id: index,
|
||||
userId: user.id,
|
||||
provider: "demo",
|
||||
slug: user.name!,
|
||||
name: user.name!,
|
||||
})));
|
||||
await writeNextAuthenticationMethodId(Math.max(await nextAuthenticationMethodId(), accounts.length));
|
||||
writeNextAuthenticationMethodId(Math.max(nextAuthenticationMethodId(), accounts.length));
|
||||
})
|
||||
|
|
|
@ -29,20 +29,20 @@ export default defineEventHandler(async (event) => {
|
|||
});
|
||||
}
|
||||
|
||||
const currentNextUserId = await readNextUserId();
|
||||
await writeNextUserId(Math.max(currentNextUserId, snapshot.nextUserId));
|
||||
await writeUsers(snapshot.users);
|
||||
const currentNextSessionId = await readNextSessionId();
|
||||
await writeNextSessionId(Math.max(currentNextSessionId, snapshot.nextSessionId));
|
||||
const currentSessions = new Map((await readSessions()).map(session => [session.id, session]));
|
||||
await writeSessions(snapshot.sessions.filter(session => {
|
||||
const currentNextUserId = readNextUserId();
|
||||
writeNextUserId(Math.max(currentNextUserId, snapshot.nextUserId));
|
||||
writeUsers(snapshot.users);
|
||||
const currentNextSessionId = readNextSessionId();
|
||||
writeNextSessionId(Math.max(currentNextSessionId, snapshot.nextSessionId));
|
||||
const currentSessions = new Map((readSessions()).map(session => [session.id, session]));
|
||||
writeSessions(snapshot.sessions.filter(session => {
|
||||
const current = currentSessions.get(session.id);
|
||||
// Only keep sessions that match the account id in both sets to avoid
|
||||
// resurrecting deleted sessions. This will still cause session cross
|
||||
// pollution if a snapshot from another instance is loaded here.
|
||||
return current?.accountId !== undefined && current.accountId === session.accountId;
|
||||
}));
|
||||
await writeSubscriptions(snapshot.subscriptions);
|
||||
await writeSchedule(snapshot.schedule);
|
||||
writeSubscriptions(snapshot.subscriptions);
|
||||
writeSchedule(snapshot.schedule);
|
||||
await sendRedirect(event, "/");
|
||||
})
|
||||
|
|
|
@ -6,5 +6,5 @@ import { deleteDatabase } from "~/server/database";
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
await requireServerSessionWithAdmin(event);
|
||||
await deleteDatabase();
|
||||
deleteDatabase();
|
||||
})
|
||||
|
|
|
@ -18,7 +18,7 @@ export default defineEventHandler(async (event) => {
|
|||
});
|
||||
}
|
||||
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const user = users.find(user => user.id === patch.id);
|
||||
if (!user || user.deleted) {
|
||||
throw createError({
|
||||
|
@ -52,28 +52,28 @@ export default defineEventHandler(async (event) => {
|
|||
user.name = patch.name;
|
||||
}
|
||||
user.updatedAt = new Date().toISOString();
|
||||
await writeUsers(users);
|
||||
writeUsers(users);
|
||||
broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "user-update",
|
||||
data: serverUserToApi(user),
|
||||
});
|
||||
|
||||
// Rotate sessions with the user in it if the access changed
|
||||
if (accessChanged) {
|
||||
const sessions = await readSessions();
|
||||
const sessions = readSessions();
|
||||
const nowMs = Date.now();
|
||||
for (const session of sessions) {
|
||||
if (session.accountId === user.id) {
|
||||
session.rotatesAtMs = nowMs;
|
||||
broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "session-expired",
|
||||
sessionId: session.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
await writeSessions(sessions);
|
||||
writeSessions(sessions);
|
||||
}
|
||||
|
||||
// Update Schedule counts.
|
||||
|
|
|
@ -11,11 +11,11 @@ import { broadcastEvent, cancelAccountStreams } from "~/server/streams";
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const serverSession = await requireServerSessionWithUser(event);
|
||||
let users = await readUsers();
|
||||
let users = readUsers();
|
||||
|
||||
// Expire sessions for this user
|
||||
const expiredSessionIds = new Set<number>();
|
||||
let sessions = await readSessions();
|
||||
let sessions = readSessions();
|
||||
const nowMs = Date.now();
|
||||
for (const session of sessions) {
|
||||
if (
|
||||
|
@ -25,7 +25,7 @@ export default defineEventHandler(async (event) => {
|
|||
) {
|
||||
session.expiresAtMs = nowMs;
|
||||
broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "session-expired",
|
||||
sessionId: session.id,
|
||||
});
|
||||
|
@ -33,24 +33,24 @@ export default defineEventHandler(async (event) => {
|
|||
}
|
||||
}
|
||||
cancelAccountStreams(serverSession.accountId);
|
||||
await writeSessions(sessions);
|
||||
writeSessions(sessions);
|
||||
await deleteCookie(event, "session");
|
||||
|
||||
// Remove subscriptions for this user
|
||||
let subscriptions = await readSubscriptions();
|
||||
let subscriptions = readSubscriptions();
|
||||
subscriptions = subscriptions.filter(
|
||||
subscription => !expiredSessionIds.has(subscription.sessionId)
|
||||
);
|
||||
await writeSubscriptions(subscriptions);
|
||||
writeSubscriptions(subscriptions);
|
||||
|
||||
// Remove the user
|
||||
const account = users.find(user => user.id === serverSession.accountId)!;
|
||||
const now = new Date(nowMs).toISOString();
|
||||
account.deleted = true;
|
||||
account.updatedAt = now;
|
||||
await writeUsers(users);
|
||||
writeUsers(users);
|
||||
await broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "user-update",
|
||||
data: {
|
||||
id: account.id,
|
||||
|
|
|
@ -38,7 +38,7 @@ export default defineEventHandler(async (event) => {
|
|||
}
|
||||
}
|
||||
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const account = users.find(user => user.id === session.accountId);
|
||||
if (!account) {
|
||||
throw Error("Account does not exist");
|
||||
|
@ -70,7 +70,7 @@ export default defineEventHandler(async (event) => {
|
|||
else
|
||||
delete account.locale;
|
||||
}
|
||||
await writeUsers(users);
|
||||
writeUsers(users);
|
||||
|
||||
// Update Schedule counts.
|
||||
await updateScheduleInterestedCounts(users);
|
||||
|
|
|
@ -18,7 +18,7 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
|
|||
const body = await readBody(event);
|
||||
const name = body?.name;
|
||||
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
let user: ServerUser;
|
||||
if (typeof name === "string") {
|
||||
if (name === "") {
|
||||
|
@ -36,7 +36,7 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
|
|||
|
||||
const firstUser = users.every(user => user.type === "anonymous");
|
||||
user = {
|
||||
id: await nextUserId(),
|
||||
id: nextUserId(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
type: firstUser ? "admin" : "regular",
|
||||
name,
|
||||
|
@ -44,7 +44,7 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
|
|||
|
||||
} else if (name === undefined) {
|
||||
user = {
|
||||
id: await nextUserId(),
|
||||
id: nextUserId(),
|
||||
updatedAt: new Date().toISOString(),
|
||||
type: "anonymous",
|
||||
};
|
||||
|
@ -76,19 +76,19 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
|
|||
});
|
||||
}
|
||||
authMethods.push({
|
||||
id: await nextAuthenticationMethodId(),
|
||||
id: nextAuthenticationMethodId(),
|
||||
userId: user.id,
|
||||
provider: session.authenticationProvider,
|
||||
slug: session.authenticationSlug!,
|
||||
name: session.authenticationName!,
|
||||
})
|
||||
await writeAuthenticationMethods(authMethods);
|
||||
writeAuthenticationMethods(authMethods);
|
||||
}
|
||||
|
||||
users.push(user);
|
||||
await writeUsers(users);
|
||||
writeUsers(users);
|
||||
await broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "user-update",
|
||||
data: user,
|
||||
});
|
||||
|
|
|
@ -24,11 +24,11 @@ export default defineEventHandler(async (event) => {
|
|||
});
|
||||
}
|
||||
|
||||
const authMethods = await readAuthenticationMethods();
|
||||
const authMethods = readAuthenticationMethods();
|
||||
const method = authMethods.find(method => method.provider === "demo" && method.slug === slug);
|
||||
let session;
|
||||
if (method) {
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const account = users.find(user => !user.deleted && user.id === method.userId);
|
||||
session = await setServerSession(event, account);
|
||||
} else {
|
||||
|
|
|
@ -84,11 +84,11 @@ export default defineEventHandler(async (event): Promise<ApiSession> => {
|
|||
}
|
||||
|
||||
const slug = String(data.authData.id);
|
||||
const authMethods = await readAuthenticationMethods();
|
||||
const authMethods = readAuthenticationMethods();
|
||||
const method = authMethods.find(method => method.provider === "telegram" && method.slug === slug);
|
||||
let session;
|
||||
if (method) {
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const account = users.find(user => !user.deleted && user.id === method.userId);
|
||||
session = await setServerSession(event, account);
|
||||
} else {
|
||||
|
|
|
@ -8,7 +8,7 @@ import { cancelSessionStreams } from "~/server/streams";
|
|||
export default defineEventHandler(async (event) => {
|
||||
const session = await getServerSession(event, true);
|
||||
if (session) {
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const account = users.find(user => user.id === session.accountId);
|
||||
if (account?.type === "anonymous") {
|
||||
throw createError({
|
||||
|
|
|
@ -6,6 +6,6 @@
|
|||
import { readEvents } from "../database";
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const events = await readEvents();
|
||||
const events = readEvents();
|
||||
return events[events.length - 1]?. id ?? 0;
|
||||
});
|
||||
|
|
|
@ -35,7 +35,7 @@ export default defineEventHandler(async (event) => {
|
|||
});
|
||||
}
|
||||
|
||||
const schedule = await readSchedule();
|
||||
const schedule = readSchedule();
|
||||
|
||||
if (schedule.deleted) {
|
||||
throw createError({
|
||||
|
@ -85,9 +85,9 @@ export default defineEventHandler(async (event) => {
|
|||
applyUpdatesToArray(update.shifts, schedule.shifts = schedule.shifts ?? []);
|
||||
}
|
||||
|
||||
await writeSchedule(schedule);
|
||||
writeSchedule(schedule);
|
||||
await broadcastEvent({
|
||||
id: await nextEventId(),
|
||||
id: nextEventId(),
|
||||
type: "schedule-update",
|
||||
updatedFrom,
|
||||
data: update,
|
||||
|
|
|
@ -6,6 +6,6 @@ import { readSchedule } from "~/server/database";
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const session = await getServerSession(event, false);
|
||||
const schedule = await readSchedule();
|
||||
const schedule = readSchedule();
|
||||
return canSeeCrew(session?.access) ? schedule : filterSchedule(schedule);
|
||||
});
|
||||
|
|
|
@ -20,7 +20,7 @@ export default defineEventHandler(async (event) => {
|
|||
message: z.prettifyError(error),
|
||||
});
|
||||
}
|
||||
const subscriptions = await readSubscriptions();
|
||||
const subscriptions = readSubscriptions();
|
||||
const existingIndex = subscriptions.findIndex(
|
||||
sub => sub.type === "push" && sub.sessionId === session.id
|
||||
);
|
||||
|
@ -34,7 +34,7 @@ export default defineEventHandler(async (event) => {
|
|||
} else {
|
||||
subscriptions.push(subscription);
|
||||
}
|
||||
await writeSubscriptions(subscriptions);
|
||||
writeSubscriptions(subscriptions);
|
||||
if (existingIndex !== -1) {
|
||||
return { message: "Existing subscription refreshed."};
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import { readSubscriptions, writeSubscriptions } from "~/server/database";
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const session = await requireServerSessionWithUser(event);
|
||||
const subscriptions = await readSubscriptions();
|
||||
const subscriptions = readSubscriptions();
|
||||
const existingIndex = subscriptions.findIndex(
|
||||
sub => sub.type === "push" && sub.sessionId === session.id
|
||||
);
|
||||
|
@ -15,6 +15,6 @@ export default defineEventHandler(async (event) => {
|
|||
} else {
|
||||
return { message: "No subscription registered."};
|
||||
}
|
||||
await writeSubscriptions(subscriptions);
|
||||
writeSubscriptions(subscriptions);
|
||||
return { message: "Existing subscription removed."};
|
||||
});
|
||||
|
|
|
@ -13,7 +13,7 @@ const detailsSchema = z.object({
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
await requireServerSessionWithAdmin(event);
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
const { success, error, data: params } = detailsSchema.safeParse(getRouterParams(event));
|
||||
if (!success) {
|
||||
throw createError({
|
||||
|
|
|
@ -6,7 +6,7 @@ import { readUsers } from "~/server/database"
|
|||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const session = await requireServerSessionWithUser(event);
|
||||
const users = await readUsers();
|
||||
const users = readUsers();
|
||||
|
||||
if (session.access === "admin") {
|
||||
return users.map(serverUserToApi);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue