refactor: use redis-semaphore for mutex across workers

This commit is contained in:
Namekuji 2023-07-01 00:50:46 -04:00
parent f53fa96fc6
commit 38d4d34713
No known key found for this signature in database
GPG key ID: 1D62332C07FBA532
3 changed files with 53 additions and 54 deletions

View file

@ -112,6 +112,7 @@
"ratelimiter": "3.4.1", "ratelimiter": "3.4.1",
"re2": "1.19.0", "re2": "1.19.0",
"redis-lock": "0.1.4", "redis-lock": "0.1.4",
"redis-semaphore": "5.3.1",
"reflect-metadata": "0.1.13", "reflect-metadata": "0.1.13",
"rename": "1.0.4", "rename": "1.0.4",
"rndstr": "1.0.0", "rndstr": "1.0.0",

View file

@ -69,6 +69,7 @@ import { getActiveWebhooks } from "@/misc/webhook-cache.js";
import { shouldSilenceInstance } from "@/misc/should-block-instance.js"; import { shouldSilenceInstance } from "@/misc/should-block-instance.js";
import meilisearch from "../../db/meilisearch.js"; import meilisearch from "../../db/meilisearch.js";
import { redisClient } from "@/db/redis.js"; import { redisClient } from "@/db/redis.js";
import { Mutex } from "redis-semaphore";
const mutedWordsCache = new Cache< const mutedWordsCache = new Cache<
{ userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[] { userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[]
@ -461,63 +462,43 @@ export default async (
} }
if (!dontFederateInitially) { if (!dontFederateInitially) {
if (Users.isLocalUser(user)) { let publishKey: string;
// Publish if the post is local let noteToPublish: Note;
publishNotesStream(note); const relays = await getCachedRelays();
} else {
const relays = await getCachedRelays();
// Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor.
const boostedByRelay =
!!user.inbox &&
relays.map((relay) => relay.inbox).includes(user.inbox);
if (boostedByRelay && data.renote && data.renote.userHost) { // Some relays (e.g., aode-relay) deliver posts by boosting them as
/* A relay boosted a remote post. */ // Announce activities. In that case, user is the relay's actor.
// Use Redis transaction for atomicity const boostedByRelay =
const key = `publishedNote:${data.renote.id}`; !!user.inbox &&
await redisClient.watch(key); relays.map((relay) => relay.inbox).includes(user.inbox);
const exists = await redisClient.exists(key);
if (exists === 0) { if (boostedByRelay && data.renote && data.renote.userHost) {
// Start the transaction publishKey = `publishedNote:${data.renote.id}`;
const transaction = redisClient.multi(); noteToPublish = data.renote;
transaction.set(key, 1, "EX", 30); } else {
// Execute the transaction publishKey = `publishedNote:${note.id}`;
await transaction.exec((err, _replies) => { noteToPublish = note;
// Publish after setting the key in Redis }
if (!err && boostedByRelay && data.renote) {
publishNotesStream(data.renote); const lock = new Mutex(redisClient, "publishedNote:lock");
} await lock.acquire();
}); try {
} else { const exists = (await redisClient.exists(publishKey)) > 0;
// Abort the transaction if (!exists) {
redisClient.unwatch(); await redisClient.set(publishKey, 1, "EX", 30);
} if (noteToPublish.renoteId) {
} else { // Prevents other threads from publishing the boosting post
// Use Redis transaction for atomicity await redisClient.set(
const key = `publishedNote:${note.id}`; `publishedNote:${noteToPublish.renoteId}`,
await redisClient.watch(key); 1,
const exists = await redisClient.exists(key); "EX",
if (exists === 0) { 30,
// Start the transaction );
const transaction = redisClient.multi();
transaction.set(key, 1, "EX", 30);
if (note.renoteId) {
// Prevent other threads from publishing this boosting post
transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30);
}
// Execute the transaction
await transaction.exec((err, _replies) => {
// Publish after setting the key in Redis
if (!err) {
publishNotesStream(note);
}
});
} else {
// Abort the transaction
redisClient.unwatch();
} }
publishNotesStream(noteToPublish);
} }
} finally {
lock.release();
} }
} }
if (note.replyId != null) { if (note.replyId != null) {

View file

@ -339,6 +339,9 @@ importers:
redis-lock: redis-lock:
specifier: 0.1.4 specifier: 0.1.4
version: 0.1.4 version: 0.1.4
redis-semaphore:
specifier: 5.3.1
version: 5.3.1(ioredis@5.3.2)
reflect-metadata: reflect-metadata:
specifier: 0.1.13 specifier: 0.1.13
version: 0.1.13 version: 0.1.13
@ -2666,6 +2669,7 @@ packages:
engines: {node: '>=10'} engines: {node: '>=10'}
cpu: [arm64] cpu: [arm64]
os: [android] os: [android]
requiresBuild: true
dependencies: dependencies:
'@swc/wasm': 1.2.130 '@swc/wasm': 1.2.130
@ -2772,6 +2776,7 @@ packages:
/@swc/wasm@1.2.130: /@swc/wasm@1.2.130:
resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==} resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==}
requiresBuild: true
/@syuilo/aiscript@0.11.1: /@syuilo/aiscript@0.11.1:
resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==} resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==}
@ -12831,6 +12836,18 @@ packages:
dependencies: dependencies:
redis-errors: 1.2.0 redis-errors: 1.2.0
/redis-semaphore@5.3.1(ioredis@5.3.2):
resolution: {integrity: sha512-oUpxxfxSbh5eT0mvVpz2d4Qlg2CsaoQkeo80/v6CU2l97zO0u6NPgc9/zQZa9KGR3/93b0igtSct3hEFh8Ei8w==}
engines: {node: '>= 14.17.0'}
peerDependencies:
ioredis: ^4.1.0 || ^5
dependencies:
debug: 4.3.4(supports-color@8.1.1)
ioredis: 5.3.2
transitivePeerDependencies:
- supports-color
dev: false
/redis@4.6.7: /redis@4.6.7:
resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==} resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==}
dependencies: dependencies: