mizzkey/packages/backend/src/server/api/stream/index.ts

300 lines
8.2 KiB
TypeScript
Raw Normal View History

/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import * as WebSocket from 'ws';
import type { MiUser } from '@/models/entities/User.js';
import type { MiAccessToken } from '@/models/entities/AccessToken.js';
2023-03-10 06:22:37 +01:00
import type { Packed } from '@/misc/json-schema.js';
2022-09-17 20:27:08 +02:00
import type { NoteReadService } from '@/core/NoteReadService.js';
import type { NotificationService } from '@/core/NotificationService.js';
import { bindThis } from '@/decorators.js';
2023-04-05 03:21:10 +02:00
import { CacheService } from '@/core/CacheService.js';
import { MiUserProfile } from '@/models/index.js';
2022-09-17 20:27:08 +02:00
import type { ChannelsService } from './ChannelsService.js';
import type { EventEmitter } from 'events';
import type Channel from './channel.js';
import type { StreamEventEmitter, StreamMessages } from './types.js';
2020-03-28 03:24:37 +01:00
/**
* Main stream connection
*/
// eslint-disable-next-line import/no-default-export
export default class Connection {
public user?: MiUser;
public token?: MiAccessToken;
private wsConnection: WebSocket.WebSocket;
refactor: publishHogeStreamとStreamのEventEmitterに型定義する (#7769) * wip * wip * wip * :v: * add main stream * packedNotificationSchemaを更新 * read:gallery, write:gallery, read:gallery-likes, write:gallery-likesに翻訳を追加 * fix * ok * add header, choice, invitation * add header, choice, invitation * test * fix * fix * yatta * remove no longer needed "as PackedUser/PackedNote" * clean up * add simple-schema * fix lint * fix lint * wip * wip! * wip * fix * wip * wip * :v: * 送信側に型エラーがないことを3回確認した * :v: * wip * update typescript * define items in full Schema * edit comment * edit comment * edit comment * Update src/prelude/types.ts Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com> * https://github.com/misskey-dev/misskey/pull/7769#discussion_r703058458 * user packとnote packの型不整合を修正 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * user packとnote packの型不整合を修正 * add prelude/types.ts * emoji * signin * game * matching * clean up * ev => data * refactor * clean up * add type * antenna * channel * fix * add Packed type * add PackedRef * fix lint * add emoji schema * add reversiGame * add reversiMatching * remove signin schema (use Signin entity) * add schemas refs, fix Packed type * wip PackedHoge => Packed<'Hoge'> * add Packed type * note-reaction * user * user-group * user-list * note * app, messaging-message * notification * drive-file * drive-folder * following * muting * blocking * hashtag * page * app (with modifying schema) * import user? * channel * antenna * clip * gallery-post * emoji * Packed * reversi-matching * update stream.ts * https://github.com/misskey-dev/misskey/pull/7769#issuecomment-917542339 * fix lint * clean up? * add changelog * add changelog * add changelog * fix: アンテナが既読にならないのを修正 * revert fix * https://github.com/misskey-dev/misskey/pull/7769#discussion_r711474875 * spec => payload * edit commetn Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com>
2021-10-20 18:04:10 +02:00
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
public userProfile: MiUserProfile | null = null;
2023-04-05 03:21:10 +02:00
public following: Set<string> = new Set();
public followingChannels: Set<string> = new Set();
public userIdsWhoMeMuting: Set<string> = new Set();
public userIdsWhoBlockingMe: Set<string> = new Set();
public userIdsWhoMeMutingRenotes: Set<string> = new Set();
private fetchIntervalId: NodeJS.Timer | null = null;
constructor(
2022-09-17 20:27:08 +02:00
private channelsService: ChannelsService,
private noteReadService: NoteReadService,
private notificationService: NotificationService,
2023-04-05 03:21:10 +02:00
private cacheService: CacheService,
2022-09-17 20:27:08 +02:00
user: MiUser | null | undefined,
token: MiAccessToken | null | undefined,
) {
if (user) this.user = user;
2020-03-28 10:07:41 +01:00
if (token) this.token = token;
2023-04-05 03:21:10 +02:00
}
2023-04-05 03:21:10 +02:00
@bindThis
public async fetch() {
if (this.user == null) return;
const [userProfile, following, followingChannels, userIdsWhoMeMuting, userIdsWhoBlockingMe, userIdsWhoMeMutingRenotes] = await Promise.all([
this.cacheService.userProfileCache.fetch(this.user.id),
this.cacheService.userFollowingsCache.fetch(this.user.id),
this.cacheService.userFollowingChannelsCache.fetch(this.user.id),
this.cacheService.userMutingsCache.fetch(this.user.id),
this.cacheService.userBlockedCache.fetch(this.user.id),
this.cacheService.renoteMutingsCache.fetch(this.user.id),
]);
this.userProfile = userProfile;
this.following = following;
this.followingChannels = followingChannels;
this.userIdsWhoMeMuting = userIdsWhoMeMuting;
this.userIdsWhoBlockingMe = userIdsWhoBlockingMe;
this.userIdsWhoMeMutingRenotes = userIdsWhoMeMutingRenotes;
}
2020-04-02 15:17:17 +02:00
2023-04-05 03:21:10 +02:00
@bindThis
public async init() {
if (this.user != null) {
await this.fetch();
if (!this.fetchIntervalId) {
this.fetchIntervalId = setInterval(this.fetch, 1000 * 10);
}
}
}
@bindThis
public async listen(subscriber: EventEmitter, wsConnection: WebSocket.WebSocket) {
this.subscriber = subscriber;
2023-04-05 03:21:10 +02:00
this.wsConnection = wsConnection;
this.wsConnection.on('message', this.onWsConnectionMessage);
this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data);
});
}
/**
*
*/
@bindThis
private async onWsConnectionMessage(data: WebSocket.RawData) {
let obj: Record<string, any>;
try {
obj = JSON.parse(data.toString());
} catch (e) {
return;
}
const { type, body } = obj;
switch (type) {
case 'readNotification': this.onReadNotification(body); break;
2021-03-21 09:38:09 +01:00
case 'subNote': this.onSubscribeNote(body); break;
case 's': this.onSubscribeNote(body); break; // alias
case 'sr': this.onSubscribeNote(body); this.readNote(body); break;
case 'unsubNote': this.onUnsubscribeNote(body); break;
case 'un': this.onUnsubscribeNote(body); break; // alias
case 'connect': this.onChannelConnectRequested(body); break;
case 'disconnect': this.onChannelDisconnectRequested(body); break;
case 'channel': this.onChannelMessageRequested(body); break;
2018-10-09 20:24:09 +02:00
case 'ch': this.onChannelMessageRequested(body); break; // alias
}
}
@bindThis
refactor: publishHogeStreamとStreamのEventEmitterに型定義する (#7769) * wip * wip * wip * :v: * add main stream * packedNotificationSchemaを更新 * read:gallery, write:gallery, read:gallery-likes, write:gallery-likesに翻訳を追加 * fix * ok * add header, choice, invitation * add header, choice, invitation * test * fix * fix * yatta * remove no longer needed "as PackedUser/PackedNote" * clean up * add simple-schema * fix lint * fix lint * wip * wip! * wip * fix * wip * wip * :v: * 送信側に型エラーがないことを3回確認した * :v: * wip * update typescript * define items in full Schema * edit comment * edit comment * edit comment * Update src/prelude/types.ts Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com> * https://github.com/misskey-dev/misskey/pull/7769#discussion_r703058458 * user packとnote packの型不整合を修正 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * user packとnote packの型不整合を修正 * add prelude/types.ts * emoji * signin * game * matching * clean up * ev => data * refactor * clean up * add type * antenna * channel * fix * add Packed type * add PackedRef * fix lint * add emoji schema * add reversiGame * add reversiMatching * remove signin schema (use Signin entity) * add schemas refs, fix Packed type * wip PackedHoge => Packed<'Hoge'> * add Packed type * note-reaction * user * user-group * user-list * note * app, messaging-message * notification * drive-file * drive-folder * following * muting * blocking * hashtag * page * app (with modifying schema) * import user? * channel * antenna * clip * gallery-post * emoji * Packed * reversi-matching * update stream.ts * https://github.com/misskey-dev/misskey/pull/7769#issuecomment-917542339 * fix lint * clean up? * add changelog * add changelog * add changelog * fix: アンテナが既読にならないのを修正 * revert fix * https://github.com/misskey-dev/misskey/pull/7769#discussion_r711474875 * spec => payload * edit commetn Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com>
2021-10-20 18:04:10 +02:00
private onBroadcastMessage(data: StreamMessages['broadcast']['payload']) {
this.sendMessageToWs(data.type, data.body);
2020-04-02 15:17:17 +02:00
}
@bindThis
public cacheNote(note: Packed<'Note'>) {
const add = (note: Packed<'Note'>) => {
2021-03-21 09:38:09 +01:00
const existIndex = this.cachedNotes.findIndex(n => n.id === note.id);
if (existIndex > -1) {
this.cachedNotes[existIndex] = note;
return;
}
this.cachedNotes.unshift(note);
if (this.cachedNotes.length > 32) {
this.cachedNotes.splice(32);
}
};
add(note);
if (note.reply) add(note.reply);
if (note.renote) add(note.renote);
2021-03-21 09:38:09 +01:00
}
@bindThis
2021-03-21 09:38:09 +01:00
private readNote(body: any) {
const id = body.id;
const note = this.cachedNotes.find(n => n.id === id);
if (note == null) return;
if (this.user && (note.userId !== this.user.id)) {
this.noteReadService.read(this.user.id, [note]);
2021-03-21 09:38:09 +01:00
}
}
@bindThis
private onReadNotification(payload: any) {
this.notificationService.readAllNotification(this.user!.id);
}
/**
* 稿
*/
@bindThis
2021-03-21 09:38:09 +01:00
private onSubscribeNote(payload: any) {
if (!payload.id) return;
if (this.subscribingNotes[payload.id] == null) {
this.subscribingNotes[payload.id] = 0;
}
this.subscribingNotes[payload.id]++;
2020-03-29 03:39:36 +02:00
if (this.subscribingNotes[payload.id] === 1) {
this.subscriber.on(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
}
/**
* 稿
*/
@bindThis
private onUnsubscribeNote(payload: any) {
if (!payload.id) return;
this.subscribingNotes[payload.id]--;
if (this.subscribingNotes[payload.id] <= 0) {
delete this.subscribingNotes[payload.id];
this.subscriber.off(`noteStream:${payload.id}`, this.onNoteStreamMessage);
}
}
@bindThis
refactor: publishHogeStreamとStreamのEventEmitterに型定義する (#7769) * wip * wip * wip * :v: * add main stream * packedNotificationSchemaを更新 * read:gallery, write:gallery, read:gallery-likes, write:gallery-likesに翻訳を追加 * fix * ok * add header, choice, invitation * add header, choice, invitation * test * fix * fix * yatta * remove no longer needed "as PackedUser/PackedNote" * clean up * add simple-schema * fix lint * fix lint * wip * wip! * wip * fix * wip * wip * :v: * 送信側に型エラーがないことを3回確認した * :v: * wip * update typescript * define items in full Schema * edit comment * edit comment * edit comment * Update src/prelude/types.ts Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com> * https://github.com/misskey-dev/misskey/pull/7769#discussion_r703058458 * user packとnote packの型不整合を修正 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * revert https://github.com/misskey-dev/misskey/pull/7772#discussion_r706627736 * user packとnote packの型不整合を修正 * add prelude/types.ts * emoji * signin * game * matching * clean up * ev => data * refactor * clean up * add type * antenna * channel * fix * add Packed type * add PackedRef * fix lint * add emoji schema * add reversiGame * add reversiMatching * remove signin schema (use Signin entity) * add schemas refs, fix Packed type * wip PackedHoge => Packed<'Hoge'> * add Packed type * note-reaction * user * user-group * user-list * note * app, messaging-message * notification * drive-file * drive-folder * following * muting * blocking * hashtag * page * app (with modifying schema) * import user? * channel * antenna * clip * gallery-post * emoji * Packed * reversi-matching * update stream.ts * https://github.com/misskey-dev/misskey/pull/7769#issuecomment-917542339 * fix lint * clean up? * add changelog * add changelog * add changelog * fix: アンテナが既読にならないのを修正 * revert fix * https://github.com/misskey-dev/misskey/pull/7769#discussion_r711474875 * spec => payload * edit commetn Co-authored-by: Acid Chicken (硫酸鶏) <root@acid-chicken.com>
2021-10-20 18:04:10 +02:00
private async onNoteStreamMessage(data: StreamMessages['note']['payload']) {
this.sendMessageToWs('noteUpdated', {
id: data.body.id,
type: data.type,
body: data.body.body,
});
}
/**
*
*/
@bindThis
private onChannelConnectRequested(payload: any) {
const { channel, id, params, pong } = payload;
this.connectChannel(id, params, channel, pong);
}
/**
*
*/
@bindThis
private onChannelDisconnectRequested(payload: any) {
const { id } = payload;
this.disconnectChannel(id);
}
/**
*
*/
@bindThis
public sendMessageToWs(type: string, payload: any) {
this.wsConnection.send(JSON.stringify({
type: type,
2021-12-09 15:58:30 +01:00
body: payload,
}));
}
/**
*
*/
@bindThis
public connectChannel(id: string, params: any, channel: string, pong = false) {
2022-09-17 20:27:08 +02:00
const channelService = this.channelsService.getChannelService(channel);
if (channelService.requireCredential && this.user == null) {
2018-11-10 18:22:34 +01:00
return;
}
// 共有可能チャンネルに接続しようとしていて、かつそのチャンネルに既に接続していたら無意味なので無視
2022-09-17 20:27:08 +02:00
if (channelService.shouldShare && this.channels.some(c => c.chName === channel)) {
return;
}
2022-09-17 20:27:08 +02:00
const ch: Channel = channelService.create(id, this);
this.channels.push(ch);
2023-05-17 04:10:31 +02:00
ch.init(params ?? {});
if (pong) {
this.sendMessageToWs('connected', {
2021-12-09 15:58:30 +01:00
id: id,
});
}
}
/**
*
2018-10-07 10:06:28 +02:00
* @param id ID
*/
@bindThis
2018-10-07 10:19:52 +02:00
public disconnectChannel(id: string) {
const channel = this.channels.find(c => c.id === id);
if (channel) {
if (channel.dispose) channel.dispose();
this.channels = this.channels.filter(c => c.id !== id);
}
}
2018-10-07 10:06:28 +02:00
/**
*
* @param data
*/
@bindThis
private onChannelMessageRequested(data: any) {
const channel = this.channels.find(c => c.id === data.id);
if (channel != null && channel.onMessage != null) {
channel.onMessage(data.type, data.body);
}
}
/**
*
*/
@bindThis
public dispose() {
2023-04-05 03:21:10 +02:00
if (this.fetchIntervalId) clearInterval(this.fetchIntervalId);
for (const c of this.channels.filter(c => c.dispose)) {
if (c.dispose) c.dispose();
}
}
}