diff --git a/.changeset/tasty-ghosts-raise.md b/.changeset/tasty-ghosts-raise.md new file mode 100644 index 0000000000..4e9de7043a --- /dev/null +++ b/.changeset/tasty-ghosts-raise.md @@ -0,0 +1,8 @@ +--- +'@backstage/plugin-search-backend-node': patch +--- + +Change search scheduler from starting indexing in a fixed interval (for example +every 60 seconds), to wait a fixed time between index runs. +This makes sure that no second index process for the same document type is +started when the previous one is still running. diff --git a/plugins/search-backend-node/src/Scheduler.ts b/plugins/search-backend-node/src/Scheduler.ts index 6c0b748b03..3e356aa6aa 100644 --- a/plugins/search-backend-node/src/Scheduler.ts +++ b/plugins/search-backend-node/src/Scheduler.ts @@ -15,6 +15,7 @@ */ import { Logger } from 'winston'; +import { runPeriodically } from './runPeriodically'; type TaskEnvelope = { task: Function; @@ -28,7 +29,7 @@ type TaskEnvelope = { export class Scheduler { private logger: Logger; private schedule: TaskEnvelope[]; - private intervalTimeouts: NodeJS.Timeout[] = []; + private runningTasks: Function[] = []; constructor({ logger }: { logger: Logger }) { this.logger = logger; @@ -36,11 +37,12 @@ export class Scheduler { } /** - * Adds each task and interval to the schedule - * + * Adds each task and interval to the schedule. + * When running the tasks, the scheduler waits at least for the time specified + * in the interval once the task was completed, before running it again. */ addToSchedule(task: Function, interval: number) { - if (this.intervalTimeouts.length) { + if (this.runningTasks.length) { throw new Error( 'Cannot add task to schedule that has already been started.', ); @@ -54,13 +56,7 @@ export class Scheduler { start() { this.logger.info('Starting all scheduled search tasks.'); this.schedule.forEach(({ task, interval }) => { - // Fire the task immediately, then schedule it. - task(); - this.intervalTimeouts.push( - setInterval(() => { - task(); - }, interval), - ); + this.runningTasks.push(runPeriodically(() => task(), interval)); }); } @@ -69,9 +65,9 @@ export class Scheduler { */ stop() { this.logger.info('Stopping all scheduled search tasks.'); - this.intervalTimeouts.forEach(timeout => { - clearInterval(timeout); + this.runningTasks.forEach(cancel => { + cancel(); }); - this.intervalTimeouts = []; + this.runningTasks = []; } } diff --git a/plugins/search-backend-node/src/runPeriodically.test.ts b/plugins/search-backend-node/src/runPeriodically.test.ts new file mode 100644 index 0000000000..4d8e991b9e --- /dev/null +++ b/plugins/search-backend-node/src/runPeriodically.test.ts @@ -0,0 +1,80 @@ +/* + * Copyright 2020 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { runPeriodically } from './runPeriodically'; + +jest.useFakeTimers(); + +describe('runPeriodically', () => { + const flushPromises = () => new Promise(setImmediate); + const advanceTimersByTime = async (time: number) => { + jest.advanceTimersByTime(time); + // Advancing the time with jest doesn't run all promises, but only sync code + await flushPromises(); + }; + + it('runs task initially', async () => { + const task = jest.fn(async () => {}); + const cancel = runPeriodically(task, 1000); + expect(task).toHaveBeenCalledTimes(1); + cancel(); + }); + + it('runs at requested interval', async () => { + const task = jest.fn(async () => {}); + const cancel = runPeriodically(task, 1000); + await flushPromises(); + await advanceTimersByTime(1000); + await advanceTimersByTime(1000); + expect(task).toHaveBeenCalledTimes(3); + cancel(); + }); + + it('stops after being canceled', async () => { + const task = jest.fn(async () => {}); + const cancel = runPeriodically(task, 1000); + await flushPromises(); + cancel(); + await advanceTimersByTime(1000); + await advanceTimersByTime(1000); + expect(task).toHaveBeenCalledTimes(1); + }); + + it('continues running after failures', async () => { + const task = jest.fn(async () => { + throw new Error(); + }); + const cancel = runPeriodically(task, 1000); + await flushPromises(); + await advanceTimersByTime(1000); + await advanceTimersByTime(1000); + expect(task).toHaveBeenCalledTimes(3); + cancel(); + }); + + it('waits till a long running task is completed', async () => { + const task = jest.fn( + () => new Promise(resolve => setTimeout(resolve, 10000)), + ); + const cancel = runPeriodically(task, 1000); + await flushPromises(); + await advanceTimersByTime(1000); + expect(task).toHaveBeenCalledTimes(1); + await advanceTimersByTime(9000); + await advanceTimersByTime(1000); + expect(task).toHaveBeenCalledTimes(2); + cancel(); + }); +}); diff --git a/plugins/search-backend-node/src/runPeriodically.ts b/plugins/search-backend-node/src/runPeriodically.ts new file mode 100644 index 0000000000..d5f5f87d7f --- /dev/null +++ b/plugins/search-backend-node/src/runPeriodically.ts @@ -0,0 +1,54 @@ +/* + * Copyright 2020 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Runs a function repeatedly, with a fixed wait between invocations. + * + * Supports async functions, and silently ignores exceptions and rejections. + * + * @param fn The function to run. May return a Promise. + * @param delayMs The delay between a completed function invocation and the + * next. + * @returns A function that, when called, stops the invocation loop. + */ +export function runPeriodically(fn: () => any, delayMs: number): () => void { + let cancel: () => void; + let cancelled = false; + const cancellationPromise = new Promise(resolve => { + cancel = () => { + resolve(); + cancelled = true; + }; + }); + + const startRefresh = async () => { + while (!cancelled) { + try { + await fn(); + } catch { + // ignore intentionally + } + + await Promise.race([ + new Promise(resolve => setTimeout(resolve, delayMs)), + cancellationPromise, + ]); + } + }; + startRefresh(); + + return cancel!; +}