refs #41 Handle delete message from streaming

This commit is contained in:
AkiraFukushima 2019-05-24 01:37:02 +09:00
parent 8db1d4940b
commit dc59c88b0a
7 changed files with 114 additions and 112 deletions

6
.prettierrc Normal file
View file

@ -0,0 +1,6 @@
{
"tabWidth": 2,
"semi": false,
"singleQuote": true,
"printWidth": 140
}

View file

@ -1,13 +1,10 @@
const Mastodon = require( '../../lib/mastodon')
const Mastodon = require('../../lib/mastodon')
const BASE_URL = 'https://mastodon.social'
const BASE_URL = 'https://mstdn.jp'
const access_token = '...'
const access_token = process.env.MASTODON_ACCESS_TOKEN
const client = new Mastodon(
access_token,
BASE_URL + '/api/v1'
)
const client = new Mastodon(access_token, BASE_URL + '/api/v1')
const stream = client.stream('/streaming/public')
stream.on('connect', event => {
@ -18,27 +15,26 @@ stream.on('not-event-stream', mes => {
console.log(mes)
})
stream.on('update', (status) => {
stream.on('update', status => {
console.log(status)
})
stream.on('notification', (notification) => {
stream.on('notification', notification => {
console.log(notification)
})
stream.on('delete', (id) => {
console.log(id)
stream.on('delete', id => {
console.log(`delete: ${id}`)
})
stream.on('error', (err) => {
stream.on('error', err => {
console.error(err)
})
stream.on('heartbeat', (msg) => {
stream.on('heartbeat', msg => {
console.log('thump.')
})
stream.on('connection-limit-exceeded', err => {
console.error(err)
})

View file

@ -2,38 +2,32 @@ const Mastodon = require('../../lib/mastodon')
const BASE_URL = 'wss://pleroma.io'
const access_token = '...'
const access_token = process.env.PLEROMA_ACCESS_TOKEN
const client = new Mastodon(
access_token,
BASE_URL + '/api/v1'
)
const client = new Mastodon(access_token, BASE_URL + '/api/v1')
const stream = client.socket(
'/streaming',
'user'
)
const stream = client.socket('/streaming', 'user')
stream.on('connect', event => {
console.log('connect')
})
stream.on('update', (status) => {
stream.on('update', status => {
console.log(status)
})
stream.on('notification', (notification) => {
stream.on('notification', notification => {
console.log(notification)
})
stream.on('delete', (id) => {
console.log(id)
stream.on('delete', id => {
console.log(`delete: ${id}`)
})
stream.on('error', (err) => {
stream.on('error', err => {
console.error(err)
})
stream.on('heartbeat', (msg) => {
stream.on('heartbeat', msg => {
console.log('thump.')
})
@ -41,10 +35,6 @@ stream.on('close', () => {
console.log('close')
})
stream.on('parser-error', (err) => {
stream.on('parser-error', err => {
console.error(err)
})
setTimeout(() => {
stream.stop()
}, 10000)

View file

@ -7,30 +7,30 @@ import Emoji from './emoji'
import Card from './card'
export default interface Status {
id: number,
uri: string,
url: string,
account: Account,
in_reply_to_id: number | null,
in_reply_to_account_id: number | null,
reblog: Status | null,
content: string,
created_at: string,
emojis: Emoji[],
replies_count: number,
reblogs_count: number,
favourites_count: number,
reblogged: boolean | null,
favourited: boolean | null,
muted: boolean | null,
sensitive: boolean,
spoiler_text: string,
visibility: 'public' | 'unlisted' | 'private' | 'direct',
media_attachments: Attachment[],
mentions: Mention[],
tags: Tag[],
card: Card | null,
application: Application,
language: string | null,
id: string
uri: string
url: string
account: Account
in_reply_to_id: number | null
in_reply_to_account_id: number | null
reblog: Status | null
content: string
created_at: string
emojis: Emoji[]
replies_count: number
reblogs_count: number
favourites_count: number
reblogged: boolean | null
favourited: boolean | null
muted: boolean | null
sensitive: boolean
spoiler_text: string
visibility: 'public' | 'unlisted' | 'private' | 'direct'
media_attachments: Attachment[]
mentions: Mention[]
tags: Tag[]
card: Card | null
application: Application
language: string | null
pinned: boolean | null
}

View file

@ -52,30 +52,33 @@ class Parser extends EventEmitter {
const event: string = root[0].substr(7)
const data: string = root[1].substr(6)
let jsonObj = {}
try {
const obj = JSON.parse(data)
switch (event) {
case 'update':
this.emit('update', obj as Status)
break
case 'notification':
this.emit('notification', obj as Notification)
break
case 'conversation':
this.emit('conversation', obj as Conversation)
break
case 'delete':
// When delete, data is an ID of the deleted status
this.emit('delete', obj as number)
break
default:
this.emit('error', new Error(`Unknown event has received: ${event}`))
break
}
jsonObj = JSON.parse(data)
} catch (err) {
this.emit('error', new Error(`Error parsing API reply: '${piece}', error message: '${err}'`))
} finally {
continue
// delete event does not have json object
if (event !== 'delete') {
this.emit('error', new Error(`Error parsing API reply: '${piece}', error message: '${err}'`))
continue
}
}
switch (event) {
case 'update':
this.emit('update', jsonObj as Status)
break
case 'notification':
this.emit('notification', jsonObj as Notification)
break
case 'conversation':
this.emit('conversation', jsonObj as Conversation)
break
case 'delete':
// When delete, data is an ID of the deleted status
this.emit('delete', data as string)
break
default:
this.emit('error', new Error(`Unknown event has received: ${event}`))
continue
}
}
offset++

View file

@ -18,7 +18,6 @@ class StreamingError extends Error {
}
}
/**
* StreamListener
* Listener of streaming. It receives data, and parse when streaming.
@ -35,7 +34,6 @@ class StreamListener extends EventEmitter {
private _usedFirstReconnect: boolean
private _stallAbortTimeout: NodeJS.Timer | undefined
/**
* @param url full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user
* @param headers headers of streaming request
@ -113,7 +111,7 @@ class StreamListener extends EventEmitter {
this.request = Request.get(this._buildRequestOption())
this.emit('connect', this.request)
this.request.on('response', (response) => {
this.request.on('response', response => {
// reset our reconnection attempt flag so next attempt goes through with 0 delay
// if we get a transport-level error
this._usedFirstReconnect = false
@ -125,7 +123,7 @@ class StreamListener extends EventEmitter {
}
if (STATUS_CODES_TO_ABORT_ON.indexOf(response.statusCode) > -1) {
let body: string = ''
this.response.on('data', (chunk) => {
this.response.on('data', chunk => {
body += chunk.toString()
try {
@ -141,14 +139,14 @@ class StreamListener extends EventEmitter {
body = ''
})
} else {
this.response.on('data', (chunk) => {
this.response.on('data', chunk => {
this._connectInterval = 0
this._resetStallAbortTimeout()
this.parser.parse(chunk.toString())
})
this.response.on('error', (err) => {
this.response.on('error', err => {
// expose response errors on twit instance
this.emit('error', err)
})
@ -286,7 +284,7 @@ class StreamListener extends EventEmitter {
this.parser.on('conversation', (conversation: Conversation) => {
this.emit('conversation', conversation)
})
this.parser.on('delete', (id: number) => {
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('error', (err: Error) => {

View file

@ -122,17 +122,17 @@ export default class WebSocket extends EventEmitter {
*/
private _getClient(): client {
const cli = new client()
cli.on('connectFailed', (err) => {
cli.on('connectFailed', err => {
console.error(err)
this._reconnect(cli)
})
cli.on('connect', (conn) => {
cli.on('connect', conn => {
this._socketConnection = conn
this.emit('connect', {})
conn.on('error', (err) => {
conn.on('error', err => {
this.emit('error', err)
})
conn.on('close', (code) => {
conn.on('close', code => {
// Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4
if (code === 1000) {
this.emit('close', {})
@ -162,7 +162,7 @@ export default class WebSocket extends EventEmitter {
this.parser.on('notification', (notification: Notification) => {
this.emit('notification', notification)
})
this.parser.on('delete', (id: number) => {
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('conversation', (conversation: Conversation) => {
@ -200,28 +200,37 @@ class Parser extends EventEmitter {
return
}
let event = ''
let payload = ''
let mes = {}
try {
const obj = JSON.parse(data)
const payload: string = obj.payload
const mes = JSON.parse(payload)
switch (obj.event) {
case 'update':
this.emit('update', mes as Status)
break
case 'notification':
this.emit('notification', mes as Notification)
break
case 'delete':
this.emit('delete', mes as number)
break
case 'conversation':
this.emit('conversation', mes as Conversation)
break
default:
this.emit('error', new Error(`Unknown event has received: ${obj}`))
}
event = obj.event
payload = obj.payload
mes = JSON.parse(payload)
} catch (err) {
this.emit('error', new Error(`Error parsing websocket reply: ${data}, error message: ${err}`))
// delete event does not have json object
if (event !== 'delete') {
this.emit('error', new Error(`Error parsing websocket reply: ${data}, error message: ${err}`))
return
}
}
switch (event) {
case 'update':
this.emit('update', mes as Status)
break
case 'notification':
this.emit('notification', mes as Notification)
break
case 'conversation':
this.emit('conversation', mes as Conversation)
break
case 'delete':
this.emit('delete', payload as string)
break
default:
this.emit('error', new Error(`Unknown event has received: ${data}`))
}
return
}