[backend] Refactor database transactions

This moves all code that isn't a direct call to transactionalEntityManager to outside of the transaction blocks, and removes all transaction blocks that were unnecessary
This commit is contained in:
Laura Hausmann 2023-10-25 15:23:57 +02:00
parent 7c56ee348b
commit 4dd8fdbd04
Signed by: zotan
GPG key ID: D044E84C5BE01605
7 changed files with 248 additions and 262 deletions

View file

@ -1,6 +1,7 @@
import { db } from "@/db/postgre.js"; import { db } from "@/db/postgre.js";
import { Meta } from "@/models/entities/meta.js"; import { Meta } from "@/models/entities/meta.js";
import push from 'web-push'; import push from 'web-push';
import { Metas } from "@/models/index.js";
let cache: Meta; let cache: Meta;
@ -33,41 +34,31 @@ export function metaToPugArgs(meta: Meta): object {
export async function fetchMeta(noCache = false): Promise<Meta> { export async function fetchMeta(noCache = false): Promise<Meta> {
if (!noCache && cache) return cache; if (!noCache && cache) return cache;
return await db.transaction(async (transactionalEntityManager) => { // New IDs are prioritized because multiple records may have been created due to past bugs.
// New IDs are prioritized because multiple records may have been created due to past bugs. const meta = await Metas.findOne({
const metas = await transactionalEntityManager.find(Meta, { where: {},
order: { order: {
id: "DESC", id: "DESC",
}, },
});
const meta = metas[0];
if (meta) {
cache = meta;
return meta;
} else {
const { publicKey, privateKey } = push.generateVAPIDKeys();
// If fetchMeta is called at the same time when meta is empty, this part may be called at the same time, so use fail-safe upsert.
const saved = await transactionalEntityManager
.upsert(
Meta,
{
id: "x",
swPublicKey: publicKey,
swPrivateKey: privateKey,
},
["id"],
)
.then((x) =>
transactionalEntityManager.findOneByOrFail(Meta, x.identifiers[0]),
);
cache = saved;
return saved;
}
}); });
if (meta) {
cache = meta;
return meta;
}
const { publicKey, privateKey } = push.generateVAPIDKeys();
const data = {
id: "x",
swPublicKey: publicKey,
swPrivateKey: privateKey,
};
// If fetchMeta is called at the same time when meta is empty, this part may be called at the same time, so use fail-safe upsert.
await Metas.upsert(data, ["id"]);
cache = await Metas.findOneByOrFail({ id: data.id });
return cache;
} }
setInterval(() => { setInterval(() => {

View file

@ -294,80 +294,77 @@ export async function createPerson(
} }
} }
// Create user // Prepare objects
let user: IRemoteUser; let user = new User({
id: genId(),
avatarId: null,
bannerId: null,
createdAt: new Date(),
lastFetchedAt: new Date(),
name: truncate(person.name, nameLength),
isLocked: !!person.manuallyApprovesFollowers,
movedToUri: person.movedTo,
alsoKnownAs: person.alsoKnownAs,
isExplorable: !!person.discoverable,
username: person.preferredUsername,
usernameLower: person.preferredUsername!.toLowerCase(),
host,
inbox: person.inbox,
sharedInbox:
person.sharedInbox ||
(person.endpoints ? person.endpoints.sharedInbox : undefined),
followersUri: person.followers
? getApId(person.followers)
: undefined,
followersCount:
followersCount !== undefined
? followersCount
: person.followers &&
typeof person.followers !== "string" &&
isCollectionOrOrderedCollection(person.followers)
? person.followers.totalItems
: undefined,
followingCount:
followingCount !== undefined
? followingCount
: person.following &&
typeof person.following !== "string" &&
isCollectionOrOrderedCollection(person.following)
? person.following.totalItems
: undefined,
featured: person.featured ? getApId(person.featured) : undefined,
uri: person.id,
tags,
isBot,
isCat: (person as any).isCat === true,
}) as IRemoteUser;
const profile = new UserProfile({
userId: user.id,
description: person.summary
? await htmlToMfm(truncate(person.summary, summaryLength), person.tag)
: null,
url: url,
fields,
birthday: bday ? bday[0] : null,
location: person["vcard:Address"] || null,
userHost: host,
});
const publicKey = person.publicKey
? new UserPublickey({
userId: user.id,
keyId: person.publicKey.id,
keyPem: person.publicKey.publicKeyPem,
})
: null;
try { try {
// Start transaction // Save the objects atomically using a db transaction, note that we should never run any code in a transaction block directly
await db.transaction(async (transactionalEntityManager) => { await db.transaction(async (transactionalEntityManager) => {
user = (await transactionalEntityManager.save( await transactionalEntityManager.save(user);
new User({ await transactionalEntityManager.save(profile);
id: genId(), if (publicKey) await transactionalEntityManager.save(publicKey);
avatarId: null,
bannerId: null,
createdAt: new Date(),
lastFetchedAt: new Date(),
name: truncate(person.name, nameLength),
isLocked: !!person.manuallyApprovesFollowers,
movedToUri: person.movedTo,
alsoKnownAs: person.alsoKnownAs,
isExplorable: !!person.discoverable,
username: person.preferredUsername,
usernameLower: person.preferredUsername!.toLowerCase(),
host,
inbox: person.inbox,
sharedInbox:
person.sharedInbox ||
(person.endpoints ? person.endpoints.sharedInbox : undefined),
followersUri: person.followers
? getApId(person.followers)
: undefined,
followersCount:
followersCount !== undefined
? followersCount
: person.followers &&
typeof person.followers !== "string" &&
isCollectionOrOrderedCollection(person.followers)
? person.followers.totalItems
: undefined,
followingCount:
followingCount !== undefined
? followingCount
: person.following &&
typeof person.following !== "string" &&
isCollectionOrOrderedCollection(person.following)
? person.following.totalItems
: undefined,
featured: person.featured ? getApId(person.featured) : undefined,
uri: person.id,
tags,
isBot,
isCat: (person as any).isCat === true,
}),
)) as IRemoteUser;
await transactionalEntityManager.save(
new UserProfile({
userId: user.id,
description: person.summary
? await htmlToMfm(truncate(person.summary, summaryLength), person.tag)
: null,
url: url,
fields,
birthday: bday ? bday[0] : null,
location: person["vcard:Address"] || null,
userHost: host,
}),
);
if (person.publicKey) {
await transactionalEntityManager.save(
new UserPublickey({
userId: user.id,
keyId: person.publicKey.id,
keyPem: person.publicKey.publicKeyPem,
}),
);
}
}); });
} catch (e) { } catch (e) {
// duplicate key error // duplicate key error
@ -754,21 +751,23 @@ export async function updateFeatured(userId: User["id"], resolver?: Resolver, li
.map((item) => limit(() => resolveNote(item, resolver, limiter))), .map((item) => limit(() => resolveNote(item, resolver, limiter))),
); );
await db.transaction(async (transactionalEntityManager) => { // Prepare the objects
await transactionalEntityManager.delete(UserNotePining, { // For now, generate the id at a different time and maintain the order.
const data: Partial<UserNotePining>[] = [];
let td = 0;
for (const note of featuredNotes.filter((note) => note != null)) {
td -= 1000;
data.push({
id: genId(new Date(Date.now() + td)),
createdAt: new Date(),
userId: user.id, userId: user.id,
noteId: note!.id,
}); });
}
// For now, generate the id at a different time and maintain the order. // Save the objects atomically using a db transaction, note that we should never run any code in a transaction block directly
let td = 0; await db.transaction(async (transactionalEntityManager) => {
for (const note of featuredNotes.filter((note) => note != null)) { await transactionalEntityManager.delete(UserNotePining, { userId: user.id });
td -= 1000; await transactionalEntityManager.insert(UserNotePining, data);
transactionalEntityManager.insert(UserNotePining, {
id: genId(new Date(Date.now() + td)),
createdAt: new Date(),
userId: user.id,
noteId: note!.id,
});
}
}); });
} }

View file

@ -84,58 +84,55 @@ export async function signup(opts: {
), ),
); );
let account!: User; const exist = await Users.findOneBy({
usernameLower: username.toLowerCase(),
// Start transaction host: IsNull(),
await db.transaction(async (transactionalEntityManager) => {
const exist = await transactionalEntityManager.findOneBy(User, {
usernameLower: username.toLowerCase(),
host: IsNull(),
});
if (exist) throw new Error(" the username is already used");
account = await transactionalEntityManager.save(
new User({
id: genId(),
createdAt: new Date(),
username: username,
usernameLower: username.toLowerCase(),
host: toPunyNullable(host),
token: secret,
isAdmin:
(await Users.countBy({
host: IsNull(),
isAdmin: true,
})) === 0,
}),
);
await transactionalEntityManager.save(
new UserKeypair({
publicKey: keyPair[0],
privateKey: keyPair[1],
userId: account.id,
}),
);
await transactionalEntityManager.save(
new UserProfile({
userId: account.id,
autoAcceptFollowed: true,
password: hash,
}),
);
await transactionalEntityManager.save(
new UsedUsername({
createdAt: new Date(),
username: username.toLowerCase(),
}),
);
}); });
usersChart.update(account, true); if (exist) throw new Error("The username is already in use");
// Prepare objects
const user = new User({
id: genId(),
createdAt: new Date(),
username: username,
usernameLower: username.toLowerCase(),
host: toPunyNullable(host),
token: secret,
isAdmin:
(await Users.countBy({
host: IsNull(),
isAdmin: true,
})) === 0,
});
const userKeypair = new UserKeypair({
publicKey: keyPair[0],
privateKey: keyPair[1],
userId: user.id,
});
const userProfile = new UserProfile({
userId: user.id,
autoAcceptFollowed: true,
password: hash,
});
const usedUsername = new UsedUsername({
createdAt: new Date(),
username: username.toLowerCase(),
});
// Save the objects atomically using a db transaction, note that we should never run any code in a transaction block directly
await db.transaction(async (transactionalEntityManager) => {
await transactionalEntityManager.save(user);
await transactionalEntityManager.save(userKeypair);
await transactionalEntityManager.save(userProfile);
await transactionalEntityManager.save(usedUsername);
});
const account = await Users.findOneByOrFail({ id: user.id });
usersChart.update(account, true);
return { account, secret }; return { account, secret };
} }

View file

@ -3,6 +3,7 @@ import { Meta } from "@/models/entities/meta.js";
import { insertModerationLog } from "@/services/insert-moderation-log.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js";
import { db } from "@/db/postgre.js"; import { db } from "@/db/postgre.js";
import define from "../../../define.js"; import define from "../../../define.js";
import { Metas } from "@/models/index.js";
export const meta = { export const meta = {
tags: ["admin"], tags: ["admin"],
@ -106,21 +107,19 @@ export default define(meta, paramDef, async (ps, me) => {
if (config.summalyProxyUrl !== undefined) { if (config.summalyProxyUrl !== undefined) {
set.summalyProxy = config.summalyProxyUrl; set.summalyProxy = config.summalyProxyUrl;
} }
await db.transaction(async (transactionalEntityManager) => {
const metas = await transactionalEntityManager.find(Meta, {
order: {
id: "DESC",
},
});
const meta = metas[0]; const meta = await Metas.findOne({
where: {},
if (meta) { order: {
await transactionalEntityManager.update(Meta, meta.id, set); id: "DESC",
} else { },
await transactionalEntityManager.save(Meta, set);
}
}); });
if (meta)
await Metas.update(meta.id, set);
else
await Metas.save(set);
insertModerationLog(me, "updateMeta"); insertModerationLog(me, "updateMeta");
} }
return hosted; return hosted;

View file

@ -2,6 +2,7 @@ import { Meta } from "@/models/entities/meta.js";
import { insertModerationLog } from "@/services/insert-moderation-log.js"; import { insertModerationLog } from "@/services/insert-moderation-log.js";
import { db } from "@/db/postgre.js"; import { db } from "@/db/postgre.js";
import define from "../../define.js"; import define from "../../define.js";
import { Metas } from "@/models/index.js";
export const meta = { export const meta = {
tags: ["admin"], tags: ["admin"],
@ -546,21 +547,17 @@ export default define(meta, paramDef, async (ps, me) => {
} }
} }
await db.transaction(async (transactionalEntityManager) => { const meta = await Metas.findOne({
const metas = await transactionalEntityManager.find(Meta, { where: {},
order: { order: {
id: "DESC", id: "DESC",
}, },
});
const meta = metas[0];
if (meta) {
await transactionalEntityManager.update(Meta, meta.id, set);
} else {
await transactionalEntityManager.save(Meta, set);
}
}); });
if (meta)
await Metas.update(meta.id, set);
else
await Metas.save(set);
insertModerationLog(me, "updateMeta"); insertModerationLog(me, "updateMeta");
}); });

View file

@ -9,8 +9,9 @@ import { UserKeypair } from "@/models/entities/user-keypair.js";
import { UsedUsername } from "@/models/entities/used-username.js"; import { UsedUsername } from "@/models/entities/used-username.js";
import { db } from "@/db/postgre.js"; import { db } from "@/db/postgre.js";
import { hashPassword } from "@/misc/password.js"; import { hashPassword } from "@/misc/password.js";
import { Users } from "@/models/index.js";
export async function createSystemUser(username: string) { export async function createSystemUser(username: string): Promise<User> {
const password = uuid(); const password = uuid();
// Generate hash of password // Generate hash of password
@ -23,49 +24,51 @@ export async function createSystemUser(username: string) {
let account!: User; let account!: User;
// Start transaction const exist = await Users.findOneBy({
await db.transaction(async (transactionalEntityManager) => { usernameLower: username.toLowerCase(),
const exist = await transactionalEntityManager.findOneBy(User, { host: IsNull(),
usernameLower: username.toLowerCase(),
host: IsNull(),
});
if (exist) throw new Error("the user is already exists");
account = await transactionalEntityManager
.insert(User, {
id: genId(),
createdAt: new Date(),
username: username,
usernameLower: username.toLowerCase(),
host: null,
token: secret,
isAdmin: false,
isLocked: true,
isExplorable: false,
isBot: true,
})
.then((x) =>
transactionalEntityManager.findOneByOrFail(User, x.identifiers[0]),
);
await transactionalEntityManager.insert(UserKeypair, {
publicKey: keyPair.publicKey,
privateKey: keyPair.privateKey,
userId: account.id,
});
await transactionalEntityManager.insert(UserProfile, {
userId: account.id,
autoAcceptFollowed: false,
password: hash,
});
await transactionalEntityManager.insert(UsedUsername, {
createdAt: new Date(),
username: username.toLowerCase(),
});
}); });
return account; if (exist) throw new Error("the user is already exists");
// Prepare objects
const user = {
id: genId(),
createdAt: new Date(),
username: username,
usernameLower: username.toLowerCase(),
host: null,
token: secret,
isAdmin: false,
isLocked: true,
isExplorable: false,
isBot: true,
};
const userKeypair = {
publicKey: keyPair.publicKey,
privateKey: keyPair.privateKey,
userId: user.id,
};
const userProfile = {
userId: user.id,
autoAcceptFollowed: false,
password: hash,
};
const usedUsername = {
createdAt: new Date(),
username: username.toLowerCase(),
}
// Save the objects atomically using a db transaction, note that we should never run any code in a transaction block directly
await db.transaction(async (transactionalEntityManager) => {
await transactionalEntityManager.insert(User, user);
await transactionalEntityManager.insert(UserKeypair, userKeypair);
await transactionalEntityManager.insert(UserProfile, userProfile);
await transactionalEntityManager.insert(UsedUsername, usedUsername);
});
return Users.findOneByOrFail({ id: user.id });
} }

View file

@ -756,30 +756,30 @@ async function insertNote(
// 投稿を作成 // 投稿を作成
try { try {
if (insert.hasPoll) { if (insert.hasPoll) {
// Start transaction // Prepare objects
if (!data.poll) throw new Error("Empty poll data");
let expiresAt: Date | null;
if (!data.poll.expiresAt || isNaN(data.poll.expiresAt.getTime())) {
expiresAt = null;
} else {
expiresAt = data.poll.expiresAt;
}
const poll = new Poll({
noteId: insert.id,
choices: data.poll.choices,
expiresAt,
multiple: data.poll.multiple,
votes: new Array(data.poll.choices.length).fill(0),
noteVisibility: insert.visibility,
userId: user.id,
userHost: user.host,
});
// Save the objects atomically using a db transaction, note that we should never run any code in a transaction block directly
await db.transaction(async (transactionalEntityManager) => { await db.transaction(async (transactionalEntityManager) => {
if (!data.poll) throw new Error("Empty poll data");
await transactionalEntityManager.insert(Note, insert); await transactionalEntityManager.insert(Note, insert);
let expiresAt: Date | null;
if (!data.poll.expiresAt || isNaN(data.poll.expiresAt.getTime())) {
expiresAt = null;
} else {
expiresAt = data.poll.expiresAt;
}
const poll = new Poll({
noteId: insert.id,
choices: data.poll.choices,
expiresAt,
multiple: data.poll.multiple,
votes: new Array(data.poll.choices.length).fill(0),
noteVisibility: insert.visibility,
userId: user.id,
userHost: user.host,
});
await transactionalEntityManager.insert(Poll, poll); await transactionalEntityManager.insert(Poll, poll);
}); });
} else { } else {