diff --git a/packages/backend/src/GlobalModule.ts b/packages/backend/src/GlobalModule.ts index d648febb86..8bd9370896 100644 --- a/packages/backend/src/GlobalModule.ts +++ b/packages/backend/src/GlobalModule.ts @@ -47,7 +47,16 @@ const $meilisearch: Provider = { const $redis: Provider = { provide: DI.redis, useFactory: (config: Config) => { - return new Redis.Redis(config.redis); + return new Redis.Redis({ + ...config.redis, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, + }); }, inject: [DI.config], }; @@ -55,7 +64,16 @@ const $redis: Provider = { const $redisForPub: Provider = { provide: DI.redisForPub, useFactory: (config: Config) => { - const redis = new Redis.Redis(config.redisForPubsub); + const redis = new Redis.Redis({ + ...config.redisForPubsub, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, + }); return redis; }, inject: [DI.config], @@ -64,7 +82,16 @@ const $redisForPub: Provider = { const $redisForSub: Provider = { provide: DI.redisForSub, useFactory: (config: Config) => { - const redis = new Redis.Redis(config.redisForPubsub); + const redis = new Redis.Redis({ + ...config.redisForPubsub, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, + }); redis.subscribe(config.host); return redis; }, @@ -74,7 +101,16 @@ const $redisForSub: Provider = { const $redisForTimelines: Provider = { provide: DI.redisForTimelines, useFactory: (config: Config) => { - return new Redis.Redis(config.redisForTimelines); + return new Redis.Redis({ + ...config.redisForTimelines, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, + }); }, inject: [DI.config], }; diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index eeae280b09..f20093e802 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -7,6 +7,7 @@ import * as fs from 'node:fs'; import { fileURLToPath } from 'node:url'; import { dirname, resolve } from 'node:path'; import * as yaml from 'js-yaml'; +import type * as Bull from 'bullmq'; import type { RedisOptions } from 'ioredis'; export type RedisOptionsSource = Partial & { @@ -82,6 +83,8 @@ type Source = { outgoingAddress?: string; outgoingAddressFamily?: 'ipv4' | 'ipv6' | 'dual'; + bullmqQueueOptions?: Partial; + bullmqWorkerOptions?: Partial; deliverJobConcurrency?: number; inboxJobConcurrency?: number; relashionshipJobConcurrency?: number; @@ -144,6 +147,8 @@ export type Config = { id: string; outgoingAddress: string | undefined; outgoingAddressFamily: 'ipv4' | 'ipv6' | 'dual' | undefined; + bullmqQueueOptions: Partial; + bullmqWorkerOptions: Partial; deliverJobConcurrency: number | undefined; inboxJobConcurrency: number | undefined; relashionshipJobConcurrency: number | undefined; @@ -266,6 +271,8 @@ export function loadConfig(): Config { clusterLimit: config.clusterLimit, outgoingAddress: config.outgoingAddress, outgoingAddressFamily: config.outgoingAddressFamily, + bullmqQueueOptions: config.bullmqQueueOptions ?? {}, + bullmqWorkerOptions: config.bullmqWorkerOptions ?? {}, deliverJobConcurrency: config.deliverJobConcurrency, inboxJobConcurrency: config.inboxJobConcurrency, relashionshipJobConcurrency: config.relashionshipJobConcurrency, diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 8657d7f540..14d9afb5c1 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -23,49 +23,49 @@ export type WebhookDeliverQueue = Bull.Queue; const $system: Provider = { provide: 'queue:system', - useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config.redisForSystemQueue, QUEUE.SYSTEM)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.SYSTEM, baseQueueOptions(config.redisForSystemQueue, config.bullmqQueueOptions, QUEUE.SYSTEM)), inject: [DI.config], }; const $endedPollNotification: Provider = { provide: 'queue:endedPollNotification', - useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.ENDED_POLL_NOTIFICATION, baseQueueOptions(config.redisForEndedPollNotificationQueue, config.bullmqQueueOptions, QUEUE.ENDED_POLL_NOTIFICATION)), inject: [DI.config], }; const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, QUEUE.DELIVER)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, config.bullmqQueueOptions, QUEUE.DELIVER)), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, QUEUE.INBOX)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, config.bullmqQueueOptions, QUEUE.INBOX)), inject: [DI.config], }; const $db: Provider = { provide: 'queue:db', - useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config.redisForDbQueue, QUEUE.DB)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.DB, baseQueueOptions(config.redisForDbQueue, config.bullmqQueueOptions, QUEUE.DB)), inject: [DI.config], }; const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, QUEUE.RELATIONSHIP)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)), inject: [DI.config], }; const $objectStorage: Provider = { provide: 'queue:objectStorage', - useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.OBJECT_STORAGE, baseQueueOptions(config.redisForObjectStorageQueue, config.bullmqQueueOptions, QUEUE.OBJECT_STORAGE)), inject: [DI.config], }; const $webhookDeliver: Provider = { provide: 'queue:webhookDeliver', - useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER)), + useFactory: (config: Config) => new Bull.Queue(QUEUE.WEBHOOK_DELIVER, baseQueueOptions(config.redisForWebhookDeliverQueue, config.bullmqQueueOptions, QUEUE.WEBHOOK_DELIVER)), inject: [DI.config], }; diff --git a/packages/backend/src/daemons/QueueStatsService.ts b/packages/backend/src/daemons/QueueStatsService.ts index 0efec9627b..ce87d94150 100644 --- a/packages/backend/src/daemons/QueueStatsService.ts +++ b/packages/backend/src/daemons/QueueStatsService.ts @@ -43,8 +43,8 @@ export class QueueStatsService implements OnApplicationShutdown { let activeDeliverJobs = 0; let activeInboxJobs = 0; - const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER)); - const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config.redisForInboxQueue, QUEUE.INBOX)); + const deliverQueueEvents = new Bull.QueueEvents(QUEUE.DELIVER, baseQueueOptions(this.config.redisForDeliverQueue, this.config.bullmqQueueOptions, QUEUE.DELIVER)); + const inboxQueueEvents = new Bull.QueueEvents(QUEUE.INBOX, baseQueueOptions(this.config.redisForInboxQueue, this.config.bullmqQueueOptions, QUEUE.INBOX)); deliverQueueEvents.on('active', () => { activeDeliverJobs++; diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 1b9b398086..1dd5be23cf 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -146,7 +146,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for system`); } }, { - ...baseWorkerOptions(this.config.redisForSystemQueue, QUEUE.SYSTEM), + ...baseWorkerOptions(this.config.redisForSystemQueue, this.config.bullmqWorkerOptions, QUEUE.SYSTEM), autorun: false, }); @@ -185,7 +185,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for db`); } }, { - ...baseWorkerOptions(this.config.redisForDbQueue, QUEUE.DB), + ...baseWorkerOptions(this.config.redisForDbQueue, this.config.bullmqWorkerOptions, QUEUE.DB), autorun: false, }); @@ -201,7 +201,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region deliver this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForDeliverQueue, QUEUE.DELIVER), + ...baseWorkerOptions(this.config.redisForDeliverQueue, this.config.bullmqWorkerOptions, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, limiter: { @@ -225,7 +225,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region inbox this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForInboxQueue, QUEUE.INBOX), + ...baseWorkerOptions(this.config.redisForInboxQueue, this.config.bullmqWorkerOptions, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, limiter: { @@ -249,7 +249,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region webhook deliver this.webhookDeliverQueueWorker = new Bull.Worker(QUEUE.WEBHOOK_DELIVER, (job) => this.webhookDeliverProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForWebhookDeliverQueue, QUEUE.WEBHOOK_DELIVER), + ...baseWorkerOptions(this.config.redisForWebhookDeliverQueue, this.config.bullmqWorkerOptions, QUEUE.WEBHOOK_DELIVER), autorun: false, concurrency: 64, limiter: { @@ -281,7 +281,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for relationship`); } }, { - ...baseWorkerOptions(this.config.redisForRelationshipQueue, QUEUE.RELATIONSHIP), + ...baseWorkerOptions(this.config.redisForRelationshipQueue, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP), autorun: false, concurrency: this.config.relashionshipJobConcurrency ?? 16, limiter: { @@ -308,7 +308,7 @@ export class QueueProcessorService implements OnApplicationShutdown { default: throw new Error(`unrecognized job type ${job.name} for objectStorage`); } }, { - ...baseWorkerOptions(this.config.redisForObjectStorageQueue, QUEUE.OBJECT_STORAGE), + ...baseWorkerOptions(this.config.redisForObjectStorageQueue, this.config.bullmqWorkerOptions, QUEUE.OBJECT_STORAGE), autorun: false, concurrency: 16, }); @@ -325,7 +325,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region ended poll notification this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForEndedPollNotificationQueue, QUEUE.ENDED_POLL_NOTIFICATION), + ...baseWorkerOptions(this.config.redisForEndedPollNotificationQueue, this.config.bullmqWorkerOptions, QUEUE.ENDED_POLL_NOTIFICATION), autorun: false, }); //#endregion diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index d0ce23c05a..98382b7feb 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -18,28 +18,40 @@ export const QUEUE = { WEBHOOK_DELIVER: 'webhookDeliver', }; -export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { +export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { return { + ...queueOptions, connection: { ...config, maxRetriesPerRequest: null, keyPrefix: undefined, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, }, prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, }; } -export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { +export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { return { + ...workerOptions, connection: { ...config, maxRetriesPerRequest: null, keyPrefix: undefined, + reconnectOnError: (err: Error) => { + if ( err.message.includes('READONLY') + || err.message.includes('ETIMEDOUT') + || err.message.includes('Command timed out') + ) return 2; + return 1; + }, }, prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, - skipLockRenewal: false, - lockDuration: 60 * 1000, - lockRenewTime: 30 * 1000, - stalledInterval: 90 * 1000, }; }