refs #125 Add homeTimeline streaming using websocket for misskey

This commit is contained in:
AkiraFukushima 2020-03-11 22:01:50 +09:00
parent 96aa94e18e
commit e3c5db9a24
7 changed files with 286 additions and 8 deletions

View file

@ -0,0 +1,38 @@
import generator, { Entity, WebSocketInterface } from 'megalodon'
import log4js from 'log4js'
declare var process: {
env: {
MISSKEY_ACCESS_TOKEN: string
}
}
const BASE_URL: string = 'wss://misskey.io'
const access_token: string = process.env.MISSKEY_ACCESS_TOKEN
const client = generator('misskey', BASE_URL, access_token)
const stream: WebSocketInterface = client.userSocket()
const logger = log4js.getLogger()
logger.level = 'debug'
stream.on('connect', () => {
logger.debug('connect')
})
stream.on('pong', () => {
logger.debug('pong')
})
stream.on('update', (status: Entity.Status) => {
logger.debug(status)
})
stream.on('error', (err: Error) => {
console.error(err)
})
stream.on('close', () => {
logger.debug('close')
})

View file

@ -60,11 +60,13 @@
"oauth": "^0.9.15",
"socks-proxy-agent": "h3poteto/node-socks-proxy-agent#master",
"typescript": "3.8.2",
"uuid": "^7.0.2",
"ws": "^7.2.1"
},
"devDependencies": {
"@types/core-js": "^2.5.0",
"@types/jest": "^25.1.1",
"@types/uuid": "^7.0.0",
"@typescript-eslint/eslint-plugin": "^2.19.2",
"@typescript-eslint/parser": "^2.19.2",
"eslint": "^6.8.0",

View file

@ -2,7 +2,6 @@ import WS from 'ws'
import moment, { Moment } from 'moment'
import { EventEmitter } from 'events'
import proxyAgent, { ProxyConfig } from '../proxy_config'
import Entity from '../entity'
import { WebSocketInterface } from '../megalodon'
import MastodonAPI from './api_client'
@ -233,16 +232,16 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac
* Set up parser when receive message.
*/
private _setupParser() {
this.parser.on('update', (status: Entity.Status) => {
this.parser.on('update', (status: MastodonAPI.Entity.Status) => {
this.emit('update', MastodonAPI.Converter.status(status))
})
this.parser.on('notification', (notification: Entity.Notification) => {
this.parser.on('notification', (notification: MastodonAPI.Entity.Notification) => {
this.emit('notification', MastodonAPI.Converter.notification(notification))
})
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('conversation', (conversation: Entity.Conversation) => {
this.parser.on('conversation', (conversation: MastodonAPI.Entity.Conversation) => {
this.emit('conversation', MastodonAPI.Converter.conversation(conversation))
})
this.parser.on('error', (err: Error) => {
@ -314,13 +313,13 @@ export class Parser extends EventEmitter {
switch (event) {
case 'update':
this.emit('update', mes as Entity.Status)
this.emit('update', mes as MastodonAPI.Entity.Status)
break
case 'notification':
this.emit('notification', mes as Entity.Notification)
this.emit('notification', mes as MastodonAPI.Entity.Notification)
break
case 'conversation':
this.emit('conversation', mes as Entity.Conversation)
this.emit('conversation', mes as MastodonAPI.Entity.Conversation)
break
case 'delete':
this.emit('delete', payload)

View file

@ -1860,7 +1860,7 @@ export default class Misskey implements MegalodonInterface {
}
public userSocket(): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
return this.client.socket('homeTimeline')
}
public publicSocket(): WebSocketInterface {

View file

@ -5,6 +5,7 @@ import proxyAgent, { ProxyConfig } from '../proxy_config'
import Response from '../response'
import MisskeyEntity from './entity'
import MegalodonEntity from '../entity'
import WebSocket from './web_socket'
namespace MisskeyAPI {
export namespace Entity {
@ -458,6 +459,18 @@ namespace MisskeyAPI {
public cancel(): void {
return this.cancelTokenSource.cancel('Request is canceled by user')
}
public socket(channel: 'homeTimeline'): WebSocket {
if (!this.accessToken) {
throw new Error('accessToken is required')
}
const url = this.baseUrl + '/streaming'
const streaming = new WebSocket(url, channel, this.accessToken)
process.nextTick(() => {
streaming.start()
})
return streaming
}
}
}

216
src/misskey/web_socket.ts Normal file
View file

@ -0,0 +1,216 @@
import WS from 'ws'
import { v4 as uuid } from 'uuid'
import { EventEmitter } from 'events'
import { WebSocketInterface } from '../megalodon'
import MisskeyAPI from './api_client'
export default class WebSocket extends EventEmitter implements WebSocketInterface {
public url: string
public channel: 'homeTimeline'
public parser: Parser
private _accessToken: string
private _reconnectInterval: number
private _reconnectMaxAttempts: number
private _reconnectCurrentAttempts: number
private _connectionClosed: boolean
private _client: WS | null = null
private _channelID: string
constructor(url: string, channel: 'homeTimeline', accessToken: string) {
super()
this.url = url
this.parser = new Parser()
this.channel = channel
this._accessToken = accessToken
this._reconnectInterval = 10000
this._reconnectMaxAttempts = Infinity
this._reconnectCurrentAttempts = 0
this._connectionClosed = false
this._channelID = uuid()
}
public start() {
this._connectionClosed = false
this._resetRetryParams()
this._startWebSocketConnection()
}
private _startWebSocketConnection() {
this._resetConnection()
this._setupParser()
this._client = this._connect()
this._bindSocket(this._client)
}
public stop() {
this._connectionClosed = true
this._resetConnection()
this._resetRetryParams()
}
private _resetConnection() {
if (this._client) {
this._client.close(100)
this._client.removeAllListeners()
this._client = null
}
if (this.parser) {
this.parser.removeAllListeners()
}
}
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}
private _connect(): WS {
const cli: WS = new WS(`${this.url}?i=${this._accessToken}`)
return cli
}
private _channel() {
if (!this._client) {
return
}
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: this.channel,
id: this._channelID
}
})
)
}
private _reconnect() {
setTimeout(() => {
// Skip reconnect when client is connecting.
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
if (this._client && this._client.readyState === WS.CONNECTING) {
return
}
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
this._clearBinding()
if (this._client) {
// In reconnect, we want to close the connection immediately,
// because recoonect is necessary when some problems occur.
this._client.terminate()
}
// Call connect methods
console.log('Reconnecting')
this._client = this._connect()
this._bindSocket(this._client)
}
}, this._reconnectInterval)
}
private _clearBinding() {
if (this._client) {
this._client.removeAllListeners('close')
this._client.removeAllListeners('pong')
this._client.removeAllListeners('open')
this._client.removeAllListeners('message')
this._client.removeAllListeners('error')
}
}
private _bindSocket(client: WS) {
client.on('close', (code: number, _reason: string) => {
if (code === 1000) {
this.emit('close', {})
} else {
console.log(`Closed connection with ${code}`)
if (!this._connectionClosed) {
this._reconnect()
}
}
})
client.on('pong', () => {
this.emit('pong', {})
})
client.on('open', () => {
this.emit('connect', {})
this._channel()
setTimeout(() => {
client.pong('')
}, 1000)
})
client.on('message', (data: WS.Data) => {
this.parser.parse(data, this._channelID)
})
client.on('error', (err: Error) => {
this.emit('error', err)
})
}
private _setupParser() {
this.parser.on('update', (note: MisskeyAPI.Entity.Note) => {
this.emit('update', MisskeyAPI.Converter.note(note))
})
}
}
/**
* Parser
* This class provides parser for websocket message.
*/
export class Parser extends EventEmitter {
/**
* @param message Message body of websocket.
*/
public parse(message: WS.Data, channelID: string) {
if (typeof message !== 'string') {
this.emit('heartbeat', {})
return
}
if (message === '') {
this.emit('heartbeat', {})
return
}
let obj: {
type: string
body: {
id: string
type: string
body: any
}
}
let body: {
id: string
type: string
body: any
}
try {
obj = JSON.parse(message)
if (obj.type !== 'channel') {
return
}
if (!obj.body) {
return
}
body = obj.body
if (body.id !== channelID) {
return
}
} catch (err) {
this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`))
return
}
switch (body.type) {
case 'note':
this.emit('update', body.body as MisskeyAPI.Entity.Note)
break
default:
this.emit('error', new Error(`Unknown event has received: ${body}`))
break
}
}
}

View file

@ -557,6 +557,11 @@
resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-1.0.1.tgz#0a851d3bd96498fa25c33ab7278ed3bd65f06c3e"
integrity sha512-l42BggppR6zLmpfU6fq9HEa2oGPEI8yrSPL3GITjfRInppYFahObbIQOQK3UGxEnyQpltZLaPe75046NOZQikw==
"@types/uuid@^7.0.0":
version "7.0.0"
resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-7.0.0.tgz#9f6993ccc8210efa90bda7e1afabbb06a9f860cd"
integrity sha512-RiX1I0lK9WFLFqy2xOxke396f0wKIzk5sAll0tL4J4XDYJXURI7JOs96XQb3nP+2gEpQ/LutBb66jgiT5oQshQ==
"@types/ws@^7.2.0":
version "7.2.2"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-7.2.2.tgz#1bd2038bc80aea60f8a20b2dcf08602a72e65063"
@ -4427,6 +4432,11 @@ uuid@^3.3.2:
resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131"
integrity sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==
uuid@^7.0.2:
version "7.0.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-7.0.2.tgz#7ff5c203467e91f5e0d85cfcbaaf7d2ebbca9be6"
integrity sha512-vy9V/+pKG+5ZTYKf+VcphF5Oc6EFiu3W8Nv3P3zIh0EqVI80ZxOzuPfe9EHjkFNvf8+xuTHVeei4Drydlx4zjw==
v8-compile-cache@^2.0.3:
version "2.0.3"
resolved "https://registry.yarnpkg.com/v8-compile-cache/-/v8-compile-cache-2.0.3.tgz#00f7494d2ae2b688cfe2899df6ed2c54bef91dbe"