refactor: make sure promises are settled before app shutdown (#12942)
👍
This commit is contained in:
parent
145d28a8e4
commit
0c2118e963
|
@ -3,7 +3,6 @@
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { setTimeout } from 'node:timers/promises';
|
|
||||||
import { Global, Inject, Module } from '@nestjs/common';
|
import { Global, Inject, Module } from '@nestjs/common';
|
||||||
import * as Redis from 'ioredis';
|
import * as Redis from 'ioredis';
|
||||||
import { DataSource } from 'typeorm';
|
import { DataSource } from 'typeorm';
|
||||||
|
@ -12,6 +11,7 @@ import { DI } from './di-symbols.js';
|
||||||
import { Config, loadConfig } from './config.js';
|
import { Config, loadConfig } from './config.js';
|
||||||
import { createPostgresDataSource } from './postgres.js';
|
import { createPostgresDataSource } from './postgres.js';
|
||||||
import { RepositoryModule } from './models/RepositoryModule.js';
|
import { RepositoryModule } from './models/RepositoryModule.js';
|
||||||
|
import { allSettled } from './misc/promise-tracker.js';
|
||||||
import type { Provider, OnApplicationShutdown } from '@nestjs/common';
|
import type { Provider, OnApplicationShutdown } from '@nestjs/common';
|
||||||
|
|
||||||
const $config: Provider = {
|
const $config: Provider = {
|
||||||
|
@ -94,14 +94,9 @@ export class GlobalModule implements OnApplicationShutdown {
|
||||||
) { }
|
) { }
|
||||||
|
|
||||||
public async dispose(): Promise<void> {
|
public async dispose(): Promise<void> {
|
||||||
if (process.env.NODE_ENV === 'test') {
|
// Wait for all potential DB queries
|
||||||
// XXX:
|
await allSettled();
|
||||||
// Shutting down the existing connections causes errors on Jest as
|
// And then disconnect from DB
|
||||||
// Misskey has asynchronous postgres/redis connections that are not
|
|
||||||
// awaited.
|
|
||||||
// Let's wait for some random time for them to finish.
|
|
||||||
await setTimeout(5000);
|
|
||||||
}
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
this.db.destroy(),
|
this.db.destroy(),
|
||||||
this.redisClient.disconnect(),
|
this.redisClient.disconnect(),
|
||||||
|
|
|
@ -58,6 +58,7 @@ import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
|
||||||
import { UtilityService } from '@/core/UtilityService.js';
|
import { UtilityService } from '@/core/UtilityService.js';
|
||||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||||
import { isReply } from '@/misc/is-reply.js';
|
import { isReply } from '@/misc/is-reply.js';
|
||||||
|
import { trackPromise } from '@/misc/promise-tracker.js';
|
||||||
|
|
||||||
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
|
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
|
||||||
|
|
||||||
|
@ -676,7 +677,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
this.relayService.deliverToRelays(user, noteActivity);
|
this.relayService.deliverToRelays(user, noteActivity);
|
||||||
}
|
}
|
||||||
|
|
||||||
dm.execute();
|
trackPromise(dm.execute());
|
||||||
})();
|
})();
|
||||||
}
|
}
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
|
@ -14,6 +14,7 @@ import { IdService } from '@/core/IdService.js';
|
||||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||||
import type { NoteUnreadsRepository, MutingsRepository, NoteThreadMutingsRepository } from '@/models/_.js';
|
import type { NoteUnreadsRepository, MutingsRepository, NoteThreadMutingsRepository } from '@/models/_.js';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
|
import { trackPromise } from '@/misc/promise-tracker.js';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class NoteReadService implements OnApplicationShutdown {
|
export class NoteReadService implements OnApplicationShutdown {
|
||||||
|
@ -107,7 +108,7 @@ export class NoteReadService implements OnApplicationShutdown {
|
||||||
|
|
||||||
// TODO: ↓まとめてクエリしたい
|
// TODO: ↓まとめてクエリしたい
|
||||||
|
|
||||||
this.noteUnreadsRepository.countBy({
|
trackPromise(this.noteUnreadsRepository.countBy({
|
||||||
userId: userId,
|
userId: userId,
|
||||||
isMentioned: true,
|
isMentioned: true,
|
||||||
}).then(mentionsCount => {
|
}).then(mentionsCount => {
|
||||||
|
@ -115,9 +116,9 @@ export class NoteReadService implements OnApplicationShutdown {
|
||||||
// 全て既読になったイベントを発行
|
// 全て既読になったイベントを発行
|
||||||
this.globalEventService.publishMainStream(userId, 'readAllUnreadMentions');
|
this.globalEventService.publishMainStream(userId, 'readAllUnreadMentions');
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
|
|
||||||
this.noteUnreadsRepository.countBy({
|
trackPromise(this.noteUnreadsRepository.countBy({
|
||||||
userId: userId,
|
userId: userId,
|
||||||
isSpecified: true,
|
isSpecified: true,
|
||||||
}).then(specifiedCount => {
|
}).then(specifiedCount => {
|
||||||
|
@ -125,7 +126,7 @@ export class NoteReadService implements OnApplicationShutdown {
|
||||||
// 全て既読になったイベントを発行
|
// 全て既読になったイベントを発行
|
||||||
this.globalEventService.publishMainStream(userId, 'readAllUnreadSpecifiedNotes');
|
this.globalEventService.publishMainStream(userId, 'readAllUnreadSpecifiedNotes');
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import { CacheService } from '@/core/CacheService.js';
|
||||||
import type { Config } from '@/config.js';
|
import type { Config } from '@/config.js';
|
||||||
import { UserListService } from '@/core/UserListService.js';
|
import { UserListService } from '@/core/UserListService.js';
|
||||||
import type { FilterUnionByProperty } from '@/types.js';
|
import type { FilterUnionByProperty } from '@/types.js';
|
||||||
|
import { trackPromise } from '@/misc/promise-tracker.js';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class NotificationService implements OnApplicationShutdown {
|
export class NotificationService implements OnApplicationShutdown {
|
||||||
|
@ -74,7 +75,18 @@ export class NotificationService implements OnApplicationShutdown {
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public async createNotification<T extends MiNotification['type']>(
|
public createNotification<T extends MiNotification['type']>(
|
||||||
|
notifieeId: MiUser['id'],
|
||||||
|
type: T,
|
||||||
|
data: Omit<FilterUnionByProperty<MiNotification, 'type', T>, 'type' | 'id' | 'createdAt' | 'notifierId'>,
|
||||||
|
notifierId?: MiUser['id'] | null,
|
||||||
|
) {
|
||||||
|
trackPromise(
|
||||||
|
this.#createNotificationInternal(notifieeId, type, data, notifierId),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async #createNotificationInternal<T extends MiNotification['type']>(
|
||||||
notifieeId: MiUser['id'],
|
notifieeId: MiUser['id'],
|
||||||
type: T,
|
type: T,
|
||||||
data: Omit<FilterUnionByProperty<MiNotification, 'type', T>, 'type' | 'id' | 'createdAt' | 'notifierId'>,
|
data: Omit<FilterUnionByProperty<MiNotification, 'type', T>, 'type' | 'id' | 'createdAt' | 'notifierId'>,
|
||||||
|
|
|
@ -3,12 +3,12 @@
|
||||||
* SPDX-License-Identifier: AGPL-3.0-only
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { setTimeout } from 'node:timers/promises';
|
|
||||||
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
|
import { Inject, Module, OnApplicationShutdown } from '@nestjs/common';
|
||||||
import * as Bull from 'bullmq';
|
import * as Bull from 'bullmq';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import type { Config } from '@/config.js';
|
import type { Config } from '@/config.js';
|
||||||
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
|
import { QUEUE, baseQueueOptions } from '@/queue/const.js';
|
||||||
|
import { allSettled } from '@/misc/promise-tracker.js';
|
||||||
import type { Provider } from '@nestjs/common';
|
import type { Provider } from '@nestjs/common';
|
||||||
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
|
import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js';
|
||||||
|
|
||||||
|
@ -106,14 +106,9 @@ export class QueueModule implements OnApplicationShutdown {
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
public async dispose(): Promise<void> {
|
public async dispose(): Promise<void> {
|
||||||
if (process.env.NODE_ENV === 'test') {
|
// Wait for all potential queue jobs
|
||||||
// XXX:
|
await allSettled();
|
||||||
// Shutting down the existing connections causes errors on Jest as
|
// And then close all queues
|
||||||
// Misskey has asynchronous postgres/redis connections that are not
|
|
||||||
// awaited.
|
|
||||||
// Let's wait for some random time for them to finish.
|
|
||||||
await setTimeout(5000);
|
|
||||||
}
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
this.systemQueue.close(),
|
this.systemQueue.close(),
|
||||||
this.endedPollNotificationQueue.close(),
|
this.endedPollNotificationQueue.close(),
|
||||||
|
|
|
@ -28,6 +28,7 @@ import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||||
import { CustomEmojiService } from '@/core/CustomEmojiService.js';
|
import { CustomEmojiService } from '@/core/CustomEmojiService.js';
|
||||||
import { RoleService } from '@/core/RoleService.js';
|
import { RoleService } from '@/core/RoleService.js';
|
||||||
import { FeaturedService } from '@/core/FeaturedService.js';
|
import { FeaturedService } from '@/core/FeaturedService.js';
|
||||||
|
import { trackPromise } from '@/misc/promise-tracker.js';
|
||||||
|
|
||||||
const FALLBACK = '❤';
|
const FALLBACK = '❤';
|
||||||
const PER_NOTE_REACTION_USER_PAIR_CACHE_MAX = 16;
|
const PER_NOTE_REACTION_USER_PAIR_CACHE_MAX = 16;
|
||||||
|
@ -268,7 +269,7 @@ export class ReactionService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dm.execute();
|
trackPromise(dm.execute());
|
||||||
}
|
}
|
||||||
//#endregion
|
//#endregion
|
||||||
}
|
}
|
||||||
|
@ -316,7 +317,7 @@ export class ReactionService {
|
||||||
dm.addDirectRecipe(reactee as MiRemoteUser);
|
dm.addDirectRecipe(reactee as MiRemoteUser);
|
||||||
}
|
}
|
||||||
dm.addFollowersRecipe();
|
dm.addFollowersRecipe();
|
||||||
dm.execute();
|
trackPromise(dm.execute());
|
||||||
}
|
}
|
||||||
//#endregion
|
//#endregion
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,7 +144,7 @@ class DeliverManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// deliver
|
// deliver
|
||||||
this.queueService.deliverMany(this.actor, this.activity, inboxes);
|
await this.queueService.deliverMany(this.actor, this.activity, inboxes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
23
packages/backend/src/misc/promise-tracker.ts
Normal file
23
packages/backend/src/misc/promise-tracker.ts
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: syuilo and other misskey contributors
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
const promiseRefs: Set<WeakRef<Promise<unknown>>> = new Set();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tracks promises that other modules decided not to wait for,
|
||||||
|
* and makes sure they are all settled before fully closing down the server.
|
||||||
|
*/
|
||||||
|
export function trackPromise(promise: Promise<unknown>) {
|
||||||
|
if (process.env.NODE_ENV !== 'test') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const ref = new WeakRef(promise);
|
||||||
|
promiseRefs.add(ref);
|
||||||
|
promise.finally(() => promiseRefs.delete(ref));
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function allSettled(): Promise<void> {
|
||||||
|
await Promise.allSettled([...promiseRefs].map(r => r.deref()));
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
|
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
|
||||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||||
|
import { trackPromise } from '@/misc/promise-tracker.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -92,7 +93,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
|
|
||||||
antenna.isActive = true;
|
antenna.isActive = true;
|
||||||
antenna.lastUsedAt = new Date();
|
antenna.lastUsedAt = new Date();
|
||||||
this.antennasRepository.update(antenna.id, antenna);
|
trackPromise(this.antennasRepository.update(antenna.id, antenna));
|
||||||
|
|
||||||
if (needPublishEvent) {
|
if (needPublishEvent) {
|
||||||
this.globalEventService.publishInternalEvent('antennaUpdated', antenna);
|
this.globalEventService.publishInternalEvent('antennaUpdated', antenna);
|
||||||
|
|
Loading…
Reference in a new issue