いちおう動くようにはなった

Signed-off-by: mattyatea <mattyacocacora0@gmail.com>
This commit is contained in:
mattyatea 2023-11-06 18:15:29 +09:00
parent f72228f428
commit e133a6b6a4
No known key found for this signature in database
GPG key ID: 068E54E2C33BEF9A
21 changed files with 597 additions and 8 deletions

View file

@ -10,6 +10,7 @@ import { QueueLoggerService } from './QueueLoggerService.js';
import { QueueProcessorService } from './QueueProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
@ -71,6 +72,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
RelationshipProcessorService,
WebhookDeliverProcessorService,
EndedPollNotificationProcessorService,
ScheduleNotePostProcessorService,
DeliverProcessorService,
InboxProcessorService,
AggregateRetentionProcessorService,

View file

@ -11,6 +11,7 @@ import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { WebhookDeliverProcessorService } from './processors/WebhookDeliverProcessorService.js';
import { EndedPollNotificationProcessorService } from './processors/EndedPollNotificationProcessorService.js';
import { ScheduleNotePostProcessorService } from './processors/ScheduleNotePostProcessorService.js';
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
@ -78,6 +79,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private relationshipQueueWorker: Bull.Worker;
private objectStorageQueueWorker: Bull.Worker;
private endedPollNotificationQueueWorker: Bull.Worker;
private schedulerNotePostQueueWorker: Bull.Worker;
constructor(
@Inject(DI.config)
@ -86,6 +88,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private queueLoggerService: QueueLoggerService,
private webhookDeliverProcessorService: WebhookDeliverProcessorService,
private endedPollNotificationProcessorService: EndedPollNotificationProcessorService,
private scheduleNotePostProcessorService: ScheduleNotePostProcessorService,
private deliverProcessorService: DeliverProcessorService,
private inboxProcessorService: InboxProcessorService,
private deleteDriveFilesProcessorService: DeleteDriveFilesProcessorService,
@ -320,6 +323,13 @@ export class QueueProcessorService implements OnApplicationShutdown {
.on('stalled', (jobId) => objectStorageLogger.warn(`stalled id=${jobId}`));
//#endregion
//#region schedule note post
this.schedulerNotePostQueueWorker = new Bull.Worker(QUEUE.SCHEDULE_NOTE_POST, (job) => this.scheduleNotePostProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.SCHEDULE_NOTE_POST),
autorun: false,
});
//#endregion
//#region ended poll notification
this.endedPollNotificationQueueWorker = new Bull.Worker(QUEUE.ENDED_POLL_NOTIFICATION, (job) => this.endedPollNotificationProcessorService.process(job), {
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
@ -339,6 +349,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.relationshipQueueWorker.run(),
this.objectStorageQueueWorker.run(),
this.endedPollNotificationQueueWorker.run(),
this.schedulerNotePostQueueWorker.run(),
]);
}
@ -353,6 +364,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
this.relationshipQueueWorker.close(),
this.objectStorageQueueWorker.close(),
this.endedPollNotificationQueueWorker.close(),
this.schedulerNotePostQueueWorker.close(),
]);
}

View file

@ -11,6 +11,7 @@ export const QUEUE = {
INBOX: 'inbox',
SYSTEM: 'system',
ENDED_POLL_NOTIFICATION: 'endedPollNotification',
SCHEDULE_NOTE_POST: 'scheduleNotePost',
DB: 'db',
RELATIONSHIP: 'relationship',
OBJECT_STORAGE: 'objectStorage',

View file

@ -0,0 +1,30 @@
/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/
import { Inject, Injectable } from '@nestjs/common';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { NoteCreateService } from '@/core/NoteCreateService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { ScheduleNotePostJobData } from '../types.js';
@Injectable()
export class ScheduleNotePostProcessorService {
private logger: Logger;
constructor(
private noteCreateService: NoteCreateService,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('ended-poll-notification');
}
@bindThis
public async process(job: Bull.Job<ScheduleNotePostJobData>): Promise<void> {
job.data.note.createdAt = new Date();
await this.noteCreateService.create(job.data.user, job.data.note);
}
}

View file

@ -6,9 +6,13 @@
import type { Antenna } from '@/server/api/endpoints/i/import-antennas.js';
import type { MiDriveFile } from '@/models/DriveFile.js';
import type { MiNote } from '@/models/Note.js';
import type { MiUser } from '@/models/User.js';
import type { MiLocalUser, MiUser } from '@/models/User.js';
import type { MiWebhook } from '@/models/Webhook.js';
import type { IActivity } from '@/core/activitypub/type.js';
import { IPoll } from '@/models/Poll.js';
import { MiNoteSchedule } from '@/models/NoteSchedule.js';
import { MiChannel } from '@/models/Channel.js';
import { MiApp } from '@/models/App.js';
import type httpSignature from '@peertube/http-signature';
export type DeliverJobData = {
@ -104,6 +108,38 @@ export type EndedPollNotificationJobData = {
noteId: MiNote['id'];
};
export type ScheduleNotePostJobData = {
note: {
name?: string | null;
text?: string | null;
reply?: MiNote | null;
renote?: MiNote | null;
files?: MiDriveFile[] | null;
poll?: IPoll | null;
schedule?: MiNoteSchedule | null;
localOnly?: boolean | null;
reactionAcceptance?: MiNote['reactionAcceptance'];
cw?: string | null;
visibility?: string;
visibleUsers?: MinimumUser[] | null;
channel?: MiChannel | null;
apMentions?: MinimumUser[] | null;
apHashtags?: string[] | null;
apEmojis?: string[] | null;
uri?: string | null;
url?: string | null;
app?: MiApp | null;
};
user: MiUser & {host: null, uri: null};
}
type MinimumUser = {
id: MiUser['id'];
host: MiUser['host'];
username: MiUser['username'];
uri: MiUser['uri'];
};
export type WebhookDeliverJobData = {
type: string;
content: unknown;