Deliver posts to remote followers
This commit is contained in:
parent
63906af5ab
commit
32c008d008
7 changed files with 195 additions and 130 deletions
94
src/processor/http/deliver-post.ts
Normal file
94
src/processor/http/deliver-post.ts
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
import Channel from '../../models/channel';
|
||||
import Following from '../../models/following';
|
||||
import ChannelWatching from '../../models/channel-watching';
|
||||
import Post, { pack } from '../../models/post';
|
||||
import User, { isLocalUser } from '../../models/user';
|
||||
import stream, { publishChannelStream } from '../../publishers/stream';
|
||||
import context from '../../remote/activitypub/renderer/context';
|
||||
import renderNote from '../../remote/activitypub/renderer/note';
|
||||
import request from '../../remote/request';
|
||||
|
||||
export default ({ data }) => Post.findOne({ _id: data.id }).then(post => {
|
||||
const promisedPostObj = pack(post);
|
||||
const promises = [];
|
||||
|
||||
// タイムラインへの投稿
|
||||
if (!post.channelId) {
|
||||
promises.push(
|
||||
// Publish event to myself's stream
|
||||
promisedPostObj.then(postObj => {
|
||||
stream(post.userId, 'post', postObj);
|
||||
}),
|
||||
|
||||
Promise.all([
|
||||
User.findOne({ _id: post.userId }),
|
||||
|
||||
// Fetch all followers
|
||||
Following.aggregate([
|
||||
{
|
||||
$lookup: {
|
||||
from: 'users',
|
||||
localField: 'followerId',
|
||||
foreignField: '_id',
|
||||
as: 'follower'
|
||||
}
|
||||
},
|
||||
{
|
||||
$match: {
|
||||
followeeId: post.userId
|
||||
}
|
||||
}
|
||||
], {
|
||||
_id: false
|
||||
})
|
||||
]).then(([user, followers]) => Promise.all(followers.map(following => {
|
||||
if (isLocalUser(following.follower)) {
|
||||
// Publish event to followers stream
|
||||
return promisedPostObj.then(postObj => {
|
||||
stream(following.followerId, 'post', postObj);
|
||||
});
|
||||
}
|
||||
|
||||
return renderNote(user, post).then(rendered => {
|
||||
rendered['@context'] = context;
|
||||
return request(user, following.follower[0].account.inbox, rendered);
|
||||
});
|
||||
})))
|
||||
);
|
||||
}
|
||||
|
||||
// チャンネルへの投稿
|
||||
if (post.channelId) {
|
||||
promises.push(
|
||||
// Increment channel index(posts count)
|
||||
Channel.update({ _id: post.channelId }, {
|
||||
$inc: {
|
||||
index: 1
|
||||
}
|
||||
}),
|
||||
|
||||
// Publish event to channel
|
||||
promisedPostObj.then(postObj => {
|
||||
publishChannelStream(post.channelId, 'post', postObj);
|
||||
}),
|
||||
|
||||
Promise.all([
|
||||
promisedPostObj,
|
||||
|
||||
// Get channel watchers
|
||||
ChannelWatching.find({
|
||||
channelId: post.channelId,
|
||||
// 削除されたドキュメントは除く
|
||||
deletedAt: { $exists: false }
|
||||
})
|
||||
]).then(([postObj, watches]) => {
|
||||
// チャンネルの視聴者(のタイムライン)に配信
|
||||
watches.forEach(w => {
|
||||
stream(w.userId, 'post', postObj);
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
return Promise.all(promises);
|
||||
});
|
||||
|
|
@ -1,6 +1,3 @@
|
|||
import { request } from 'https';
|
||||
import { sign } from 'http-signature';
|
||||
import { URL } from 'url';
|
||||
import User, { isLocalUser, pack as packUser } from '../../models/user';
|
||||
import Following from '../../models/following';
|
||||
import FollowingLog from '../../models/following-log';
|
||||
|
|
@ -9,7 +6,7 @@ import event from '../../publishers/stream';
|
|||
import notify from '../../publishers/notify';
|
||||
import context from '../../remote/activitypub/renderer/context';
|
||||
import render from '../../remote/activitypub/renderer/follow';
|
||||
import config from '../../config';
|
||||
import request from '../../remote/request';
|
||||
|
||||
export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => {
|
||||
const promisedFollower = User.findOne({ _id: followerId });
|
||||
|
|
@ -60,45 +57,10 @@ export default ({ data }) => Following.findOne({ _id: data.following }).then(({
|
|||
followeeEvent = packUser(follower, followee)
|
||||
.then(packed => event(followee._id, 'followed', packed));
|
||||
} else if (isLocalUser(follower)) {
|
||||
followeeEvent = new Promise((resolve, reject) => {
|
||||
const {
|
||||
protocol,
|
||||
hostname,
|
||||
port,
|
||||
pathname,
|
||||
search
|
||||
} = new URL(followee.account.inbox);
|
||||
const rendered = render(follower, followee);
|
||||
rendered['@context'] = context;
|
||||
|
||||
const req = request({
|
||||
protocol,
|
||||
hostname,
|
||||
port,
|
||||
method: 'POST',
|
||||
path: pathname + search,
|
||||
}, res => {
|
||||
res.on('close', () => {
|
||||
if (res.statusCode >= 200 && res.statusCode < 300) {
|
||||
resolve();
|
||||
} else {
|
||||
reject(res);
|
||||
}
|
||||
});
|
||||
|
||||
res.on('data', () => {});
|
||||
res.on('error', reject);
|
||||
});
|
||||
|
||||
sign(req, {
|
||||
authorizationHeaderName: 'Signature',
|
||||
key: follower.account.keypair,
|
||||
keyId: `acct:${follower.username}@${config.host}`
|
||||
});
|
||||
|
||||
const rendered = render(follower, followee);
|
||||
rendered['@context'] = context;
|
||||
|
||||
req.end(JSON.stringify(rendered));
|
||||
});
|
||||
followeeEvent = request(follower, followee.account.inbox, rendered);
|
||||
}
|
||||
|
||||
return Promise.all([followerEvent, followeeEvent]);
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
import deliverPost from './deliver-post';
|
||||
import follow from './follow';
|
||||
import performActivityPub from './perform-activitypub';
|
||||
import processInbox from './process-inbox';
|
||||
import reportGitHubFailure from './report-github-failure';
|
||||
|
||||
const handlers = {
|
||||
deliverPost,
|
||||
follow,
|
||||
performActivityPub,
|
||||
processInbox,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue