Merge pull request #68 from h3poteto/iss-67

closes #67 Auto reconnect when does not receive any messages for a while
This commit is contained in:
AkiraFukushima 2019-10-17 22:10:59 +09:00 committed by GitHub
commit f27cff119c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 159 additions and 11 deletions

View file

@ -6,7 +6,10 @@
"author": "h3poteto",
"license": "MIT",
"dependencies": {
"typescript": "^3.4.5",
"megalodon": "file:../../"
"megalodon": "file:../../",
"typescript": "^3.4.5"
},
"devDependencies": {
"log4js": "^5.2.2"
}
}

View file

@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN
const client = new Mastodon(access_token, BASE_URL + '/api/v1')
const stream: StreamListener = client.stream('/streaming/public')
const stream: StreamListener = client.stream('/streaming/user')
stream.on('connect', _ => {
console.log('connect')
})

View file

@ -1,8 +1,10 @@
import Mastodon, { Status, Notification, WebSocket } from 'megalodon'
import log4js from 'log4js'
declare var process: {
env: {
PLEROMA_ACCESS_TOKEN: string
MASTODON_ACCESS_TOKEN: string
}
}
@ -13,20 +15,27 @@ const access_token: string = process.env.PLEROMA_ACCESS_TOKEN
const client = new Mastodon(access_token, BASE_URL + '/api/v1')
const stream: WebSocket = client.socket('/streaming', 'user')
const logger = log4js.getLogger()
logger.level = 'debug'
stream.on('connect', () => {
console.log('connect')
logger.debug('connect')
})
stream.on('pong', () => {
logger.debug('pong')
})
stream.on('update', (status: Status) => {
console.log(status)
logger.debug(status)
})
stream.on('notification', (notification: Notification) => {
console.log(notification)
logger.debug(notification)
})
stream.on('delete', (id: number) => {
console.log(id)
logger.debug(id)
})
stream.on('error', (err: Error) => {
@ -34,13 +43,13 @@ stream.on('error', (err: Error) => {
})
stream.on('heartbeat', () => {
console.log('thump.')
logger.debug('thump.')
})
stream.on('close', () => {
console.log('close')
logger.debug('close')
})
stream.on('parser-error', (err: Error) => {
console.error(err)
logger.error(err)
})

View file

@ -141,6 +141,11 @@ dashdash@^1.12.0:
dependencies:
assert-plus "^1.0.0"
date-format@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/date-format/-/date-format-2.1.0.tgz#31d5b5ea211cf5fd764cd38baf9d033df7e125cf"
integrity sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA==
debug@=3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/debug/-/debug-3.1.0.tgz#5bb5a0672628b64149566ba16819e61518c67261"
@ -148,6 +153,13 @@ debug@=3.1.0:
dependencies:
ms "2.0.0"
debug@^4.1.1:
version "4.1.1"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.1.1.tgz#3b72260255109c6b589cee050f1d516139664791"
integrity sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==
dependencies:
ms "^2.1.1"
delayed-stream@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/delayed-stream/-/delayed-stream-1.0.0.tgz#df3ae199acadfb7d440aaae0b29e2272b24ec619"
@ -186,6 +198,11 @@ fast-json-stable-stringify@^2.0.0:
resolved "https://registry.yarnpkg.com/fast-json-stable-stringify/-/fast-json-stable-stringify-2.0.0.tgz#d5142c0caee6b1189f87d3a76111064f86c8bbf2"
integrity sha1-1RQsDK7msRifh9OnYREGT4bIu/I=
flatted@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/flatted/-/flatted-2.0.1.tgz#69e57caa8f0eacbc281d2e2cb458d46fdb449e08"
integrity sha512-a1hQMktqW9Nmqr5aktAux3JMNqaucxGcjtjWnZLHX7yyPCmlSV3M54nGYbqT8K+0GhF3NBgmJCc3ma+WOgX8Jg==
follow-redirects@1.5.10:
version "1.5.10"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.5.10.tgz#7b7a9f9aea2fdff36786a94ff643ed07f4ff5e2a"
@ -207,6 +224,15 @@ form-data@~2.3.2:
combined-stream "1.0.6"
mime-types "^2.1.12"
fs-extra@^8.1.0:
version "8.1.0"
resolved "https://registry.yarnpkg.com/fs-extra/-/fs-extra-8.1.0.tgz#49d43c45a88cd9677668cb7be1b46efdb8d2e1c0"
integrity sha512-yhlQgA6mnOJUKOsRUFsgJdQCvkKhcz8tlZG5HBQfReYZy46OwLcY+Zia0mtdHsOo9y/hP+CxMN0TU9QxoOtG4g==
dependencies:
graceful-fs "^4.2.0"
jsonfile "^4.0.0"
universalify "^0.1.0"
getpass@^0.1.1:
version "0.1.7"
resolved "https://registry.yarnpkg.com/getpass/-/getpass-0.1.7.tgz#5eff8e3e684d569ae4cb2b1282604e8ba62149fa"
@ -214,6 +240,11 @@ getpass@^0.1.1:
dependencies:
assert-plus "^1.0.0"
graceful-fs@^4.1.6, graceful-fs@^4.2.0:
version "4.2.2"
resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.2.tgz#6f0952605d0140c1cfdb138ed005775b92d67b02"
integrity sha512-IItsdsea19BoLC7ELy13q1iJFNmd7ofZH5+X/pJr90/nRoPEX0DJo1dHDbgtYWOhJhcCgMDTOw84RZ72q6lB+Q==
har-schema@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/har-schema/-/har-schema-2.0.0.tgz#a94c2224ebcac04782a0d9035521f24735b7ec92"
@ -271,6 +302,13 @@ json-stringify-safe@~5.0.1:
resolved "https://registry.yarnpkg.com/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz#1296a2d58fd45f19a0f6ce01d65701e2c735b6eb"
integrity sha1-Epai1Y/UXxmg9s4B1lcB4sc1tus=
jsonfile@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/jsonfile/-/jsonfile-4.0.0.tgz#8771aae0799b64076b76640fca058f9c10e33ecb"
integrity sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss=
optionalDependencies:
graceful-fs "^4.1.6"
jsprim@^1.2.2:
version "1.4.1"
resolved "https://registry.yarnpkg.com/jsprim/-/jsprim-1.4.1.tgz#313e66bc1e5cc06e438bc1b7499c2e5c56acb6a2"
@ -281,13 +319,25 @@ jsprim@^1.2.2:
json-schema "0.2.3"
verror "1.10.0"
log4js@^5.2.2:
version "5.2.2"
resolved "https://registry.yarnpkg.com/log4js/-/log4js-5.2.2.tgz#35b750416f22913dd6905d49335752d9b240f47c"
integrity sha512-Iw4ZjbYTMxSTh1jnXM2brpRIr+psM8/nkUiOHu2gFfd0saoX2NdRB69buMWJJuoIJfU/eTzqKy9rVBr0zQwSGQ==
dependencies:
date-format "^2.1.0"
debug "^4.1.1"
flatted "^2.0.1"
rfdc "^1.1.4"
streamroller "^2.2.2"
"megalodon@file:../..":
version "0.9.0"
version "1.0.3"
dependencies:
"@types/oauth" "^0.9.0"
"@types/request" "^2.47.0"
"@types/ws" "^6.0.1"
axios "^0.18.1"
moment "^2.24.0"
oauth "^0.9.15"
request "^2.87.0"
typescript "^3.4.5"
@ -305,11 +355,21 @@ mime-types@^2.1.12, mime-types@~2.1.19:
dependencies:
mime-db "~1.36.0"
moment@^2.24.0:
version "2.24.0"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.24.0.tgz#0d055d53f5052aa653c9f6eb68bb5d12bf5c2b5b"
integrity sha512-bV7f+6l2QigeBBZSM/6yTNq4P2fNpSWj/0e7jQcy87A8e7o2nAfP/34/2ky5Vw4B9S446EtIhodAzkFCcR4dQg==
ms@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8"
integrity sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=
ms@^2.1.1:
version "2.1.2"
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009"
integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==
oauth-sign@~0.9.0:
version "0.9.0"
resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.9.0.tgz#47a7b016baa68b5fa0ecf3dee08a85c679ac6455"
@ -366,6 +426,11 @@ request@^2.87.0:
tunnel-agent "^0.6.0"
uuid "^3.3.2"
rfdc@^1.1.4:
version "1.1.4"
resolved "https://registry.yarnpkg.com/rfdc/-/rfdc-1.1.4.tgz#ba72cc1367a0ccd9cf81a870b3b58bd3ad07f8c2"
integrity sha512-5C9HXdzK8EAqN7JDif30jqsBzavB7wLpaubisuQIGHWf2gUXSpzy6ArX/+Da8RjFpagWsCn+pIgxTMAmKw9Zug==
safe-buffer@^5.0.1, safe-buffer@^5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
@ -392,6 +457,15 @@ sshpk@^1.7.0:
jsbn "~0.1.0"
tweetnacl "~0.14.0"
streamroller@^2.2.2:
version "2.2.2"
resolved "https://registry.yarnpkg.com/streamroller/-/streamroller-2.2.2.tgz#26bea90567f80d8438d251e5603643fe617b7090"
integrity sha512-wizmZ8NNiqeNIYHv8MqBBbSIeNNcsXyoKxbGYBpiFHCjTGlNHqGNGElwrSM3Awg+0j6U96/eFrSnjW+h3aRo0Q==
dependencies:
date-format "^2.1.0"
debug "^4.1.1"
fs-extra "^8.1.0"
tough-cookie@~2.4.3:
version "2.4.3"
resolved "https://registry.yarnpkg.com/tough-cookie/-/tough-cookie-2.4.3.tgz#53f36da3f47783b0925afa06ff9f3b165280f781"
@ -417,6 +491,11 @@ typescript@^3.4.5:
resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977"
integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g==
universalify@^0.1.0:
version "0.1.2"
resolved "https://registry.yarnpkg.com/universalify/-/universalify-0.1.2.tgz#b646f69be3942dabcecc9d6639c80dc105efaa66"
integrity sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg==
uuid@^3.3.2:
version "3.3.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131"

View file

@ -54,6 +54,7 @@
"@types/request": "^2.47.0",
"@types/ws": "^6.0.1",
"axios": "^0.18.1",
"moment": "^2.24.0",
"oauth": "^0.9.15",
"request": "^2.87.0",
"typescript": "^3.4.5",

View file

@ -1,4 +1,5 @@
import WS from 'ws'
import moment, { Moment } from 'moment'
import { EventEmitter } from 'events'
import { Status } from './entities/status'
import { Notification } from './entities/notification'
@ -20,6 +21,9 @@ export default class WebSocket extends EventEmitter {
private _reconnectCurrentAttempts: number
private _connectionClosed: boolean
private _client: WS | null
private _pongReceivedTimestamp: Moment
private _heartbeatInterval: number = 60000
private _pongWaiting: boolean = false
/**
* @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming
@ -41,6 +45,7 @@ export default class WebSocket extends EventEmitter {
this._reconnectCurrentAttempts = 0
this._connectionClosed = false
this._client = null
this._pongReceivedTimestamp = moment()
}
/**
@ -104,6 +109,7 @@ export default class WebSocket extends EventEmitter {
// Call connect methods
console.log('Reconnecting')
this._client = this._connect(this.url, this.stream, this._accessToken, this.headers)
this._clearBinding()
this._bindSocket(this._client)
}
}, this._reconnectInterval)
@ -132,6 +138,19 @@ export default class WebSocket extends EventEmitter {
return cli
}
/**
* Clear binding event for web socket client.
*/
private _clearBinding() {
if (this._client) {
this._client.removeAllListeners('close')
this._client.removeAllListeners('pong')
this._client.removeAllListeners('open')
this._client.removeAllListeners('message')
this._client.removeAllListeners('error')
}
}
/**
* Bind event for web socket client.
* @param client A WebSocket instance.
@ -149,8 +168,19 @@ export default class WebSocket extends EventEmitter {
}
}
})
client.on('pong', () => {
this._pongWaiting = false
this.emit('pong', {})
this._pongReceivedTimestamp = moment()
// It is required to anonymous function since get this scope in checkAlive.
setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
})
client.on('open', () => {
this.emit('connect', {})
// Call first ping event.
setTimeout(() => {
client.ping('')
}, 10000)
})
client.on('message', (data: WS.Data) => {
this.parser.parse(data)
@ -183,6 +213,27 @@ export default class WebSocket extends EventEmitter {
this.emit('heartbeat', 'heartbeat')
})
}
/**
* Call ping and wait to pong.
*/
private _checkAlive(timestamp: Moment) {
const now: Moment = moment()
// Block multiple calling, if multiple pong event occur.
// It the duration is less than interval, through ping.
if (now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed) {
if (this._client) {
this._pongWaiting = true
this._client.ping('')
}
setTimeout(() => {
if (this._pongWaiting) {
this._pongWaiting = false
this._reconnect()
}
}, 10000)
}
}
}
/**

View file

@ -2882,6 +2882,11 @@ mkdirp@0.x, mkdirp@^0.5.0, mkdirp@^0.5.1:
dependencies:
minimist "0.0.8"
moment@^2.24.0:
version "2.24.0"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.24.0.tgz#0d055d53f5052aa653c9f6eb68bb5d12bf5c2b5b"
integrity sha512-bV7f+6l2QigeBBZSM/6yTNq4P2fNpSWj/0e7jQcy87A8e7o2nAfP/34/2ky5Vw4B9S446EtIhodAzkFCcR4dQg==
ms@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8"