From c3344fbd68440d31d9e93fc260980558d6aed00a Mon Sep 17 00:00:00 2001 From: MeiMei <30769358+mei23@users.noreply.github.com> Date: Sat, 9 Mar 2019 08:57:55 +0900 Subject: [PATCH] To retry AP deliver queue (#4457) --- src/queue/index.ts | 22 +++++++++++++++++++++- src/queue/processors/deliver.ts | 12 +++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/queue/index.ts b/src/queue/index.ts index b4067cd7ce..db1344d2e1 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -25,6 +25,26 @@ export const deliverQueue = initializeQueue('deliver'); export const inboxQueue = initializeQueue('inbox'); export const dbQueue = initializeQueue('db'); +deliverQueue + .on('waiting', (jobId) => { + queueLogger.debug(`[deliver] waiting id=${jobId}`); + }) + .on('active', (job) => { + queueLogger.debug(`[deliver] active id=${job.id} to=${job.data.to}`); + }) + .on('completed', (job, result) => { + queueLogger.debug(`[deliver] completed(${result}) id=${job.id} to=${job.data.to}`); + }) + .on('failed', (job, err) => { + queueLogger.debug(`[deliver] failed(${err}) id=${job.id} to=${job.data.to}`); + }) + .on('error', (error) => { + queueLogger.error(`[deliver] error ${error}`); + }) + .on('stalled', (job) => { + queueLogger.warn(`[deliver] stalled id=${job.id} to=${job.data.to}`); + }); + export function deliver(user: ILocalUser, content: any, to: any) { if (content == null) return null; @@ -38,7 +58,7 @@ export function deliver(user: ILocalUser, content: any, to: any) { attempts: 8, backoff: { type: 'exponential', - delay: 1000 + delay: 60 * 1000 }, removeOnComplete: true, removeOnFail: true diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index b561f33181..4cb1cfc6bb 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -7,7 +7,7 @@ import instanceChart from '../../services/chart/instance'; let latest: string = null; -export default async (job: Bull.Job): Promise => { +export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); try { @@ -29,6 +29,8 @@ export default async (job: Bull.Job): Promise => { instanceChart.requestSent(i.host, true); }); + + return 'Success'; } catch (res) { // Update stats registerOrFetchInstanceDoc(host).then(i => { @@ -46,15 +48,19 @@ export default async (job: Bull.Job): Promise => { if (res != null && res.hasOwnProperty('statusCode')) { queueLogger.warn(`deliver failed: ${res.statusCode} ${res.statusMessage} to=${job.data.to}`); + // 4xx if (res.statusCode >= 400 && res.statusCode < 500) { // HTTPステータスコード4xxはクライアントエラーであり、それはつまり // 何回再送しても成功することはないということなのでエラーにはしないでおく - return; + return `${res.statusCode} ${res.statusMessage}`; } - return res.statusMessage; + // 5xx etc. + throw `${res.statusCode} ${res.statusMessage}`; } else { + // DNS error, socket error, timeout ... queueLogger.warn(`deliver failed: ${res} to=${job.data.to}`); + throw res; } } };