Improve chart engine (#8253)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Update core.ts

* wip

* wip

* #7361

* delete network chart

* federationChart強化 apRequestChart追加

* tweak
This commit is contained in:
syuilo 2022-02-06 00:13:52 +09:00 committed by GitHub
parent 0b462feff6
commit c1b264e4e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
65 changed files with 1616 additions and 1756 deletions

View file

@ -7,24 +7,19 @@
import * as nestedProperty from 'nested-property';
import autobind from 'autobind-decorator';
import Logger from '../logger';
import { Schema } from '@/misc/schema';
import { EntitySchema, getRepository, Repository, LessThan, Between } from 'typeorm';
import { dateUTC, isTimeSame, isTimeBefore, subtractTime, addTime } from '@/prelude/time';
import { getChartInsertLock } from '@/misc/app-lock';
const logger = new Logger('chart', 'white', process.env.NODE_ENV !== 'test');
export type Obj = { [key: string]: any };
const columnPrefix = '___' as const;
const uniqueTempColumnPrefix = 'unique_temp___' as const;
const columnDot = '_' as const;
export type DeepPartial<T> = {
[P in keyof T]?: DeepPartial<T[P]>;
};
type KeyToColumnName<T extends string> = T extends `${infer R1}.${infer R2}` ? `${R1}${typeof columnDot}${KeyToColumnName<R2>}` : T;
type ArrayValue<T> = {
[P in keyof T]: T[P] extends number ? T[P][] : ArrayValue<T[P]>;
};
type Log = {
type RawRecord<S extends Schema> = {
id: number;
/**
@ -36,6 +31,10 @@ type Log = {
* Unixタイムスタンプ()
*/
date: number;
} & {
[K in keyof S as `${typeof uniqueTempColumnPrefix}${KeyToColumnName<string & K>}`]: S[K]['uniqueIncrement'] extends true ? string[] : never;
} & {
[K in keyof S as `${typeof columnPrefix}${KeyToColumnName<string & K>}`]: number;
};
const camelToSnake = (str: string): string => {
@ -44,123 +43,72 @@ const camelToSnake = (str: string): string => {
const removeDuplicates = (array: any[]) => Array.from(new Set(array));
type Schema = Record<string, {
uniqueIncrement?: boolean;
range?: 'big' | 'small' | 'medium';
// previousな値を引き継ぐかどうか
accumulate?: boolean;
}>;
type Commit<S extends Schema> = {
[K in keyof S]?: S[K]['uniqueIncrement'] extends true ? string[] : number;
};
export type KVs<S extends Schema> = {
[K in keyof S]: number;
};
type ChartResult<T extends Schema> = {
[P in keyof T]: number[];
};
/**
*
*/
// eslint-disable-next-line import/no-default-export
export default abstract class Chart<T extends Record<string, any>> {
private static readonly columnPrefix = '___';
private static readonly columnDot = '_';
export default abstract class Chart<T extends Schema> {
public schema: T;
private name: string;
private buffer: {
diff: DeepPartial<T>;
diff: Commit<T>;
group: string | null;
}[] = [];
public schema: Schema;
protected repositoryForHour: Repository<Log>;
protected repositoryForDay: Repository<Log>;
protected repositoryForHour: Repository<RawRecord<T>>;
protected repositoryForDay: Repository<RawRecord<T>>;
protected abstract genNewLog(latest: T): DeepPartial<T>;
/**
* @param logs
*/
protected abstract aggregate(logs: T[]): T;
protected abstract fetchActual(group: string | null): Promise<DeepPartial<T>>;
protected abstract queryCurrentState(group: string | null): Promise<Partial<KVs<T>>>;
@autobind
private static convertSchemaToFlatColumnDefinitions(schema: Schema) {
const columns = {} as Record<string, unknown>;
const flatColumns = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}${this.columnDot}${k}` : k;
if (v.type === 'object') {
flatColumns(v.properties, p);
} else if (v.type === 'number') {
columns[this.columnPrefix + p] = {
type: 'bigint',
};
} else if (v.type === 'array' && v.items.type === 'string') {
columns[this.columnPrefix + p] = {
type: 'varchar',
array: true,
};
}
private static convertSchemaToColumnDefinitions(schema: Schema): Record<string, { type: string; array?: boolean; default?: any; }> {
const columns = {} as Record<string, { type: string; array?: boolean; default?: any; }>;
for (const [k, v] of Object.entries(schema)) {
const name = k.replaceAll('.', columnDot);
const type = v.range === 'big' ? 'bigint' : v.range === 'small' ? 'smallint' : 'integer';
if (v.uniqueIncrement) {
columns[uniqueTempColumnPrefix + name] = {
type: 'varchar',
array: true,
default: '{}',
};
columns[columnPrefix + name] = {
type,
default: 0,
};
} else {
columns[columnPrefix + name] = {
type,
default: 0,
};
}
};
flatColumns(schema.properties!);
}
return columns;
}
@autobind
private static convertFlattenColumnsToObject(x: Record<string, unknown>): Record<string, unknown> {
const obj = {} as Record<string, unknown>;
for (const k of Object.keys(x).filter(k => k.startsWith(Chart.columnPrefix))) {
// now k is ___x_y_z
const path = k.substr(Chart.columnPrefix.length).split(Chart.columnDot).join('.');
nestedProperty.set(obj, path, x[k]);
}
return obj;
}
@autobind
private static convertObjectToFlattenColumns(x: Record<string, unknown>) {
const columns = {} as Record<string, number | unknown[]>;
const flatten = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}${this.columnDot}${k}` : k;
if (typeof v === 'object' && !Array.isArray(v)) {
flatten(v, p);
} else {
columns[this.columnPrefix + p] = v;
}
}
};
flatten(x);
return columns;
}
@autobind
private static countUniqueFields(x: Record<string, unknown>) {
const exec = (x: Obj) => {
const res = {} as Record<string, unknown>;
for (const [k, v] of Object.entries(x)) {
if (typeof v === 'object' && !Array.isArray(v)) {
res[k] = exec(v);
} else if (Array.isArray(v)) {
res[k] = Array.from(new Set(v)).length;
} else {
res[k] = v;
}
}
return res;
};
return exec(x);
}
@autobind
private static convertQuery(diff: Record<string, number | unknown[]>) {
const query: Record<string, () => string> = {};
for (const [k, v] of Object.entries(diff)) {
if (typeof v === 'number') {
if (v > 0) query[k] = () => `"${k}" + ${v}`;
if (v < 0) query[k] = () => `"${k}" - ${Math.abs(v)}`;
} else if (Array.isArray(v)) {
// TODO: item が文字列以外の場合も対応
// TODO: item をSQLエスケープ
const items = v.map(item => `"${item}"`).join(',');
query[k] = () => `array_cat("${k}", '{${items}}'::varchar[])`;
}
}
return query;
}
@autobind
private static dateToTimestamp(x: Date): Log['date'] {
private static dateToTimestamp(x: Date): number {
return Math.floor(x.getTime() / 1000);
}
@ -207,7 +155,7 @@ export default abstract class Chart<T extends Record<string, any>> {
length: 128,
},
} : {}),
...Chart.convertSchemaToFlatColumnDefinitions(schema),
...Chart.convertSchemaToColumnDefinitions(schema),
},
indices: [{
columns: grouped ? ['date', 'group'] : ['date'],
@ -233,37 +181,39 @@ export default abstract class Chart<T extends Record<string, any>> {
};
}
constructor(name: string, schema: Schema, grouped = false) {
constructor(name: string, schema: T, grouped = false) {
this.name = name;
this.schema = schema;
const { hour, day } = Chart.schemaToEntity(name, schema, grouped);
this.repositoryForHour = getRepository<Log>(hour);
this.repositoryForDay = getRepository<Log>(day);
this.repositoryForHour = getRepository<RawRecord<T>>(hour);
this.repositoryForDay = getRepository<RawRecord<T>>(day);
}
@autobind
private getNewLog(latest: T | null): T {
const log = latest ? this.genNewLog(latest) : {};
const flatColumns = (x: Obj, path?: string) => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}.${k}` : k;
if (v.type === 'object') {
flatColumns(v.properties, p);
} else {
if (nestedProperty.get(log, p) == null) {
const emptyValue = v.type === 'number' ? 0 : [];
nestedProperty.set(log, p, emptyValue);
}
}
private convertRawRecord(x: RawRecord<T>): KVs<T> {
const kvs = {} as KVs<T>;
for (const k of Object.keys(x).filter(k => k.startsWith(columnPrefix))) {
kvs[k.substr(columnPrefix.length).split(columnDot).join('.')] = x[k];
}
return kvs;
}
@autobind
private getNewLog(latest: KVs<T> | null): KVs<T> {
const log = {} as Record<keyof T, number>;
for (const [k, v] of Object.entries(this.schema)) {
if (v.accumulate && latest) {
log[k] = latest[k];
} else {
log[k] = 0;
}
};
flatColumns(this.schema.properties!);
return log as T;
}
return log as KVs<T>;
}
@autobind
private getLatestLog(group: string | null, span: 'hour' | 'day'): Promise<Log | null> {
private getLatestLog(group: string | null, span: 'hour' | 'day'): Promise<RawRecord<T> | null> {
const repository =
span === 'hour' ? this.repositoryForHour :
span === 'day' ? this.repositoryForDay :
@ -282,7 +232,7 @@ export default abstract class Chart<T extends Record<string, any>> {
* (=Hour or Day)
*/
@autobind
private async claimCurrentLog(group: string | null, span: 'hour' | 'day'): Promise<Log> {
private async claimCurrentLog(group: string | null, span: 'hour' | 'day'): Promise<RawRecord<T>> {
const [y, m, d, h] = Chart.getCurrentDate();
const current = dateUTC(
@ -306,8 +256,8 @@ export default abstract class Chart<T extends Record<string, any>> {
return currentLog;
}
let log: Log;
let data: T;
let log: RawRecord<T>;
let data: KVs<T>;
// 集計期間が変わってから、初めてのチャート更新なら
// 最も最近のログを持ってくる
@ -318,10 +268,8 @@ export default abstract class Chart<T extends Record<string, any>> {
const latest = await this.getLatestLog(group, span);
if (latest != null) {
const obj = Chart.convertFlattenColumnsToObject(latest) as T;
// 空ログデータを作成
data = this.getNewLog(obj);
data = this.getNewLog(this.convertRawRecord(latest));
} else {
// ログが存在しなかったら
// (Misskeyインスタンスを建てて初めてのチャート更新時など)
@ -346,11 +294,17 @@ export default abstract class Chart<T extends Record<string, any>> {
// ログがあればそれを返して終了
if (currentLog != null) return currentLog;
const columns = {} as Record<string, number | unknown[]>;
for (const [k, v] of Object.entries(data)) {
const name = k.replaceAll('.', columnDot);
columns[columnPrefix + name] = v;
}
// 新規ログ挿入
log = await repository.insert({
date: date,
...(group ? { group: group } : {}),
...Chart.convertObjectToFlattenColumns(data),
...columns,
}).then(x => repository.findOneOrFail(x.identifiers[0]));
logger.info(`${this.name + (group ? `:${group}` : '')}(${span}): New commit created`);
@ -362,7 +316,10 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
protected commit(diff: DeepPartial<T>, group: string | null = null): void {
protected commit(diff: Commit<T>, group: string | null = null): void {
for (const [k, v] of Object.entries(diff)) {
if (v == null || v === 0 || (Array.isArray(v) && v.length === 0)) delete diff[k];
}
this.buffer.push({
diff, group,
});
@ -381,13 +338,11 @@ export default abstract class Chart<T extends Record<string, any>> {
// そのログは本来は 01:00~ のログとしてDBに保存されて欲しいのに、02:00~ のログ扱いになってしまう。
// これを回避するための実装は複雑になりそうなため、一旦保留。
const update = async (logHour: Log, logDay: Log): Promise<void> => {
const update = async (logHour: RawRecord<T>, logDay: RawRecord<T>): Promise<void> => {
const finalDiffs = {} as Record<string, number | unknown[]>;
for (const diff of this.buffer.filter(q => q.group == null || (q.group === logHour.group)).map(q => q.diff)) {
const columns = Chart.convertObjectToFlattenColumns(diff);
for (const [k, v] of Object.entries(columns)) {
for (const [k, v] of Object.entries(diff)) {
if (finalDiffs[k] == null) {
finalDiffs[k] = v;
} else {
@ -400,18 +355,45 @@ export default abstract class Chart<T extends Record<string, any>> {
}
}
const query = Chart.convertQuery(finalDiffs);
const queryForHour: Record<string, number | (() => string)> = {};
const queryForDay: Record<string, number | (() => string)> = {};
for (const [k, v] of Object.entries(finalDiffs)) {
if (typeof v === 'number') {
const name = columnPrefix + k.replaceAll('.', columnDot);
if (v > 0) queryForHour[name] = () => `"${name}" + ${v}`;
if (v < 0) queryForHour[name] = () => `"${name}" - ${Math.abs(v)}`;
if (v > 0) queryForDay[name] = () => `"${name}" + ${v}`;
if (v < 0) queryForDay[name] = () => `"${name}" - ${Math.abs(v)}`;
} else if (Array.isArray(v) && v.length > 0) { // ユニークインクリメント
const name = uniqueTempColumnPrefix + k.replaceAll('.', columnDot);
// TODO: item が文字列以外の場合も対応
// TODO: item をSQLエスケープ
// TODO: 値が重複しないようにしたい
const items = v.map(item => `"${item}"`).join(',');
queryForHour[name] = () => `array_cat("${name}", '{${items}}'::varchar[])`;
queryForDay[name] = () => `array_cat("${name}", '{${items}}'::varchar[])`;
}
}
for (const [k, v] of Object.entries(this.schema)) {
const name = columnPrefix + k.replaceAll('.', columnDot);
if (v.uniqueIncrement) {
const tempColumnName = uniqueTempColumnPrefix + k.replaceAll('.', columnDot);
queryForHour[name] = new Set([...finalDiffs[k], ...logHour[tempColumnName]]).size;
queryForDay[name] = new Set([...finalDiffs[k], ...logDay[tempColumnName]]).size;
}
}
// ログ更新
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(query)
.set(queryForHour)
.where('id = :id', { id: logHour.id })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(query)
.set(queryForDay)
.where('id = :id', { id: logDay.id })
.execute(),
]);
@ -435,18 +417,24 @@ export default abstract class Chart<T extends Record<string, any>> {
@autobind
public async resync(group: string | null = null): Promise<void> {
const data = await this.fetchActual(group);
const data = await this.queryCurrentState(group);
const update = async (logHour: Log, logDay: Log): Promise<void> => {
const columns = {} as Record<string, number>;
for (const [k, v] of Object.entries(data)) {
const name = k.replaceAll('.', columnDot);
columns[columnPrefix + name] = v;
}
const update = async (logHour: RawRecord<T>, logDay: RawRecord<T>): Promise<void> => {
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(Chart.convertObjectToFlattenColumns(data))
.set(columns as any)
.where('id = :id', { id: logHour.id })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(Chart.convertObjectToFlattenColumns(data))
.set(columns as any)
.where('id = :id', { id: logDay.id })
.execute(),
]);
@ -460,12 +448,39 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
protected async inc(inc: DeepPartial<T>, group: string | null = null): Promise<void> {
await this.commit(inc, group);
public async clean(): Promise<void> {
const current = dateUTC(Chart.getCurrentDate());
// 一日以上前かつ三日以内
const gt = Chart.dateToTimestamp(current) - (1000 * 60 * 60 * 24 * 3);
const lt = Chart.dateToTimestamp(current) - (1000 * 60 * 60 * 24);
const columns = {} as Record<string, number>;
for (const [k, v] of Object.entries(this.schema)) {
if (v.uniqueIncrement) {
const name = k.replaceAll('.', columnDot);
columns[uniqueTempColumnPrefix + name] = [];
}
}
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(columns as any)
.where('date > :gt', { gt })
.andWhere('date < :lt', { lt })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(columns as any)
.where('date > :gt', { gt })
.andWhere('date < :lt', { lt })
.execute(),
]);
}
@autobind
public async getChart(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise<ArrayValue<T>> {
public async getChartRaw(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise<ChartResult<T>> {
const [y, m, d, h, _m, _s, _ms] = cursor ? Chart.parseDate(subtractTime(addTime(cursor, 1, span), 1)) : Chart.getCurrentDate();
const [y2, m2, d2, h2] = cursor ? Chart.parseDate(addTime(cursor, 1, span)) : [] as never;
@ -526,7 +541,7 @@ export default abstract class Chart<T extends Record<string, any>> {
}
}
const chart: T[] = [];
const chart: KVs<T>[] = [];
for (let i = (amount - 1); i >= 0; i--) {
const current =
@ -537,17 +552,16 @@ export default abstract class Chart<T extends Record<string, any>> {
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
if (log) {
const data = Chart.convertFlattenColumnsToObject(log);
chart.unshift(Chart.countUniqueFields(data) as T);
chart.unshift(this.convertRawRecord(log));
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest) as T : null;
chart.unshift(Chart.countUniqueFields(this.getNewLog(data)) as T);
const data = latest ? this.convertRawRecord(latest) : null;
chart.unshift(this.getNewLog(data));
}
}
const res = {} as Record<string, unknown>;
const res = {} as ChartResult<T>;
/**
* [{ foo: 1, bar: 5 }, { foo: 2, bar: 6 }, { foo: 3, bar: 7 }]
@ -555,36 +569,26 @@ export default abstract class Chart<T extends Record<string, any>> {
* { foo: [1, 2, 3], bar: [5, 6, 7] }
*
*/
const compact = (x: Obj, path?: string): void => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}.${k}` : k;
if (typeof v === 'object' && !Array.isArray(v)) {
compact(v, p);
for (const record of chart) {
for (const [k, v] of Object.entries(record)) {
if (res[k]) {
res[k].push(v);
} else {
const values = chart.map(s => nestedProperty.get(s, p));
nestedProperty.set(res, p, values);
res[k] = [v];
}
}
};
compact(chart[0]);
return res as ArrayValue<T>;
}
}
export function convertLog(logSchema: Schema): Schema {
const v: Schema = JSON.parse(JSON.stringify(logSchema)); // copy
if (v.type === 'number') {
v.type = 'array';
v.items = {
type: 'number' as const,
optional: false as const, nullable: false as const,
};
} else if (v.type === 'object') {
for (const k of Object.keys(v.properties!)) {
v.properties![k] = convertLog(v.properties![k]);
}
return res;
}
@autobind
public async getChart(span: 'hour' | 'day', amount: number, cursor: Date | null, group: string | null = null): Promise<Record<string, unknown>> {
const result = await this.getChartRaw(span, amount, cursor, group);
const object = {};
for (const [k, v] of Object.entries(result)) {
nestedProperty.set(object, k, v);
}
return object;
}
return v;
}