parent
aafe80c121
commit
0bb0c32908
|
@ -16,6 +16,7 @@ import type { AntennasRepository, UserListMembershipsRepository } from '@/models
|
||||||
import { UtilityService } from '@/core/UtilityService.js';
|
import { UtilityService } from '@/core/UtilityService.js';
|
||||||
import { bindThis } from '@/decorators.js';
|
import { bindThis } from '@/decorators.js';
|
||||||
import type { GlobalEvents } from '@/core/GlobalEventService.js';
|
import type { GlobalEvents } from '@/core/GlobalEventService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import type { OnApplicationShutdown } from '@nestjs/common';
|
import type { OnApplicationShutdown } from '@nestjs/common';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
|
@ -38,6 +39,7 @@ export class AntennaService implements OnApplicationShutdown {
|
||||||
|
|
||||||
private utilityService: UtilityService,
|
private utilityService: UtilityService,
|
||||||
private globalEventService: GlobalEventService,
|
private globalEventService: GlobalEventService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
this.antennasFetched = false;
|
this.antennasFetched = false;
|
||||||
this.antennas = [];
|
this.antennas = [];
|
||||||
|
@ -77,9 +79,6 @@ export class AntennaService implements OnApplicationShutdown {
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
public async addNoteToAntennas(note: MiNote, noteUser: { id: MiUser['id']; username: string; host: string | null; }): Promise<void> {
|
public async addNoteToAntennas(note: MiNote, noteUser: { id: MiUser['id']; username: string; host: string | null; }): Promise<void> {
|
||||||
// リモートから遅れて届いた(もしくは後から追加された)投稿日時が古い投稿が追加されるとページネーション時に問題を引き起こすため、3分以内に投稿されたもののみを追加する
|
|
||||||
if (Date.now() - note.createdAt.getTime() > 1000 * 60 * 3) return;
|
|
||||||
|
|
||||||
const antennas = await this.getAntennas();
|
const antennas = await this.getAntennas();
|
||||||
const antennasWithMatchResult = await Promise.all(antennas.map(antenna => this.checkHitAntenna(antenna, note, noteUser).then(hit => [antenna, hit] as const)));
|
const antennasWithMatchResult = await Promise.all(antennas.map(antenna => this.checkHitAntenna(antenna, note, noteUser).then(hit => [antenna, hit] as const)));
|
||||||
const matchedAntennas = antennasWithMatchResult.filter(([, hit]) => hit).map(([antenna]) => antenna);
|
const matchedAntennas = antennasWithMatchResult.filter(([, hit]) => hit).map(([antenna]) => antenna);
|
||||||
|
@ -87,12 +86,7 @@ export class AntennaService implements OnApplicationShutdown {
|
||||||
const redisPipeline = this.redisForTimelines.pipeline();
|
const redisPipeline = this.redisForTimelines.pipeline();
|
||||||
|
|
||||||
for (const antenna of matchedAntennas) {
|
for (const antenna of matchedAntennas) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`antennaTimeline:${antenna.id}`, note.id, 200, redisPipeline);
|
||||||
`antennaTimeline:${antenna.id}`,
|
|
||||||
'MAXLEN', '~', '200',
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
this.globalEventService.publishAntennaStream(antenna.id, 'note', note);
|
this.globalEventService.publishAntennaStream(antenna.id, 'note', note);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,7 @@ import { FileInfoService } from './FileInfoService.js';
|
||||||
import { SearchService } from './SearchService.js';
|
import { SearchService } from './SearchService.js';
|
||||||
import { ClipService } from './ClipService.js';
|
import { ClipService } from './ClipService.js';
|
||||||
import { FeaturedService } from './FeaturedService.js';
|
import { FeaturedService } from './FeaturedService.js';
|
||||||
|
import { RedisTimelineService } from './RedisTimelineService.js';
|
||||||
import { ChartLoggerService } from './chart/ChartLoggerService.js';
|
import { ChartLoggerService } from './chart/ChartLoggerService.js';
|
||||||
import FederationChart from './chart/charts/federation.js';
|
import FederationChart from './chart/charts/federation.js';
|
||||||
import NotesChart from './chart/charts/notes.js';
|
import NotesChart from './chart/charts/notes.js';
|
||||||
|
@ -189,6 +190,7 @@ const $FileInfoService: Provider = { provide: 'FileInfoService', useExisting: Fi
|
||||||
const $SearchService: Provider = { provide: 'SearchService', useExisting: SearchService };
|
const $SearchService: Provider = { provide: 'SearchService', useExisting: SearchService };
|
||||||
const $ClipService: Provider = { provide: 'ClipService', useExisting: ClipService };
|
const $ClipService: Provider = { provide: 'ClipService', useExisting: ClipService };
|
||||||
const $FeaturedService: Provider = { provide: 'FeaturedService', useExisting: FeaturedService };
|
const $FeaturedService: Provider = { provide: 'FeaturedService', useExisting: FeaturedService };
|
||||||
|
const $RedisTimelineService: Provider = { provide: 'RedisTimelineService', useExisting: RedisTimelineService };
|
||||||
|
|
||||||
const $ChartLoggerService: Provider = { provide: 'ChartLoggerService', useExisting: ChartLoggerService };
|
const $ChartLoggerService: Provider = { provide: 'ChartLoggerService', useExisting: ChartLoggerService };
|
||||||
const $FederationChart: Provider = { provide: 'FederationChart', useExisting: FederationChart };
|
const $FederationChart: Provider = { provide: 'FederationChart', useExisting: FederationChart };
|
||||||
|
@ -321,6 +323,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||||
SearchService,
|
SearchService,
|
||||||
ClipService,
|
ClipService,
|
||||||
FeaturedService,
|
FeaturedService,
|
||||||
|
RedisTimelineService,
|
||||||
ChartLoggerService,
|
ChartLoggerService,
|
||||||
FederationChart,
|
FederationChart,
|
||||||
NotesChart,
|
NotesChart,
|
||||||
|
@ -446,6 +449,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||||
$SearchService,
|
$SearchService,
|
||||||
$ClipService,
|
$ClipService,
|
||||||
$FeaturedService,
|
$FeaturedService,
|
||||||
|
$RedisTimelineService,
|
||||||
$ChartLoggerService,
|
$ChartLoggerService,
|
||||||
$FederationChart,
|
$FederationChart,
|
||||||
$NotesChart,
|
$NotesChart,
|
||||||
|
@ -572,6 +576,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||||
SearchService,
|
SearchService,
|
||||||
ClipService,
|
ClipService,
|
||||||
FeaturedService,
|
FeaturedService,
|
||||||
|
RedisTimelineService,
|
||||||
FederationChart,
|
FederationChart,
|
||||||
NotesChart,
|
NotesChart,
|
||||||
UsersChart,
|
UsersChart,
|
||||||
|
@ -696,6 +701,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
|
||||||
$SearchService,
|
$SearchService,
|
||||||
$ClipService,
|
$ClipService,
|
||||||
$FeaturedService,
|
$FeaturedService,
|
||||||
|
$RedisTimelineService,
|
||||||
$FederationChart,
|
$FederationChart,
|
||||||
$NotesChart,
|
$NotesChart,
|
||||||
$UsersChart,
|
$UsersChart,
|
||||||
|
|
|
@ -54,6 +54,7 @@ import { RoleService } from '@/core/RoleService.js';
|
||||||
import { MetaService } from '@/core/MetaService.js';
|
import { MetaService } from '@/core/MetaService.js';
|
||||||
import { SearchService } from '@/core/SearchService.js';
|
import { SearchService } from '@/core/SearchService.js';
|
||||||
import { FeaturedService } from '@/core/FeaturedService.js';
|
import { FeaturedService } from '@/core/FeaturedService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
|
|
||||||
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
|
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
|
||||||
|
|
||||||
|
@ -194,6 +195,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private globalEventService: GlobalEventService,
|
private globalEventService: GlobalEventService,
|
||||||
private queueService: QueueService,
|
private queueService: QueueService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
private noteReadService: NoteReadService,
|
private noteReadService: NoteReadService,
|
||||||
private notificationService: NotificationService,
|
private notificationService: NotificationService,
|
||||||
private relayService: RelayService,
|
private relayService: RelayService,
|
||||||
|
@ -347,14 +349,6 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
|
|
||||||
const note = await this.insertNote(user, data, tags, emojis, mentionedUsers);
|
const note = await this.insertNote(user, data, tags, emojis, mentionedUsers);
|
||||||
|
|
||||||
if (data.channel) {
|
|
||||||
this.redisForTimelines.xadd(
|
|
||||||
`channelTimeline:${data.channel.id}`,
|
|
||||||
'MAXLEN', '~', this.config.perChannelMaxNoteCacheCount.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
|
||||||
|
|
||||||
setImmediate('post created', { signal: this.#shutdownController.signal }).then(
|
setImmediate('post created', { signal: this.#shutdownController.signal }).then(
|
||||||
() => this.postNoteCreated(note, user, data, silent, tags!, mentionedUsers!),
|
() => this.postNoteCreated(note, user, data, silent, tags!, mentionedUsers!),
|
||||||
() => { /* aborted, ignore this */ },
|
() => { /* aborted, ignore this */ },
|
||||||
|
@ -822,20 +816,14 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
|
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
|
||||||
// リモートから遅れて届いた(もしくは後から追加された)投稿日時が古い投稿が追加されるとページネーション時に問題を引き起こすため、3分以内に投稿されたもののみを追加する
|
|
||||||
// TODO: https://github.com/misskey-dev/misskey/issues/11404#issuecomment-1752480890 をやる
|
|
||||||
if (note.userHost != null && (Date.now() - note.createdAt.getTime()) > 1000 * 60 * 3) return;
|
|
||||||
|
|
||||||
const meta = await this.metaService.fetch();
|
const meta = await this.metaService.fetch();
|
||||||
|
|
||||||
const redisPipeline = this.redisForTimelines.pipeline();
|
const r = this.redisForTimelines.pipeline();
|
||||||
|
|
||||||
if (note.channelId) {
|
if (note.channelId) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`channelTimeline:${note.channelId}`, note.id, this.config.perChannelMaxNoteCacheCount, r);
|
||||||
`userTimelineWithChannel:${user.id}`,
|
|
||||||
'MAXLEN', '~', note.userHost == null ? meta.perLocalUserUserTimelineCacheMax.toString() : meta.perRemoteUserUserTimelineCacheMax.toString(),
|
this.redisTimelineService.push(`userTimelineWithChannel:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r);
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
const channelFollowings = await this.channelFollowingsRepository.find({
|
const channelFollowings = await this.channelFollowingsRepository.find({
|
||||||
where: {
|
where: {
|
||||||
|
@ -845,18 +833,9 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const channelFollowing of channelFollowings) {
|
for (const channelFollowing of channelFollowings) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimeline:${channelFollowing.followerId}`, note.id, meta.perUserHomeTimelineCacheMax, r);
|
||||||
`homeTimeline:${channelFollowing.followerId}`,
|
|
||||||
'MAXLEN', '~', meta.perUserHomeTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimelineWithFiles:${channelFollowing.followerId}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r);
|
||||||
`homeTimelineWithFiles:${channelFollowing.followerId}`,
|
|
||||||
'MAXLEN', '~', (meta.perUserHomeTimelineCacheMax / 2).toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -894,18 +873,9 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
if (!following.withReplies) continue;
|
if (!following.withReplies) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimeline:${following.followerId}`, note.id, meta.perUserHomeTimelineCacheMax, r);
|
||||||
`homeTimeline:${following.followerId}`,
|
|
||||||
'MAXLEN', '~', meta.perUserHomeTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimelineWithFiles:${following.followerId}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r);
|
||||||
`homeTimelineWithFiles:${following.followerId}`,
|
|
||||||
'MAXLEN', '~', (meta.perUserHomeTimelineCacheMax / 2).toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -921,72 +891,32 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
if (!userListMembership.withReplies) continue;
|
if (!userListMembership.withReplies) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`userListTimeline:${userListMembership.userListId}`, note.id, meta.perUserListTimelineCacheMax, r);
|
||||||
`userListTimeline:${userListMembership.userListId}`,
|
|
||||||
'MAXLEN', '~', meta.perUserListTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`userListTimelineWithFiles:${userListMembership.userListId}`, note.id, meta.perUserListTimelineCacheMax / 2, r);
|
||||||
`userListTimelineWithFiles:${userListMembership.userListId}`,
|
|
||||||
'MAXLEN', '~', (meta.perUserListTimelineCacheMax / 2).toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (note.visibility !== 'specified' || !note.visibleUserIds.some(v => v === user.id)) { // 自分自身のHTL
|
if (note.visibility !== 'specified' || !note.visibleUserIds.some(v => v === user.id)) { // 自分自身のHTL
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimeline:${user.id}`, note.id, meta.perUserHomeTimelineCacheMax, r);
|
||||||
`homeTimeline:${user.id}`,
|
|
||||||
'MAXLEN', '~', meta.perUserHomeTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`homeTimelineWithFiles:${user.id}`, note.id, meta.perUserHomeTimelineCacheMax / 2, r);
|
||||||
`homeTimelineWithFiles:${user.id}`,
|
|
||||||
'MAXLEN', '~', (meta.perUserHomeTimelineCacheMax / 2).toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 自分自身以外への返信
|
// 自分自身以外への返信
|
||||||
if (note.replyId && note.replyUserId !== note.userId) {
|
if (note.replyId && note.replyUserId !== note.userId) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`userTimelineWithReplies:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r);
|
||||||
`userTimelineWithReplies:${user.id}`,
|
|
||||||
'MAXLEN', '~', note.userHost == null ? meta.perLocalUserUserTimelineCacheMax.toString() : meta.perRemoteUserUserTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
} else {
|
} else {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`userTimeline:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax : meta.perRemoteUserUserTimelineCacheMax, r);
|
||||||
`userTimeline:${user.id}`,
|
|
||||||
'MAXLEN', '~', note.userHost == null ? meta.perLocalUserUserTimelineCacheMax.toString() : meta.perRemoteUserUserTimelineCacheMax.toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`userTimelineWithFiles:${user.id}`, note.id, note.userHost == null ? meta.perLocalUserUserTimelineCacheMax / 2 : meta.perRemoteUserUserTimelineCacheMax / 2, r);
|
||||||
`userTimelineWithFiles:${user.id}`,
|
|
||||||
'MAXLEN', '~', note.userHost == null ? (meta.perLocalUserUserTimelineCacheMax / 2).toString() : (meta.perRemoteUserUserTimelineCacheMax / 2).toString(),
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (note.visibility === 'public' && note.userHost == null) {
|
if (note.visibility === 'public' && note.userHost == null) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push('localTimeline', note.id, 1000, r);
|
||||||
'localTimeline',
|
|
||||||
'MAXLEN', '~', '1000',
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
if (note.fileIds.length > 0) {
|
if (note.fileIds.length > 0) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push('localTimelineWithFiles', note.id, 500, r);
|
||||||
'localTimelineWithFiles',
|
|
||||||
'MAXLEN', '~', '500',
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -998,7 +928,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
redisPipeline.exec();
|
r.exec();
|
||||||
}
|
}
|
||||||
|
|
||||||
@bindThis
|
@bindThis
|
||||||
|
|
80
packages/backend/src/core/RedisTimelineService.ts
Normal file
80
packages/backend/src/core/RedisTimelineService.ts
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* SPDX-FileCopyrightText: syuilo and other misskey contributors
|
||||||
|
* SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { Inject, Injectable } from '@nestjs/common';
|
||||||
|
import * as Redis from 'ioredis';
|
||||||
|
import { DI } from '@/di-symbols.js';
|
||||||
|
import { bindThis } from '@/decorators.js';
|
||||||
|
import { IdService } from '@/core/IdService.js';
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class RedisTimelineService {
|
||||||
|
constructor(
|
||||||
|
@Inject(DI.redisForTimelines)
|
||||||
|
private redisForTimelines: Redis.Redis,
|
||||||
|
|
||||||
|
private idService: IdService,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public push(tl: string, id: string, maxlen: number, pipeline: Redis.ChainableCommander) {
|
||||||
|
// リモートから遅れて届いた(もしくは後から追加された)投稿日時が古い投稿が追加されるとページネーション時に問題を引き起こすため、
|
||||||
|
// 3分以内に投稿されたものでない場合、Redisにある最古のIDより新しい場合のみ追加する
|
||||||
|
if (this.idService.parse(id).date.getTime() > Date.now() - 1000 * 60 * 3) {
|
||||||
|
pipeline.lpush('list:' + tl, id);
|
||||||
|
if (Math.random() < 0.1) { // 10%の確率でトリム
|
||||||
|
pipeline.ltrim('list:' + tl, 0, maxlen - 1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 末尾のIDを取得
|
||||||
|
this.redisForTimelines.lindex('list:' + tl, -1).then(lastId => {
|
||||||
|
if (lastId == null || (this.idService.parse(id).date.getTime() > this.idService.parse(lastId).date.getTime())) {
|
||||||
|
this.redisForTimelines.lpush('list:' + tl, id);
|
||||||
|
} else {
|
||||||
|
Promise.resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public get(name: string, untilId?: string | null, sinceId?: string | null) {
|
||||||
|
if (untilId && sinceId) {
|
||||||
|
return this.redisForTimelines.lrange('list:' + name, 0, -1)
|
||||||
|
.then(ids => ids.filter(id => id > untilId && id < sinceId).sort((a, b) => a > b ? -1 : 1));
|
||||||
|
} else if (untilId) {
|
||||||
|
return this.redisForTimelines.lrange('list:' + name, 0, -1)
|
||||||
|
.then(ids => ids.filter(id => id > untilId).sort((a, b) => a > b ? -1 : 1));
|
||||||
|
} else if (sinceId) {
|
||||||
|
return this.redisForTimelines.lrange('list:' + name, 0, -1)
|
||||||
|
.then(ids => ids.filter(id => id < sinceId).sort((a, b) => a < b ? -1 : 1));
|
||||||
|
} else {
|
||||||
|
return this.redisForTimelines.lrange('list:' + name, 0, -1)
|
||||||
|
.then(ids => ids.sort((a, b) => a > b ? -1 : 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@bindThis
|
||||||
|
public getMulti(name: string[], untilId?: string | null, sinceId?: string | null): Promise<string[][]> {
|
||||||
|
const pipeline = this.redisForTimelines.pipeline();
|
||||||
|
for (const n of name) {
|
||||||
|
pipeline.lrange('list:' + n, 0, -1);
|
||||||
|
}
|
||||||
|
return pipeline.exec().then(res => {
|
||||||
|
if (res == null) return [];
|
||||||
|
const tls = res.map(r => r[1] as string[]);
|
||||||
|
return tls.map(ids =>
|
||||||
|
(untilId && sinceId)
|
||||||
|
? ids.filter(id => id > untilId && id < sinceId).sort((a, b) => a > b ? -1 : 1)
|
||||||
|
: untilId
|
||||||
|
? ids.filter(id => id > untilId).sort((a, b) => a > b ? -1 : 1)
|
||||||
|
: sinceId
|
||||||
|
? ids.filter(id => id < sinceId).sort((a, b) => a < b ? -1 : 1)
|
||||||
|
: ids.sort((a, b) => a > b ? -1 : 1),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import { IdService } from '@/core/IdService.js';
|
||||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||||
import { ModerationLogService } from '@/core/ModerationLogService.js';
|
import { ModerationLogService } from '@/core/ModerationLogService.js';
|
||||||
import type { Packed } from '@/misc/json-schema.js';
|
import type { Packed } from '@/misc/json-schema.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import type { OnApplicationShutdown } from '@nestjs/common';
|
import type { OnApplicationShutdown } from '@nestjs/common';
|
||||||
|
|
||||||
export type RolePolicies = {
|
export type RolePolicies = {
|
||||||
|
@ -102,6 +103,7 @@ export class RoleService implements OnApplicationShutdown {
|
||||||
private globalEventService: GlobalEventService,
|
private globalEventService: GlobalEventService,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private moderationLogService: ModerationLogService,
|
private moderationLogService: ModerationLogService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
//this.onMessage = this.onMessage.bind(this);
|
//this.onMessage = this.onMessage.bind(this);
|
||||||
|
|
||||||
|
@ -472,12 +474,7 @@ export class RoleService implements OnApplicationShutdown {
|
||||||
const redisPipeline = this.redisClient.pipeline();
|
const redisPipeline = this.redisClient.pipeline();
|
||||||
|
|
||||||
for (const role of roles) {
|
for (const role of roles) {
|
||||||
redisPipeline.xadd(
|
this.redisTimelineService.push(`roleTimeline:${role.id}`, note.id, 1000, redisPipeline);
|
||||||
`roleTimeline:${role.id}`,
|
|
||||||
'MAXLEN', '~', '1000',
|
|
||||||
'*',
|
|
||||||
'note', note.id);
|
|
||||||
|
|
||||||
this.globalEventService.publishRoleTimelineStream(role.id, 'note', note);
|
this.globalEventService.publishRoleTimelineStream(role.id, 'note', note);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,7 @@ import { NoteReadService } from '@/core/NoteReadService.js';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -69,8 +70,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private noteEntityService: NoteEntityService,
|
private noteEntityService: NoteEntityService,
|
||||||
private queryService: QueryService,
|
private queryService: QueryService,
|
||||||
private noteReadService: NoteReadService,
|
private noteReadService: NoteReadService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const antenna = await this.antennasRepository.findOneBy({
|
const antenna = await this.antennasRepository.findOneBy({
|
||||||
id: ps.antennaId,
|
id: ps.antennaId,
|
||||||
userId: me.id,
|
userId: me.id,
|
||||||
|
@ -85,15 +90,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
lastUsedAt: new Date(),
|
lastUsedAt: new Date(),
|
||||||
});
|
});
|
||||||
|
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
let noteIds = await this.redisTimelineService.get(`antennaTimeline:${antenna.id}`, untilId, sinceId);
|
||||||
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
const noteIds = await this.redisForTimelines.xrevrange(
|
|
||||||
`antennaTimeline:${antenna.id}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId));
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
if (noteIds.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,9 @@ import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
||||||
import ActiveUsersChart from '@/core/chart/charts/active-users.js';
|
import ActiveUsersChart from '@/core/chart/charts/active-users.js';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -66,9 +69,15 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private noteEntityService: NoteEntityService,
|
private noteEntityService: NoteEntityService,
|
||||||
private queryService: QueryService,
|
private queryService: QueryService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
|
private cacheService: CacheService,
|
||||||
private activeUsersChart: ActiveUsersChart,
|
private activeUsersChart: ActiveUsersChart,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
const isRangeSpecified = untilId != null && sinceId != null;
|
||||||
|
|
||||||
const channel = await this.channelsRepository.findOneBy({
|
const channel = await this.channelsRepository.findOneBy({
|
||||||
id: ps.channelId,
|
id: ps.channelId,
|
||||||
});
|
});
|
||||||
|
@ -77,68 +86,66 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
throw new ApiError(meta.errors.noSuchChannel);
|
throw new ApiError(meta.errors.noSuchChannel);
|
||||||
}
|
}
|
||||||
|
|
||||||
let timeline: MiNote[] = [];
|
|
||||||
|
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
let noteIdsRes: [string, string[]][] = [];
|
|
||||||
|
|
||||||
if (!ps.sinceId && !ps.sinceDate) {
|
|
||||||
noteIdsRes = await this.redisForTimelines.xrevrange(
|
|
||||||
`channelTimeline:${channel.id}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
// redis から取得していないとき・取得数が足りないとき
|
|
||||||
if (noteIdsRes.length < limit) {
|
|
||||||
//#region Construct query
|
|
||||||
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate)
|
|
||||||
.andWhere('note.channelId = :channelId', { channelId: channel.id })
|
|
||||||
.innerJoinAndSelect('note.user', 'user')
|
|
||||||
.leftJoinAndSelect('note.reply', 'reply')
|
|
||||||
.leftJoinAndSelect('note.renote', 'renote')
|
|
||||||
.leftJoinAndSelect('reply.user', 'replyUser')
|
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
|
||||||
|
|
||||||
if (me) {
|
|
||||||
this.queryService.generateMutedUserQuery(query, me);
|
|
||||||
this.queryService.generateBlockedUserQuery(query, me);
|
|
||||||
}
|
|
||||||
//#endregion
|
|
||||||
|
|
||||||
timeline = await query.limit(ps.limit).getMany();
|
|
||||||
} else {
|
|
||||||
const noteIds = noteIdsRes.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId);
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
//#region Construct query
|
|
||||||
const query = this.notesRepository.createQueryBuilder('note')
|
|
||||||
.where('note.id IN (:...noteIds)', { noteIds: noteIds })
|
|
||||||
.innerJoinAndSelect('note.user', 'user')
|
|
||||||
.leftJoinAndSelect('note.reply', 'reply')
|
|
||||||
.leftJoinAndSelect('note.renote', 'renote')
|
|
||||||
.leftJoinAndSelect('reply.user', 'replyUser')
|
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
|
||||||
|
|
||||||
if (me) {
|
|
||||||
this.queryService.generateMutedUserQuery(query, me);
|
|
||||||
this.queryService.generateBlockedUserQuery(query, me);
|
|
||||||
}
|
|
||||||
//#endregion
|
|
||||||
|
|
||||||
timeline = await query.getMany();
|
|
||||||
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (me) this.activeUsersChart.read(me);
|
if (me) this.activeUsersChart.read(me);
|
||||||
|
|
||||||
|
if (isRangeSpecified || sinceId == null) {
|
||||||
|
const [
|
||||||
|
userIdsWhoMeMuting,
|
||||||
|
] = me ? await Promise.all([
|
||||||
|
this.cacheService.userMutingsCache.fetch(me.id),
|
||||||
|
]) : [new Set<string>()];
|
||||||
|
|
||||||
|
let noteIds = await this.redisTimelineService.get(`channelTimeline:${channel.id}`, untilId, sinceId);
|
||||||
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
|
|
||||||
|
if (noteIds.length > 0) {
|
||||||
|
const query = this.notesRepository.createQueryBuilder('note')
|
||||||
|
.where('note.id IN (:...noteIds)', { noteIds: noteIds })
|
||||||
|
.innerJoinAndSelect('note.user', 'user')
|
||||||
|
.leftJoinAndSelect('note.reply', 'reply')
|
||||||
|
.leftJoinAndSelect('note.renote', 'renote')
|
||||||
|
.leftJoinAndSelect('reply.user', 'replyUser')
|
||||||
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
|
let timeline = await query.getMany();
|
||||||
|
|
||||||
|
timeline = timeline.filter(note => {
|
||||||
|
if (me && isUserRelated(note, userIdsWhoMeMuting, true)) return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: フィルタで件数が減った場合の埋め合わせ処理
|
||||||
|
|
||||||
|
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
||||||
|
|
||||||
|
if (timeline.length > 0) {
|
||||||
|
return await this.noteEntityService.packMany(timeline, me);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#region fallback to database
|
||||||
|
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate)
|
||||||
|
.andWhere('note.channelId = :channelId', { channelId: channel.id })
|
||||||
|
.innerJoinAndSelect('note.user', 'user')
|
||||||
|
.leftJoinAndSelect('note.reply', 'reply')
|
||||||
|
.leftJoinAndSelect('note.renote', 'renote')
|
||||||
|
.leftJoinAndSelect('reply.user', 'replyUser')
|
||||||
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
|
if (me) {
|
||||||
|
this.queryService.generateMutedUserQuery(query, me);
|
||||||
|
this.queryService.generateBlockedUserQuery(query, me);
|
||||||
|
}
|
||||||
|
//#endregion
|
||||||
|
|
||||||
|
const timeline = await query.limit(ps.limit).getMany();
|
||||||
|
|
||||||
return await this.noteEntityService.packMany(timeline, me);
|
return await this.noteEntityService.packMany(timeline, me);
|
||||||
|
//#endregion
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import { RoleService } from '@/core/RoleService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
import { CacheService } from '@/core/CacheService.js';
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -72,8 +73,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private activeUsersChart: ActiveUsersChart,
|
private activeUsersChart: ActiveUsersChart,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const policies = await this.roleService.getUserPolicies(me.id);
|
const policies = await this.roleService.getUserPolicies(me.id);
|
||||||
if (!policies.ltlAvailable) {
|
if (!policies.ltlAvailable) {
|
||||||
throw new ApiError(meta.errors.stlDisabled);
|
throw new ApiError(meta.errors.stlDisabled);
|
||||||
|
@ -89,27 +94,10 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
this.cacheService.userBlockedCache.fetch(me.id),
|
this.cacheService.userBlockedCache.fetch(me.id),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let timeline: MiNote[] = [];
|
const [htlNoteIds, ltlNoteIds] = await this.redisTimelineService.getMulti([
|
||||||
|
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const redisPipeline = this.redisForTimelines.pipeline();
|
|
||||||
redisPipeline.xrevrange(
|
|
||||||
ps.withFiles ? `homeTimelineWithFiles:${me.id}` : `homeTimeline:${me.id}`,
|
ps.withFiles ? `homeTimelineWithFiles:${me.id}` : `homeTimeline:${me.id}`,
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
);
|
|
||||||
redisPipeline.xrevrange(
|
|
||||||
ps.withFiles ? 'localTimelineWithFiles' : 'localTimeline',
|
ps.withFiles ? 'localTimelineWithFiles' : 'localTimeline',
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
], untilId, sinceId);
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
);
|
|
||||||
const [htlNoteIds, ltlNoteIds] = await redisPipeline.exec().then(res => res ? [
|
|
||||||
(res[0][1] as string[][]).map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId),
|
|
||||||
(res[1][1] as string[][]).map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId),
|
|
||||||
] : []);
|
|
||||||
|
|
||||||
let noteIds = Array.from(new Set([...htlNoteIds, ...ltlNoteIds]));
|
let noteIds = Array.from(new Set([...htlNoteIds, ...ltlNoteIds]));
|
||||||
noteIds.sort((a, b) => a > b ? -1 : 1);
|
noteIds.sort((a, b) => a > b ? -1 : 1);
|
||||||
|
@ -128,7 +116,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
timeline = await query.getMany();
|
let timeline = await query.getMany();
|
||||||
|
|
||||||
timeline = timeline.filter(note => {
|
timeline = timeline.filter(note => {
|
||||||
if (note.userId === me.id) {
|
if (note.userId === me.id) {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import { RoleService } from '@/core/RoleService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { CacheService } from '@/core/CacheService.js';
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -68,8 +69,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private activeUsersChart: ActiveUsersChart,
|
private activeUsersChart: ActiveUsersChart,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const policies = await this.roleService.getUserPolicies(me ? me.id : null);
|
const policies = await this.roleService.getUserPolicies(me ? me.id : null);
|
||||||
if (!policies.ltlAvailable) {
|
if (!policies.ltlAvailable) {
|
||||||
throw new ApiError(meta.errors.ltlDisabled);
|
throw new ApiError(meta.errors.ltlDisabled);
|
||||||
|
@ -85,16 +90,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
this.cacheService.userBlockedCache.fetch(me.id),
|
this.cacheService.userBlockedCache.fetch(me.id),
|
||||||
]) : [new Set<string>(), new Set<string>(), new Set<string>()];
|
]) : [new Set<string>(), new Set<string>(), new Set<string>()];
|
||||||
|
|
||||||
let timeline: MiNote[] = [];
|
let noteIds = await this.redisTimelineService.get(ps.withFiles ? 'localTimelineWithFiles' : 'localTimeline', untilId, sinceId);
|
||||||
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const noteIds = await this.redisForTimelines.xrevrange(
|
|
||||||
ps.withFiles ? 'localTimelineWithFiles' : 'localTimeline',
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId));
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
if (noteIds.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
|
@ -109,7 +106,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
timeline = await query.getMany();
|
let timeline = await query.getMany();
|
||||||
|
|
||||||
timeline = timeline.filter(note => {
|
timeline = timeline.filter(note => {
|
||||||
if (me && (note.userId === me.id)) {
|
if (me && (note.userId === me.id)) {
|
||||||
|
@ -127,6 +124,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// TODO: フィルタした結果件数が足りなかった場合の対応
|
||||||
|
|
||||||
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
||||||
|
|
||||||
process.nextTick(() => {
|
process.nextTick(() => {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import { DI } from '@/di-symbols.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { CacheService } from '@/core/CacheService.js';
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ['notes'],
|
tags: ['notes'],
|
||||||
|
@ -62,8 +63,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private activeUsersChart: ActiveUsersChart,
|
private activeUsersChart: ActiveUsersChart,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const [
|
const [
|
||||||
followings,
|
followings,
|
||||||
userIdsWhoMeMuting,
|
userIdsWhoMeMuting,
|
||||||
|
@ -76,16 +81,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
this.cacheService.userBlockedCache.fetch(me.id),
|
this.cacheService.userBlockedCache.fetch(me.id),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let timeline: MiNote[] = [];
|
let noteIds = await this.redisTimelineService.get(ps.withFiles ? `homeTimelineWithFiles:${me.id}` : `homeTimeline:${me.id}`, untilId, sinceId);
|
||||||
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const noteIds = await this.redisForTimelines.xrevrange(
|
|
||||||
ps.withFiles ? `homeTimelineWithFiles:${me.id}` : `homeTimeline:${me.id}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId));
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
if (noteIds.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
|
@ -100,7 +97,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
timeline = await query.getMany();
|
let timeline = await query.getMany();
|
||||||
|
|
||||||
timeline = timeline.filter(note => {
|
timeline = timeline.filter(note => {
|
||||||
if (note.userId === me.id) {
|
if (note.userId === me.id) {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import { DI } from '@/di-symbols.js';
|
||||||
import { CacheService } from '@/core/CacheService.js';
|
import { CacheService } from '@/core/CacheService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -79,8 +80,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private activeUsersChart: ActiveUsersChart,
|
private activeUsersChart: ActiveUsersChart,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const list = await this.userListsRepository.findOneBy({
|
const list = await this.userListsRepository.findOneBy({
|
||||||
id: ps.listId,
|
id: ps.listId,
|
||||||
userId: me.id,
|
userId: me.id,
|
||||||
|
@ -100,16 +105,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
this.cacheService.userBlockedCache.fetch(me.id),
|
this.cacheService.userBlockedCache.fetch(me.id),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let timeline: MiNote[] = [];
|
let noteIds = await this.redisTimelineService.get(ps.withFiles ? `userListTimelineWithFiles:${list.id}` : `userListTimeline:${list.id}`, untilId, sinceId);
|
||||||
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const noteIds = await this.redisForTimelines.xrevrange(
|
|
||||||
ps.withFiles ? `userListTimelineWithFiles:${list.id}` : `userListTimeline:${list.id}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId));
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
if (noteIds.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
|
@ -124,7 +121,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
.leftJoinAndSelect('renote.user', 'renoteUser')
|
.leftJoinAndSelect('renote.user', 'renoteUser')
|
||||||
.leftJoinAndSelect('note.channel', 'channel');
|
.leftJoinAndSelect('note.channel', 'channel');
|
||||||
|
|
||||||
timeline = await query.getMany();
|
let timeline = await query.getMany();
|
||||||
|
|
||||||
timeline = timeline.filter(note => {
|
timeline = timeline.filter(note => {
|
||||||
if (note.userId === me.id) {
|
if (note.userId === me.id) {
|
||||||
|
|
|
@ -11,6 +11,7 @@ import { QueryService } from '@/core/QueryService.js';
|
||||||
import { DI } from '@/di-symbols.js';
|
import { DI } from '@/di-symbols.js';
|
||||||
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -65,8 +66,12 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
private noteEntityService: NoteEntityService,
|
private noteEntityService: NoteEntityService,
|
||||||
private queryService: QueryService,
|
private queryService: QueryService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
|
||||||
const role = await this.rolesRepository.findOneBy({
|
const role = await this.rolesRepository.findOneBy({
|
||||||
id: ps.roleId,
|
id: ps.roleId,
|
||||||
isPublic: true,
|
isPublic: true,
|
||||||
|
@ -78,14 +83,9 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
if (!role.isExplorable) {
|
if (!role.isExplorable) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const noteIds = await this.redisForTimelines.xrevrange(
|
let noteIds = await this.redisTimelineService.get(`roleTimeline:${role.id}`, untilId, sinceId);
|
||||||
`roleTimeline:${role.id}`,
|
noteIds = noteIds.slice(0, ps.limit);
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId));
|
|
||||||
|
|
||||||
if (noteIds.length === 0) {
|
if (noteIds.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
|
|
|
@ -14,6 +14,7 @@ import { CacheService } from '@/core/CacheService.js';
|
||||||
import { IdService } from '@/core/IdService.js';
|
import { IdService } from '@/core/IdService.js';
|
||||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||||
import { QueryService } from '@/core/QueryService.js';
|
import { QueryService } from '@/core/QueryService.js';
|
||||||
|
import { RedisTimelineService } from '@/core/RedisTimelineService.js';
|
||||||
import { ApiError } from '../../error.js';
|
import { ApiError } from '../../error.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
|
@ -70,42 +71,24 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
private queryService: QueryService,
|
private queryService: QueryService,
|
||||||
private cacheService: CacheService,
|
private cacheService: CacheService,
|
||||||
private idService: IdService,
|
private idService: IdService,
|
||||||
|
private redisTimelineService: RedisTimelineService,
|
||||||
) {
|
) {
|
||||||
super(meta, paramDef, async (ps, me) => {
|
super(meta, paramDef, async (ps, me) => {
|
||||||
const isRangeSpecified = (ps.sinceId != null || ps.sinceDate != null) && (ps.untilId != null || ps.untilDate != null);
|
const untilId = ps.untilId ?? ps.untilDate ? this.idService.genId(new Date(ps.untilDate!)) : null;
|
||||||
|
const sinceId = ps.sinceId ?? ps.sinceDate ? this.idService.genId(new Date(ps.sinceDate!)) : null;
|
||||||
|
const isRangeSpecified = untilId != null && sinceId != null;
|
||||||
|
|
||||||
if (isRangeSpecified || !(ps.sinceId != null || ps.sinceDate != null)) {
|
if (isRangeSpecified || sinceId == null) {
|
||||||
const [
|
const [
|
||||||
userIdsWhoMeMuting,
|
userIdsWhoMeMuting,
|
||||||
] = me ? await Promise.all([
|
] = me ? await Promise.all([
|
||||||
this.cacheService.userMutingsCache.fetch(me.id),
|
this.cacheService.userMutingsCache.fetch(me.id),
|
||||||
]) : [new Set<string>()];
|
]) : [new Set<string>()];
|
||||||
|
|
||||||
const limit = ps.limit + (ps.untilId ? 1 : 0) + (ps.sinceId ? 1 : 0); // untilIdに指定したものも含まれるため+1
|
|
||||||
|
|
||||||
const [noteIdsRes, repliesNoteIdsRes, channelNoteIdsRes] = await Promise.all([
|
const [noteIdsRes, repliesNoteIdsRes, channelNoteIdsRes] = await Promise.all([
|
||||||
this.redisForTimelines.xrevrange(
|
this.redisTimelineService.get(ps.withFiles ? `userTimelineWithFiles:${ps.userId}` : `userTimeline:${ps.userId}`, untilId, sinceId),
|
||||||
ps.withFiles ? `userTimelineWithFiles:${ps.userId}` : `userTimeline:${ps.userId}`,
|
ps.withReplies ? this.redisTimelineService.get(`userTimelineWithReplies:${ps.userId}`, untilId, sinceId) : Promise.resolve([]),
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
ps.withChannelNotes ? this.redisTimelineService.get(`userTimelineWithChannel:${ps.userId}`, untilId, sinceId) : Promise.resolve([]),
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId)),
|
|
||||||
ps.withReplies
|
|
||||||
? this.redisForTimelines.xrevrange(
|
|
||||||
`userTimelineWithReplies:${ps.userId}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId))
|
|
||||||
: Promise.resolve([]),
|
|
||||||
ps.withChannelNotes
|
|
||||||
? this.redisForTimelines.xrevrange(
|
|
||||||
`userTimelineWithChannel:${ps.userId}`,
|
|
||||||
ps.untilId ? this.idService.parse(ps.untilId).date.getTime() : ps.untilDate ?? '+',
|
|
||||||
ps.sinceId ? this.idService.parse(ps.sinceId).date.getTime() : ps.sinceDate ?? '-',
|
|
||||||
'COUNT', limit,
|
|
||||||
).then(res => res.map(x => x[1][1]).filter(x => x !== ps.untilId && x !== ps.sinceId))
|
|
||||||
: Promise.resolve([]),
|
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let noteIds = Array.from(new Set([
|
let noteIds = Array.from(new Set([
|
||||||
|
@ -145,6 +128,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// TODO: フィルタで件数が減った場合の埋め合わせ処理
|
||||||
|
|
||||||
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
timeline.sort((a, b) => a.id > b.id ? -1 : 1);
|
||||||
|
|
||||||
if (timeline.length > 0) {
|
if (timeline.length > 0) {
|
||||||
|
@ -153,9 +138,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fallback to database
|
//#region fallback to database
|
||||||
|
|
||||||
//#region Construct query
|
|
||||||
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate)
|
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate)
|
||||||
.andWhere('note.userId = :userId', { userId: ps.userId })
|
.andWhere('note.userId = :userId', { userId: ps.userId })
|
||||||
.innerJoinAndSelect('note.user', 'user')
|
.innerJoinAndSelect('note.user', 'user')
|
||||||
|
@ -188,11 +171,11 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
|
||||||
qb.orWhere('0 < (SELECT COUNT(*) FROM poll WHERE poll."noteId" = note.id)');
|
qb.orWhere('0 < (SELECT COUNT(*) FROM poll WHERE poll."noteId" = note.id)');
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
//#endregion
|
|
||||||
|
|
||||||
const timeline = await query.limit(ps.limit).getMany();
|
const timeline = await query.limit(ps.limit).getMany();
|
||||||
|
|
||||||
return await this.noteEntityService.packMany(timeline, me);
|
return await this.noteEntityService.packMany(timeline, me);
|
||||||
|
//#endregion
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue