diff --git a/src/notify.ts b/src/notify.ts index f55dea167..ea7423655 100644 --- a/src/notify.ts +++ b/src/notify.ts @@ -2,7 +2,7 @@ import * as mongo from 'mongodb'; import Notification from './models/notification'; import Mute from './models/mute'; import { pack } from './models/notification'; -import stream from './stream'; +import { publishUserStream } from './stream'; import User from './models/user'; import pushSw from './push-sw'; @@ -30,7 +30,7 @@ export default ( const packed = await pack(notification); // Publish notification event - stream(notifiee, 'notification', packed); + publishUserStream(notifiee, 'notification', packed); // Update flag User.update({ _id: notifiee }, { @@ -54,7 +54,7 @@ export default ( } //#endregion - stream(notifiee, 'unread_notification', packed); + publishUserStream(notifiee, 'unread_notification', packed); pushSw(notifiee, 'notification', packed); } diff --git a/src/server/api/common/read-messaging-message.ts b/src/server/api/common/read-messaging-message.ts index a34fd8a70..005240a37 100644 --- a/src/server/api/common/read-messaging-message.ts +++ b/src/server/api/common/read-messaging-message.ts @@ -1,7 +1,7 @@ import * as mongo from 'mongodb'; import Message from '../../../models/messaging-message'; import { IMessagingMessage as IMessage } from '../../../models/messaging-message'; -import publishUserStream from '../../../stream'; +import { publishUserStream } from '../../../stream'; import { publishMessagingStream } from '../../../stream'; import { publishMessagingIndexStream } from '../../../stream'; import User from '../../../models/user'; diff --git a/src/server/api/common/read-notification.ts b/src/server/api/common/read-notification.ts index 3a1f4cfbd..0b0f3e4e5 100644 --- a/src/server/api/common/read-notification.ts +++ b/src/server/api/common/read-notification.ts @@ -1,6 +1,6 @@ import * as mongo from 'mongodb'; import { default as Notification, INotification } from '../../../models/notification'; -import publishUserStream from '../../../stream'; +import { publishUserStream } from '../../../stream'; import Mute from '../../../models/mute'; import User from '../../../models/user'; diff --git a/src/server/api/endpoints/games/reversi/match.ts b/src/server/api/endpoints/games/reversi/match.ts index 24746170f..aba400af1 100644 --- a/src/server/api/endpoints/games/reversi/match.ts +++ b/src/server/api/endpoints/games/reversi/match.ts @@ -2,7 +2,7 @@ import $ from 'cafy'; import ID from '../../../../../misc/cafy-id'; import Matching, { pack as packMatching } from '../../../../../models/games/reversi/matching'; import ReversiGame, { pack as packGame } from '../../../../../models/games/reversi/game'; import User, { ILocalUser } from '../../../../../models/user'; -import publishUserStream, { publishReversiStream } from '../../../../../stream'; +import { publishUserStream, publishReversiStream } from '../../../../../stream'; import { eighteight } from '../../../../../games/reversi/maps'; export const meta = { diff --git a/src/server/api/endpoints/i/regenerate_token.ts b/src/server/api/endpoints/i/regenerate_token.ts index 374861daa..fe4a5cd11 100644 --- a/src/server/api/endpoints/i/regenerate_token.ts +++ b/src/server/api/endpoints/i/regenerate_token.ts @@ -1,7 +1,7 @@ import $ from 'cafy'; import * as bcrypt from 'bcryptjs'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import generateUserToken from '../../common/generate-native-user-token'; export const meta = { @@ -33,5 +33,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - event(user._id, 'my_token_regenerated'); + publishUserStream(user._id, 'my_token_regenerated'); }); diff --git a/src/server/api/endpoints/i/update.ts b/src/server/api/endpoints/i/update.ts index 019c8281a..aa801b1b0 100644 --- a/src/server/api/endpoints/i/update.ts +++ b/src/server/api/endpoints/i/update.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import ID from '../../../../misc/cafy-id'; import User, { isValidName, isValidDescription, isValidLocation, isValidBirthday, pack, ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import DriveFile from '../../../../models/drive-file'; import acceptAllFollowRequests from '../../../../services/following/requests/accept-all'; import { IApp } from '../../../../models/app'; @@ -133,7 +133,7 @@ export default async (params: any, user: ILocalUser, app: IApp) => new Promise(a res(iObj); // Publish meUpdated event - event(user._id, 'meUpdated', iObj); + publishUserStream(user._id, 'meUpdated', iObj); // 鍵垢を解除したとき、溜まっていたフォローリクエストがあるならすべて承認 if (user.isLocked && isLocked === false) { diff --git a/src/server/api/endpoints/i/update_client_setting.ts b/src/server/api/endpoints/i/update_client_setting.ts index 9342f5dad..aed93c792 100644 --- a/src/server/api/endpoints/i/update_client_setting.ts +++ b/src/server/api/endpoints/i/update_client_setting.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -26,7 +26,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); // Publish event - event(user._id, 'clientSettingUpdated', { + publishUserStream(user._id, 'clientSettingUpdated', { key: name, value }); diff --git a/src/server/api/endpoints/i/update_home.ts b/src/server/api/endpoints/i/update_home.ts index 6f3985429..ffca9b90b 100644 --- a/src/server/api/endpoints/i/update_home.ts +++ b/src/server/api/endpoints/i/update_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -25,5 +25,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - event(user._id, 'home_updated', home); + publishUserStream(user._id, 'home_updated', home); }); diff --git a/src/server/api/endpoints/i/update_mobile_home.ts b/src/server/api/endpoints/i/update_mobile_home.ts index 1babe409e..0b72fbe2c 100644 --- a/src/server/api/endpoints/i/update_mobile_home.ts +++ b/src/server/api/endpoints/i/update_mobile_home.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -24,5 +24,5 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, res(); - event(user._id, 'mobile_home_updated', home); + publishUserStream(user._id, 'mobile_home_updated', home); }); diff --git a/src/server/api/endpoints/i/update_widget.ts b/src/server/api/endpoints/i/update_widget.ts index 5bf9c2305..5cbe7c07a 100644 --- a/src/server/api/endpoints/i/update_widget.ts +++ b/src/server/api/endpoints/i/update_widget.ts @@ -1,6 +1,6 @@ import $ from 'cafy'; import User, { ILocalUser } from '../../../../models/user'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; export const meta = { requireCredential: true, @@ -73,7 +73,7 @@ export default async (params: any, user: ILocalUser) => new Promise(async (res, //#endregion if (widget) { - event(user._id, 'widgetUpdated', { + publishUserStream(user._id, 'widgetUpdated', { id, data }); diff --git a/src/server/api/endpoints/messaging/messages/create.ts b/src/server/api/endpoints/messaging/messages/create.ts index 9b897b45e..8ebf1a2a2 100644 --- a/src/server/api/endpoints/messaging/messages/create.ts +++ b/src/server/api/endpoints/messaging/messages/create.ts @@ -6,7 +6,7 @@ import User, { ILocalUser } from '../../../../../models/user'; import Mute from '../../../../../models/mute'; import DriveFile from '../../../../../models/drive-file'; import { pack } from '../../../../../models/messaging-message'; -import publishUserStream from '../../../../../stream'; +import { publishUserStream } from '../../../../../stream'; import { publishMessagingStream, publishMessagingIndexStream } from '../../../../../stream'; import pushSw from '../../../../../push-sw'; import config from '../../../../../config'; diff --git a/src/server/api/endpoints/notifications/mark_all_as_read.ts b/src/server/api/endpoints/notifications/mark_all_as_read.ts index 91319d055..a9875ebb0 100644 --- a/src/server/api/endpoints/notifications/mark_all_as_read.ts +++ b/src/server/api/endpoints/notifications/mark_all_as_read.ts @@ -1,5 +1,5 @@ import Notification from '../../../../models/notification'; -import event from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; import User, { ILocalUser } from '../../../../models/user'; export const meta = { @@ -40,5 +40,5 @@ export default (params: any, user: ILocalUser) => new Promise(async (res, rej) = }); // 全ての通知を読みましたよというイベントを発行 - event(user._id, 'read_all_notifications'); + publishUserStream(user._id, 'read_all_notifications'); }); diff --git a/src/server/api/private/signin.ts b/src/server/api/private/signin.ts index 9719329dd..65413208d 100644 --- a/src/server/api/private/signin.ts +++ b/src/server/api/private/signin.ts @@ -3,7 +3,7 @@ import * as bcrypt from 'bcryptjs'; import * as speakeasy from 'speakeasy'; import User, { ILocalUser } from '../../../models/user'; import Signin, { pack } from '../../../models/signin'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; import signin from '../common/signin'; import config from '../../../config'; @@ -86,5 +86,5 @@ export default async (ctx: Koa.Context) => { }); // Publish signin event - event(user._id, 'signin', await pack(record)); + publishUserStream(user._id, 'signin', await pack(record)); }; diff --git a/src/server/api/service/twitter.ts b/src/server/api/service/twitter.ts index 080f5879a..8c668e832 100644 --- a/src/server/api/service/twitter.ts +++ b/src/server/api/service/twitter.ts @@ -4,7 +4,7 @@ import * as uuid from 'uuid'; import autwh from 'autwh'; import redis from '../../../db/redis'; import User, { pack, ILocalUser } from '../../../models/user'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; import config from '../../../config'; import signin from '../common/signin'; @@ -49,7 +49,7 @@ router.get('/disconnect/twitter', async ctx => { ctx.body = `Twitterの連携を解除しました :v:`; // Publish i updated event - event(user._id, 'meUpdated', await pack(user, user, { + publishUserStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); @@ -174,7 +174,7 @@ if (config.twitter == null) { ctx.body = `Twitter: @${result.screenName} を、Misskey: @${user.username} に接続しました!`; // Publish i updated event - event(user._id, 'meUpdated', await pack(user, user, { + publishUserStream(user._id, 'meUpdated', await pack(user, user, { detail: true, includeSecrets: true })); diff --git a/src/server/api/stream/drive.ts b/src/server/api/stream/drive.ts index c97ab80dc..28c241e1b 100644 --- a/src/server/api/stream/drive.ts +++ b/src/server/api/stream/drive.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe drive stream - subscriber.subscribe(`misskey:drive-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`drive-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/games/reversi-game.ts b/src/server/api/stream/games/reversi-game.ts index da949e90f..5cbbf42d5 100644 --- a/src/server/api/stream/games/reversi-game.ts +++ b/src/server/api/stream/games/reversi-game.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as CRC32 from 'crc-32'; import ReversiGame, { pack } from '../../../../models/games/reversi/game'; import { publishReversiGameStream } from '../../../../stream'; @@ -7,14 +7,13 @@ import Reversi from '../../../../games/reversi/core'; import * as maps from '../../../../games/reversi/maps'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user?: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user?: any): void { const q = request.resourceURL.query as ParsedUrlQuery; - const gameId = q.game; + const gameId = q.game as string; // Subscribe game stream - subscriber.subscribe(`misskey:reversi-game-stream:${gameId}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-game-stream:${gameId}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/games/reversi.ts b/src/server/api/stream/games/reversi.ts index 3f2346652..f467613b2 100644 --- a/src/server/api/stream/games/reversi.ts +++ b/src/server/api/stream/games/reversi.ts @@ -1,14 +1,13 @@ import * as mongo from 'mongodb'; import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import Matching, { pack } from '../../../../models/games/reversi/matching'; -import publishUserStream from '../../../../stream'; +import { publishUserStream } from '../../../../stream'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe reversi stream - subscriber.subscribe(`misskey:reversi-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`reversi-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/global-timeline.ts b/src/server/api/stream/global-timeline.ts index f31ce1775..4786450cb 100644 --- a/src/server/api/stream/global-timeline.ts +++ b/src/server/api/stream/global-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -7,18 +7,14 @@ import Mute from '../../../models/mute'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe(`misskey:global-timeline`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('global-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/home.ts b/src/server/api/stream/home.ts index d9b8f7fb9..dc3ce9d19 100644 --- a/src/server/api/stream/home.ts +++ b/src/server/api/stream/home.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import * as debug from 'debug'; import User, { IUser } from '../../../models/user'; @@ -14,68 +14,54 @@ const log = debug('misskey'); export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser, app: IApp ) { - // Subscribe Home stream channel - subscriber.subscribe(`misskey:user-stream:${user._id}`); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (channel, data) => { - switch (channel.split(':')[1]) { - case 'user-stream': - try { - const x = JSON.parse(data); + async function onNoteStream(noteId: any) { + const note = await packNote(noteId, user, { + detail: true + }); - //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する - if (x.type == 'note') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { - return; - } - if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { - return; - } - } else if (x.type == 'notification') { - if (mutedUserIds.includes(x.body.userId)) { - return; - } - } - //#endregion + connection.send(JSON.stringify({ + type: 'note-updated', + body: { + note: note + } + })); + } - // Renoteなら再pack - if (x.type == 'note' && x.body.renoteId != null) { - x.body.renote = await pack(x.body.renoteId, user, { - detail: true - }); - data = JSON.stringify(x); - } - - connection.send(data); - } catch (e) { - connection.send(data); - } - break; - - case 'note-stream': - const noteId = channel.split(':')[2]; - log(`RECEIVED: ${noteId} ${data} by @${user.username}`); - const note = await packNote(noteId, user, { - detail: true - }); - connection.send(JSON.stringify({ - type: 'note-updated', - body: { - note: note - } - })); - break; + // Subscribe Home stream channel + subscriber.on(`user-stream:${user._id}`, async x => { + //#region 流れてきたメッセージがミュートしているユーザーが関わるものだったら無視する + if (x.type == 'note') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } + if (x.body.reply != null && mutedUserIds.includes(x.body.reply.userId)) { + return; + } + if (x.body.renote != null && mutedUserIds.includes(x.body.renote.userId)) { + return; + } + } else if (x.type == 'notification') { + if (mutedUserIds.includes(x.body.userId)) { + return; + } } + //#endregion + + // Renoteなら再pack + if (x.type == 'note' && x.body.renoteId != null) { + x.body.renote = await pack(x.body.renoteId, user, { + detail: true + }); + } + + connection.send(JSON.stringify(x)); }); connection.on('message', async data => { @@ -113,9 +99,14 @@ export default async function( case 'capture': if (!msg.id) return; - const noteId = msg.id; - log(`CAPTURE: ${noteId} by @${user.username}`); - subscriber.subscribe(`misskey:note-stream:${noteId}`); + log(`CAPTURE: ${msg.id} by @${user.username}`); + subscriber.on(`note-stream:${msg.id}`, onNoteStream); + break; + + case 'decapture': + if (!msg.id) return; + log(`DECAPTURE: ${msg.id} by @${user.username}`); + subscriber.off(`note-stream:${msg.id}`, onNoteStream); break; } }); diff --git a/src/server/api/stream/hybrid-timeline.ts b/src/server/api/stream/hybrid-timeline.ts index 513af9c1d..5f411317c 100644 --- a/src/server/api/stream/hybrid-timeline.ts +++ b/src/server/api/stream/hybrid-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,17 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { // Subscribe stream - subscriber.subscribe('misskey:hybrid-timeline', `misskey:hybrid-timeline:${user._id}`); + subscriber.on('hybrid-timeline', onEvent); + subscriber.on(`hybrid-timeline:${user._id}`, onEvent); const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + async function onEvent(note: any) { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; @@ -43,5 +42,5 @@ export default async function( type: 'note', body: note })); - }); + } } diff --git a/src/server/api/stream/local-timeline.ts b/src/server/api/stream/local-timeline.ts index 32718810d..82060a7aa 100644 --- a/src/server/api/stream/local-timeline.ts +++ b/src/server/api/stream/local-timeline.ts @@ -1,5 +1,5 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { IUser } from '../../../models/user'; import Mute from '../../../models/mute'; @@ -8,18 +8,14 @@ import { pack } from '../../../models/note'; export default async function( request: websocket.request, connection: websocket.connection, - subscriber: redis.RedisClient, + subscriber: Xev, user: IUser ) { - // Subscribe stream - subscriber.subscribe('misskey:local-timeline'); - const mute = await Mute.find({ muterId: user._id }); const mutedUserIds = mute.map(m => m.muteeId.toString()); - subscriber.on('message', async (_, data) => { - const note = JSON.parse(data); - + // Subscribe stream + subscriber.on('local-timeline', async note => { //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する if (mutedUserIds.indexOf(note.userId) != -1) { return; diff --git a/src/server/api/stream/messaging-index.ts b/src/server/api/stream/messaging-index.ts index c1b2fbc80..9af63f281 100644 --- a/src/server/api/stream/messaging-index.ts +++ b/src/server/api/stream/messaging-index.ts @@ -1,10 +1,9 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { // Subscribe messaging index stream - subscriber.subscribe(`misskey:messaging-index-stream:${user._id}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-index-stream:${user._id}`, data => { + connection.send(JSON.stringify(data)); }); } diff --git a/src/server/api/stream/messaging.ts b/src/server/api/stream/messaging.ts index 3e6c2cd50..8b352cea3 100644 --- a/src/server/api/stream/messaging.ts +++ b/src/server/api/stream/messaging.ts @@ -1,16 +1,15 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import read from '../common/read-messaging-message'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const otherparty = q.otherparty as string; // Subscribe messaging stream - subscriber.subscribe(`misskey:messaging-stream:${user._id}-${otherparty}`); - subscriber.on('message', (_, data) => { - connection.send(data); + subscriber.on(`messaging-stream:${user._id}-${otherparty}`, data => { + connection.send(JSON.stringify(data)); }); connection.on('message', async (data) => { diff --git a/src/server/api/stream/user-list.ts b/src/server/api/stream/user-list.ts index ba03b9786..33cc2a1ee 100644 --- a/src/server/api/stream/user-list.ts +++ b/src/server/api/stream/user-list.ts @@ -1,14 +1,13 @@ import * as websocket from 'websocket'; -import * as redis from 'redis'; +import Xev from 'xev'; import { ParsedUrlQuery } from 'querystring'; -export default function(request: websocket.request, connection: websocket.connection, subscriber: redis.RedisClient, user: any): void { +export default function(request: websocket.request, connection: websocket.connection, subscriber: Xev, user: any): void { const q = request.resourceURL.query as ParsedUrlQuery; const listId = q.listId as string; // Subscribe stream - subscriber.subscribe(`misskey:user-list-stream:${listId}`); - subscriber.on('message', (_, data) => { + subscriber.on(`user-list-stream:${listId}`, data => { connection.send(data); }); } diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index afa0de2ce..c8b2d4e0b 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -1,7 +1,6 @@ import * as http from 'http'; import * as websocket from 'websocket'; -import * as redis from 'redis'; -import config from '../../config'; +import Xev from 'xev'; import homeStream from './stream/home'; import localTimelineStream from './stream/local-timeline'; @@ -39,20 +38,17 @@ module.exports = (server: http.Server) => { return; } - // Connect to Redis - const subscriber = redis.createClient( - config.redis.port, config.redis.host); + const ev = new Xev(); - connection.on('close', () => { - subscriber.unsubscribe(); - subscriber.quit(); + connection.once('close', () => { + ev.removeAllListeners(); }); const q = request.resourceURL.query as ParsedUrlQuery; const [user, app] = await authenticate(q.i as string); if (request.resourceURL.pathname === '/games/reversi-game') { - reversiGameStream(request, connection, subscriber, user); + reversiGameStream(request, connection, ev, user); return; } @@ -75,7 +71,7 @@ module.exports = (server: http.Server) => { null; if (channel !== null) { - channel(request, connection, subscriber, user, app); + channel(request, connection, ev, user, app); } else { connection.close(); } diff --git a/src/services/drive/add-file.ts b/src/services/drive/add-file.ts index 3cab924b4..a04bab9db 100644 --- a/src/services/drive/add-file.ts +++ b/src/services/drive/add-file.ts @@ -13,7 +13,7 @@ import * as sharp from 'sharp'; import DriveFile, { IMetadata, getDriveFileBucket, IDriveFile } from '../../models/drive-file'; import DriveFolder from '../../models/drive-folder'; import { pack } from '../../models/drive-file'; -import event, { publishDriveStream } from '../../stream'; +import { publishUserStream, publishDriveStream } from '../../stream'; import { isLocalUser, IUser, IRemoteUser } from '../../models/user'; import delFile from './delete-file'; import config from '../../config'; @@ -309,7 +309,7 @@ export default async function( pack(driveFile).then(packedFile => { // Publish drive_file_created event - event(user._id, 'drive_file_created', packedFile); + publishUserStream(user._id, 'drive_file_created', packedFile); publishDriveStream(user._id, 'file_created', packedFile); }); diff --git a/src/services/following/create.ts b/src/services/following/create.ts index e1164c0bd..bd39b8e18 100644 --- a/src/services/following/create.ts +++ b/src/services/following/create.ts @@ -2,7 +2,7 @@ import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../ import Following from '../../models/following'; import FollowingLog from '../../models/following-log'; import FollowedLog from '../../models/followed-log'; -import event from '../../stream'; +import { publishUserStream } from '../../stream'; import notify from '../../notify'; import pack from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; @@ -61,12 +61,12 @@ export default async function(follower: IUser, followee: IUser) { // Publish follow event if (isLocalUser(follower)) { - packUser(followee, follower).then(packed => event(follower._id, 'follow', packed)); + packUser(followee, follower).then(packed => publishUserStream(follower._id, 'follow', packed)); } // Publish followed event if (isLocalUser(followee)) { - packUser(follower, followee).then(packed => event(followee._id, 'followed', packed)), + packUser(follower, followee).then(packed => publishUserStream(followee._id, 'followed', packed)), // 通知を作成 notify(followee._id, follower._id, 'follow'); diff --git a/src/services/following/delete.ts b/src/services/following/delete.ts index 75f255751..8a9f739bd 100644 --- a/src/services/following/delete.ts +++ b/src/services/following/delete.ts @@ -2,7 +2,7 @@ import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../ import Following from '../../models/following'; import FollowingLog from '../../models/following-log'; import FollowedLog from '../../models/followed-log'; -import event from '../../stream'; +import { publishUserStream } from '../../stream'; import pack from '../../remote/activitypub/renderer'; import renderFollow from '../../remote/activitypub/renderer/follow'; import renderUndo from '../../remote/activitypub/renderer/undo'; @@ -52,7 +52,7 @@ export default async function(follower: IUser, followee: IUser) { // Publish unfollow event if (isLocalUser(follower)) { - packUser(followee, follower).then(packed => event(follower._id, 'unfollow', packed)); + packUser(followee, follower).then(packed => publishUserStream(follower._id, 'unfollow', packed)); } if (isLocalUser(follower) && isRemoteUser(followee)) { diff --git a/src/services/following/requests/accept.ts b/src/services/following/requests/accept.ts index 817662ab6..bf8ed99e1 100644 --- a/src/services/following/requests/accept.ts +++ b/src/services/following/requests/accept.ts @@ -7,7 +7,7 @@ import { deliver } from '../../../queue'; import Following from '../../../models/following'; import FollowingLog from '../../../models/following-log'; import FollowedLog from '../../../models/followed-log'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; export default async function(followee: IUser, follower: IUser) { const following = await Following.insert({ @@ -74,5 +74,5 @@ export default async function(followee: IUser, follower: IUser) { packUser(followee, followee, { detail: true - }).then(packed => event(followee._id, 'meUpdated', packed)); + }).then(packed => publishUserStream(followee._id, 'meUpdated', packed)); } diff --git a/src/services/following/requests/cancel.ts b/src/services/following/requests/cancel.ts index 1a4c0033d..b0b574da5 100644 --- a/src/services/following/requests/cancel.ts +++ b/src/services/following/requests/cancel.ts @@ -4,7 +4,7 @@ import pack from '../../../remote/activitypub/renderer'; import renderFollow from '../../../remote/activitypub/renderer/follow'; import renderUndo from '../../../remote/activitypub/renderer/undo'; import { deliver } from '../../../queue'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; export default async function(followee: IUser, follower: IUser) { if (isRemoteUser(followee)) { @@ -25,5 +25,5 @@ export default async function(followee: IUser, follower: IUser) { packUser(followee, followee, { detail: true - }).then(packed => event(followee._id, 'meUpdated', packed)); + }).then(packed => publishUserStream(followee._id, 'meUpdated', packed)); } diff --git a/src/services/following/requests/create.ts b/src/services/following/requests/create.ts index 1b474dd8b..4c7c90cc0 100644 --- a/src/services/following/requests/create.ts +++ b/src/services/following/requests/create.ts @@ -1,5 +1,5 @@ import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../../models/user'; -import event from '../../../stream'; +import { publishUserStream } from '../../../stream'; import notify from '../../../notify'; import pack from '../../../remote/activitypub/renderer'; import renderFollow from '../../../remote/activitypub/renderer/follow'; @@ -35,11 +35,11 @@ export default async function(follower: IUser, followee: IUser) { // Publish receiveRequest event if (isLocalUser(followee)) { - packUser(follower, followee).then(packed => event(followee._id, 'receiveFollowRequest', packed)); + packUser(follower, followee).then(packed => publishUserStream(followee._id, 'receiveFollowRequest', packed)); packUser(followee, followee, { detail: true - }).then(packed => event(followee._id, 'meUpdated', packed)); + }).then(packed => publishUserStream(followee._id, 'meUpdated', packed)); // 通知を作成 notify(followee._id, follower._id, 'receiveFollowRequest'); diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 340120053..6e644ef1d 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -1,7 +1,7 @@ import es from '../../db/elasticsearch'; import Note, { pack, INote } from '../../models/note'; import User, { isLocalUser, IUser, isRemoteUser, IRemoteUser, ILocalUser } from '../../models/user'; -import stream, { publishLocalTimelineStream, publishHybridTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../stream'; +import { publishUserStream, publishLocalTimelineStream, publishHybridTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../stream'; import Following from '../../models/following'; import { deliver } from '../../queue'; import renderNote from '../../remote/activitypub/renderer/note'; @@ -13,7 +13,6 @@ import notify from '../../notify'; import NoteWatching from '../../models/note-watching'; import watch from './watch'; import Mute from '../../models/mute'; -import event from '../../stream'; import parse from '../../mfm/parse'; import { IApp } from '../../models/app'; import UserList from '../../models/user-list'; @@ -189,7 +188,7 @@ export default async (user: IUser, data: Option, silent = false) => new Promise< } else { // Publish event if (!user._id.equals(data.renote.userId)) { - event(data.renote.userId, 'renote', noteObj); + publishUserStream(data.renote.userId, 'renote', noteObj); } } } @@ -236,12 +235,12 @@ async function publish(user: IUser, note: INote, noteObj: any, reply: INote, ren if (['private', 'followers', 'specified'].includes(note.visibility)) { // Publish event to myself's stream - stream(note.userId, 'note', await pack(note, user, { + publishUserStream(note.userId, 'note', await pack(note, user, { detail: true })); } else { // Publish event to myself's stream - stream(note.userId, 'note', noteObj); + publishUserStream(note.userId, 'note', noteObj); // Publish note to local and hybrid timeline stream if (note.visibility != 'home') { @@ -264,7 +263,7 @@ async function publish(user: IUser, note: INote, noteObj: any, reply: INote, ren const n = await pack(note, u, { detail: true }); - stream(u._id, 'note', n); + publishUserStream(u._id, 'note', n); publishHybridTimelineStream(u._id, n); }); } @@ -417,7 +416,7 @@ async function publishToFollowers(note: INote, noteObj: any, user: IUser, noteAc } // Publish event to followers stream - stream(following.followerId, 'note', noteObj); + publishUserStream(following.followerId, 'note', noteObj); if (isRemoteUser(user) || note.visibility != 'public') { publishHybridTimelineStream(following.followerId, noteObj); @@ -444,7 +443,7 @@ function deliverNoteToMentionedRemoteUsers(mentionedUsers: IUser[], user: ILocal function createMentionedEvents(mentionedUsers: IUser[], noteObj: any, nm: NotificationManager) { mentionedUsers.filter(u => isLocalUser(u)).forEach(async (u) => { - event(u, 'mention', noteObj); + publishUserStream(u._id, 'mention', noteObj); // Create notification nm.push(u._id, 'mention'); diff --git a/src/stream.ts b/src/stream.ts index cec5fcf0f..6db47ecae 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,82 +1,58 @@ import * as mongo from 'mongodb'; -import * as redis from 'redis'; -import config from './config'; +import Xev from 'xev'; + +const ev = new Xev(); type ID = string | mongo.ObjectID; -class MisskeyEvent { - private redisClient: redis.RedisClient; +function publish(channel: string, type: string, value?: any): void { + const message = type == null ? value : value == null ? + { type: type } : + { type: type, body: value }; - constructor() { - // Connect to Redis - this.redisClient = redis.createClient( - config.redis.port, config.redis.host); - } - - public publishUserStream(userId: ID, type: string, value?: any): void { - this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishDriveStream(userId: ID, type: string, value?: any): void { - this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishNoteStream(noteId: ID, type: string, value?: any): void { - this.publish(`note-stream:${noteId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishUserListStream(listId: ID, type: string, value?: any): void { - this.publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { - this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishMessagingIndexStream(userId: ID, type: string, value?: any): void { - this.publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishReversiStream(userId: ID, type: string, value?: any): void { - this.publish(`reversi-stream:${userId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishReversiGameStream(gameId: ID, type: string, value?: any): void { - this.publish(`reversi-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); - } - - public publishLocalTimelineStream(note: any): void { - this.redisClient.publish('misskey:local-timeline', JSON.stringify(note)); - } - - public publishHybridTimelineStream(userId: ID, note: any): void { - this.redisClient.publish(userId ? `misskey:hybrid-timeline:${userId}` : 'misskey:hybrid-timeline', JSON.stringify(note)); - } - - public publishGlobalTimelineStream(note: any): void { - this.redisClient.publish('misskey:global-timeline', JSON.stringify(note)); - } - - private publish(channel: string, type: string, value?: any): void { - const message = value == null ? - { type: type } : - { type: type, body: value }; - - this.redisClient.publish(`misskey:${channel}`, JSON.stringify(message)); - } + ev.emit(channel, message); } -const ev = new MisskeyEvent(); +export function publishUserStream(userId: ID, type: string, value?: any): void { + publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); +} -export default ev.publishUserStream.bind(ev); +export function publishDriveStream(userId: ID, type: string, value?: any): void { + publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); +} -export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev); -export const publishHybridTimelineStream = ev.publishHybridTimelineStream.bind(ev); -export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev); -export const publishDriveStream = ev.publishDriveStream.bind(ev); -export const publishUserListStream = ev.publishUserListStream.bind(ev); -export const publishNoteStream = ev.publishNoteStream.bind(ev); -export const publishMessagingStream = ev.publishMessagingStream.bind(ev); -export const publishMessagingIndexStream = ev.publishMessagingIndexStream.bind(ev); -export const publishReversiStream = ev.publishReversiStream.bind(ev); -export const publishReversiGameStream = ev.publishReversiGameStream.bind(ev); +export function publishNoteStream(noteId: ID, type: string): void { + publish(`note-stream:${noteId}`, type, noteId); +} + +export function publishUserListStream(listId: ID, type: string, value?: any): void { + publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value); +} + +export function publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { + publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); +} + +export function publishMessagingIndexStream(userId: ID, type: string, value?: any): void { + publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); +} + +export function publishReversiStream(userId: ID, type: string, value?: any): void { + publish(`reversi-stream:${userId}`, type, typeof value === 'undefined' ? null : value); +} + +export function publishReversiGameStream(gameId: ID, type: string, value?: any): void { + publish(`reversi-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); +} + +export function publishLocalTimelineStream(note: any): void { + publish('local-timeline', null, note); +} + +export function publishHybridTimelineStream(userId: ID, note: any): void { + publish(userId ? `hybrid-timeline:${userId}` : 'hybrid-timeline', null, note); +} + +export function publishGlobalTimelineStream(note: any): void { + publish('global-timeline', null, note); +}