refs #102 Get event-stream using axios

This commit is contained in:
AkiraFukushima 2019-12-08 01:24:18 +09:00
parent 4d4c314c29
commit 8d04a42bc7
6 changed files with 164 additions and 12 deletions

View file

@ -1,4 +1,4 @@
import Mastodon, { Status, Notification, StreamListener, ProxyConfig } from 'megalodon'
import Mastodon, { Status, Notification, EventStream, ProxyConfig } from 'megalodon'
declare var process: {
env: {
@ -21,7 +21,7 @@ const proxy: ProxyConfig = {
const client = new Mastodon(access_token, BASE_URL + '/api/v1', 'megalodon', proxy)
const stream: StreamListener = client.stream('/streaming/public')
const stream: EventStream = client.stream('/streaming/public')
stream.on('connect', _ => {
console.log('connect')
})

View file

@ -1,4 +1,4 @@
import Mastodon, { Status, Notification, StreamListener } from 'megalodon'
import Mastodon, { Status, Notification, EventStream } from 'megalodon'
declare var process: {
env: {
@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN
const client = new Mastodon(access_token, BASE_URL + '/api/v1')
const stream: StreamListener = client.stream('/streaming/public')
const stream: EventStream = client.stream('/streaming/public')
stream.on('connect', _ => {
console.log('connect')
})

1
src/axios.d.ts vendored Normal file
View file

@ -0,0 +1 @@
declare module 'axios/lib/adapters/http'

143
src/event_stream.ts Normal file
View file

@ -0,0 +1,143 @@
import { EventEmitter } from 'events'
import axios from 'axios'
import httpAdapter from 'axios/lib/adapters/http'
import Parser from './parser'
import { Status } from './entities/status'
import { Notification } from './entities/notification'
import { Conversation } from './entities/conversation'
const STATUS_CODES_TO_ABORT_ON: Array<number> = [400, 401, 403, 404, 406, 410, 422]
class EventStreamError extends Error {
public statusCode: number
public message: string
constructor(statusCode: number, message: string) {
super()
this.statusCode = statusCode
this.message = message
}
}
class EventStream extends EventEmitter {
public url: string
public headers: object
public parser: Parser
private _connectionClosed: boolean = false
private _reconnectInterval: number
private _reconnectMaxAttempts: number = Infinity
private _reconnectCurrentAttempts: number = 0
constructor(url: string, headers: object, _reconnectInterval: number) {
super()
this.url = url
this.headers = headers
this.parser = new Parser()
this._reconnectInterval = _reconnectInterval
}
public start() {
this._setupParser()
this._connect()
}
private _connect() {
axios.get(this.url, { responseType: 'stream', adapter: httpAdapter }).then(response => {
const stream = response.data
if (response.headers['content-type'] !== 'text/event-stream') {
this.emit('no-event-stream', 'no event')
}
// Response status is error
if (STATUS_CODES_TO_ABORT_ON.includes(response.status)) {
stream.on('data', (chunk: any) => {
let body = chunk.toString()
try {
body = JSON.parse(body)
} catch (jsonDecodeError) {
// if non-JSON text was returned, we'll just attach it to the error as-is.
}
const error: EventStreamError = new EventStreamError(response.status, body)
this.emit('error', error)
this.stop()
})
} else {
stream.on('data', (chunk: any) => {
this.parser.parse(chunk.toString())
})
stream.on('error', (err: Error) => {
this.emit('error', err)
})
}
stream.on('end', (err: Error | undefined | null) => {
if (err) {
console.log(`Closed connection with ${err.message}`)
if (!this._connectionClosed) {
this._reconnect()
}
} else {
this.emit('close', {})
}
})
this.emit('connect', stream)
})
}
private _reconnect() {
setTimeout(() => {
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
console.log('Reconnecting')
this._connect()
}
}, this._reconnectInterval)
}
public stop() {
this._connectionClosed = true
this._resetConnection()
this._resetRetryParams()
}
private _resetConnection() {
// TODO: close axios streaming connection
if (this.parser) {
this.parser.removeAllListeners()
}
}
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}
/**
* Set up parser when receive some data.
**/
private _setupParser() {
this.parser.on('update', (status: Status) => {
this.emit('update', status)
})
this.parser.on('notification', (notification: Notification) => {
this.emit('notification', notification)
})
this.parser.on('conversation', (conversation: Conversation) => {
this.emit('conversation', conversation)
})
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('error', (err: Error) => {
this.emit('parser-error', err)
})
this.parser.on('connection-limit-exceeded', (err: Error) => {
this.emit('error', err)
})
this.parser.on('heartbeat', _ => {
this.emit('heartbeat', 'heartbeat')
})
}
}
export default EventStream

View file

@ -3,6 +3,7 @@ import StreamListener from './stream_listener'
import WebSocket from './web_socket'
import Response from './response'
import OAuth from './oauth'
import EventStream from './event_stream'
/**
* Entities
*/
@ -45,6 +46,7 @@ export {
RequestCanceledError,
isCancel,
ProxyConfig,
EventStream,
/**
* Entities
*/

View file

@ -1,7 +1,8 @@
import { OAuth2 } from 'oauth'
import axios, { AxiosResponse, CancelTokenSource, AxiosRequestConfig } from 'axios'
import StreamListener from './stream_listener'
// import StreamListener from './stream_listener'
import EventStream from './event_stream'
import WebSocket from './web_socket'
import OAuth from './oauth'
import Response from './response'
@ -23,7 +24,7 @@ export interface MegalodonInstance {
post<T = any>(path: string, params: object): Promise<Response<T>>
del(path: string, params: object): Promise<Response<{}>>
cancel(): void
stream(path: string, reconnectInterval: number): StreamListener
stream(path: string, reconnectInterval: number): EventStream
socket(path: string, strea: string): WebSocket
}
@ -128,9 +129,12 @@ export default class Mastodon implements MegalodonInstance {
}
if (options.website) params.website = options.website
return this._post<OAuth.AppDataFromServer>('/api/v1/apps', params, baseUrl, proxyConfig).then(
(res: Response<OAuth.AppDataFromServer>) => OAuth.AppData.from(res.data)
)
return this._post<OAuth.AppDataFromServer>(
'/api/v1/apps',
params,
baseUrl,
proxyConfig
).then((res: Response<OAuth.AppDataFromServer>) => OAuth.AppData.from(res.data))
}
/**
@ -476,15 +480,17 @@ export default class Mastodon implements MegalodonInstance {
* @param reconnectInterval interval of reconnect
* @returns streamListener, which inherits from EventEmitter
*/
public stream(path: string, reconnectInterval = 1000): StreamListener {
public stream(path: string, reconnectInterval = 1000): EventStream {
const headers = {
'Cache-Control': 'no-cache',
Accept: 'text/event-stream',
'Content-Type': 'text/event-stream',
Authorization: `Bearer ${this.accessToken}`,
'User-Agent': this.userAgent
}
const url = this.baseUrl + path
const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval)
const url = this.baseUrl + path + `?access_token=${this.accessToken}`
// const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval)
const streaming = new EventStream(url, headers, reconnectInterval)
process.nextTick(() => {
streaming.start()
})