Optimize indexer throughput

Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
Eric Peterson
2022-12-22 12:49:09 +01:00
parent 56633804dd
commit aa33a06894
2 changed files with 17 additions and 3 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-search-backend-module-elasticsearch': patch
---
Improved index throughput by optimizing when and how many documents were made available to the bulk client.
@@ -45,7 +45,6 @@ function duration(startTimestamp: [number, number]): string {
* @public
*/
export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
private received: number = 0;
private processed: number = 0;
private removableIndices: string[] = [];
@@ -59,11 +58,13 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
private readonly logger: Logger;
private readonly sourceStream: Readable;
private readonly elasticSearchClientWrapper: ElasticSearchClientWrapper;
private configuredBatchSize: number;
private bulkResult: Promise<any>;
private bulkClientError?: Error;
constructor(options: ElasticSearchSearchEngineIndexerOptions) {
super({ batchSize: options.batchSize });
this.configuredBatchSize = options.batchSize;
this.logger = options.logger.child({ documentType: options.type });
this.startTimestamp = process.hrtime();
this.type = options.type;
@@ -121,7 +122,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
async index(documents: IndexableDocument[]): Promise<void> {
await this.isReady();
documents.forEach(document => {
this.received++;
this.sourceStream.push(document);
});
}
@@ -208,9 +208,18 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
return Promise.reject(this.bulkClientError);
}
// Optimization: if the stream that ES reads from has fewer docs queued
// than the configured batch size, continue early to allow more docs to be
// queued
if (this.sourceStream.readableLength < this.configuredBatchSize) {
return Promise.resolve();
}
// Otherwise, continue periodically checking the stream queue to see if
// ES has consumed the documents and continue when it's ready for more.
return new Promise(resolve => {
const interval = setInterval(() => {
if (this.received === this.processed) {
if (this.sourceStream.readableLength < this.configuredBatchSize) {
clearInterval(interval);
resolve();
}