From 80a2172715b6dd225a331d8f2cbccc78dcbd1302 Mon Sep 17 00:00:00 2001
From: syuilo <syuilotan@yahoo.co.jp>
Date: Sun, 10 Mar 2019 19:16:33 +0900
Subject: [PATCH] Resolve #4462

---
 .../common/views/deck/deck.widgets-column.vue |   1 +
 src/client/app/common/views/widgets/index.ts  |   1 +
 src/client/app/common/views/widgets/queue.vue | 157 ++++++++++++++++++
 src/client/app/desktop/views/home/home.vue    |   1 +
 src/client/app/mobile/views/pages/widgets.vue |   1 +
 src/daemons/queue-stats.ts                    |  43 +++++
 src/index.ts                                  |   2 +
 src/server/api/stream/channels/index.ts       |   2 +
 src/server/api/stream/channels/queue-stats.ts |  41 +++++
 9 files changed, 249 insertions(+)
 create mode 100644 src/client/app/common/views/widgets/queue.vue
 create mode 100644 src/daemons/queue-stats.ts
 create mode 100644 src/server/api/stream/channels/queue-stats.ts

diff --git a/src/client/app/common/views/deck/deck.widgets-column.vue b/src/client/app/common/views/deck/deck.widgets-column.vue
index 47a584a53a..fcf3afd3f7 100644
--- a/src/client/app/common/views/deck/deck.widgets-column.vue
+++ b/src/client/app/common/views/deck/deck.widgets-column.vue
@@ -26,6 +26,7 @@
 					<option value="hashtags">{{ $t('@.widgets.hashtags') }}</option>
 					<option value="posts-monitor">{{ $t('@.widgets.posts-monitor') }}</option>
 					<option value="server">{{ $t('@.widgets.server') }}</option>
+					<option value="queue">{{ $t('@.widgets.queue') }}</option>
 					<option value="nav">{{ $t('@.widgets.nav') }}</option>
 					<option value="tips">{{ $t('@.widgets.tips') }}</option>
 				</select>
diff --git a/src/client/app/common/views/widgets/index.ts b/src/client/app/common/views/widgets/index.ts
index 05aa08375b..d923a01941 100644
--- a/src/client/app/common/views/widgets/index.ts
+++ b/src/client/app/common/views/widgets/index.ts
@@ -31,3 +31,4 @@ Vue.component('mkw-version', wVersion);
 Vue.component('mkw-hashtags', wHashtags);
 Vue.component('mkw-instance', wInstance);
 Vue.component('mkw-post-form', wPostForm);
+Vue.component('mkw-queue', () => import('./queue.vue').then(m => m.default));
diff --git a/src/client/app/common/views/widgets/queue.vue b/src/client/app/common/views/widgets/queue.vue
new file mode 100644
index 0000000000..18bfeb3ba9
--- /dev/null
+++ b/src/client/app/common/views/widgets/queue.vue
@@ -0,0 +1,157 @@
+<template>
+<div>
+	<ui-container :show-header="!props.compact">
+		<template #header><fa :icon="faTasks"/>Queue</template>
+
+		<div class="mntrproz">
+			<div>
+				<b>In</b>
+				<span v-if="latestStats">{{ latestStats.inbox.active | number }} / {{ latestStats.inbox.delayed | number }}</span>
+				<div ref="in"></div>
+			</div>
+			<div>
+				<b>Out</b>
+				<span v-if="latestStats">{{ latestStats.deliver.active | number }} / {{ latestStats.deliver.delayed | number }}</span>
+				<div ref="out"></div>
+			</div>
+		</div>
+	</ui-container>
+</div>
+</template>
+
+<script lang="ts">
+import define from '../../define-widget';
+import { faTasks } from '@fortawesome/free-solid-svg-icons';
+import ApexCharts from 'apexcharts';
+
+export default define({
+	name: 'queue',
+	props: () => ({
+		compact: false
+	})
+}).extend({
+	data() {
+		return {
+			stats: [],
+			inChart: null,
+			outChart: null,
+			faTasks
+		};
+	},
+
+	watch: {
+		stats(stats) {
+			this.inChart.updateSeries([{
+				data: stats.map((x, i) => ({ x: i, y: x.inbox.active }))
+			}, {
+				data: stats.map((x, i) => ({ x: i, y: x.inbox.delayed }))
+			}]);
+			this.outChart.updateSeries([{
+				data: stats.map((x, i) => ({ x: i, y: x.deliver.active }))
+			}, {
+				data: stats.map((x, i) => ({ x: i, y: x.deliver.delayed }))
+			}]);
+		}
+	},
+
+	computed: {
+		latestStats(): any {
+			return this.stats[this.stats.length - 1];
+		}
+	},
+
+	mounted() {
+		const chartOpts = {
+			chart: {
+				type: 'area',
+				height: 70,
+				animations: {
+					dynamicAnimation: {
+						enabled: false
+					}
+				},
+				sparkline: {
+					enabled: true,
+				}
+			},
+			tooltip: {
+				enabled: false
+			},
+			stroke: {
+				curve: 'straight',
+				width: 1
+			},
+			series: [{
+				data: [] as any
+			}, {
+				data: [] as any
+			}],
+			yaxis: {
+				min: 0,
+			}
+		};
+
+		this.inChart = new ApexCharts(this.$refs.in, chartOpts);
+		this.outChart = new ApexCharts(this.$refs.out, chartOpts);
+
+		this.inChart.render();
+		this.outChart.render();
+
+		const connection = this.$root.stream.useSharedConnection('queueStats');
+		connection.on('stats', this.onStats);
+		connection.on('statsLog', this.onStatsLog);
+		connection.send('requestLog', {
+			id: Math.random().toString().substr(2, 8),
+			length: 50
+		});
+
+		this.$once('hook:beforeDestroy', () => {
+			connection.dispose();
+			this.inChart.destroy();
+			this.outChart.destroy();
+		});
+	},
+
+	methods: {
+		func() {
+			this.props.compact = !this.props.compact;
+			this.save();
+		},
+
+		onStats(stats) {
+			this.stats.push(stats);
+			if (this.stats.length > 50) this.stats.shift();
+		},
+
+		onStatsLog(statsLog) {
+			for (const stats of statsLog.reverse()) {
+				this.onStats(stats);
+			}
+		}
+	}
+});
+</script>
+
+<style lang="stylus" scoped>
+.mntrproz
+	display flex
+	padding 4px
+
+	> div
+		width 50%
+		padding 4px
+
+		> b
+			display block
+			font-size 12px
+			color var(--text)
+
+		> span
+			position absolute
+			top 4px
+			right 4px
+			opacity 0.7
+			font-size 12px
+			color var(--text)
+
+</style>
diff --git a/src/client/app/desktop/views/home/home.vue b/src/client/app/desktop/views/home/home.vue
index 740aa1289d..fb7af5a9ad 100644
--- a/src/client/app/desktop/views/home/home.vue
+++ b/src/client/app/desktop/views/home/home.vue
@@ -27,6 +27,7 @@
 						<option value="hashtags">{{ $t('@.widgets.hashtags') }}</option>
 						<option value="posts-monitor">{{ $t('@.widgets.posts-monitor') }}</option>
 						<option value="server">{{ $t('@.widgets.server') }}</option>
+						<option value="queue">{{ $t('@.widgets.queue') }}</option>
 						<option value="nav">{{ $t('@.widgets.nav') }}</option>
 						<option value="tips">{{ $t('@.widgets.tips') }}</option>
 					</select>
diff --git a/src/client/app/mobile/views/pages/widgets.vue b/src/client/app/mobile/views/pages/widgets.vue
index 7722104aff..96dcb977fa 100644
--- a/src/client/app/mobile/views/pages/widgets.vue
+++ b/src/client/app/mobile/views/pages/widgets.vue
@@ -19,6 +19,7 @@
 					<option value="posts-monitor">{{ $t('@.widgets.posts-monitor') }}</option>
 					<option value="version">{{ $t('@.widgets.version') }}</option>
 					<option value="server">{{ $t('@.widgets.server') }}</option>
+					<option value="queue">{{ $t('@.widgets.queue') }}</option>
 					<option value="memo">{{ $t('@.widgets.memo') }}</option>
 					<option value="nav">{{ $t('@.widgets.nav') }}</option>
 					<option value="tips">{{ $t('@.widgets.tips') }}</option>
diff --git a/src/daemons/queue-stats.ts b/src/daemons/queue-stats.ts
new file mode 100644
index 0000000000..26f2bf7c03
--- /dev/null
+++ b/src/daemons/queue-stats.ts
@@ -0,0 +1,43 @@
+import * as Deque from 'double-ended-queue';
+import Xev from 'xev';
+import { deliverQueue, inboxQueue } from '../queue';
+
+const ev = new Xev();
+
+const interval = 1000;
+
+/**
+ * Report queue stats regularly
+ */
+export default function() {
+	const log = new Deque<any>();
+
+	ev.on('requestQueueStatsLog', x => {
+		ev.emit(`queueStatsLog:${x.id}`, log.toArray().slice(0, x.length || 50));
+	});
+
+	async function tick() {
+		const deliverJobCounts = await deliverQueue.getJobCounts();
+		const inboxJobCounts = await inboxQueue.getJobCounts();
+
+		const stats = {
+			deliver: {
+				active: Math.floor(Math.random() * 100),
+				delayed: Math.floor(Math.random() * 1000),
+			},
+			inbox: {
+				active: Math.floor(Math.random() * 100),
+				delayed: Math.floor(Math.random() * 1000),
+			}
+		};
+
+		ev.emit('queueStats', stats);
+
+		log.unshift(stats);
+		if (log.length > 200) log.pop();
+	}
+
+	tick();
+
+	setInterval(tick, interval);
+}
diff --git a/src/index.ts b/src/index.ts
index 0df38b5966..e55ba5115d 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -16,6 +16,7 @@ import Xev from 'xev';
 import Logger from './services/logger';
 import serverStats from './daemons/server-stats';
 import notesStats from './daemons/notes-stats';
+import queueStats from './daemons/queue-stats';
 import loadConfig from './config/load';
 import { Config } from './config/types';
 import { lessThan } from './prelude/array';
@@ -50,6 +51,7 @@ function main() {
 		if (program.daemons) {
 			serverStats();
 			notesStats();
+			queueStats();
 		}
 	}
 
diff --git a/src/server/api/stream/channels/index.ts b/src/server/api/stream/channels/index.ts
index 02f71b5851..4527fb1e46 100644
--- a/src/server/api/stream/channels/index.ts
+++ b/src/server/api/stream/channels/index.ts
@@ -5,6 +5,7 @@ import hybridTimeline from './hybrid-timeline';
 import globalTimeline from './global-timeline';
 import notesStats from './notes-stats';
 import serverStats from './server-stats';
+import queueStats from './queue-stats';
 import userList from './user-list';
 import messaging from './messaging';
 import messagingIndex from './messaging-index';
@@ -23,6 +24,7 @@ export default {
 	globalTimeline,
 	notesStats,
 	serverStats,
+	queueStats,
 	userList,
 	messaging,
 	messagingIndex,
diff --git a/src/server/api/stream/channels/queue-stats.ts b/src/server/api/stream/channels/queue-stats.ts
new file mode 100644
index 0000000000..0bda0cfcb9
--- /dev/null
+++ b/src/server/api/stream/channels/queue-stats.ts
@@ -0,0 +1,41 @@
+import autobind from 'autobind-decorator';
+import Xev from 'xev';
+import Channel from '../channel';
+
+const ev = new Xev();
+
+export default class extends Channel {
+	public readonly chName = 'queueStats';
+	public static shouldShare = true;
+	public static requireCredential = false;
+
+	@autobind
+	public async init(params: any) {
+		ev.addListener('queueStats', this.onStats);
+	}
+
+	@autobind
+	private onStats(stats: any) {
+		this.send('stats', stats);
+	}
+
+	@autobind
+	public onMessage(type: string, body: any) {
+		switch (type) {
+			case 'requestLog':
+				ev.once(`queueStatsLog:${body.id}`, statsLog => {
+					this.send('statsLog', statsLog);
+				});
+				ev.emit('requestQueueStatsLog', {
+					id: body.id,
+					length: body.length
+				});
+				break;
+		}
+	}
+
+	@autobind
+	public dispose() {
+		ev.removeListener('queueStats', this.onStats);
+	}
+}