Merge pull request #6362 from SDA-SE/feat/techdocs-scheduler

Make sure that the search scheduler doesn't run to many tasks in parallel
This commit is contained in:
Oliver Sand
2021-07-07 19:33:07 +02:00
committed by GitHub
4 changed files with 152 additions and 14 deletions
+8
View File
@@ -0,0 +1,8 @@
---
'@backstage/plugin-search-backend-node': patch
---
Change search scheduler from starting indexing in a fixed interval (for example
every 60 seconds), to wait a fixed time between index runs.
This makes sure that no second index process for the same document type is
started when the previous one is still running.
+10 -14
View File
@@ -15,6 +15,7 @@
*/
import { Logger } from 'winston';
import { runPeriodically } from './runPeriodically';
type TaskEnvelope = {
task: Function;
@@ -28,7 +29,7 @@ type TaskEnvelope = {
export class Scheduler {
private logger: Logger;
private schedule: TaskEnvelope[];
private intervalTimeouts: NodeJS.Timeout[] = [];
private runningTasks: Function[] = [];
constructor({ logger }: { logger: Logger }) {
this.logger = logger;
@@ -36,11 +37,12 @@ export class Scheduler {
}
/**
* Adds each task and interval to the schedule
*
* Adds each task and interval to the schedule.
* When running the tasks, the scheduler waits at least for the time specified
* in the interval once the task was completed, before running it again.
*/
addToSchedule(task: Function, interval: number) {
if (this.intervalTimeouts.length) {
if (this.runningTasks.length) {
throw new Error(
'Cannot add task to schedule that has already been started.',
);
@@ -54,13 +56,7 @@ export class Scheduler {
start() {
this.logger.info('Starting all scheduled search tasks.');
this.schedule.forEach(({ task, interval }) => {
// Fire the task immediately, then schedule it.
task();
this.intervalTimeouts.push(
setInterval(() => {
task();
}, interval),
);
this.runningTasks.push(runPeriodically(() => task(), interval));
});
}
@@ -69,9 +65,9 @@ export class Scheduler {
*/
stop() {
this.logger.info('Stopping all scheduled search tasks.');
this.intervalTimeouts.forEach(timeout => {
clearInterval(timeout);
this.runningTasks.forEach(cancel => {
cancel();
});
this.intervalTimeouts = [];
this.runningTasks = [];
}
}
@@ -0,0 +1,80 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { runPeriodically } from './runPeriodically';
jest.useFakeTimers();
describe('runPeriodically', () => {
const flushPromises = () => new Promise(setImmediate);
const advanceTimersByTime = async (time: number) => {
jest.advanceTimersByTime(time);
// Advancing the time with jest doesn't run all promises, but only sync code
await flushPromises();
};
it('runs task initially', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
expect(task).toHaveBeenCalledTimes(1);
cancel();
});
it('runs at requested interval', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(3);
cancel();
});
it('stops after being canceled', async () => {
const task = jest.fn(async () => {});
const cancel = runPeriodically(task, 1000);
await flushPromises();
cancel();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(1);
});
it('continues running after failures', async () => {
const task = jest.fn(async () => {
throw new Error();
});
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(3);
cancel();
});
it('waits till a long running task is completed', async () => {
const task = jest.fn(
() => new Promise(resolve => setTimeout(resolve, 10000)),
);
const cancel = runPeriodically(task, 1000);
await flushPromises();
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(1);
await advanceTimersByTime(9000);
await advanceTimersByTime(1000);
expect(task).toHaveBeenCalledTimes(2);
cancel();
});
});
@@ -0,0 +1,54 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Runs a function repeatedly, with a fixed wait between invocations.
*
* Supports async functions, and silently ignores exceptions and rejections.
*
* @param fn The function to run. May return a Promise.
* @param delayMs The delay between a completed function invocation and the
* next.
* @returns A function that, when called, stops the invocation loop.
*/
export function runPeriodically(fn: () => any, delayMs: number): () => void {
let cancel: () => void;
let cancelled = false;
const cancellationPromise = new Promise<void>(resolve => {
cancel = () => {
resolve();
cancelled = true;
};
});
const startRefresh = async () => {
while (!cancelled) {
try {
await fn();
} catch {
// ignore intentionally
}
await Promise.race([
new Promise(resolve => setTimeout(resolve, delayMs)),
cancellationPromise,
]);
}
};
startRefresh();
return cancel!;
}