Use ws instead of websocket to connect web socket

This commit is contained in:
AkiraFukushima 2019-07-06 00:05:43 +09:00
parent 324bee027d
commit 020e678e99
3 changed files with 91 additions and 66 deletions

View file

@ -30,11 +30,13 @@
"@types/oauth": "^0.9.0", "@types/oauth": "^0.9.0",
"@types/request": "^2.47.0", "@types/request": "^2.47.0",
"@types/websocket": "0.0.40", "@types/websocket": "0.0.40",
"@types/ws": "^6.0.1",
"axios": "^0.18.1", "axios": "^0.18.1",
"oauth": "^0.9.15", "oauth": "^0.9.15",
"request": "^2.87.0", "request": "^2.87.0",
"typescript": "^3.4.5", "typescript": "^3.4.5",
"websocket": "^1.0.28" "websocket": "^1.0.28",
"ws": "^7.0.1"
}, },
"devDependencies": { "devDependencies": {
"@types/core-js": "^2.5.0", "@types/core-js": "^2.5.0",

View file

@ -1,4 +1,4 @@
import { client, connection, IMessage } from 'websocket' import ws from 'ws'
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { Status } from './entities/status' import { Status } from './entities/status'
import { Notification } from './entities/notification' import { Notification } from './entities/notification'
@ -13,18 +13,19 @@ export default class WebSocket extends EventEmitter {
public url: string public url: string
public stream: string public stream: string
public parser: Parser public parser: Parser
public headers: {} public headers: { [key: string]: string }
private _accessToken: string private _accessToken: string
private _socketConnection: connection | null
private _reconnectInterval: number private _reconnectInterval: number
private _reconnectMaxAttempts: number private _reconnectMaxAttempts: number
private _reconnectCurrentAttempts: number private _reconnectCurrentAttempts: number
private _connectionClosed: boolean private _connectionClosed: boolean
private _client: ws | null
/** /**
* @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming
* @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28 * @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28
* @param accessToken The access token. * @param accessToken The access token.
* @param userAgent The specified User Agent.
*/ */
constructor(url: string, stream: string, accessToken: string, userAgent: string) { constructor(url: string, stream: string, accessToken: string, userAgent: string) {
super() super()
@ -35,11 +36,11 @@ export default class WebSocket extends EventEmitter {
'User-Agent': userAgent 'User-Agent': userAgent
} }
this._accessToken = accessToken this._accessToken = accessToken
this._socketConnection = null
this._reconnectInterval = 1000 this._reconnectInterval = 1000
this._reconnectMaxAttempts = Infinity this._reconnectMaxAttempts = Infinity
this._reconnectCurrentAttempts = 0 this._reconnectCurrentAttempts = 0
this._connectionClosed = false this._connectionClosed = false
this._client = null
} }
/** /**
@ -57,18 +58,15 @@ export default class WebSocket extends EventEmitter {
private _startWebSocketConnection() { private _startWebSocketConnection() {
this._resetConnection() this._resetConnection()
this._setupParser() this._setupParser()
const cli = this._getClient() this._client = this._connect(this.url, this.stream, this._accessToken, this.headers)
this._connect(cli, this.url, this.stream, this._accessToken, this.headers) this._bindSocket(this._client)
} }
/** /**
* Stop current connection. * Stop current connection.
*/ */
public stop() { public stop() {
if (this._socketConnection) { this._resetConnection()
this._connectionClosed = true
this._socketConnection.close()
}
this._resetRetryParams() this._resetRetryParams()
} }
@ -76,9 +74,10 @@ export default class WebSocket extends EventEmitter {
* Clean up current connection, and listeners. * Clean up current connection, and listeners.
*/ */
private _resetConnection() { private _resetConnection() {
if (this._socketConnection) { if (this._client) {
this._socketConnection.removeAllListeners() this._client.removeAllListeners()
this._socketConnection.close() this._client.close(1000)
this._client = null
} }
if (this.parser) { if (this.parser) {
@ -96,64 +95,68 @@ export default class WebSocket extends EventEmitter {
/** /**
* Reconnects to the same endpoint. * Reconnects to the same endpoint.
*/ */
private _reconnect(cli: client) { private _reconnect() {
setTimeout(() => { if (this._client) {
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { setTimeout(() => {
this._reconnectCurrentAttempts++ if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
// Call connect methods this._reconnectCurrentAttempts++
console.log('Reconnecting') // Call connect methods
this._connect(cli, this.url, this.stream, this._accessToken, this.headers) console.log('Reconnecting')
} this._client = this._connect(this.url, this.stream, this._accessToken, this.headers)
}, this._reconnectInterval) this._bindSocket(this._client)
}
}, this._reconnectInterval)
}
} }
private _connect(cli: client, url: string, stream: string, accessToken: string, headers: {}) { /**
* @param url Base url of streaming endpoint.
* @param stream The specified stream name.
* @param accessToken Access token.
* @param headers The specified headers.
* @return A WebSocket instance.
*/
private _connect(url: string, stream: string, accessToken: string, headers: { [key: string]: string }): ws {
const params: Array<string> = [`stream=${stream}`] const params: Array<string> = [`stream=${stream}`]
if (accessToken !== null) { if (accessToken !== null) {
params.push(`access_token=${accessToken}`) params.push(`access_token=${accessToken}`)
} }
const req_url: string = `${url}/?${params.join('&')}` const requestURL: string = `${url}/?${params.join('&')}`
cli.connect(req_url, undefined, undefined, headers) const options: ws.ClientOptions = {
headers: headers
}
const cli: ws = new ws(requestURL, options)
return cli
} }
/** /**
* Prepare websocket client. * Bind event for web socket client.
* @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming * @param client A WebSocket instance.
* @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28
* @param accessToken The access token.
* @returns A Client instance of websocket.
*/ */
private _getClient(): client { private _bindSocket(client: ws) {
const cli = new client() client.on('close', (code: number, _reason: string) => {
cli.on('connectFailed', err => { // Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4
console.error(err) if (code === 1000) {
this._reconnect(cli) this.emit('close', {})
}) } else {
cli.on('connect', conn => { console.log(`Closed connection with ${code}`)
this._socketConnection = conn // If already called close method, it does not retry.
this.emit('connect', {}) if (!this._connectionClosed) {
conn.on('error', err => { this._reconnect()
this.emit('error', err)
})
conn.on('close', code => {
// Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4
if (code === 1000) {
this.emit('close', {})
} else {
console.log(`Closed connection with ${code}`)
// If already called close method, it does not retry.
if (!this._connectionClosed) {
this._reconnect(cli)
}
} }
}) }
conn.on('message', (message: IMessage) => { })
this.parser.parser(message) client.on('open', () => {
}) this.emit('connect', {})
})
client.on('message', (data: ws.Data) => {
this.parser.parse(data)
})
client.on('error', (err: Error) => {
this.emit('error', err)
}) })
return cli
} }
/** /**
@ -193,13 +196,13 @@ class Parser extends EventEmitter {
/** /**
* @param message Message body of websocket. * @param message Message body of websocket.
*/ */
public parser(message: IMessage) { public parse(message: ws.Data) {
if (!message.utf8Data) { if (typeof message !== 'string') {
this.emit('heartbeat', {}) this.emit('heartbeat', {})
return return
} }
const data = message.utf8Data
if (data === '') { if (message === '') {
this.emit('heartbeat', {}) this.emit('heartbeat', {})
return return
} }
@ -208,14 +211,14 @@ class Parser extends EventEmitter {
let payload = '' let payload = ''
let mes = {} let mes = {}
try { try {
const obj = JSON.parse(data) const obj = JSON.parse(message)
event = obj.event event = obj.event
payload = obj.payload payload = obj.payload
mes = JSON.parse(payload) mes = JSON.parse(payload)
} catch (err) { } catch (err) {
// delete event does not have json object // delete event does not have json object
if (event !== 'delete') { if (event !== 'delete') {
this.emit('error', new Error(`Error parsing websocket reply: ${data}, error message: ${err}`)) this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`))
return return
} }
} }
@ -234,7 +237,7 @@ class Parser extends EventEmitter {
this.emit('delete', payload) this.emit('delete', payload)
break break
default: default:
this.emit('error', new Error(`Unknown event has received: ${data}`)) this.emit('error', new Error(`Unknown event has received: ${message}`))
} }
return return
} }

View file

@ -80,6 +80,14 @@
"@types/events" "*" "@types/events" "*"
"@types/node" "*" "@types/node" "*"
"@types/ws@^6.0.1":
version "6.0.1"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-6.0.1.tgz#ca7a3f3756aa12f62a0a62145ed14c6db25d5a28"
integrity sha512-EzH8k1gyZ4xih/MaZTXwT2xOkPiIMSrhQ9b8wrlX88L0T02eYsddatQlwVFlEPyEqV0ChpdpNnE51QPH6NVT4Q==
dependencies:
"@types/events" "*"
"@types/node" "*"
"@typescript-eslint/eslint-plugin@^1.9.0": "@typescript-eslint/eslint-plugin@^1.9.0":
version "1.9.0" version "1.9.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-1.9.0.tgz#29d73006811bf2563b88891ceeff1c5ea9c8d9c6" resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-1.9.0.tgz#29d73006811bf2563b88891ceeff1c5ea9c8d9c6"
@ -238,6 +246,11 @@ astral-regex@^1.0.0:
resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-1.0.0.tgz#6c8c3fb827dd43ee3918f27b82782ab7658a6fd9" resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-1.0.0.tgz#6c8c3fb827dd43ee3918f27b82782ab7658a6fd9"
integrity sha512-+Ryf6g3BKoRc7jfp7ad8tM4TtMiaWvbF/1/sQcZPkkS7ag3D5nMBCe2UfOTONtAkaG0tO0ij3C5Lwmf1EiyjHg== integrity sha512-+Ryf6g3BKoRc7jfp7ad8tM4TtMiaWvbF/1/sQcZPkkS7ag3D5nMBCe2UfOTONtAkaG0tO0ij3C5Lwmf1EiyjHg==
async-limiter@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.0.tgz#78faed8c3d074ab81f22b4e985d79e8738f720f8"
integrity sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg==
asynckit@^0.4.0: asynckit@^0.4.0:
version "0.4.0" version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
@ -1871,6 +1884,13 @@ write@1.0.3:
dependencies: dependencies:
mkdirp "^0.5.1" mkdirp "^0.5.1"
ws@^7.0.1:
version "7.0.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.0.1.tgz#1a04e86cc3a57c03783f4910fdb090cf31b8e165"
integrity sha512-ILHfMbuqLJvnSgYXLgy4kMntroJpe8hT41dOVWM8bxRuw6TK4mgMp9VJUNsZTEc5Bh+Mbs0DJT4M0N+wBG9l9A==
dependencies:
async-limiter "^1.0.0"
yaeti@^0.0.6: yaeti@^0.0.6:
version "0.0.6" version "0.0.6"
resolved "https://registry.yarnpkg.com/yaeti/-/yaeti-0.0.6.tgz#f26f484d72684cf42bedfb76970aa1608fbf9577" resolved "https://registry.yarnpkg.com/yaeti/-/yaeti-0.0.6.tgz#f26f484d72684cf42bedfb76970aa1608fbf9577"