Add sonic full-text search support (#9714)

This pull request adds support for the [sonic](https://github.com/valeriansaliou/sonic) full text indexing server into Calckey.

In addition to this, a stateful endpoint has been added that will completely (re-)index all notes into any (elasticsearch and/or sonic) indexing server defined in your config at `/api/admin/search/index-all`. It can (optionally) take input data to define the starting point, such as:

```
{"cursor": "9beg3lx6ad"}
```

Currently if both sonic and elasticsearch are defined in the config, sonic will take precedence for searching, but both indexes will continue to be updated for new note creations. Future enhancements may include the ability to choose which indexer to use (or combine multiple).

Co-authored-by: Kaitlyn Allan <kaitlyn.allan@enlabs.cloud>
Reviewed-on: https://codeberg.org/calckey/calckey/pulls/9714
Co-authored-by: Kaity A <supakaity@noreply.codeberg.org>
Co-committed-by: Kaity A <supakaity@noreply.codeberg.org>
This commit is contained in:
Kaity A 2023-03-19 08:26:47 +00:00 committed by Kainoa Kanter
parent 3f58232a39
commit 706b4ae602
15 changed files with 328 additions and 14 deletions

View file

@ -72,6 +72,16 @@ redis:
# user:
# pass:
# ┌─────────────────────┐
#───┘ Sonic configuration └─────────────────────────────────────
#sonic:
# host: localhost
# port: 1491
# auth: SecretPassword
# collection: notes
# bucket: default
# ┌───────────────┐
#───┘ ID generation └───────────────────────────────────────────

1
.gitignore vendored
View file

@ -44,6 +44,7 @@ ormconfig.json
packages/backend/assets/instance.css
packages/backend/assets/sounds/None.mp3
!packages/backend/src/db
# blender backups
*.blend1

View file

@ -112,6 +112,7 @@
"seedrandom": "^3.0.5",
"semver": "7.3.8",
"sharp": "0.31.3",
"sonic-channel": "^1.3.1",
"speakeasy": "2.0.0",
"stringz": "2.1.0",
"summaly": "2.7.0",

View file

@ -32,6 +32,13 @@ export type Source = {
pass?: string;
index?: string;
};
sonic: {
host: string;
port: number;
auth?: string;
collection?: string;
bucket?: string;
};
proxy?: string;
proxySmtp?: string;

View file

@ -0,0 +1,51 @@
import * as SonicChannel from "sonic-channel";
import { dbLogger } from "./logger.js";
import config from "@/config/index.js";
const logger = dbLogger.createSubLogger("sonic", "gray", false);
logger.info("Connecting to Sonic");
const handlers = (type: string): SonicChannel.Handlers => (
{
connected: () => {
logger.succ(`Connected to Sonic ${type}`);
},
disconnected: (error) => {
logger.warn(`Disconnected from Sonic ${type}, error: ${error}`);
},
error: (error) => {
logger.warn(`Sonic ${type} error: ${error}`);
},
retrying: () => {
logger.info(`Sonic ${type} retrying`);
},
timeout: () => {
logger.warn(`Sonic ${type} timeout`);
},
}
)
const hasConfig =
config.sonic
&& ( config.sonic.host
|| config.sonic.port
|| config.sonic.auth
)
const host = hasConfig ? config.sonic.host ?? "localhost" : "";
const port = hasConfig ? config.sonic.port ?? 1491 : 0;
const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : "";
const collection = hasConfig ? config.sonic.collection ?? "main" : "";
const bucket = hasConfig ? config.sonic.bucket ?? "default" : "";
export default hasConfig
? {
search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")),
ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")),
collection,
bucket,
}
: null;

View file

@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js";
import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js";
import processBackground from "./processors/background/index.js";
import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js";
@ -24,6 +25,7 @@ import {
objectStorageQueue,
endedPollNotificationQueue,
webhookDeliverQueue,
backgroundQueue,
} from "./queues.js";
import type { ThinUser } from "./types.js";
@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() {
);
}
export function createIndexAllNotesJob(data = {}) {
return backgroundQueue.add(
"indexAllNotes",
data,
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function webhookDeliver(
webhook: Webhook,
type: typeof webhookEventTypes[number],
@ -454,6 +467,7 @@ export default function () {
webhookDeliverQueue.process(64, processWebhookDeliver);
processDb(dbQueue);
processObjectStorage(objectStorageQueue);
processBackground(backgroundQueue);
systemQueue.add(
"tickCharts",

View file

@ -0,0 +1,76 @@
import type Bull from "bull";
import { queueLogger } from "../../logger.js";
import { Notes } from "@/models/index.js";
import { MoreThan } from "typeorm";
import { index } from "@/services/note/create.js"
import { Note } from "@/models/entities/note.js";
const logger = queueLogger.createSubLogger("index-all-notes");
export default async function indexAllNotes(
job: Bull.Job<Record<string, unknown>>,
done: ()=>void,
): Promise<void> {
logger.info("Indexing all notes...");
let cursor: string|null = job.data.cursor as string ?? null;
let indexedCount: number = job.data.indexedCount as number ?? 0;
let total: number = job.data.total as number ?? 0;
let running = true;
const take = 50000;
const batch = 100;
while (running) {
logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`);
let notes: Note[] = [];
try {
notes = await Notes.find({
where: {
...(cursor ? { id: MoreThan(cursor) } : {}),
},
take: take,
order: {
id: 1,
},
});
} catch (e) {
logger.error(`Failed to query notes ${e}`);
continue;
}
if (notes.length === 0) {
job.progress(100);
running = false;
break;
}
try {
const count = await Notes.count();
total = count;
job.update({ indexedCount, cursor, total })
} catch (e) {
}
for (let i = 0; i < notes.length; i += batch) {
const chunk = notes.slice(i, i + batch);
await Promise.all(chunk.map(note => index(note)));
indexedCount += chunk.length;
const pct = (indexedCount / total)*100;
job.update({ indexedCount, cursor, total })
job.progress(+(pct.toFixed(1)));
logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`);
}
cursor = notes[notes.length - 1].id;
job.update({ indexedCount, cursor, total })
if (notes.length < take) {
running = false;
}
}
done();
logger.info("All notes have been indexed.");
}

View file

@ -0,0 +1,15 @@
import type Bull from "bull";
import indexAllNotes from "./index-all-notes.js";
const jobs = {
indexAllNotes,
} as Record<
string,
Bull.ProcessCallbackFunction<Record<string, unknown>>
>;
export default function (q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
}

View file

@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue<WebhookDeliverJobData>(
"webhookDeliver",
64,
);
export const backgroundQueue = initializeQueue<Record<string, unknown>>("bg");
export const queues = [
systemQueue,
@ -36,4 +37,5 @@ export const queues = [
dbQueue,
objectStorageQueue,
webhookDeliverQueue,
backgroundQueue,
];

View file

@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js";
import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js";
import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js";
import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js";
import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js";
import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js";
import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js";
import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js";
@ -393,6 +394,7 @@ const eps = [
["admin/relays/remove", ep___admin_relays_remove],
["admin/reset-password", ep___admin_resetPassword],
["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport],
["admin/search/index-all", ep___admin_search_indexAll],
["admin/send-email", ep___admin_sendEmail],
["admin/server-info", ep___admin_serverInfo],
["admin/show-moderation-logs", ep___admin_showModerationLogs],

View file

@ -3,6 +3,7 @@ import {
inboxQueue,
dbQueue,
objectStorageQueue,
backgroundQueue,
} from "@/queue/queues.js";
import define from "../../../define.js";
@ -37,6 +38,11 @@ export const meta = {
nullable: false,
ref: "QueueCount",
},
backgroundQueue: {
optional: false,
nullable: false,
ref: "QueueCount",
},
},
},
} as const;
@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => {
const inboxJobCounts = await inboxQueue.getJobCounts();
const dbJobCounts = await dbQueue.getJobCounts();
const objectStorageJobCounts = await objectStorageQueue.getJobCounts();
const backgroundJobCounts = await backgroundQueue.getJobCounts();
return {
deliver: deliverJobCounts,
inbox: inboxJobCounts,
db: dbJobCounts,
objectStorage: objectStorageJobCounts,
backgroundQueue: backgroundJobCounts,
};
});

View file

@ -0,0 +1,28 @@
import define from "../../../define.js";
import { createIndexAllNotesJob } from "@/queue/index.js";
export const meta = {
tags: ["admin"],
requireCredential: true,
requireModerator: true,
} as const;
export const paramDef = {
type: "object",
properties: {
cursor: {
type: "string",
format: "misskey:id",
nullable: true,
default: null,
},
},
required: [],
} as const;
export default define(meta, paramDef, async (ps, _me) => {
createIndexAllNotesJob({
cursor: ps.cursor ?? undefined,
});
});

View file

@ -1,7 +1,9 @@
import { In } from "typeorm";
import { Notes } from "@/models/index.js";
import { Note } from "@/models/entities/note.js";
import config from "@/config/index.js";
import es from "../../../../db/elasticsearch.js";
import sonic from "../../../../db/sonic.js";
import define from "../../define.js";
import { makePaginationQuery } from "../../common/make-pagination-query.js";
import { generateVisibilityQuery } from "../../common/generate-visibility-query.js";
@ -59,7 +61,7 @@ export const paramDef = {
} as const;
export default define(meta, paramDef, async (ps, me) => {
if (es == null) {
if (es == null && sonic == null) {
const query = makePaginationQuery(
Notes.createQueryBuilder("note"),
ps.sinceId,
@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => {
if (me) generateMutedUserQuery(query, me);
if (me) generateBlockedUserQuery(query, me);
const notes = await query.take(ps.limit).getMany();
const notes: Note[] = await query.take(ps.limit).getMany();
return await Notes.packMany(notes, me);
} else if (sonic) {
let start = 0;
const chunkSize = 100;
// Use sonic to fetch and step through all search results that could match the requirements
const ids = [];
while (true) {
const results = await sonic.search.query(
sonic.collection,
sonic.bucket,
ps.query,
{
limit: chunkSize,
offset: start,
},
);
start += chunkSize;
if (results.length === 0) {
break;
}
const res = results
.map((k) => JSON.parse(k))
.filter((key) => {
if (ps.userId && key.userId !== ps.userId) {
return false;
}
if (ps.channelId && key.channelId !== ps.channelId) {
return false;
}
if (ps.sinceId && key.id <= ps.sinceId) {
return false;
}
if (ps.untilId && key.id >= ps.untilId) {
return false;
}
return true;
})
.map((key) => key.id);
ids.push(...res);
}
// Sort all the results by note id DESC (newest first)
ids.sort((a, b) => b - a);
// Fetch the notes from the database until we have enough to satisfy the limit
start = 0;
const found = [];
while (found.length < ps.limit && start < ids.length) {
const chunk = ids.slice(start, start + chunkSize);
const notes: Note[] = await Notes.find({
where: {
id: In(chunk),
},
order: {
id: "DESC",
},
});
// The notes are checked for visibility and muted/blocked users when packed
found.push(...await Notes.packMany(notes, me));
start += chunkSize;
}
// If we have more results than the limit, trim them
if (found.length > ps.limit) {
found.length = ps.limit;
}
return found;
} else {
const userQuery =
ps.userId != null

View file

@ -1,5 +1,6 @@
import * as mfm from "mfm-js";
import es from "../../db/elasticsearch.js";
import sonic from "../../db/sonic.js";
import {
publishMainStream,
publishNotesStream,
@ -588,7 +589,7 @@ export default async (
}
// Register to search database
index(note);
await index(note);
});
async function renderNoteOrRenoteActivity(data: Option, note: Note) {
@ -728,18 +729,34 @@ async function insertNote(
}
}
function index(note: Note) {
if (note.text == null || config.elasticsearch == null) return;
export async function index(note: Note): Promise<void> {
if (!note.text) return;
es!.index({
index: config.elasticsearch.index || "misskey_note",
id: note.id.toString(),
body: {
text: normalizeForSearch(note.text),
userId: note.userId,
userHost: note.userHost,
},
});
if (config.elasticsearch && es) {
es.index({
index: config.elasticsearch.index || "misskey_note",
id: note.id.toString(),
body: {
text: normalizeForSearch(note.text),
userId: note.userId,
userHost: note.userHost,
},
});
}
if (sonic) {
await sonic.ingest.push(
sonic.collection,
sonic.bucket,
JSON.stringify({
id: note.id,
userId: note.userId,
userHost: note.userHost,
channelId: note.channelId,
}),
note.text,
);
}
}
async function notifyToWatchersOfRenotee(

View file

@ -196,6 +196,7 @@ importers:
seedrandom: ^3.0.5
semver: 7.3.8
sharp: 0.31.3
sonic-channel: ^1.3.1
speakeasy: 2.0.0
strict-event-emitter-types: 2.0.0
stringz: 2.1.0
@ -310,6 +311,7 @@ importers:
seedrandom: 3.0.5
semver: 7.3.8
sharp: 0.31.3
sonic-channel: 1.3.1
speakeasy: 2.0.0
stringz: 2.1.0
summaly: 2.7.0
@ -11594,6 +11596,11 @@ packages:
smart-buffer: 4.2.0
dev: false
/sonic-channel/1.3.1:
resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==}
engines: {node: '>= 6.0.0'}
dev: false
/sort-keys-length/1.0.1:
resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==}
engines: {node: '>=0.10.0'}