job!
This commit is contained in:
parent
d2ea04fbf2
commit
239824c187
8 changed files with 54 additions and 37 deletions
|
|
@ -84,8 +84,10 @@ export type Source = {
|
|||
|
||||
deliverJobConcurrency?: number;
|
||||
inboxJobConcurrency?: number;
|
||||
relashionshipJobConcurrency?: number;
|
||||
deliverJobPerSec?: number;
|
||||
inboxJobPerSec?: number;
|
||||
relashionshipJobPerSec?: number;
|
||||
deliverJobMaxAttempts?: number;
|
||||
inboxJobMaxAttempts?: number;
|
||||
|
||||
|
|
|
|||
|
|
@ -96,14 +96,14 @@ export class AccountMoveService {
|
|||
const iObj = await this.userEntityService.pack<true, true>(src.id, src, { detail: true, includeSecrets: true });
|
||||
this.globalEventService.publishMainStream(src.id, 'meUpdated', iObj);
|
||||
|
||||
// Unfollow
|
||||
// Unfollow after 24 hours
|
||||
const followings = await this.followingsRepository.findBy({
|
||||
followerId: src.id,
|
||||
});
|
||||
this.queueService.createUnfollowJob(followings.map(following => ({
|
||||
this.queueService.createDelayedUnfollowJob(followings.map(following => ({
|
||||
from: { id: src.id },
|
||||
to: { id: following.followeeId },
|
||||
})));
|
||||
})), process.env.NODE_ENV === 'test' ? 10000 : 1000 * 60 * 60 * 24);
|
||||
|
||||
await this.postMoveProcess(src, dst);
|
||||
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ const $db: Provider = {
|
|||
|
||||
const $relationship: Provider = {
|
||||
provide: 'queue:relationship',
|
||||
useFactory: (config: Config) => q(config, 'relationship'),
|
||||
useFactory: (config: Config) => q(config, 'relationship', config.relashionshipJobPerSec ?? 64),
|
||||
inject: [DI.config],
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -258,6 +258,12 @@ export class QueueService {
|
|||
return this.relationshipQueue.addBulk(jobs);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public createDelayedUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[], delay: number) {
|
||||
const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel, { delay }));
|
||||
return this.relationshipQueue.addBulk(jobs);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
|
||||
const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel));
|
||||
|
|
@ -271,7 +277,7 @@ export class QueueService {
|
|||
}
|
||||
|
||||
@bindThis
|
||||
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): {
|
||||
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData, opts?: Bull.JobOptions): {
|
||||
name: string,
|
||||
data: RelationshipJobData,
|
||||
opts: Bull.JobOptions,
|
||||
|
|
@ -287,6 +293,7 @@ export class QueueService {
|
|||
opts: {
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
...opts,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ export class RelationshipQueueProcessorsService {
|
|||
|
||||
@bindThis
|
||||
public start(q: Bull.Queue): void {
|
||||
const maxJobs = (this.config.deliverJobConcurrency ?? 128) / 4; // conservative?
|
||||
const maxJobs = this.config.relashionshipJobConcurrency ?? 16;
|
||||
q.process('follow', maxJobs, (job) => this.relationshipProcessorService.processFollow(job));
|
||||
q.process('unfollow', maxJobs, (job) => this.relationshipProcessorService.processUnfollow(job));
|
||||
q.process('block', maxJobs, (job) => this.relationshipProcessorService.processBlock(job));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue