Refactoring
This commit is contained in:
parent
9b780dff04
commit
8129d4dc23
51 changed files with 68 additions and 68 deletions
|
|
@ -1,7 +1,7 @@
|
|||
import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../models/user';
|
||||
import Following from '../../models/following';
|
||||
import FollowRequest from '../../models/follow-request';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import { publishMainStream } from '../stream';
|
||||
import { renderActivity } from '../../remote/activitypub/renderer';
|
||||
import renderFollow from '../../remote/activitypub/renderer/follow';
|
||||
import renderUndo from '../../remote/activitypub/renderer/undo';
|
||||
|
|
|
|||
62
src/services/create-notification.ts
Normal file
62
src/services/create-notification.ts
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
import * as mongo from 'mongodb';
|
||||
import Notification from '../models/notification';
|
||||
import Mute from '../models/mute';
|
||||
import { pack } from '../models/notification';
|
||||
import { publishMainStream } from './stream';
|
||||
import User from '../models/user';
|
||||
import pushSw from './push-notification';
|
||||
|
||||
export default (
|
||||
notifiee: mongo.ObjectID,
|
||||
notifier: mongo.ObjectID,
|
||||
type: string,
|
||||
content?: any
|
||||
) => new Promise<any>(async (resolve, reject) => {
|
||||
if (notifiee.equals(notifier)) {
|
||||
return resolve();
|
||||
}
|
||||
|
||||
// Create notification
|
||||
const notification = await Notification.insert(Object.assign({
|
||||
createdAt: new Date(),
|
||||
notifieeId: notifiee,
|
||||
notifierId: notifier,
|
||||
type: type,
|
||||
isRead: false
|
||||
}, content));
|
||||
|
||||
resolve(notification);
|
||||
|
||||
const packed = await pack(notification);
|
||||
|
||||
// Publish notification event
|
||||
publishMainStream(notifiee, 'notification', packed);
|
||||
|
||||
// Update flag
|
||||
User.update({ _id: notifiee }, {
|
||||
$set: {
|
||||
hasUnreadNotification: true
|
||||
}
|
||||
});
|
||||
|
||||
// 2秒経っても(今回作成した)通知が既読にならなかったら「未読の通知がありますよ」イベントを発行する
|
||||
setTimeout(async () => {
|
||||
const fresh = await Notification.findOne({ _id: notification._id }, { isRead: true });
|
||||
if (!fresh.isRead) {
|
||||
//#region ただしミュートしているユーザーからの通知なら無視
|
||||
const mute = await Mute.find({
|
||||
muterId: notifiee,
|
||||
deletedAt: { $exists: false }
|
||||
});
|
||||
const mutedUserIds = mute.map(m => m.muteeId.toString());
|
||||
if (mutedUserIds.indexOf(notifier.toString()) != -1) {
|
||||
return;
|
||||
}
|
||||
//#endregion
|
||||
|
||||
publishMainStream(notifiee, 'unreadNotification', packed);
|
||||
|
||||
pushSw(notifiee, 'notification', packed);
|
||||
}
|
||||
}, 2000);
|
||||
});
|
||||
|
|
@ -12,7 +12,7 @@ import * as isSvg from 'is-svg';
|
|||
import DriveFile, { IMetadata, getDriveFileBucket, IDriveFile } from '../../models/drive-file';
|
||||
import DriveFolder from '../../models/drive-folder';
|
||||
import { pack } from '../../models/drive-file';
|
||||
import { publishMainStream, publishDriveStream } from '../../stream';
|
||||
import { publishMainStream, publishDriveStream } from '../stream';
|
||||
import { isLocalUser, IUser, IRemoteUser } from '../../models/user';
|
||||
import delFile from './delete-file';
|
||||
import config from '../../config';
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../models/user';
|
||||
import Following from '../../models/following';
|
||||
import Blocking from '../../models/blocking';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import notify from '../../notify';
|
||||
import { publishMainStream } from '../stream';
|
||||
import notify from '../../services/create-notification';
|
||||
import { renderActivity } from '../../remote/activitypub/renderer';
|
||||
import renderFollow from '../../remote/activitypub/renderer/follow';
|
||||
import renderAccept from '../../remote/activitypub/renderer/accept';
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../models/user';
|
||||
import Following from '../../models/following';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import { publishMainStream } from '../stream';
|
||||
import { renderActivity } from '../../remote/activitypub/renderer';
|
||||
import renderFollow from '../../remote/activitypub/renderer/follow';
|
||||
import renderUndo from '../../remote/activitypub/renderer/undo';
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import renderFollow from '../../../remote/activitypub/renderer/follow';
|
|||
import renderAccept from '../../../remote/activitypub/renderer/accept';
|
||||
import { deliver } from '../../../queue';
|
||||
import Following from '../../../models/following';
|
||||
import { publishMainStream } from '../../../stream';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import perUserFollowingChart from '../../../chart/per-user-following';
|
||||
import Logger from '../../../misc/logger';
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer';
|
|||
import renderFollow from '../../../remote/activitypub/renderer/follow';
|
||||
import renderUndo from '../../../remote/activitypub/renderer/undo';
|
||||
import { deliver } from '../../../queue';
|
||||
import { publishMainStream } from '../../../stream';
|
||||
import { publishMainStream } from '../../stream';
|
||||
|
||||
export default async function(followee: IUser, follower: IUser) {
|
||||
if (isRemoteUser(followee)) {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import User, { isLocalUser, isRemoteUser, pack as packUser, IUser } from '../../../models/user';
|
||||
import { publishMainStream } from '../../../stream';
|
||||
import notify from '../../../notify';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import notify from '../../../services/create-notification';
|
||||
import { renderActivity } from '../../../remote/activitypub/renderer';
|
||||
import renderFollow from '../../../remote/activitypub/renderer/follow';
|
||||
import { deliver } from '../../../queue';
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import { renderActivity } from '../../../remote/activitypub/renderer';
|
|||
import renderFollow from '../../../remote/activitypub/renderer/follow';
|
||||
import renderReject from '../../../remote/activitypub/renderer/reject';
|
||||
import { deliver } from '../../../queue';
|
||||
import { publishMainStream } from '../../../stream';
|
||||
import { publishMainStream } from '../../stream';
|
||||
|
||||
export default async function(followee: IUser, follower: IUser) {
|
||||
if (isRemoteUser(follower)) {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import es from '../../db/elasticsearch';
|
||||
import Note, { pack, INote, IChoice } from '../../models/note';
|
||||
import User, { isLocalUser, IUser, isRemoteUser, IRemoteUser, ILocalUser } from '../../models/user';
|
||||
import { publishMainStream, publishHomeTimelineStream, publishLocalTimelineStream, publishHybridTimelineStream, publishGlobalTimelineStream, publishUserListStream, publishHashtagStream } from '../../stream';
|
||||
import { publishMainStream, publishHomeTimelineStream, publishLocalTimelineStream, publishHybridTimelineStream, publishGlobalTimelineStream, publishUserListStream, publishHashtagStream } from '../stream';
|
||||
import Following from '../../models/following';
|
||||
import { deliver } from '../../queue';
|
||||
import renderNote from '../../remote/activitypub/renderer/note';
|
||||
|
|
@ -9,7 +9,7 @@ import renderCreate from '../../remote/activitypub/renderer/create';
|
|||
import renderAnnounce from '../../remote/activitypub/renderer/announce';
|
||||
import { renderActivity } from '../../remote/activitypub/renderer';
|
||||
import DriveFile, { IDriveFile } from '../../models/drive-file';
|
||||
import notify from '../../notify';
|
||||
import notify from '../../services/create-notification';
|
||||
import NoteWatching from '../../models/note-watching';
|
||||
import watch from './watch';
|
||||
import Mute from '../../models/mute';
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import Note, { INote } from '../../models/note';
|
||||
import { IUser, isLocalUser } from '../../models/user';
|
||||
import { publishNoteStream } from '../../stream';
|
||||
import { publishNoteStream } from '../stream';
|
||||
import renderDelete from '../../remote/activitypub/renderer/delete';
|
||||
import { renderActivity } from '../../remote/activitypub/renderer';
|
||||
import { deliver } from '../../queue';
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ import Vote from '../../../models/poll-vote';
|
|||
import Note, { INote } from '../../../models/note';
|
||||
import Watching from '../../../models/note-watching';
|
||||
import watch from '../../../services/note/watch';
|
||||
import { publishNoteStream } from '../../../stream';
|
||||
import notify from '../../../notify';
|
||||
import { publishNoteStream } from '../../stream';
|
||||
import notify from '../../../services/create-notification';
|
||||
import { isLocalUser, IUser } from '../../../models/user';
|
||||
|
||||
export default (user: IUser, note: INote, choice: number) => new Promise(async (res, rej) => {
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
import { IUser, isLocalUser, isRemoteUser } from '../../../models/user';
|
||||
import Note, { INote } from '../../../models/note';
|
||||
import NoteReaction from '../../../models/note-reaction';
|
||||
import { publishNoteStream } from '../../../stream';
|
||||
import notify from '../../../notify';
|
||||
import { publishNoteStream } from '../../stream';
|
||||
import notify from '../../create-notification';
|
||||
import NoteWatching from '../../../models/note-watching';
|
||||
import watch from '../watch';
|
||||
import renderLike from '../../../remote/activitypub/renderer/like';
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { IUser, isLocalUser, isRemoteUser } from '../../../models/user';
|
||||
import Note, { INote } from '../../../models/note';
|
||||
import Reaction from '../../../models/note-reaction';
|
||||
import { publishNoteStream } from '../../../stream';
|
||||
import { publishNoteStream } from '../../stream';
|
||||
import renderLike from '../../../remote/activitypub/renderer/like';
|
||||
import renderUndo from '../../../remote/activitypub/renderer/undo';
|
||||
import { renderActivity } from '../../../remote/activitypub/renderer';
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import * as mongo from 'mongodb';
|
||||
import isObjectId from '../../misc/is-objectid';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import { publishMainStream } from '../stream';
|
||||
import User from '../../models/user';
|
||||
import NoteUnread from '../../models/note-unread';
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import NoteUnread from '../../models/note-unread';
|
|||
import User, { IUser } from '../../models/user';
|
||||
import { INote } from '../../models/note';
|
||||
import Mute from '../../models/mute';
|
||||
import { publishMainStream } from '../../stream';
|
||||
import { publishMainStream } from '../stream';
|
||||
|
||||
export default async function(user: IUser, note: INote, isSpecified = false) {
|
||||
//#region ミュートしているなら無視
|
||||
|
|
|
|||
61
src/services/push-notification.ts
Normal file
61
src/services/push-notification.ts
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
import * as push from 'web-push';
|
||||
import * as mongo from 'mongodb';
|
||||
import Subscription from '../models/sw-subscription';
|
||||
import config from '../config';
|
||||
import fetchMeta from '../misc/fetch-meta';
|
||||
import { IMeta } from '../models/meta';
|
||||
|
||||
let meta: IMeta = null;
|
||||
|
||||
setInterval(() => {
|
||||
fetchMeta().then(m => {
|
||||
meta = m;
|
||||
|
||||
if (meta.enableServiceWorker) {
|
||||
// アプリケーションの連絡先と、サーバーサイドの鍵ペアの情報を登録
|
||||
push.setVapidDetails(config.url,
|
||||
meta.swPublicKey,
|
||||
meta.swPrivateKey);
|
||||
}
|
||||
});
|
||||
}, 3000);
|
||||
|
||||
export default async function(userId: mongo.ObjectID | string, type: string, body?: any) {
|
||||
if (!meta.enableServiceWorker) return;
|
||||
|
||||
if (typeof userId === 'string') {
|
||||
userId = new mongo.ObjectID(userId);
|
||||
}
|
||||
|
||||
// Fetch
|
||||
const subscriptions = await Subscription.find({
|
||||
userId: userId
|
||||
});
|
||||
|
||||
for (const subscription of subscriptions) {
|
||||
const pushSubscription = {
|
||||
endpoint: subscription.endpoint,
|
||||
keys: {
|
||||
auth: subscription.auth,
|
||||
p256dh: subscription.publickey
|
||||
}
|
||||
};
|
||||
|
||||
push.sendNotification(pushSubscription, JSON.stringify({
|
||||
type, body
|
||||
})).catch((err: any) => {
|
||||
//swLogger.info(err.statusCode);
|
||||
//swLogger.info(err.headers);
|
||||
//swLogger.info(err.body);
|
||||
|
||||
if (err.statusCode == 410) {
|
||||
Subscription.remove({
|
||||
userId: userId,
|
||||
endpoint: subscription.endpoint,
|
||||
auth: subscription.auth,
|
||||
publickey: subscription.publickey
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
114
src/services/stream.ts
Normal file
114
src/services/stream.ts
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
import * as mongo from 'mongodb';
|
||||
import redis from '../db/redis';
|
||||
import Xev from 'xev';
|
||||
|
||||
type ID = string | mongo.ObjectID;
|
||||
|
||||
class Publisher {
|
||||
private ev: Xev;
|
||||
|
||||
constructor() {
|
||||
// Redisがインストールされてないときはプロセス間通信を使う
|
||||
if (redis == null) {
|
||||
this.ev = new Xev();
|
||||
}
|
||||
}
|
||||
|
||||
private publish = (channel: string, type: string, value?: any): void => {
|
||||
const message = type == null ? value : value == null ?
|
||||
{ type: type, body: null } :
|
||||
{ type: type, body: value };
|
||||
|
||||
if (this.ev) {
|
||||
this.ev.emit(channel, message);
|
||||
} else {
|
||||
redis.publish('misskey', JSON.stringify({
|
||||
channel: channel,
|
||||
message: message
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
public publishMainStream = (userId: ID, type: string, value?: any): void => {
|
||||
this.publish(`mainStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishDriveStream = (userId: ID, type: string, value?: any): void => {
|
||||
this.publish(`driveStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishNoteStream = (noteId: ID, type: string, value: any): void => {
|
||||
this.publish(`noteStream:${noteId}`, type, {
|
||||
id: noteId,
|
||||
body: value
|
||||
});
|
||||
}
|
||||
|
||||
public publishUserListStream = (listId: ID, type: string, value?: any): void => {
|
||||
this.publish(`userListStream:${listId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishMessagingStream = (userId: ID, otherpartyId: ID, type: string, value?: any): void => {
|
||||
this.publish(`messagingStream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishMessagingIndexStream = (userId: ID, type: string, value?: any): void => {
|
||||
this.publish(`messagingIndexStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishReversiStream = (userId: ID, type: string, value?: any): void => {
|
||||
this.publish(`reversiStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishReversiGameStream = (gameId: ID, type: string, value?: any): void => {
|
||||
this.publish(`reversiGameStream:${gameId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
|
||||
public publishHomeTimelineStream = (userId: ID, note: any): void => {
|
||||
this.publish(`homeTimeline:${userId}`, null, note);
|
||||
}
|
||||
|
||||
public publishLocalTimelineStream = async (note: any): Promise<void> => {
|
||||
this.publish('localTimeline', null, note);
|
||||
}
|
||||
|
||||
public publishHybridTimelineStream = async (userId: ID, note: any): Promise<void> => {
|
||||
this.publish(userId ? `hybridTimeline:${userId}` : 'hybridTimeline', null, note);
|
||||
}
|
||||
|
||||
public publishGlobalTimelineStream = (note: any): void => {
|
||||
this.publish('globalTimeline', null, note);
|
||||
}
|
||||
|
||||
public publishHashtagStream = (note: any): void => {
|
||||
this.publish('hashtag', null, note);
|
||||
}
|
||||
|
||||
public publishApLogStream = (log: any): void => {
|
||||
this.publish('apLog', null, log);
|
||||
}
|
||||
|
||||
public publishAdminStream = (userId: ID, type: string, value?: any): void => {
|
||||
this.publish(`adminStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
}
|
||||
}
|
||||
|
||||
const publisher = new Publisher();
|
||||
|
||||
export default publisher;
|
||||
|
||||
export const publishMainStream = publisher.publishMainStream;
|
||||
export const publishDriveStream = publisher.publishDriveStream;
|
||||
export const publishNoteStream = publisher.publishNoteStream;
|
||||
export const publishUserListStream = publisher.publishUserListStream;
|
||||
export const publishMessagingStream = publisher.publishMessagingStream;
|
||||
export const publishMessagingIndexStream = publisher.publishMessagingIndexStream;
|
||||
export const publishReversiStream = publisher.publishReversiStream;
|
||||
export const publishReversiGameStream = publisher.publishReversiGameStream;
|
||||
export const publishHomeTimelineStream = publisher.publishHomeTimelineStream;
|
||||
export const publishLocalTimelineStream = publisher.publishLocalTimelineStream;
|
||||
export const publishHybridTimelineStream = publisher.publishHybridTimelineStream;
|
||||
export const publishGlobalTimelineStream = publisher.publishGlobalTimelineStream;
|
||||
export const publishHashtagStream = publisher.publishHashtagStream;
|
||||
export const publishApLogStream = publisher.publishApLogStream;
|
||||
export const publishAdminStream = publisher.publishAdminStream;
|
||||
Loading…
Add table
Add a link
Reference in a new issue