From eb392b125105f9949f84a6fd654694238457a18f Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 11 Jul 2018 09:36:30 +0900 Subject: [PATCH] wip --- locales/ja.yml | 3 ++ .../scripts/streaming/hybrid-timeline.ts | 34 ++++++++++++++ src/client/app/mios.ts | 4 ++ src/server/api/stream/hybrid-timeline.ts | 47 +++++++++++++++++++ src/services/note/create.ts | 9 +++- src/stream.ts | 5 ++ 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 src/client/app/common/scripts/streaming/hybrid-timeline.ts create mode 100644 src/server/api/stream/hybrid-timeline.ts diff --git a/locales/ja.yml b/locales/ja.yml index b097d96dc..03a2cceee 100644 --- a/locales/ja.yml +++ b/locales/ja.yml @@ -93,6 +93,7 @@ common: widgets: "ウィジェット" home: "ホーム" local: "ローカル" + hybrid: "ハイブリッド" global: "グローバル" notifications: "通知" list: "リスト" @@ -642,6 +643,7 @@ desktop/views/components/taskmanager.vue: desktop/views/components/timeline.vue: home: "ホーム" local: "ローカル" + hybrid: "ハイブリッド" global: "グローバル" list: "リスト" @@ -965,6 +967,7 @@ mobile/views/pages/following.vue: mobile/views/pages/home.vue: home: "ホーム" local: "ローカル" + hybrid: "ハイブリッド" global: "グローバル" mobile/views/pages/messaging.vue: diff --git a/src/client/app/common/scripts/streaming/hybrid-timeline.ts b/src/client/app/common/scripts/streaming/hybrid-timeline.ts new file mode 100644 index 000000000..cd290797c --- /dev/null +++ b/src/client/app/common/scripts/streaming/hybrid-timeline.ts @@ -0,0 +1,34 @@ +import Stream from './stream'; +import StreamManager from './stream-manager'; +import MiOS from '../../../mios'; + +/** + * Hybrid timeline stream connection + */ +export class HybridTimelineStream extends Stream { + constructor(os: MiOS, me) { + super(os, 'hybrid-timeline', { + i: me.token + }); + } +} + +export class HybridTimelineStreamManager extends StreamManager { + private me; + private os: MiOS; + + constructor(os: MiOS, me) { + super(); + + this.me = me; + this.os = os; + } + + public getConnection() { + if (this.connection == null) { + this.connection = new HybridTimelineStream(this.os, this.me); + } + + return this.connection; + } +} diff --git a/src/client/app/mios.ts b/src/client/app/mios.ts index e61976419..5e8f82589 100644 --- a/src/client/app/mios.ts +++ b/src/client/app/mios.ts @@ -15,6 +15,7 @@ import { ReversiStreamManager } from './common/scripts/streaming/games/reversi'; import Err from './common/views/components/connect-failed.vue'; import { LocalTimelineStreamManager } from './common/scripts/streaming/local-timeline'; +import { HybridTimelineStreamManager } from './common/scripts/streaming/hybrid-timeline'; import { GlobalTimelineStreamManager } from './common/scripts/streaming/global-timeline'; //#region api requests @@ -103,6 +104,7 @@ export default class MiOS extends EventEmitter { */ public streams: { localTimelineStream: LocalTimelineStreamManager; + hybridTimelineStreamManager: HybridTimelineStreamManager; globalTimelineStream: GlobalTimelineStreamManager; driveStream: DriveStreamManager; serverStatsStream: ServerStatsStreamManager; @@ -111,6 +113,7 @@ export default class MiOS extends EventEmitter { reversiStream: ReversiStreamManager; } = { localTimelineStream: null, + hybridTimelineStreamManager: null, globalTimelineStream: null, driveStream: null, serverStatsStream: null, @@ -230,6 +233,7 @@ export default class MiOS extends EventEmitter { // Init other stream manager this.streams.localTimelineStream = new LocalTimelineStreamManager(this, this.store.state.i); + this.streams.hybridTimelineStreamManager = new HybridTimelineStreamManager(this, this.store.state.i); this.streams.globalTimelineStream = new GlobalTimelineStreamManager(this, this.store.state.i); this.streams.driveStream = new DriveStreamManager(this, this.store.state.i); this.streams.messagingIndexStream = new MessagingIndexStreamManager(this, this.store.state.i); diff --git a/src/server/api/stream/hybrid-timeline.ts b/src/server/api/stream/hybrid-timeline.ts new file mode 100644 index 000000000..55f9fbb78 --- /dev/null +++ b/src/server/api/stream/hybrid-timeline.ts @@ -0,0 +1,47 @@ +import * as websocket from 'websocket'; +import * as redis from 'redis'; + +import { IUser } from '../../../models/user'; +import Mute from '../../../models/mute'; +import { pack } from '../../../models/note'; + +export default async function( + request: websocket.request, + connection: websocket.connection, + subscriber: redis.RedisClient, + user: IUser +) { + // Subscribe stream + subscriber.subscribe(`misskey:hybrid-timeline:${user._id}`); + + const mute = await Mute.find({ muterId: user._id }); + const mutedUserIds = mute.map(m => m.muteeId.toString()); + + subscriber.on('message', async (_, data) => { + const note = JSON.parse(data); + + //#region 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する + if (mutedUserIds.indexOf(note.userId) != -1) { + return; + } + if (note.reply != null && mutedUserIds.indexOf(note.reply.userId) != -1) { + return; + } + if (note.renote != null && mutedUserIds.indexOf(note.renote.userId) != -1) { + return; + } + //#endregion + + // Renoteなら再pack + if (note.renoteId != null) { + note.renote = await pack(note.renoteId, user, { + detail: true + }); + } + + connection.send(JSON.stringify({ + type: 'note', + body: note + })); + }); +} diff --git a/src/services/note/create.ts b/src/services/note/create.ts index 25281df01..cc7474292 100644 --- a/src/services/note/create.ts +++ b/src/services/note/create.ts @@ -1,7 +1,7 @@ import es from '../../db/elasticsearch'; import Note, { pack, INote } from '../../models/note'; import User, { isLocalUser, IUser, isRemoteUser, IRemoteUser, ILocalUser } from '../../models/user'; -import stream, { publishLocalTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../stream'; +import stream, { publishLocalTimelineStream, publishHybridTimelineStream, publishGlobalTimelineStream, publishUserListStream } from '../../stream'; import Following from '../../models/following'; import { deliver } from '../../queue'; import renderNote from '../../remote/activitypub/renderer/note'; @@ -266,9 +266,10 @@ export default async (user: IUser, data: { // Publish event to myself's stream stream(note.userId, 'note', noteObj); - // Publish note to local timeline stream + // Publish note to local and hybrid timeline stream if (note.visibility != 'home') { publishLocalTimelineStream(noteObj); + publishHybridTimelineStream(noteObj); } } } @@ -303,6 +304,10 @@ export default async (user: IUser, data: { // Publish event to followers stream stream(following.followerId, 'note', noteObj); + + if (isRemoteUser(user)) { + publishHybridTimelineStream(following.followerId, noteObj); + } } else { //#region AP配送 // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 diff --git a/src/stream.ts b/src/stream.ts index da2f9aecd..b2aa4f0e2 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -49,6 +49,10 @@ class MisskeyEvent { this.redisClient.publish('misskey:local-timeline', JSON.stringify(note)); } + public publishHybridTimelineStream(userId: ID, note: any): void { + this.redisClient.publish(`misskey:hybrid-timeline:${userId}`, JSON.stringify(note)); + } + public publishGlobalTimelineStream(note: any): void { this.redisClient.publish('misskey:global-timeline', JSON.stringify(note)); } @@ -67,6 +71,7 @@ const ev = new MisskeyEvent(); export default ev.publishUserStream.bind(ev); export const publishLocalTimelineStream = ev.publishLocalTimelineStream.bind(ev); +export const publishHybridTimelineStream = ev.publishHybridTimelineStream.bind(ev); export const publishGlobalTimelineStream = ev.publishGlobalTimelineStream.bind(ev); export const publishDriveStream = ev.publishDriveStream.bind(ev); export const publishUserListStream = ev.publishUserListStream.bind(ev);