Make sure that the search scheduler doesn't run to many tasks in parallel
Signed-off-by: Oliver Sand <oliver.sand@sda-se.com>
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-node': patch
|
||||
---
|
||||
|
||||
Change search scheduler from starting indexing in a fixed interval (for example
|
||||
every 60 seconds), to wait a fixed time between index runs.
|
||||
This makes sure that no second index process for the same document type is
|
||||
started when the previous one is still running.
|
||||
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
import { Logger } from 'winston';
|
||||
import { runPeriodically } from './runPeriodically';
|
||||
|
||||
type TaskEnvelope = {
|
||||
task: Function;
|
||||
@@ -28,7 +29,7 @@ type TaskEnvelope = {
|
||||
export class Scheduler {
|
||||
private logger: Logger;
|
||||
private schedule: TaskEnvelope[];
|
||||
private intervalTimeouts: NodeJS.Timeout[] = [];
|
||||
private runningTasks: Function[] = [];
|
||||
|
||||
constructor({ logger }: { logger: Logger }) {
|
||||
this.logger = logger;
|
||||
@@ -40,7 +41,7 @@ export class Scheduler {
|
||||
*
|
||||
*/
|
||||
addToSchedule(task: Function, interval: number) {
|
||||
if (this.intervalTimeouts.length) {
|
||||
if (this.runningTasks.length) {
|
||||
throw new Error(
|
||||
'Cannot add task to schedule that has already been started.',
|
||||
);
|
||||
@@ -56,11 +57,7 @@ export class Scheduler {
|
||||
this.schedule.forEach(({ task, interval }) => {
|
||||
// Fire the task immediately, then schedule it.
|
||||
task();
|
||||
this.intervalTimeouts.push(
|
||||
setInterval(() => {
|
||||
task();
|
||||
}, interval),
|
||||
);
|
||||
this.runningTasks.push(runPeriodically(() => task(), interval));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -69,9 +66,9 @@ export class Scheduler {
|
||||
*/
|
||||
stop() {
|
||||
this.logger.info('Stopping all scheduled search tasks.');
|
||||
this.intervalTimeouts.forEach(timeout => {
|
||||
clearInterval(timeout);
|
||||
this.runningTasks.forEach(cancel => {
|
||||
cancel();
|
||||
});
|
||||
this.intervalTimeouts = [];
|
||||
this.runningTasks = [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Runs a function repeatedly, with a fixed wait between invocations.
|
||||
*
|
||||
* Supports async functions, and silently ignores exceptions and rejections.
|
||||
*
|
||||
* @param fn The function to run. May return a Promise.
|
||||
* @param delayMs The delay between a completed function invocation and the
|
||||
* next.
|
||||
* @returns A function that, when called, stops the invocation loop.
|
||||
*/
|
||||
export function runPeriodically(fn: () => any, delayMs: number): () => void {
|
||||
let cancel: () => void;
|
||||
let cancelled = false;
|
||||
const cancellationPromise = new Promise<void>(resolve => {
|
||||
cancel = () => {
|
||||
resolve();
|
||||
cancelled = true;
|
||||
};
|
||||
});
|
||||
|
||||
const startRefresh = async () => {
|
||||
while (!cancelled) {
|
||||
try {
|
||||
await fn();
|
||||
} catch {
|
||||
// ignore intentionally
|
||||
}
|
||||
|
||||
await Promise.race([
|
||||
new Promise(resolve => setTimeout(resolve, delayMs)),
|
||||
cancellationPromise,
|
||||
]);
|
||||
}
|
||||
};
|
||||
startRefresh();
|
||||
|
||||
return cancel!;
|
||||
}
|
||||
Reference in New Issue
Block a user