Merge pull request #72 from h3poteto/iss-69-websocket

refs #69 Add proxy support for web socket
This commit is contained in:
AkiraFukushima 2019-10-19 13:56:26 +09:00 committed by GitHub
commit a34ca4ef61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 94 additions and 6 deletions

View file

@ -0,0 +1,64 @@
import Mastodon, { Status, Notification, WebSocket, ProxyConfig } from 'megalodon'
import log4js from 'log4js'
declare var process: {
env: {
PLEROMA_ACCESS_TOKEN: string
MASTODON_ACCESS_TOKEN: string
PROXY_HOST: string
PROXY_PORT: number
PROXY_PROTOCOL: string
}
}
const BASE_URL: string = 'wss://pleroma.io'
const access_token: string = process.env.PLEROMA_ACCESS_TOKEN
const proxy: ProxyConfig = {
host: process.env.PROXY_HOST,
port: process.env.PROXY_PORT,
protocol: process.env.PROXY_PROTOCOL
}
const client = new Mastodon(access_token, BASE_URL + '/api/v1', 'megalodon', proxy)
const stream: WebSocket = client.socket('/streaming', 'user')
const logger = log4js.getLogger()
logger.level = 'debug'
stream.on('connect', () => {
logger.debug('connect')
})
stream.on('pong', () => {
logger.debug('pong')
})
stream.on('update', (status: Status) => {
logger.debug(status)
})
stream.on('notification', (notification: Notification) => {
logger.debug(notification)
})
stream.on('delete', (id: number) => {
logger.debug(id)
})
stream.on('error', (err: Error) => {
console.error(err)
})
stream.on('heartbeat', () => {
logger.debug('thump.')
})
stream.on('close', () => {
logger.debug('close')
})
stream.on('parser-error', (err: Error) => {
logger.error(err)
})

View file

@ -46,6 +46,8 @@ export default class Mastodon implements MegalodonInstance {
/**
* @param accessToken access token from OAuth2 authorization
* @param baseUrl hostname or base URL
* @param userAgent UserAgent is specified in header on request.
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
constructor(
accessToken: string,
@ -66,6 +68,7 @@ export default class Mastodon implements MegalodonInstance {
* @param client_name Form Data, which is sent to /api/v1/apps
* @param options Form Data, which is sent to /api/v1/apps. and properties should be **snake_case**
* @param baseUrl base URL of the target
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
public static async registerApp(
client_name: string,
@ -99,6 +102,7 @@ export default class Mastodon implements MegalodonInstance {
* @param client_name your application's name
* @param options Form Data
* @param baseUrl target of base URL
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
public static async createApp(
client_name: string,
@ -167,6 +171,7 @@ export default class Mastodon implements MegalodonInstance {
* @param code will be generated by the link of #generateAuthUrl or #registerApp
* @param baseUrl base URL of the target
* @param redirect_uri must be the same uri as the time when you register your OAuth application
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
public static async fetchAccessToken(
client_id: string,
@ -198,6 +203,7 @@ export default class Mastodon implements MegalodonInstance {
* @param client_secret will be generated by #createApp or #registerApp
* @param refresh_token will be get #fetchAccessToken
* @param baseUrl base URL or the target
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
public static async refreshToken(
client_id: string,
@ -224,6 +230,7 @@ export default class Mastodon implements MegalodonInstance {
* @param path relative path from baseUrl
* @param params Query parameters
* @param baseUrl base URL of the target
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
public static async get<T>(
path: string,
@ -493,7 +500,7 @@ export default class Mastodon implements MegalodonInstance {
*/
public socket(path: string, stream: string): WebSocket {
const url = this.baseUrl + path
const streaming = new WebSocket(url, stream, this.accessToken, this.userAgent)
const streaming = new WebSocket(url, stream, this.accessToken, this.userAgent, this.proxyConfig)
process.nextTick(() => {
streaming.start()
})

View file

@ -39,6 +39,7 @@ class StreamListener extends EventEmitter {
/**
* @param url full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user
* @param headers headers of streaming request
* @param proxyConfig Proxy setting, or set false if don't use proxy.
* @param reconnectInterval reconnection interval[ms]
*/
constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval?: number) {

View file

@ -4,6 +4,7 @@ import { EventEmitter } from 'events'
import { Status } from './entities/status'
import { Notification } from './entities/notification'
import { Conversation } from './entities/conversation'
import proxyAgent, { ProxyConfig } from './proxy_config'
/**
* WebSocket
@ -15,6 +16,7 @@ export default class WebSocket extends EventEmitter {
public stream: string
public parser: Parser
public headers: { [key: string]: string }
public proxyConfig: ProxyConfig | false = false
private _accessToken: string
private _reconnectInterval: number
private _reconnectMaxAttempts: number
@ -30,8 +32,9 @@ export default class WebSocket extends EventEmitter {
* @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28
* @param accessToken The access token.
* @param userAgent The specified User Agent.
* @param proxyConfig Proxy setting, or set false if don't use proxy.
*/
constructor(url: string, stream: string, accessToken: string, userAgent: string) {
constructor(url: string, stream: string, accessToken: string, userAgent: string, proxyConfig: ProxyConfig | false = false) {
super()
this.url = url
this.stream = stream
@ -39,6 +42,7 @@ export default class WebSocket extends EventEmitter {
this.headers = {
'User-Agent': userAgent
}
this.proxyConfig = proxyConfig
this._accessToken = accessToken
this._reconnectInterval = 1000
this._reconnectMaxAttempts = Infinity
@ -63,7 +67,7 @@ export default class WebSocket extends EventEmitter {
private _startWebSocketConnection() {
this._resetConnection()
this._setupParser()
this._client = this._connect(this.url, this.stream, this._accessToken, this.headers)
this._client = this._connect(this.url, this.stream, this._accessToken, this.headers, this.proxyConfig)
this._bindSocket(this._client)
}
@ -108,7 +112,7 @@ export default class WebSocket extends EventEmitter {
this._reconnectCurrentAttempts++
// Call connect methods
console.log('Reconnecting')
this._client = this._connect(this.url, this.stream, this._accessToken, this.headers)
this._client = this._connect(this.url, this.stream, this._accessToken, this.headers, this.proxyConfig)
this._clearBinding()
this._bindSocket(this._client)
}
@ -121,18 +125,30 @@ export default class WebSocket extends EventEmitter {
* @param stream The specified stream name.
* @param accessToken Access token.
* @param headers The specified headers.
* @param proxyConfig Proxy setting, or set false if don't use proxy.
* @return A WebSocket instance.
*/
private _connect(url: string, stream: string, accessToken: string, headers: { [key: string]: string }): WS {
private _connect(
url: string,
stream: string,
accessToken: string,
headers: { [key: string]: string },
proxyConfig: ProxyConfig | false
): WS {
const params: Array<string> = [`stream=${stream}`]
if (accessToken !== null) {
params.push(`access_token=${accessToken}`)
}
const requestURL: string = `${url}/?${params.join('&')}`
const options: WS.ClientOptions = {
let options: WS.ClientOptions = {
headers: headers
}
if (proxyConfig) {
options = Object.assign(proxyConfig, {
agent: proxyAgent(proxyConfig)
})
}
const cli: WS = new WS(requestURL, options)
return cli