Merge pull request #248 from h3poteto/iss-125

refs #125 Add websocket client for misskey
This commit is contained in:
AkiraFukushima 2020-03-12 23:46:54 +09:00 committed by GitHub
commit 7591d027ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 489 additions and 26 deletions

View file

@ -79,6 +79,8 @@
"moment": "^2.24.0",
"oauth": "^0.9.15",
"socks-proxy-agent": "github:h3poteto/node-socks-proxy-agent#master",
"typescript": "3.8.2",
"uuid": "^7.0.2",
"ws": "^7.2.1"
},
"dependencies": {
@ -4376,8 +4378,7 @@
"qs": "~6.5.2",
"safe-buffer": "^5.1.2",
"tough-cookie": "~2.4.3",
"tunnel-agent": "^0.6.0",
"uuid": "^3.3.2"
"tunnel-agent": "^0.6.0"
},
"dependencies": {
"punycode": {
@ -5443,8 +5444,7 @@
"minimatch": "^3.0.0",
"progress": "^2.0.3",
"shelljs": "^0.8.3",
"typedoc-default-themes": "^0.7.2",
"typescript": "3.7.x"
"typedoc-default-themes": "^0.7.2"
}
},
"typedoc-default-themes": {
@ -5459,9 +5459,9 @@
}
},
"typescript": {
"version": "3.7.5",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-3.7.5.tgz",
"integrity": "sha512-/P5lkRXkWHNAbcJIiHPfRoKqyd7bsyCma1hZNUGfn20qm64T6ZBlrzprymeu918H+mB/0rIg2gGK/BXkhhYgBw=="
"version": "3.8.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-3.8.2.tgz",
"integrity": "sha512-EgOVgL/4xfVrCMbhYKUQTdF37SQn4Iw73H5BgCrF1Abdun7Kwy/QZsE/ssAy0y4LxBbvua3PIbFsbRczWWnDdQ=="
},
"uglify-js": {
"version": "3.7.2",
@ -5558,9 +5558,9 @@
}
},
"uuid": {
"version": "3.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-3.3.2.tgz",
"integrity": "sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA=="
"version": "7.0.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-7.0.2.tgz",
"integrity": "sha512-vy9V/+pKG+5ZTYKf+VcphF5Oc6EFiu3W8Nv3P3zIh0EqVI80ZxOzuPfe9EHjkFNvf8+xuTHVeei4Drydlx4zjw=="
},
"v8-compile-cache": {
"version": "2.0.3",

View file

@ -0,0 +1,46 @@
import generator, { Entity, WebSocketInterface } from 'megalodon'
import log4js from 'log4js'
declare var process: {
env: {
MISSKEY_ACCESS_TOKEN: string
}
}
const BASE_URL: string = 'wss://misskey.io'
const access_token: string = process.env.MISSKEY_ACCESS_TOKEN
const client = generator('misskey', BASE_URL, access_token)
const stream: WebSocketInterface = client.userSocket()
const logger = log4js.getLogger()
logger.level = 'debug'
stream.on('connect', () => {
logger.debug('connect')
})
stream.on('pong', () => {
logger.debug('pong')
})
stream.on('update', (status: Entity.Status) => {
logger.debug(status)
})
stream.on('notification', (notification: Entity.Notification) => {
logger.debug(notification)
})
stream.on('error', (err: Error) => {
console.error(err)
})
stream.on('close', () => {
logger.debug('close')
})
stream.on('parser-error', (err: Error) => {
console.error(err)
})

View file

@ -60,11 +60,13 @@
"oauth": "^0.9.15",
"socks-proxy-agent": "h3poteto/node-socks-proxy-agent#master",
"typescript": "3.8.2",
"uuid": "^7.0.2",
"ws": "^7.2.1"
},
"devDependencies": {
"@types/core-js": "^2.5.0",
"@types/jest": "^25.1.1",
"@types/uuid": "^7.0.0",
"@typescript-eslint/eslint-plugin": "^2.19.2",
"@typescript-eslint/parser": "^2.19.2",
"eslint": "^6.8.0",

View file

@ -2,7 +2,6 @@ import WS from 'ws'
import moment, { Moment } from 'moment'
import { EventEmitter } from 'events'
import proxyAgent, { ProxyConfig } from '../proxy_config'
import Entity from '../entity'
import { WebSocketInterface } from '../megalodon'
import MastodonAPI from './api_client'
@ -233,16 +232,16 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac
* Set up parser when receive message.
*/
private _setupParser() {
this.parser.on('update', (status: Entity.Status) => {
this.parser.on('update', (status: MastodonAPI.Entity.Status) => {
this.emit('update', MastodonAPI.Converter.status(status))
})
this.parser.on('notification', (notification: Entity.Notification) => {
this.parser.on('notification', (notification: MastodonAPI.Entity.Notification) => {
this.emit('notification', MastodonAPI.Converter.notification(notification))
})
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('conversation', (conversation: Entity.Conversation) => {
this.parser.on('conversation', (conversation: MastodonAPI.Entity.Conversation) => {
this.emit('conversation', MastodonAPI.Converter.conversation(conversation))
})
this.parser.on('error', (err: Error) => {
@ -314,13 +313,13 @@ export class Parser extends EventEmitter {
switch (event) {
case 'update':
this.emit('update', mes as Entity.Status)
this.emit('update', mes as MastodonAPI.Entity.Status)
break
case 'notification':
this.emit('notification', mes as Entity.Notification)
this.emit('notification', mes as MastodonAPI.Entity.Notification)
break
case 'conversation':
this.emit('conversation', mes as Entity.Conversation)
this.emit('conversation', mes as MastodonAPI.Entity.Conversation)
break
case 'delete':
this.emit('delete', payload)

View file

@ -1860,26 +1860,26 @@ export default class Misskey implements MegalodonInterface {
}
public userSocket(): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
return this.client.socket('user')
}
public publicSocket(): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
return this.client.socket('globalTimeline')
}
public localSocket(): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
return this.client.socket('localTimeline')
}
public tagSocket(_tag: string): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
}
public listSocket(_list_id: string): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
public listSocket(list_id: string): WebSocketInterface {
return this.client.socket('list', list_id)
}
public directSocket(): WebSocketInterface {
throw new NoImplementedError('TODO: implement')
return this.client.socket('conversation')
}
}

View file

@ -5,6 +5,7 @@ import proxyAgent, { ProxyConfig } from '../proxy_config'
import Response from '../response'
import MisskeyEntity from './entity'
import MegalodonEntity from '../entity'
import WebSocket from './web_socket'
namespace MisskeyAPI {
export namespace Entity {
@ -59,7 +60,7 @@ namespace MisskeyAPI {
following_count: 0,
statuses_count: 0,
note: '',
url: '',
url: acct,
avatar: u.avatarUrl,
avatar_static: u.avatarColor,
header: '',
@ -186,8 +187,8 @@ namespace MisskeyAPI {
export const note = (n: Entity.Note): MegalodonEntity.Status => {
return {
id: n.id,
uri: '',
url: '',
uri: n.uri ? n.uri : '',
url: n.uri ? n.uri : '',
account: user(n.user),
in_reply_to_id: n.replyId,
in_reply_to_account_id: null,
@ -458,6 +459,27 @@ namespace MisskeyAPI {
public cancel(): void {
return this.cancelTokenSource.cancel('Request is canceled by user')
}
/**
* Get connection and receive websocket connection for Misskey API.
*
* @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
* @param listId This parameter is required only list channel.
*/
public socket(
channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list',
listId?: string | null
): WebSocket {
if (!this.accessToken) {
throw new Error('accessToken is required')
}
const url = this.baseUrl + '/streaming'
const streaming = new WebSocket(url, channel, this.accessToken, listId)
process.nextTick(() => {
streaming.start()
})
return streaming
}
}
}

384
src/misskey/web_socket.ts Normal file
View file

@ -0,0 +1,384 @@
import WS from 'ws'
import moment, { Moment } from 'moment'
import { v4 as uuid } from 'uuid'
import { EventEmitter } from 'events'
import { WebSocketInterface } from '../megalodon'
import MisskeyAPI from './api_client'
/**
* WebSocket
* Misskey is not support http streaming. It supports websocket instead of streaming.
* So this class connect to Misskey server with WebSocket.
*/
export default class WebSocket extends EventEmitter implements WebSocketInterface {
public url: string
public channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list'
public parser: Parser
public listId: string | null = null
private _accessToken: string
private _reconnectInterval: number
private _reconnectMaxAttempts: number
private _reconnectCurrentAttempts: number
private _connectionClosed: boolean
private _client: WS | null = null
private _channelID: string
private _pongReceivedTimestamp: Moment
private _heartbeatInterval: number = 60000
private _pongWaiting: boolean = false
/**
* @param url Full url of websocket: e.g. wss://misskey.io/streaming
* @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
* @param accessToken The access token.
* @param listId This parameter is required when you specify list as channel.
*/
constructor(
url: string,
channel: 'user' | 'localTimeline' | 'hybridTimeline' | 'globalTimeline' | 'conversation' | 'list',
accessToken: string,
listId?: string | null
) {
super()
this.url = url
this.parser = new Parser()
this.channel = channel
if (listId) {
this.listId = listId
}
this._accessToken = accessToken
this._reconnectInterval = 10000
this._reconnectMaxAttempts = Infinity
this._reconnectCurrentAttempts = 0
this._connectionClosed = false
this._channelID = uuid()
this._pongReceivedTimestamp = moment()
}
/**
* Start websocket connection.
*/
public start() {
this._connectionClosed = false
this._resetRetryParams()
this._startWebSocketConnection()
}
/**
* Reset connection and start new websocket connection.
*/
private _startWebSocketConnection() {
this._resetConnection()
this._setupParser()
this._client = this._connect()
this._bindSocket(this._client)
}
/**
* Stop current connection.
*/
public stop() {
this._connectionClosed = true
this._resetConnection()
this._resetRetryParams()
}
/**
* Clean up current connection, and listeners.
*/
private _resetConnection() {
if (this._client) {
this._client.close(100)
this._client.removeAllListeners()
this._client = null
}
if (this.parser) {
this.parser.removeAllListeners()
}
}
/**
* Resets the parameters used in reconnect.
*/
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}
/**
* Connect to the endpoint.
*/
private _connect(): WS {
const cli: WS = new WS(`${this.url}?i=${this._accessToken}`)
return cli
}
/**
* Connect specified channels in websocket.
*/
private _channel() {
if (!this._client) {
return
}
switch (this.channel) {
case 'conversation':
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: 'main',
id: this._channelID
}
})
)
break
case 'user':
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: 'main',
id: this._channelID
}
})
)
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: 'homeTimeline',
id: this._channelID
}
})
)
break
case 'list':
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: 'userList',
id: this._channelID,
params: {
listId: this.listId
}
}
})
)
break
default:
this._client.send(
JSON.stringify({
type: 'connect',
body: {
channel: this.channel,
id: this._channelID
}
})
)
break
}
}
/**
* Reconnects to the same endpoint.
*/
private _reconnect() {
setTimeout(() => {
// Skip reconnect when client is connecting.
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
if (this._client && this._client.readyState === WS.CONNECTING) {
return
}
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
this._clearBinding()
if (this._client) {
// In reconnect, we want to close the connection immediately,
// because recoonect is necessary when some problems occur.
this._client.terminate()
}
// Call connect methods
console.log('Reconnecting')
this._client = this._connect()
this._bindSocket(this._client)
}
}, this._reconnectInterval)
}
/**
* Clear binding event for websocket client.
*/
private _clearBinding() {
if (this._client) {
this._client.removeAllListeners('close')
this._client.removeAllListeners('pong')
this._client.removeAllListeners('open')
this._client.removeAllListeners('message')
this._client.removeAllListeners('error')
}
}
/**
* Bind event for web socket client.
* @param client A WebSocket instance.
*/
private _bindSocket(client: WS) {
client.on('close', (code: number, _reason: string) => {
if (code === 1000) {
this.emit('close', {})
} else {
console.log(`Closed connection with ${code}`)
if (!this._connectionClosed) {
this._reconnect()
}
}
})
client.on('pong', () => {
this._pongWaiting = false
this.emit('pong', {})
this._pongReceivedTimestamp = moment()
// It is required to anonymous function since get this scope in checkAlive.
setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
})
client.on('open', () => {
this.emit('connect', {})
this._channel()
// Call first ping event.
setTimeout(() => {
client.ping('')
}, 10000)
})
client.on('message', (data: WS.Data) => {
this.parser.parse(data, this._channelID)
})
client.on('error', (err: Error) => {
this.emit('error', err)
})
}
/**
* Set up parser when receive message.
*/
private _setupParser() {
this.parser.on('update', (note: MisskeyAPI.Entity.Note) => {
this.emit('update', MisskeyAPI.Converter.note(note))
})
this.parser.on('notification', (notification: MisskeyAPI.Entity.Notification) => {
this.emit('notification', MisskeyAPI.Converter.notification(notification))
})
this.parser.on('conversation', (note: MisskeyAPI.Entity.Note) => {
this.emit('conversation', MisskeyAPI.Converter.noteToConversation(note))
})
this.parser.on('error', (err: Error) => {
this.emit('parser-error', err)
})
}
/**
* Call ping and wait to pong.
*/
private _checkAlive(timestamp: Moment) {
const now: Moment = moment()
// Block multiple calling, if multiple pong event occur.
// It the duration is less than interval, through ping.
if (now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed) {
// Skip ping when client is connecting.
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289
if (this._client && this._client.readyState !== WS.CONNECTING) {
this._pongWaiting = true
this._client.ping('')
setTimeout(() => {
if (this._pongWaiting) {
this._pongWaiting = false
this._reconnect()
}
}, 10000)
}
}
}
}
/**
* Parser
* This class provides parser for websocket message.
*/
export class Parser extends EventEmitter {
/**
* @param message Message body of websocket.
* @param channelID Parse only messages which has same channelID.
*/
public parse(message: WS.Data, channelID: string) {
if (typeof message !== 'string') {
this.emit('heartbeat', {})
return
}
if (message === '') {
this.emit('heartbeat', {})
return
}
let obj: {
type: string
body: {
id: string
type: string
body: any
}
}
let body: {
id: string
type: string
body: any
}
try {
obj = JSON.parse(message)
if (obj.type !== 'channel') {
return
}
if (!obj.body) {
return
}
body = obj.body
if (body.id !== channelID) {
return
}
} catch (err) {
this.emit('error', new Error(`Error parsing websocket reply: ${message}, error message: ${err}`))
return
}
switch (body.type) {
case 'note':
this.emit('update', body.body as MisskeyAPI.Entity.Note)
break
case 'notification':
this.emit('notification', body.body as MisskeyAPI.Entity.Notification)
break
case 'mention': {
const note = body.body as MisskeyAPI.Entity.Note
if (note.visibility === 'specified') {
this.emit('conversation', note)
}
break
}
// When renote and followed event, the same notification will be received.
case 'renote':
case 'followed':
case 'receiveFollowRequest':
case 'meUpdated':
case 'readAllNotifications':
case 'readAllUnreadSpecifiedNotes':
case 'readAllAntennas':
case 'readAllUnreadMentions':
// Ignore these events
break
default:
this.emit('error', new Error(`Unknown event has received: ${JSON.stringify(body)}`))
break
}
}
}

View file

@ -557,6 +557,11 @@
resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-1.0.1.tgz#0a851d3bd96498fa25c33ab7278ed3bd65f06c3e"
integrity sha512-l42BggppR6zLmpfU6fq9HEa2oGPEI8yrSPL3GITjfRInppYFahObbIQOQK3UGxEnyQpltZLaPe75046NOZQikw==
"@types/uuid@^7.0.0":
version "7.0.0"
resolved "https://registry.yarnpkg.com/@types/uuid/-/uuid-7.0.0.tgz#9f6993ccc8210efa90bda7e1afabbb06a9f860cd"
integrity sha512-RiX1I0lK9WFLFqy2xOxke396f0wKIzk5sAll0tL4J4XDYJXURI7JOs96XQb3nP+2gEpQ/LutBb66jgiT5oQshQ==
"@types/ws@^7.2.0":
version "7.2.2"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-7.2.2.tgz#1bd2038bc80aea60f8a20b2dcf08602a72e65063"
@ -4427,6 +4432,11 @@ uuid@^3.3.2:
resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131"
integrity sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==
uuid@^7.0.2:
version "7.0.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-7.0.2.tgz#7ff5c203467e91f5e0d85cfcbaaf7d2ebbca9be6"
integrity sha512-vy9V/+pKG+5ZTYKf+VcphF5Oc6EFiu3W8Nv3P3zIh0EqVI80ZxOzuPfe9EHjkFNvf8+xuTHVeei4Drydlx4zjw==
v8-compile-cache@^2.0.3:
version "2.0.3"
resolved "https://registry.yarnpkg.com/v8-compile-cache/-/v8-compile-cache-2.0.3.tgz#00f7494d2ae2b688cfe2899df6ed2c54bef91dbe"