diff --git a/.changeset/ninety-eggs-argue.md b/.changeset/ninety-eggs-argue.md new file mode 100644 index 0000000000..01f861fe4b --- /dev/null +++ b/.changeset/ninety-eggs-argue.md @@ -0,0 +1,84 @@ +--- +'@backstage/plugin-search-backend-node': minor +'@backstage/create-app': patch +--- + +**BREAKING**: `IndexBuilder.addCollator()` now requires a `schedule` parameter (replacing `defaultRefreshIntervalSeconds`) which is expected to be a `TaskRunner` that is configured with the desired search indexing schedule for the given collator. + +`Scheduler.addToSchedule()` now takes a new parameter object (`ScheduleTaskParameters`) with two new options `id` and `scheduledRunner` in addition to the migrated `task` argument. + +NOTE: The search backend plugin now creates a dedicated database for coordinating indexing tasks. + +To make this change to an existing app, make the following changes to `packages/backend/src/plugins/search.ts`: + +```diff ++import { Duration } from 'luxon'; + +/* ... */ + ++ const schedule = env.scheduler.createScheduledTaskRunner({ ++ frequency: Duration.fromObject({ seconds: 600 }), ++ timeout: Duration.fromObject({ seconds: 900 }), ++ initialDelay: Duration.fromObject({ seconds: 3 }), ++ }); + + indexBuilder.addCollator({ +- defaultRefreshIntervalSeconds: 600, ++ schedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { + discovery: env.discovery, + tokenManager: env.tokenManager, + }), + }); + + indexBuilder.addCollator({ +- defaultRefreshIntervalSeconds: 600, ++ schedule, + factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, { + discovery: env.discovery, + tokenManager: env.tokenManager, + }), + }); + + const { scheduler } = await indexBuilder.build(); +- setTimeout(() => scheduler.start(), 3000); ++ scheduler.start(); +/* ... */ +``` + +NOTE: For scenarios where the `lunr` search engine is used in a multi-node configuration, a non-distributed `TaskRunner` like the following should be implemented to ensure consistency across nodes (alternatively, you can configure +the search plugin to use a non-distributed DB such as [SQLite](https://backstage.io/docs/tutorials/configuring-plugin-databases#postgresql-and-sqlite-3)): + +```diff ++import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks'; + +/* ... */ + ++ const schedule: TaskRunner = { ++ run: async (task: TaskInvocationDefinition) => { ++ const startRefresh = async () => { ++ while (!task.signal?.aborted) { ++ try { ++ await task.fn(task.signal); ++ } catch { ++ // ignore intentionally ++ } ++ ++ await new Promise(resolve => setTimeout(resolve, 600 * 1000)); ++ } ++ }; ++ startRefresh(); ++ }, ++ }; + + indexBuilder.addCollator({ +- defaultRefreshIntervalSeconds: 600, ++ schedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { + discovery: env.discovery, + tokenManager: env.tokenManager, + }), + }); + +/* ... */ +``` diff --git a/docs/features/search/concepts.md b/docs/features/search/concepts.md index 052f71376f..57ab061974 100644 --- a/docs/features/search/concepts.md +++ b/docs/features/search/concepts.md @@ -84,7 +84,9 @@ index-time. There are many ways a search index could be built and maintained, but Backstage Search chooses to completely rebuild indices on a schedule. Different collators can be configured to refresh at different intervals, depending on how often the -source information is updated. +source information is updated. When search indexing is distributed among multiple +backend nodes, coordination to prevent clashes is typically handled by a +distributed `TaskRunner`. ### The Search Page diff --git a/docs/features/search/getting-started.md b/docs/features/search/getting-started.md index f38d02a3d7..bc17b1eea2 100644 --- a/docs/features/search/getting-started.md +++ b/docs/features/search/getting-started.md @@ -149,6 +149,7 @@ import { import { PluginEnvironment } from '../types'; import { DefaultCatalogCollator } from '@backstage/plugin-catalog-backend'; import { Router } from 'express'; +import { Duration } from 'luxon'; export default async function createPlugin( env: PluginEnvironment, @@ -161,9 +162,15 @@ export default async function createPlugin( searchEngine, }); + const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 600 }), + timeout: Duration.fromObject({ seconds: 900 }), + initialDelay: Duration.fromObject({ seconds: 3 }), + }); + indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, - collator: new DefaultCatalogCollator({ + schedule: every10MinutesSchedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { discovery: env.discovery, tokenManager: env.tokenManager, }), @@ -287,32 +294,87 @@ which are responsible for providing documents number of collators with the `IndexBuilder` like this: ```typescript +import { Duration } from 'luxon'; + const indexBuilder = new IndexBuilder({ logger: env.logger, searchEngine }); +const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 600 }), + timeout: Duration.fromObject({ seconds: 900 }), + initialDelay: Duration.fromObject({ seconds: 3 }), +}); + +const everyHourSchedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 3600 }), + timeout: Duration.fromObject({ seconds: 5400 }), + initialDelay: Duration.fromObject({ seconds: 3 }), +}); + indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, - collator: new DefaultCatalogCollator({ + schedule: every10MinutesSchedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { discovery: env.discovery, tokenManager: env.tokenManager, }), }); indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 3600, - collator: new MyCustomCollator(), + schedule: everyHourSchedule, + factory: new MyCustomCollatorFactory(), }); ``` Backstage Search builds and maintains its index [on a schedule](./concepts.md#the-scheduler). You can change how often the indexes are rebuilt for a given type of document. You may want to do this if -your documents are updated more or less frequently. You can do so by modifying -its `defaultRefreshIntervalSeconds` value, like this: +your documents are updated more or less frequently. You can do so by configuring +a scheduled `TaskRunner` to pass into the `schedule` value, like this: ```typescript {3} +const every10MinutesSchedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 600 }), + timeout: Duration.fromObject({ seconds: 900 }), + initialDelay: Duration.fromObject({ seconds: 3 }), +}); + indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, - collator: new DefaultCatalogCollator({ + schedule: every10MinutesSchedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { + discovery: env.discovery, + tokenManager: env.tokenManager, + }), +}); +``` + +Note: if you are using the in-memory Lunr search engine, you probably want to +implement a non-distributed `TaskRunner` like the following to ensure consistency +if you're running multiple search backend nodes (alternatively, you can configure +the search plugin to use a non-distributed database such as +[SQLite](../../tutorials/configuring-plugin-databases.md#postgresql-and-sqlite-3)): + +```typescript +import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks'; + +const schedule: TaskRunner = { + run: async (task: TaskInvocationDefinition) => { + const startRefresh = async () => { + while (!task.signal?.aborted) { + try { + await task.fn(task.signal); + } catch { + // ignore intentionally + } + + await new Promise(resolve => setTimeout(resolve, 600 * 1000)); + } + }; + startRefresh(); + }, +}; + +indexBuilder.addCollator({ + schedule, + factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { discovery: env.discovery, tokenManager: env.tokenManager, }), diff --git a/packages/backend/package.json b/packages/backend/package.json index 3fde1cdc11..aed934a028 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -68,6 +68,7 @@ "express": "^4.17.1", "express-promise-router": "^4.1.0", "express-prom-bundle": "^6.3.6", + "luxon": "^2.0.2", "pg": "^8.3.0", "pg-connection-string": "^2.3.0", "prom-client": "^14.0.1", @@ -77,7 +78,8 @@ "@backstage/cli": "^0.17.0-next.1", "@types/dockerode": "^3.3.0", "@types/express": "^4.17.6", - "@types/express-serve-static-core": "^4.17.5" + "@types/express-serve-static-core": "^4.17.5", + "@types/luxon": "^2.0.4" }, "files": [ "dist" diff --git a/packages/backend/src/plugins/search.ts b/packages/backend/src/plugins/search.ts index cb675a842c..052b7380c3 100644 --- a/packages/backend/src/plugins/search.ts +++ b/packages/backend/src/plugins/search.ts @@ -26,6 +26,7 @@ import { } from '@backstage/plugin-search-backend-node'; import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend'; import { Router } from 'express'; +import { Duration } from 'luxon'; import { PluginEnvironment } from '../types'; async function createSearchEngine( @@ -55,10 +56,18 @@ export default async function createPlugin( searchEngine, }); + const schedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 600 }), + timeout: Duration.fromObject({ seconds: 900 }), + // A 3 second delay gives the backend server a chance to initialize before + // any collators are executed, which may attempt requests against the API. + initialDelay: Duration.fromObject({ seconds: 3 }), + }); + // Collators are responsible for gathering documents known to plugins. This // particular collator gathers entities from the software catalog. indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, + schedule, factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { discovery: env.discovery, tokenManager: env.tokenManager, @@ -66,7 +75,7 @@ export default async function createPlugin( }); indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, + schedule, factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, { discovery: env.discovery, logger: env.logger, @@ -77,10 +86,8 @@ export default async function createPlugin( // The scheduler controls when documents are gathered from collators and sent // to the search engine for indexing. const { scheduler } = await indexBuilder.build(); + scheduler.start(); - // A 3 second delay gives the backend server a chance to initialize before - // any collators are executed, which may attempt requests against the API. - setTimeout(() => scheduler.start(), 3000); useHotCleanup(module, () => scheduler.stop()); return await createRouter({ diff --git a/packages/create-app/templates/default-app/packages/backend/package.json.hbs b/packages/create-app/templates/default-app/packages/backend/package.json.hbs index cd4d3e1222..b91366dee8 100644 --- a/packages/create-app/templates/default-app/packages/backend/package.json.hbs +++ b/packages/create-app/templates/default-app/packages/backend/package.json.hbs @@ -40,6 +40,7 @@ "dockerode": "^3.3.1", "express": "^4.17.1", "express-promise-router": "^4.1.0", + "luxon": "^2.0.2", {{#if dbTypePG}} "pg": "^8.3.0", {{/if}} @@ -52,7 +53,8 @@ "@backstage/cli": "^{{version '@backstage/cli'}}", "@types/dockerode": "^3.3.0", "@types/express": "^4.17.6", - "@types/express-serve-static-core": "^4.17.5" + "@types/express-serve-static-core": "^4.17.5", + "@types/luxon": "^2.0.4" }, "files": [ "dist" diff --git a/packages/create-app/templates/default-app/packages/backend/src/plugins/search.ts.hbs b/packages/create-app/templates/default-app/packages/backend/src/plugins/search.ts.hbs index 8f44a35b16..bda10ae21b 100644 --- a/packages/create-app/templates/default-app/packages/backend/src/plugins/search.ts.hbs +++ b/packages/create-app/templates/default-app/packages/backend/src/plugins/search.ts.hbs @@ -11,6 +11,7 @@ import { PluginEnvironment } from '../types'; import { DefaultCatalogCollatorFactory } from '@backstage/plugin-catalog-backend'; import { DefaultTechDocsCollatorFactory } from '@backstage/plugin-techdocs-backend'; import { Router } from 'express'; +import { Duration } from 'luxon'; export default async function createPlugin( env: PluginEnvironment, @@ -31,10 +32,18 @@ export default async function createPlugin( searchEngine, }); + const schedule = env.scheduler.createScheduledTaskRunner({ + frequency: Duration.fromObject({ seconds: 600 }), + timeout: Duration.fromObject({ seconds: 900 }), + // A 3 second delay gives the backend server a chance to initialize before + // any collators are executed, which may attempt requests against the API. + initialDelay: Duration.fromObject({ seconds: 3 }), + }); + // Collators are responsible for gathering documents known to plugins. This // collator gathers entities from the software catalog. indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, + schedule, factory: DefaultCatalogCollatorFactory.fromConfig(env.config, { discovery: env.discovery, tokenManager: env.tokenManager, @@ -43,7 +52,7 @@ export default async function createPlugin( // collator gathers entities from techdocs. indexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 600, + schedule, factory: DefaultTechDocsCollatorFactory.fromConfig(env.config, { discovery: env.discovery, logger: env.logger, @@ -54,10 +63,8 @@ export default async function createPlugin( // The scheduler controls when documents are gathered from collators and sent // to the search engine for indexing. const { scheduler } = await indexBuilder.build(); + scheduler.start(); - // A 3 second delay gives the backend server a chance to initialize before - // any collators are executed, which may attempt requests against the API. - setTimeout(() => scheduler.start(), 3000); useHotCleanup(module, () => scheduler.stop()); return await createRouter({ diff --git a/plugins/search-backend-node/api-report.md b/plugins/search-backend-node/api-report.md index ebc49df280..61b8a124f7 100644 --- a/plugins/search-backend-node/api-report.md +++ b/plugins/search-backend-node/api-report.md @@ -16,6 +16,8 @@ import { QueryTranslator } from '@backstage/plugin-search-common'; import { Readable } from 'stream'; import { SearchEngine } from '@backstage/plugin-search-common'; import { SearchQuery } from '@backstage/plugin-search-common'; +import { TaskFunction } from '@backstage/backend-tasks'; +import { TaskRunner } from '@backstage/backend-tasks'; import { Transform } from 'stream'; import { Writable } from 'stream'; @@ -52,10 +54,7 @@ export abstract class DecoratorBase extends Transform { // @beta (undocumented) export class IndexBuilder { constructor({ logger, searchEngine }: IndexBuilderOptions); - addCollator({ - factory, - defaultRefreshIntervalSeconds, - }: RegisterCollatorParameters): void; + addCollator({ factory, schedule }: RegisterCollatorParameters): void; addDecorator({ factory }: RegisterDecoratorParameters): void; build(): Promise<{ scheduler: Scheduler; @@ -111,8 +110,8 @@ export class LunrSearchEngineIndexer extends BatchSearchEngineIndexer { // @beta export interface RegisterCollatorParameters { - defaultRefreshIntervalSeconds: number; factory: DocumentCollatorFactory; + schedule: TaskRunner; } // @beta @@ -123,11 +122,21 @@ export interface RegisterDecoratorParameters { // @beta (undocumented) export class Scheduler { constructor({ logger }: { logger: Logger }); - addToSchedule(task: Function, interval: number): void; + addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters): void; start(): void; stop(): void; } +// @public +export interface ScheduleTaskParameters { + // (undocumented) + id: string; + // (undocumented) + scheduledRunner: TaskRunner; + // (undocumented) + task: TaskFunction; +} + export { SearchEngine }; // @beta diff --git a/plugins/search-backend-node/package.json b/plugins/search-backend-node/package.json index e6fcac1398..3eea462c20 100644 --- a/plugins/search-backend-node/package.json +++ b/plugins/search-backend-node/package.json @@ -23,11 +23,13 @@ "clean": "backstage-cli package clean" }, "dependencies": { + "@backstage/backend-tasks": "^0.3.0-next.2", "@backstage/errors": "^1.0.0", "@backstage/plugin-search-common": "^0.3.3-next.1", "@types/lunr": "^2.3.3", "lodash": "^4.17.21", "lunr": "^2.3.9", + "node-abort-controller": "^3.0.1", "winston": "^3.2.1" }, "devDependencies": { diff --git a/plugins/search-backend-node/src/IndexBuilder.test.ts b/plugins/search-backend-node/src/IndexBuilder.test.ts index 6a32343205..70d00f6974 100644 --- a/plugins/search-backend-node/src/IndexBuilder.test.ts +++ b/plugins/search-backend-node/src/IndexBuilder.test.ts @@ -15,6 +15,7 @@ */ import { getVoidLogger } from '@backstage/backend-common'; +import { TaskInvocationDefinition, TaskRunner } from '@backstage/backend-tasks'; import { DocumentCollatorFactory, DocumentDecoratorFactory, @@ -53,9 +54,15 @@ class DifferentlyTypedDocumentDecoratorFactory extends TestDocumentDecoratorFact describe('IndexBuilder', () => { let testSearchEngine: SearchEngine; let testIndexBuilder: IndexBuilder; + let testScheduledTaskRunner: TaskRunner; beforeEach(() => { const logger = getVoidLogger(); + testScheduledTaskRunner = { + run: async (task: TaskInvocationDefinition & { fn: () => void }) => { + task.fn(); + }, + }; testSearchEngine = new LunrSearchEngine({ logger }); testIndexBuilder = new IndexBuilder({ logger, @@ -65,35 +72,32 @@ describe('IndexBuilder', () => { describe('addCollator', () => { it('adds a collator', async () => { - jest.useFakeTimers(); const testCollatorFactory = new TestDocumentCollatorFactory(); const collatorSpy = jest.spyOn(testCollatorFactory, 'getCollator'); // Add a collator. testIndexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 6, factory: testCollatorFactory, + schedule: testScheduledTaskRunner, }); // Build the index and ensure the collator was invoked. const { scheduler } = await testIndexBuilder.build(); scheduler.start(); - jest.advanceTimersByTime(6000); expect(collatorSpy).toHaveBeenCalled(); }); }); describe('addDecorator', () => { it('adds a decorator', async () => { - jest.useFakeTimers(); const testCollatorFactory = new TestDocumentCollatorFactory(); const testDecoratorFactory = new TestDocumentDecoratorFactory(); const decoratorSpy = jest.spyOn(testDecoratorFactory, 'getDecorator'); // Add a collator. testIndexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 6, factory: testCollatorFactory, + schedule: testScheduledTaskRunner, }); // Add a decorator. @@ -104,14 +108,12 @@ describe('IndexBuilder', () => { // Build the index and ensure the decorator was invoked. const { scheduler } = await testIndexBuilder.build(); scheduler.start(); - jest.advanceTimersByTime(6000); // wait for async decorator execution await Promise.resolve(); expect(decoratorSpy).toHaveBeenCalled(); }); it('adds a type-specific decorator', async () => { - jest.useFakeTimers(); const testCollatorFactory = new TypedDocumentCollatorFactory(); const testDecoratorFactory = new TypedDocumentDecoratorFactory(); jest.spyOn(testCollatorFactory, 'getCollator'); @@ -119,8 +121,8 @@ describe('IndexBuilder', () => { // Add a collator. testIndexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 6, factory: testCollatorFactory, + schedule: testScheduledTaskRunner, }); // Add a decorator for the same type. @@ -131,7 +133,6 @@ describe('IndexBuilder', () => { // Build the index and ensure the decorator was invoked. const { scheduler } = await testIndexBuilder.build(); scheduler.start(); - jest.advanceTimersByTime(6000); // wait for async decorator execution await Promise.resolve(); expect(decoratorSpy).toHaveBeenCalled(); @@ -146,8 +147,8 @@ describe('IndexBuilder', () => { // Add a collator. testIndexBuilder.addCollator({ - defaultRefreshIntervalSeconds: 6, factory: testCollatorFactory, + schedule: testScheduledTaskRunner, }); // Add a decorator for a different type. @@ -158,7 +159,6 @@ describe('IndexBuilder', () => { // Build the index and ensure the decorator was not invoked. const { scheduler } = await testIndexBuilder.build(); scheduler.start(); - jest.advanceTimersByTime(6000); expect(collatorSpy).toHaveBeenCalled(); expect(decoratorSpy).not.toHaveBeenCalled(); }); diff --git a/plugins/search-backend-node/src/IndexBuilder.ts b/plugins/search-backend-node/src/IndexBuilder.ts index 69c1b2e6fc..235bac905a 100644 --- a/plugins/search-backend-node/src/IndexBuilder.ts +++ b/plugins/search-backend-node/src/IndexBuilder.ts @@ -13,32 +13,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - import { - DocumentCollatorFactory, DocumentDecoratorFactory, DocumentTypeInfo, SearchEngine, } from '@backstage/plugin-search-common'; import { Transform, pipeline } from 'stream'; import { Logger } from 'winston'; -import { Scheduler } from './index'; +import { Scheduler } from './Scheduler'; import { IndexBuilderOptions, RegisterCollatorParameters, RegisterDecoratorParameters, } from './types'; -interface CollatorEnvelope { - factory: DocumentCollatorFactory; - refreshInterval: number; -} - /** * @beta */ export class IndexBuilder { - private collators: Record; + private collators: Record; private decorators: Record; private documentTypes: Record; private searchEngine: SearchEngine; @@ -64,16 +57,13 @@ export class IndexBuilder { * Makes the index builder aware of a collator that should be executed at the * given refresh interval. */ - addCollator({ - factory, - defaultRefreshIntervalSeconds, - }: RegisterCollatorParameters): void { + addCollator({ factory, schedule }: RegisterCollatorParameters): void { this.logger.info( `Added ${factory.constructor.name} collator factory for type ${factory.type}`, ); this.collators[factory.type] = { - refreshInterval: defaultRefreshIntervalSeconds, factory, + schedule, }; this.documentTypes[factory.type] = { visibilityPermission: factory.visibilityPermission, @@ -106,51 +96,57 @@ export class IndexBuilder { * scheduler returned to the caller. */ async build(): Promise<{ scheduler: Scheduler }> { - const scheduler = new Scheduler({ logger: this.logger }); + const scheduler = new Scheduler({ + logger: this.logger, + }); Object.keys(this.collators).forEach(type => { - scheduler.addToSchedule(async () => { - // Instantiate the collator. - const collator = await this.collators[type].factory.getCollator(); - this.logger.info( - `Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`, - ); - - // Instantiate all relevant decorators. - const decorators: Transform[] = await Promise.all( - (this.decorators['*'] || []) - .concat(this.decorators[type] || []) - .map(async factory => { - const decorator = await factory.getDecorator(); - this.logger.info( - `Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`, - ); - return decorator; - }), - ); - - // Instantiate the indexer. - const indexer = await this.searchEngine.getIndexer(type); - - // Compose collator/decorators/indexer into a pipeline - return new Promise(done => { - pipeline( - [collator, ...decorators, indexer], - (error: NodeJS.ErrnoException | null) => { - if (error) { - this.logger.error( - `Collating documents for ${type} failed: ${error}`, - ); - } else { - this.logger.info(`Collating documents for ${type} succeeded`); - } - - // Signal index pipeline completion! - done(); - }, + scheduler.addToSchedule({ + id: `search_index_${type.replace('-', '_').toLocaleLowerCase('en-US')}`, + scheduledRunner: this.collators[type].schedule, + task: async () => { + // Instantiate the collator. + const collator = await this.collators[type].factory.getCollator(); + this.logger.info( + `Collating documents for ${type} via ${this.collators[type].factory.constructor.name}`, ); - }); - }, this.collators[type].refreshInterval * 1000); + + // Instantiate all relevant decorators. + const decorators: Transform[] = await Promise.all( + (this.decorators['*'] || []) + .concat(this.decorators[type] || []) + .map(async factory => { + const decorator = await factory.getDecorator(); + this.logger.info( + `Attached decorator via ${factory.constructor.name} to ${type} index pipeline.`, + ); + return decorator; + }), + ); + + // Instantiate the indexer. + const indexer = await this.searchEngine.getIndexer(type); + + // Compose collator/decorators/indexer into a pipeline + return new Promise(done => { + pipeline( + [collator, ...decorators, indexer], + (error: NodeJS.ErrnoException | null) => { + if (error) { + this.logger.error( + `Collating documents for ${type} failed: ${error}`, + ); + } else { + this.logger.info(`Collating documents for ${type} succeeded`); + } + + // Signal index pipeline completion! + done(); + }, + ); + }); + }, + }); }); return { diff --git a/plugins/search-backend-node/src/Scheduler.test.ts b/plugins/search-backend-node/src/Scheduler.test.ts index de6add13fe..7eaa6e8c9d 100644 --- a/plugins/search-backend-node/src/Scheduler.test.ts +++ b/plugins/search-backend-node/src/Scheduler.test.ts @@ -29,31 +29,107 @@ describe('Scheduler', () => { describe('addToSchedule', () => { it('should not add a task and interval to schedule, if already started', async () => { - jest.useFakeTimers(); const mockTask1 = jest.fn(); const mockTask2 = jest.fn(); + const mockScheduledTaskRunner1 = { + run: jest.fn(), + }; + const mockScheduledTaskRunner2 = { + run: jest.fn(), + }; // Add a task and interval to schedule - testScheduler.addToSchedule(mockTask1, 2); + testScheduler.addToSchedule({ + id: 'id1', + task: mockTask1, + scheduledRunner: mockScheduledTaskRunner1, + }); // Starts scheduling process testScheduler.start(); // Throws Error if task and interval is added to a already started schedule - expect(() => testScheduler.addToSchedule(mockTask2, 2)).toThrowError(); + expect(() => + testScheduler.addToSchedule({ + id: 'id2', + task: mockTask2, + scheduledRunner: mockScheduledTaskRunner2, + }), + ).toThrowError(); - jest.runOnlyPendingTimers(); - expect(mockTask1).toHaveBeenCalled(); - expect(mockTask2).not.toHaveBeenCalled(); + expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id1', + fn: mockTask1, + }), + ); + expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id2', + fn: mockTask2, + }), + ); + }); + + it('should not add a task to schedule, if it already exists', async () => { + const mockTask1 = jest.fn(); + const mockTask2 = jest.fn(); + const mockScheduledTaskRunner1 = { + run: jest.fn(), + }; + const mockScheduledTaskRunner2 = { + run: jest.fn(), + }; + + // Add a task and interval to schedule + testScheduler.addToSchedule({ + id: 'id1', + task: mockTask1, + scheduledRunner: mockScheduledTaskRunner1, + }); + + // Throws Error if task and interval is added to a already started schedule + expect(() => + testScheduler.addToSchedule({ + id: 'id1', + task: mockTask2, + scheduledRunner: mockScheduledTaskRunner2, + }), + ).toThrowError(); + + // Starts scheduling process + testScheduler.start(); + + expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id1', + fn: mockTask1, + }), + ); + expect(mockScheduledTaskRunner2.run).not.toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id2', + fn: mockTask2, + }), + ); }); it('should be possible to add a task and interval to schedule, if already started, but stopped in between', async () => { - jest.useFakeTimers(); const mockTask1 = jest.fn(); const mockTask2 = jest.fn(); + const mockScheduledTaskRunner1 = { + run: jest.fn(), + }; + const mockScheduledTaskRunner2 = { + run: jest.fn(), + }; // Add a task and interval to schedule - testScheduler.addToSchedule(mockTask1, 2); + testScheduler.addToSchedule({ + id: 'id1', + task: mockTask1, + scheduledRunner: mockScheduledTaskRunner1, + }); // Starts scheduling process testScheduler.start(); @@ -63,15 +139,28 @@ describe('Scheduler', () => { // Shouldn't throw error, as it is stopped. expect(() => - testScheduler.addToSchedule(mockTask2, 4), + testScheduler.addToSchedule({ + id: 'id2', + task: mockTask2, + scheduledRunner: mockScheduledTaskRunner2, + }), ).not.toThrowError(); // Starts scheduling process testScheduler.start(); - jest.runOnlyPendingTimers(); - expect(mockTask1).toHaveBeenCalled(); - expect(mockTask2).toHaveBeenCalled(); + expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id1', + fn: mockTask1, + }), + ); + expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id2', + fn: mockTask2, + }), + ); }); }); @@ -79,16 +168,40 @@ describe('Scheduler', () => { it('should execute tasks on start', () => { const mockTask1 = jest.fn(); const mockTask2 = jest.fn(); + const mockScheduledTaskRunner1 = { + run: jest.fn(), + }; + const mockScheduledTaskRunner2 = { + run: jest.fn(), + }; // Add tasks and interval to schedule - testScheduler.addToSchedule(mockTask1, 2); - testScheduler.addToSchedule(mockTask2, 2); + testScheduler.addToSchedule({ + id: 'id1', + task: mockTask1, + scheduledRunner: mockScheduledTaskRunner1, + }); + testScheduler.addToSchedule({ + id: 'id2', + task: mockTask2, + scheduledRunner: mockScheduledTaskRunner2, + }); // Starts scheduling process testScheduler.start(); - expect(mockTask1).toHaveBeenCalled(); - expect(mockTask2).toHaveBeenCalled(); + expect(mockScheduledTaskRunner1.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id1', + fn: mockTask1, + }), + ); + expect(mockScheduledTaskRunner2.run).toHaveBeenCalledWith( + expect.objectContaining({ + id: 'id2', + fn: mockTask2, + }), + ); }); }); }); diff --git a/plugins/search-backend-node/src/Scheduler.ts b/plugins/search-backend-node/src/Scheduler.ts index 6debaa3dcd..9f3d95b112 100644 --- a/plugins/search-backend-node/src/Scheduler.ts +++ b/plugins/search-backend-node/src/Scheduler.ts @@ -14,29 +14,38 @@ * limitations under the License. */ +import { AbortController } from 'node-abort-controller'; import { Logger } from 'winston'; -import { runPeriodically } from './runPeriodically'; +import { TaskFunction, TaskRunner } from '@backstage/backend-tasks'; type TaskEnvelope = { - task: Function; - interval: number; + task: TaskFunction; + scheduledRunner: TaskRunner; }; /** - * TODO: coordination, error handling + * @public ScheduleTaskParameters */ +export interface ScheduleTaskParameters { + id: string; + task: TaskFunction; + scheduledRunner: TaskRunner; +} /** * @beta */ export class Scheduler { private logger: Logger; - private schedule: TaskEnvelope[]; - private runningTasks: Function[] = []; + private schedule: { [id: string]: TaskEnvelope }; + private abortController: AbortController; + private isRunning: boolean; constructor({ logger }: { logger: Logger }) { this.logger = logger; - this.schedule = []; + this.schedule = {}; + this.abortController = new AbortController(); + this.isRunning = false; } /** @@ -44,13 +53,18 @@ export class Scheduler { * When running the tasks, the scheduler waits at least for the time specified * in the interval once the task was completed, before running it again. */ - addToSchedule(task: Function, interval: number) { - if (this.runningTasks.length) { + addToSchedule({ id, task, scheduledRunner }: ScheduleTaskParameters) { + if (this.isRunning) { throw new Error( 'Cannot add task to schedule that has already been started.', ); } - this.schedule.push({ task, interval }); + + if (this.schedule[id]) { + throw new Error(`Task with id ${id} already exists.`); + } + + this.schedule[id] = { task, scheduledRunner }; } /** @@ -58,8 +72,14 @@ export class Scheduler { */ start() { this.logger.info('Starting all scheduled search tasks.'); - this.schedule.forEach(({ task, interval }) => { - this.runningTasks.push(runPeriodically(() => task(), interval)); + this.isRunning = true; + Object.keys(this.schedule).forEach(id => { + const { task, scheduledRunner } = this.schedule[id]; + scheduledRunner.run({ + id, + fn: task, + signal: this.abortController.signal, + }); }); } @@ -68,9 +88,7 @@ export class Scheduler { */ stop() { this.logger.info('Stopping all scheduled search tasks.'); - this.runningTasks.forEach(cancel => { - cancel(); - }); - this.runningTasks = []; + this.abortController.abort(); + this.isRunning = false; } } diff --git a/plugins/search-backend-node/src/index.ts b/plugins/search-backend-node/src/index.ts index e84c5d6a4d..e5202ac640 100644 --- a/plugins/search-backend-node/src/index.ts +++ b/plugins/search-backend-node/src/index.ts @@ -36,6 +36,8 @@ export type { export * from './indexing'; export * from './test-utils'; +export type { ScheduleTaskParameters } from './Scheduler'; + /** * @deprecated Import from @backstage/plugin-search-common instead */ diff --git a/plugins/search-backend-node/src/runPeriodically.test.ts b/plugins/search-backend-node/src/runPeriodically.test.ts deleted file mode 100644 index 0f2ee44b8d..0000000000 --- a/plugins/search-backend-node/src/runPeriodically.test.ts +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2020 The Backstage Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import { runPeriodically } from './runPeriodically'; - -jest.useFakeTimers(); - -describe('runPeriodically', () => { - const flushPromises = async () => { - const promise = new Promise(resolve => process.nextTick(resolve)); - jest.runAllTicks(); - await promise; - }; - const advanceTimersByTime = async (time: number) => { - jest.advanceTimersByTime(time); - // Advancing the time with jest doesn't run all promises, but only sync code - await flushPromises(); - }; - - it('runs task initially', async () => { - const task = jest.fn(async () => {}); - const cancel = runPeriodically(task, 1000); - expect(task).toHaveBeenCalledTimes(1); - cancel(); - }); - - it('runs at requested interval', async () => { - const task = jest.fn(async () => {}); - const cancel = runPeriodically(task, 1000); - await flushPromises(); - await advanceTimersByTime(1000); - await advanceTimersByTime(1000); - expect(task).toHaveBeenCalledTimes(3); - cancel(); - }); - - it('stops after being canceled', async () => { - const task = jest.fn(async () => {}); - const cancel = runPeriodically(task, 1000); - await flushPromises(); - cancel(); - await advanceTimersByTime(1000); - await advanceTimersByTime(1000); - expect(task).toHaveBeenCalledTimes(1); - }); - - it('continues running after failures', async () => { - const task = jest.fn(async () => { - throw new Error(); - }); - const cancel = runPeriodically(task, 1000); - await flushPromises(); - await advanceTimersByTime(1000); - await advanceTimersByTime(1000); - expect(task).toHaveBeenCalledTimes(3); - cancel(); - }); - - it('waits till a long running task is completed', async () => { - const task = jest.fn( - () => new Promise(resolve => setTimeout(resolve, 10000)), - ); - const cancel = runPeriodically(task, 1000); - await flushPromises(); - await advanceTimersByTime(1000); - expect(task).toHaveBeenCalledTimes(1); - await advanceTimersByTime(9000); - await advanceTimersByTime(1000); - expect(task).toHaveBeenCalledTimes(2); - cancel(); - }); -}); diff --git a/plugins/search-backend-node/src/runPeriodically.ts b/plugins/search-backend-node/src/runPeriodically.ts deleted file mode 100644 index 2f3104e221..0000000000 --- a/plugins/search-backend-node/src/runPeriodically.ts +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2020 The Backstage Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Runs a function repeatedly, with a fixed wait between invocations. - * - * Supports async functions, and silently ignores exceptions and rejections. - * - * @param fn - The function to run. May return a Promise. - * @param delayMs - The delay between a completed function invocation and the - * next. - * @returns A function that, when called, stops the invocation loop. - */ -export function runPeriodically(fn: () => any, delayMs: number): () => void { - let cancel: () => void; - let cancelled = false; - const cancellationPromise = new Promise(resolve => { - cancel = () => { - resolve(); - cancelled = true; - }; - }); - - const startRefresh = async () => { - while (!cancelled) { - try { - await fn(); - } catch { - // ignore intentionally - } - - await Promise.race([ - new Promise(resolve => setTimeout(resolve, delayMs)), - cancellationPromise, - ]); - } - }; - startRefresh(); - - return cancel!; -} diff --git a/plugins/search-backend-node/src/types.ts b/plugins/search-backend-node/src/types.ts index d21bfaf453..a92398e8c8 100644 --- a/plugins/search-backend-node/src/types.ts +++ b/plugins/search-backend-node/src/types.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import { TaskRunner } from '@backstage/backend-tasks'; import { DocumentCollatorFactory, DocumentDecoratorFactory, @@ -35,10 +36,10 @@ export type IndexBuilderOptions = { */ export interface RegisterCollatorParameters { /** - * The default interval (in seconds) that the provided collator will be called (can be overridden in config). + * The schedule for which the provided collator will be called, commonly the result of + * {@link @backstage/backend-tasks#PluginTaskScheduler.createScheduledTaskRunner} */ - defaultRefreshIntervalSeconds: number; - + schedule: TaskRunner; /** * The class responsible for returning the document collator of the given type. */