diff --git a/package.json b/package.json index e7e8e59..fd0558f 100644 --- a/package.json +++ b/package.json @@ -30,11 +30,13 @@ "@types/oauth": "^0.9.0", "@types/request": "^2.47.0", "@types/websocket": "0.0.40", + "@types/ws": "^6.0.1", "axios": "^0.18.1", "oauth": "^0.9.15", "request": "^2.87.0", "typescript": "^3.4.5", - "websocket": "^1.0.28" + "websocket": "^1.0.28", + "ws": "^7.0.1" }, "devDependencies": { "@types/core-js": "^2.5.0", diff --git a/src/web_socket.ts b/src/web_socket.ts index ca8ceeb..ccfd202 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -1,4 +1,4 @@ -import { client, connection, IMessage } from 'websocket' +import ws from 'ws' import { EventEmitter } from 'events' import { Status } from './entities/status' import { Notification } from './entities/notification' @@ -13,18 +13,19 @@ export default class WebSocket extends EventEmitter { public url: string public stream: string public parser: Parser - public headers: {} + public headers: { [key: string]: string } private _accessToken: string - private _socketConnection: connection | null private _reconnectInterval: number private _reconnectMaxAttempts: number private _reconnectCurrentAttempts: number private _connectionClosed: boolean + private _client: ws | null /** * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming * @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28 * @param accessToken The access token. + * @param userAgent The specified User Agent. */ constructor(url: string, stream: string, accessToken: string, userAgent: string) { super() @@ -35,11 +36,11 @@ export default class WebSocket extends EventEmitter { 'User-Agent': userAgent } this._accessToken = accessToken - this._socketConnection = null this._reconnectInterval = 1000 this._reconnectMaxAttempts = Infinity this._reconnectCurrentAttempts = 0 this._connectionClosed = false + this._client = null } /** @@ -57,18 +58,15 @@ export default class WebSocket extends EventEmitter { private _startWebSocketConnection() { this._resetConnection() this._setupParser() - const cli = this._getClient() - this._connect(cli, this.url, this.stream, this._accessToken, this.headers) + this._client = this._connect(this.url, this.stream, this._accessToken, this.headers) + this._bindSocket(this._client) } /** * Stop current connection. */ public stop() { - if (this._socketConnection) { - this._connectionClosed = true - this._socketConnection.close() - } + this._resetConnection() this._resetRetryParams() } @@ -76,9 +74,10 @@ export default class WebSocket extends EventEmitter { * Clean up current connection, and listeners. */ private _resetConnection() { - if (this._socketConnection) { - this._socketConnection.removeAllListeners() - this._socketConnection.close() + if (this._client) { + this._client.removeAllListeners() + this._client.close(1000) + this._client = null } if (this.parser) { @@ -96,64 +95,68 @@ export default class WebSocket extends EventEmitter { /** * Reconnects to the same endpoint. */ - private _reconnect(cli: client) { - setTimeout(() => { - if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { - this._reconnectCurrentAttempts++ - // Call connect methods - console.log('Reconnecting') - this._connect(cli, this.url, this.stream, this._accessToken, this.headers) - } - }, this._reconnectInterval) + private _reconnect() { + if (this._client) { + setTimeout(() => { + if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { + this._reconnectCurrentAttempts++ + // Call connect methods + console.log('Reconnecting') + this._client = this._connect(this.url, this.stream, this._accessToken, this.headers) + this._bindSocket(this._client) + } + }, this._reconnectInterval) + } } - private _connect(cli: client, url: string, stream: string, accessToken: string, headers: {}) { + /** + * @param url Base url of streaming endpoint. + * @param stream The specified stream name. + * @param accessToken Access token. + * @param headers The specified headers. + * @return A WebSocket instance. + */ + private _connect(url: string, stream: string, accessToken: string, headers: { [key: string]: string }): ws { const params: Array = [`stream=${stream}`] if (accessToken !== null) { params.push(`access_token=${accessToken}`) } - const req_url: string = `${url}/?${params.join('&')}` - cli.connect(req_url, undefined, undefined, headers) + const requestURL: string = `${url}/?${params.join('&')}` + const options: ws.ClientOptions = { + headers: headers + } + + const cli: ws = new ws(requestURL, options) + return cli } /** - * Prepare websocket client. - * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming - * @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28 - * @param accessToken The access token. - * @returns A Client instance of websocket. + * Bind event for web socket client. + * @param client A WebSocket instance. */ - private _getClient(): client { - const cli = new client() - cli.on('connectFailed', err => { - console.error(err) - this._reconnect(cli) - }) - cli.on('connect', conn => { - this._socketConnection = conn - this.emit('connect', {}) - conn.on('error', err => { - this.emit('error', err) - }) - conn.on('close', code => { - // Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4 - if (code === 1000) { - this.emit('close', {}) - } else { - console.log(`Closed connection with ${code}`) - // If already called close method, it does not retry. - if (!this._connectionClosed) { - this._reconnect(cli) - } + private _bindSocket(client: ws) { + client.on('close', (code: number, _reason: string) => { + // Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4 + if (code === 1000) { + this.emit('close', {}) + } else { + console.log(`Closed connection with ${code}`) + // If already called close method, it does not retry. + if (!this._connectionClosed) { + this._reconnect() } - }) - conn.on('message', (message: IMessage) => { - this.parser.parser(message) - }) + } + }) + client.on('open', () => { + this.emit('connect', {}) + }) + client.on('message', (data: ws.Data) => { + this.parser.parse(data) + }) + client.on('error', (err: Error) => { + this.emit('error', err) }) - - return cli } /** @@ -193,13 +196,13 @@ class Parser extends EventEmitter { /** * @param message Message body of websocket. */ - public parser(message: IMessage) { - if (!message.utf8Data) { + public parse(message: ws.Data) { + if (typeof message !== 'string') { this.emit('heartbeat', {}) return } - const data = message.utf8Data - if (data === '') { + + if (message === '') { this.emit('heartbeat', {}) return } @@ -208,14 +211,14 @@ class Parser extends EventEmitter { let payload = '' let mes = {} try { - const obj = JSON.parse(data) + const obj = JSON.parse(message) event = obj.event payload = obj.payload mes = JSON.parse(payload) } catch (err) { // delete event does not have json object if (event !== 'delete') { - this.emit('error', new Error(`Error parsing websocket reply: ${data}, error message: ${err}`)) + this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`)) return } } @@ -234,7 +237,7 @@ class Parser extends EventEmitter { this.emit('delete', payload) break default: - this.emit('error', new Error(`Unknown event has received: ${data}`)) + this.emit('error', new Error(`Unknown event has received: ${message}`)) } return } diff --git a/yarn.lock b/yarn.lock index 9e1744b..7717e0b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -80,6 +80,14 @@ "@types/events" "*" "@types/node" "*" +"@types/ws@^6.0.1": + version "6.0.1" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-6.0.1.tgz#ca7a3f3756aa12f62a0a62145ed14c6db25d5a28" + integrity sha512-EzH8k1gyZ4xih/MaZTXwT2xOkPiIMSrhQ9b8wrlX88L0T02eYsddatQlwVFlEPyEqV0ChpdpNnE51QPH6NVT4Q== + dependencies: + "@types/events" "*" + "@types/node" "*" + "@typescript-eslint/eslint-plugin@^1.9.0": version "1.9.0" resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-1.9.0.tgz#29d73006811bf2563b88891ceeff1c5ea9c8d9c6" @@ -238,6 +246,11 @@ astral-regex@^1.0.0: resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-1.0.0.tgz#6c8c3fb827dd43ee3918f27b82782ab7658a6fd9" integrity sha512-+Ryf6g3BKoRc7jfp7ad8tM4TtMiaWvbF/1/sQcZPkkS7ag3D5nMBCe2UfOTONtAkaG0tO0ij3C5Lwmf1EiyjHg== +async-limiter@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.0.tgz#78faed8c3d074ab81f22b4e985d79e8738f720f8" + integrity sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg== + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -1871,6 +1884,13 @@ write@1.0.3: dependencies: mkdirp "^0.5.1" +ws@^7.0.1: + version "7.0.1" + resolved "https://registry.yarnpkg.com/ws/-/ws-7.0.1.tgz#1a04e86cc3a57c03783f4910fdb090cf31b8e165" + integrity sha512-ILHfMbuqLJvnSgYXLgy4kMntroJpe8hT41dOVWM8bxRuw6TK4mgMp9VJUNsZTEc5Bh+Mbs0DJT4M0N+wBG9l9A== + dependencies: + async-limiter "^1.0.0" + yaeti@^0.0.6: version "0.0.6" resolved "https://registry.yarnpkg.com/yaeti/-/yaeti-0.0.6.tgz#f26f484d72684cf42bedfb76970aa1608fbf9577"