enhance(Queue): ジョブキューの設定の項目をキューごとに分ける (MisskeyIO#301)

This commit is contained in:
まっちゃとーにゅ 2023-12-29 14:58:35 +09:00 committed by GitHub
parent 9ade4bc326
commit 3dd8c675d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 25 deletions

View file

@ -9,7 +9,7 @@ import { dirname, resolve } from 'node:path';
import * as yaml from 'js-yaml';
import type { RedisOptions } from 'ioredis';
type RedisOptionsSource = Partial<RedisOptions> & {
export type RedisOptionsSource = Partial<RedisOptions> & {
host: string;
port: number;
family?: number;
@ -47,6 +47,14 @@ type Source = {
redis: RedisOptionsSource;
redisForPubsub?: RedisOptionsSource;
redisForJobQueue?: RedisOptionsSource;
redisForSystemQueue?: RedisOptionsSource;
redisForEndedPollNotificationQueue?: RedisOptionsSource;
redisForDeliverQueue?: RedisOptionsSource;
redisForInboxQueue?: RedisOptionsSource;
redisForDbQueue?: RedisOptionsSource;
redisForRelationshipQueue?: RedisOptionsSource;
redisForObjectStorageQueue?: RedisOptionsSource;
redisForWebhookDeliverQueue?: RedisOptionsSource;
redisForTimelines?: RedisOptionsSource;
meilisearch?: {
host: string;
@ -164,7 +172,14 @@ export type Config = {
videoThumbnailGenerator: string | null;
redis: RedisOptions & RedisOptionsSource;
redisForPubsub: RedisOptions & RedisOptionsSource;
redisForJobQueue: RedisOptions & RedisOptionsSource;
redisForSystemQueue: RedisOptions & RedisOptionsSource;
redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource;
redisForDeliverQueue: RedisOptions & RedisOptionsSource;
redisForInboxQueue: RedisOptions & RedisOptionsSource;
redisForDbQueue: RedisOptions & RedisOptionsSource;
redisForRelationshipQueue: RedisOptions & RedisOptionsSource;
redisForObjectStorageQueue: RedisOptions & RedisOptionsSource;
redisForWebhookDeliverQueue: RedisOptions & RedisOptionsSource;
redisForTimelines: RedisOptions & RedisOptionsSource;
perChannelMaxNoteCacheCount: number;
perUserNotificationsMaxCount: number;
@ -209,6 +224,7 @@ export function loadConfig(): Config {
: null;
const internalMediaProxy = `${scheme}://${host}/proxy`;
const redis = convertRedisOptions(config.redis, host);
const redisForJobQueue = config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis;
return {
version,
@ -231,7 +247,14 @@ export function loadConfig(): Config {
meilisearch: config.meilisearch,
redis,
redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis,
redisForJobQueue: config.redisForJobQueue ? convertRedisOptions(config.redisForJobQueue, host) : redis,
redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue,
redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue,
redisForDeliverQueue: config.redisForDeliverQueue ? convertRedisOptions(config.redisForDeliverQueue, host) : redisForJobQueue,
redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue,
redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue,
redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue,
redisForObjectStorageQueue: config.redisForObjectStorageQueue ? convertRedisOptions(config.redisForObjectStorageQueue, host) : redisForJobQueue,
redisForWebhookDeliverQueue: config.redisForWebhookDeliverQueue ? convertRedisOptions(config.redisForWebhookDeliverQueue, host) : redisForJobQueue,
redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis,
id: config.id,
proxy: config.proxy,

View file

@ -23,49 +23,49 @@ export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;
const $system: Provider = {
provide: 'queue:system',
useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config, QUEUE.SYSTEM)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config.redisForSystemQueue, QUEUE.SYSTEM)),
inject: [DI.config],
};
const $endedPollNotification: Provider = {
provide: 'queue:endedPollNotification',
useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config, QUEUE.ENDED_POLL_NOTIFICATION)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION)),
inject: [DI.config],
};
const $deliver: Provider = {
provide: 'queue:deliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config, QUEUE.DELIVER)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, QUEUE.DELIVER)),
inject: [DI.config],
};
const $inbox: Provider = {
provide: 'queue:inbox',
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config, QUEUE.INBOX)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, QUEUE.INBOX)),
inject: [DI.config],
};
const $db: Provider = {
provide: 'queue:db',
useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config, QUEUE.DB)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config.redisForDbQueue, QUEUE.DB)),
inject: [DI.config],
};
const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config, QUEUE.RELATIONSHIP)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, QUEUE.RELATIONSHIP)),
inject: [DI.config],
};
const $objectStorage: Provider = {
provide: 'queue:objectStorage',
useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config, QUEUE.OBJECT_STORAGE)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE)),
inject: [DI.config],
};
const $webhookDeliver: Provider = {
provide: 'queue:webhookDeliver',
useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config, QUEUE.WEBHOOK_DELIVER)),
useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER)),
inject: [DI.config],
};

View file

@ -43,8 +43,8 @@ export class QueueStatsService implements OnApplicationShutdown {
let activeDeliverJobs = 0;
let activeInboxJobs = 0;
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config, QUEUE.DELIVER));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config, QUEUE.INBOX));
const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER));
const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config.redisForInboxQueue, QUEUE.INBOX));
deliverQueueEvents.on('active', () => {
activeDeliverJobs++;

View file

@ -146,7 +146,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
}, {
...baseQueueOptions(this.config, QUEUE.SYSTEM),
...baseQueueOptions(this.config.redisForSystemQueue, QUEUE.SYSTEM),
autorun: false,
});
@ -185,7 +185,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for db`);
}
}, {
...baseQueueOptions(this.config, QUEUE.DB),
...baseQueueOptions(this.config.redisForDbQueue, QUEUE.DB),
autorun: false,
});
@ -201,7 +201,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region deliver
this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.DELIVER),
...baseQueueOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER),
autorun: false,
concurrency: this.config.deliverJobConcurrency ?? 128,
limiter: {
@ -225,7 +225,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region inbox
this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.INBOX),
...baseQueueOptions(this.config.redisForInboxQueue, QUEUE.INBOX),
autorun: false,
concurrency: this.config.inboxJobConcurrency ?? 16,
limiter: {
@ -249,7 +249,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region webhook deliver
this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.WEBHOOK_DELIVER),
...baseQueueOptions(this.config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER),
autorun: false,
concurrency: 64,
limiter: {
@ -281,7 +281,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for relationship`);
}
}, {
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
...baseQueueOptions(this.config.redisForRelationshipQueue, QUEUE.RELATIONSHIP),
autorun: false,
concurrency: this.config.relashionshipJobConcurrency ?? 16,
limiter: {
@ -308,7 +308,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
default: throw new Error(`unrecognized job type ${job.name} for objectStorage`);
}
}, {
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
...baseQueueOptions(this.config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE),
autorun: false,
concurrency: 16,
});
@ -325,7 +325,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
//#region ended poll notification
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
...baseQueueOptions(this.config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION),
autorun: false,
});
//#endregion

View file

@ -3,8 +3,9 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Config } from '@/config.js';
import type * as Bull from 'bullmq';
import type { RedisOptions } from "ioredis";
import type { RedisOptionsSource } from '@/config.js';
export const QUEUE = {
DELIVER: 'deliver',
@ -17,13 +18,13 @@ export const QUEUE = {
WEBHOOK_DELIVER: 'webhookDeliver',
};
export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions {
return {
connection: {
...config.redisForJobQueue,
...config,
maxRetriesPerRequest: null,
keyPrefix: undefined,
},
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`,
};
}