firefish/packages/megalodon/src/misskey/web_socket.ts
2023-09-24 04:27:16 +00:00

459 lines
10 KiB
TypeScript

import WS from "ws";
import dayjs, { Dayjs } from "dayjs";
import { v4 as uuid } from "uuid";
import { EventEmitter } from "events";
import { WebSocketInterface } from "../megalodon";
import proxyAgent, { ProxyConfig } from "../proxy_config";
import MisskeyAPI from "./api_client";
/**
* WebSocket
* Misskey is not support http streaming. It supports websocket instead of streaming.
* So this class connect to Misskey server with WebSocket.
*/
export default class WebSocket
extends EventEmitter
implements WebSocketInterface
{
public url: string;
public channel:
| "user"
| "localTimeline"
| "hybridTimeline"
| "globalTimeline"
| "conversation"
| "list";
public parser: any;
public headers: { [key: string]: string };
public proxyConfig: ProxyConfig | false = false;
public listId: string | null = null;
private _converter: MisskeyAPI.Converter;
private _accessToken: string;
private _reconnectInterval: number;
private _reconnectMaxAttempts: number;
private _reconnectCurrentAttempts: number;
private _connectionClosed: boolean;
private _client: WS | null = null;
private _channelID: string;
private _pongReceivedTimestamp: Dayjs;
private _heartbeatInterval = 60000;
private _pongWaiting = false;
/**
* @param url Full url of websocket: e.g. wss://firefish.io/streaming
* @param channel Channel name is user, localTimeline, hybridTimeline, globalTimeline, conversation or list.
* @param accessToken The access token.
* @param listId This parameter is required when you specify list as channel.
*/
constructor(
url: string,
channel:
| "user"
| "localTimeline"
| "hybridTimeline"
| "globalTimeline"
| "conversation"
| "list",
accessToken: string,
listId: string | undefined,
userAgent: string,
proxyConfig: ProxyConfig | false = false,
converter: MisskeyAPI.Converter,
) {
super();
this.url = url;
this.parser = new Parser();
this.channel = channel;
this.headers = {
"User-Agent": userAgent,
};
if (listId === undefined) {
this.listId = null;
} else {
this.listId = listId;
}
this.proxyConfig = proxyConfig;
this._accessToken = accessToken;
this._reconnectInterval = 10000;
this._reconnectMaxAttempts = Infinity;
this._reconnectCurrentAttempts = 0;
this._connectionClosed = false;
this._channelID = uuid();
this._pongReceivedTimestamp = dayjs();
this._converter = converter;
}
/**
* Start websocket connection.
*/
public start() {
this._connectionClosed = false;
this._resetRetryParams();
this._startWebSocketConnection();
}
private baseUrlToHost(baseUrl: string): string {
return baseUrl.replace("https://", "");
}
/**
* Reset connection and start new websocket connection.
*/
private _startWebSocketConnection() {
this._resetConnection();
this._setupParser();
this._client = this._connect();
this._bindSocket(this._client);
}
/**
* Stop current connection.
*/
public stop() {
this._connectionClosed = true;
this._resetConnection();
this._resetRetryParams();
}
/**
* Clean up current connection, and listeners.
*/
private _resetConnection() {
if (this._client) {
this._client.close(1000);
this._client.removeAllListeners();
this._client = null;
}
if (this.parser) {
this.parser.removeAllListeners();
}
}
/**
* Resets the parameters used in reconnect.
*/
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0;
}
/**
* Connect to the endpoint.
*/
private _connect(): WS {
let options: WS.ClientOptions = {
headers: this.headers,
};
if (this.proxyConfig) {
options = Object.assign(options, {
agent: proxyAgent(this.proxyConfig),
});
}
const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options);
return cli;
}
/**
* Connect specified channels in websocket.
*/
private _channel() {
if (!this._client) {
return;
}
switch (this.channel) {
case "conversation":
this._client.send(
JSON.stringify({
type: "connect",
body: {
channel: "main",
id: this._channelID,
},
}),
);
break;
case "user":
this._client.send(
JSON.stringify({
type: "connect",
body: {
channel: "main",
id: this._channelID,
},
}),
);
this._client.send(
JSON.stringify({
type: "connect",
body: {
channel: "homeTimeline",
id: this._channelID,
},
}),
);
break;
case "list":
this._client.send(
JSON.stringify({
type: "connect",
body: {
channel: "userList",
id: this._channelID,
params: {
listId: this.listId,
},
},
}),
);
break;
default:
this._client.send(
JSON.stringify({
type: "connect",
body: {
channel: this.channel,
id: this._channelID,
},
}),
);
break;
}
}
/**
* Reconnects to the same endpoint.
*/
private _reconnect() {
setTimeout(() => {
// Skip reconnect when client is connecting.
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L365
if (this._client && this._client.readyState === WS.CONNECTING) {
return;
}
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++;
this._clearBinding();
if (this._client) {
// In reconnect, we want to close the connection immediately,
// because recoonect is necessary when some problems occur.
this._client.terminate();
}
// Call connect methods
console.log("Reconnecting");
this._client = this._connect();
this._bindSocket(this._client);
}
}, this._reconnectInterval);
}
/**
* Clear binding event for websocket client.
*/
private _clearBinding() {
if (this._client) {
this._client.removeAllListeners("close");
this._client.removeAllListeners("pong");
this._client.removeAllListeners("open");
this._client.removeAllListeners("message");
this._client.removeAllListeners("error");
}
}
/**
* Bind event for web socket client.
* @param client A WebSocket instance.
*/
private _bindSocket(client: WS) {
client.on("close", (code: number, _reason: Buffer) => {
if (code === 1000) {
this.emit("close", {});
} else {
console.log(`Closed connection with ${code}`);
if (!this._connectionClosed) {
this._reconnect();
}
}
});
client.on("pong", () => {
this._pongWaiting = false;
this.emit("pong", {});
this._pongReceivedTimestamp = dayjs();
// It is required to anonymous function since get this scope in checkAlive.
setTimeout(
() => this._checkAlive(this._pongReceivedTimestamp),
this._heartbeatInterval,
);
});
client.on("open", () => {
this.emit("connect", {});
this._channel();
// Call first ping event.
setTimeout(() => {
client.ping("");
}, 10000);
});
client.on("message", (data: WS.Data, isBinary: boolean) => {
this.parser.parse(data, isBinary, this._channelID);
});
client.on("error", (err: Error) => {
this.emit("error", err);
});
}
/**
* Set up parser when receive message.
*/
private _setupParser() {
this.parser.on("update", (note: MisskeyAPI.Entity.Note) => {
this.emit(
"update",
this._converter.note(note, this.baseUrlToHost(this.url)),
);
});
this.parser.on(
"notification",
(notification: MisskeyAPI.Entity.Notification) => {
this.emit(
"notification",
this._converter.notification(
notification,
this.baseUrlToHost(this.url),
),
);
},
);
this.parser.on("conversation", (note: MisskeyAPI.Entity.Note) => {
this.emit(
"conversation",
this._converter.noteToConversation(note, this.baseUrlToHost(this.url)),
);
});
this.parser.on("error", (err: Error) => {
this.emit("parser-error", err);
});
}
/**
* Call ping and wait to pong.
*/
private _checkAlive(timestamp: Dayjs) {
const now: Dayjs = dayjs();
// Block multiple calling, if multiple pong event occur.
// It the duration is less than interval, through ping.
if (
now.diff(timestamp) > this._heartbeatInterval - 1000 &&
!this._connectionClosed
) {
// Skip ping when client is connecting.
// https://github.com/websockets/ws/blob/7.2.1/lib/websocket.js#L289
if (this._client && this._client.readyState !== WS.CONNECTING) {
this._pongWaiting = true;
this._client.ping("");
setTimeout(() => {
if (this._pongWaiting) {
this._pongWaiting = false;
this._reconnect();
}
}, 10000);
}
}
}
}
/**
* Parser
* This class provides parser for websocket message.
*/
export class Parser extends EventEmitter {
/**
* @param message Message body of websocket.
* @param channelID Parse only messages which has same channelID.
*/
public parse(data: WS.Data, isBinary: boolean, channelID: string) {
const message = isBinary ? data : data.toString();
if (typeof message !== "string") {
this.emit("heartbeat", {});
return;
}
if (message === "") {
this.emit("heartbeat", {});
return;
}
let obj: {
type: string;
body: {
id: string;
type: string;
body: any;
};
};
let body: {
id: string;
type: string;
body: any;
};
try {
obj = JSON.parse(message);
if (obj.type !== "channel") {
return;
}
if (!obj.body) {
return;
}
body = obj.body;
if (body.id !== channelID) {
return;
}
} catch (err) {
this.emit(
"error",
new Error(
`Error parsing websocket reply: ${message}, error message: ${err}`,
),
);
return;
}
switch (body.type) {
case "note":
this.emit("update", body.body as MisskeyAPI.Entity.Note);
break;
case "notification":
this.emit("notification", body.body as MisskeyAPI.Entity.Notification);
break;
case "mention": {
const note = body.body as MisskeyAPI.Entity.Note;
if (note.visibility === "specified") {
this.emit("conversation", note);
}
break;
}
// When renote and followed event, the same notification will be received.
case "renote":
case "followed":
case "follow":
case "unfollow":
case "receiveFollowRequest":
case "meUpdated":
case "readAllNotifications":
case "readAllUnreadSpecifiedNotes":
case "readAllAntennas":
case "readAllUnreadMentions":
case "unreadNotification":
// Ignore these events
break;
default:
this.emit(
"error",
new Error(`Unknown event has received: ${JSON.stringify(body)}`),
);
break;
}
}
}