From e3c5db9a24331528f93ed9822aa3a1153c72d49d Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Wed, 11 Mar 2020 22:01:50 +0900 Subject: [PATCH] refs #125 Add homeTimeline streaming using websocket for misskey --- example/typescript/src/misskey/web_socket.ts | 38 ++++ package.json | 2 + src/mastodon/web_socket.ts | 13 +- src/misskey.ts | 2 +- src/misskey/api_client.ts | 13 ++ src/misskey/web_socket.ts | 216 +++++++++++++++++++ yarn.lock | 10 + 7 files changed, 286 insertions(+), 8 deletions(-) create mode 100644 example/typescript/src/misskey/web_socket.ts create mode 100644 src/misskey/web_socket.ts diff --git a/example/typescript/src/misskey/web_socket.ts b/example/typescript/src/misskey/web_socket.ts new file mode 100644 index 0000000..9f8b1c6 --- /dev/null +++ b/example/typescript/src/misskey/web_socket.ts @@ -0,0 +1,38 @@ +import generator, { Entity, WebSocketInterface } from 'megalodon' +import log4js from 'log4js' + +declare var process: { + env: { + MISSKEY_ACCESS_TOKEN: string + } +} + +const BASE_URL: string = 'wss://misskey.io' + +const access_token: string = process.env.MISSKEY_ACCESS_TOKEN + +const client = generator('misskey', BASE_URL, access_token) + +const stream: WebSocketInterface = client.userSocket() + +const logger = log4js.getLogger() +logger.level = 'debug' +stream.on('connect', () => { + logger.debug('connect') +}) + +stream.on('pong', () => { + logger.debug('pong') +}) + +stream.on('update', (status: Entity.Status) => { + logger.debug(status) +}) + +stream.on('error', (err: Error) => { + console.error(err) +}) + +stream.on('close', () => { + logger.debug('close') +}) diff --git a/package.json b/package.json index 8813e65..5ff7a9c 100644 --- a/package.json +++ b/package.json @@ -60,11 +60,13 @@ "oauth": "^0.9.15", "socks-proxy-agent": "h3poteto/node-socks-proxy-agent#master", "typescript": "3.8.2", + "uuid": "^7.0.2", "ws": "^7.2.1" }, "devDependencies": { "@types/core-js": "^2.5.0", "@types/jest": "^25.1.1", + "@types/uuid": "^7.0.0", "@typescript-eslint/eslint-plugin": "^2.19.2", "@typescript-eslint/parser": "^2.19.2", "eslint": "^6.8.0", diff --git a/src/mastodon/web_socket.ts b/src/mastodon/web_socket.ts index 07c6f52..ad0df6d 100644 --- a/src/mastodon/web_socket.ts +++ b/src/mastodon/web_socket.ts @@ -2,7 +2,6 @@ import WS from 'ws' import moment, { Moment } from 'moment' import { EventEmitter } from 'events' import proxyAgent, { ProxyConfig } from '../proxy_config' -import Entity from '../entity' import { WebSocketInterface } from '../megalodon' import MastodonAPI from './api_client' @@ -233,16 +232,16 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac * Set up parser when receive message. */ private _setupParser() { - this.parser.on('update', (status: Entity.Status) => { + this.parser.on('update', (status: MastodonAPI.Entity.Status) => { this.emit('update', MastodonAPI.Converter.status(status)) }) - this.parser.on('notification', (notification: Entity.Notification) => { + this.parser.on('notification', (notification: MastodonAPI.Entity.Notification) => { this.emit('notification', MastodonAPI.Converter.notification(notification)) }) this.parser.on('delete', (id: string) => { this.emit('delete', id) }) - this.parser.on('conversation', (conversation: Entity.Conversation) => { + this.parser.on('conversation', (conversation: MastodonAPI.Entity.Conversation) => { this.emit('conversation', MastodonAPI.Converter.conversation(conversation)) }) this.parser.on('error', (err: Error) => { @@ -314,13 +313,13 @@ export class Parser extends EventEmitter { switch (event) { case 'update': - this.emit('update', mes as Entity.Status) + this.emit('update', mes as MastodonAPI.Entity.Status) break case 'notification': - this.emit('notification', mes as Entity.Notification) + this.emit('notification', mes as MastodonAPI.Entity.Notification) break case 'conversation': - this.emit('conversation', mes as Entity.Conversation) + this.emit('conversation', mes as MastodonAPI.Entity.Conversation) break case 'delete': this.emit('delete', payload) diff --git a/src/misskey.ts b/src/misskey.ts index 835c5e3..5719613 100644 --- a/src/misskey.ts +++ b/src/misskey.ts @@ -1860,7 +1860,7 @@ export default class Misskey implements MegalodonInterface { } public userSocket(): WebSocketInterface { - throw new NoImplementedError('TODO: implement') + return this.client.socket('homeTimeline') } public publicSocket(): WebSocketInterface { diff --git a/src/misskey/api_client.ts b/src/misskey/api_client.ts index 82213fb..c636cea 100644 --- a/src/misskey/api_client.ts +++ b/src/misskey/api_client.ts @@ -5,6 +5,7 @@ import proxyAgent, { ProxyConfig } from '../proxy_config' import Response from '../response' import MisskeyEntity from './entity' import MegalodonEntity from '../entity' +import WebSocket from './web_socket' namespace MisskeyAPI { export namespace Entity { @@ -458,6 +459,18 @@ namespace MisskeyAPI { public cancel(): void { return this.cancelTokenSource.cancel('Request is canceled by user') } + + public socket(channel: 'homeTimeline'): WebSocket { + if (!this.accessToken) { + throw new Error('accessToken is required') + } + const url = this.baseUrl + '/streaming' + const streaming = new WebSocket(url, channel, this.accessToken) + process.nextTick(() => { + streaming.start() + }) + return streaming + } } } diff --git a/src/misskey/web_socket.ts b/src/misskey/web_socket.ts new file mode 100644 index 0000000..bc2722b --- /dev/null +++ b/src/misskey/web_socket.ts @@ -0,0 +1,216 @@ +import WS from 'ws' +import { v4 as uuid } from 'uuid' +import { EventEmitter } from 'events' +import { WebSocketInterface } from '../megalodon' +import MisskeyAPI from './api_client' + +export default class WebSocket extends EventEmitter implements WebSocketInterface { + public url: string + public channel: 'homeTimeline' + public parser: Parser + private _accessToken: string + private _reconnectInterval: number + private _reconnectMaxAttempts: number + private _reconnectCurrentAttempts: number + private _connectionClosed: boolean + private _client: WS | null = null + private _channelID: string + + constructor(url: string, channel: 'homeTimeline', accessToken: string) { + super() + this.url = url + this.parser = new Parser() + this.channel = channel + this._accessToken = accessToken + this._reconnectInterval = 10000 + this._reconnectMaxAttempts = Infinity + this._reconnectCurrentAttempts = 0 + this._connectionClosed = false + this._channelID = uuid() + } + + public start() { + this._connectionClosed = false + this._resetRetryParams() + this._startWebSocketConnection() + } + + private _startWebSocketConnection() { + this._resetConnection() + this._setupParser() + this._client = this._connect() + this._bindSocket(this._client) + } + + public stop() { + this._connectionClosed = true + this._resetConnection() + this._resetRetryParams() + } + + private _resetConnection() { + if (this._client) { + this._client.close(100) + this._client.removeAllListeners() + this._client = null + } + + if (this.parser) { + this.parser.removeAllListeners() + } + } + + private _resetRetryParams() { + this._reconnectCurrentAttempts = 0 + } + + private _connect(): WS { + const cli: WS = new WS(`${this.url}?i=${this._accessToken}`) + return cli + } + + private _channel() { + if (!this._client) { + return + } + this._client.send( + JSON.stringify({ + type: 'connect', + body: { + channel: this.channel, + id: this._channelID + } + }) + ) + } + + private _reconnect() { + setTimeout(() => { + // Skip reconnect when client is connecting. + // https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365 + if (this._client && this._client.readyState === WS.CONNECTING) { + return + } + + if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { + this._reconnectCurrentAttempts++ + this._clearBinding() + if (this._client) { + // In reconnect, we want to close the connection immediately, + // because recoonect is necessary when some problems occur. + this._client.terminate() + } + // Call connect methods + console.log('Reconnecting') + this._client = this._connect() + this._bindSocket(this._client) + } + }, this._reconnectInterval) + } + + private _clearBinding() { + if (this._client) { + this._client.removeAllListeners('close') + this._client.removeAllListeners('pong') + this._client.removeAllListeners('open') + this._client.removeAllListeners('message') + this._client.removeAllListeners('error') + } + } + + private _bindSocket(client: WS) { + client.on('close', (code: number, _reason: string) => { + if (code === 1000) { + this.emit('close', {}) + } else { + console.log(`Closed connection with ${code}`) + if (!this._connectionClosed) { + this._reconnect() + } + } + }) + client.on('pong', () => { + this.emit('pong', {}) + }) + client.on('open', () => { + this.emit('connect', {}) + this._channel() + setTimeout(() => { + client.pong('') + }, 1000) + }) + client.on('message', (data: WS.Data) => { + this.parser.parse(data, this._channelID) + }) + client.on('error', (err: Error) => { + this.emit('error', err) + }) + } + + private _setupParser() { + this.parser.on('update', (note: MisskeyAPI.Entity.Note) => { + this.emit('update', MisskeyAPI.Converter.note(note)) + }) + } +} + +/** + * Parser + * This class provides parser for websocket message. + */ +export class Parser extends EventEmitter { + /** + * @param message Message body of websocket. + */ + public parse(message: WS.Data, channelID: string) { + if (typeof message !== 'string') { + this.emit('heartbeat', {}) + return + } + + if (message === '') { + this.emit('heartbeat', {}) + return + } + + let obj: { + type: string + body: { + id: string + type: string + body: any + } + } + let body: { + id: string + type: string + body: any + } + + try { + obj = JSON.parse(message) + if (obj.type !== 'channel') { + return + } + if (!obj.body) { + return + } + body = obj.body + if (body.id !== channelID) { + return + } + } catch (err) { + this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`)) + return + } + + switch (body.type) { + case 'note': + this.emit('update', body.body as MisskeyAPI.Entity.Note) + break + default: + this.emit('error', new Error(`Unknown event has received: ${body}`)) + break + } + } +} diff --git a/yarn.lock b/yarn.lock index 3d9c2e9..49386de 100644 --- a/yarn.lock +++ b/yarn.lock @@ -557,6 +557,11 @@ resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-1.0.1.tgz#0a851d3bd96498fa25c33ab7278ed3bd65f06c3e" integrity sha512-l42BggppR6zLmpfU6fq9HEa2oGPEI8yrSPL3GITjfRInppYFahObbIQOQK3UGxEnyQpltZLaPe75046NOZQikw== +"@types/uuid@^7.0.0": + version "7.0.0" + resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-7.0.0.tgz#9f6993ccc8210efa90bda7e1afabbb06a9f860cd" + integrity sha512-RiX1I0lK9WFLFqy2xOxke396f0wKIzk5sAll0tL4J4XDYJXURI7JOs96XQb3nP+2gEpQ/LutBb66jgiT5oQshQ== + "@types/ws@^7.2.0": version "7.2.2" resolved "https://registry.yarnpkg.com/@types/ws/-/ws-7.2.2.tgz#1bd2038bc80aea60f8a20b2dcf08602a72e65063" @@ -4427,6 +4432,11 @@ uuid@^3.3.2: resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131" integrity sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA== +uuid@^7.0.2: + version "7.0.2" + resolved "https://registry.yarnpkg.com/uuid/-/uuid-7.0.2.tgz#7ff5c203467e91f5e0d85cfcbaaf7d2ebbca9be6" + integrity sha512-vy9V/+pKG+5ZTYKf+VcphF5Oc6EFiu3W8Nv3P3zIh0EqVI80ZxOzuPfe9EHjkFNvf8+xuTHVeei4Drydlx4zjw== + v8-compile-cache@^2.0.3: version "2.0.3" resolved "https://registry.yarnpkg.com/v8-compile-cache/-/v8-compile-cache-2.0.3.tgz#00f7494d2ae2b688cfe2899df6ed2c54bef91dbe"