Optimize indexer throughput
Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-elasticsearch': patch
|
||||
---
|
||||
|
||||
Improved index throughput by optimizing when and how many documents were made available to the bulk client.
|
||||
+12
-3
@@ -45,7 +45,6 @@ function duration(startTimestamp: [number, number]): string {
|
||||
* @public
|
||||
*/
|
||||
export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
private received: number = 0;
|
||||
private processed: number = 0;
|
||||
private removableIndices: string[] = [];
|
||||
|
||||
@@ -59,11 +58,13 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
private readonly logger: Logger;
|
||||
private readonly sourceStream: Readable;
|
||||
private readonly elasticSearchClientWrapper: ElasticSearchClientWrapper;
|
||||
private configuredBatchSize: number;
|
||||
private bulkResult: Promise<any>;
|
||||
private bulkClientError?: Error;
|
||||
|
||||
constructor(options: ElasticSearchSearchEngineIndexerOptions) {
|
||||
super({ batchSize: options.batchSize });
|
||||
this.configuredBatchSize = options.batchSize;
|
||||
this.logger = options.logger.child({ documentType: options.type });
|
||||
this.startTimestamp = process.hrtime();
|
||||
this.type = options.type;
|
||||
@@ -121,7 +122,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
async index(documents: IndexableDocument[]): Promise<void> {
|
||||
await this.isReady();
|
||||
documents.forEach(document => {
|
||||
this.received++;
|
||||
this.sourceStream.push(document);
|
||||
});
|
||||
}
|
||||
@@ -208,9 +208,18 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
return Promise.reject(this.bulkClientError);
|
||||
}
|
||||
|
||||
// Optimization: if the stream that ES reads from has fewer docs queued
|
||||
// than the configured batch size, continue early to allow more docs to be
|
||||
// queued
|
||||
if (this.sourceStream.readableLength < this.configuredBatchSize) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
// Otherwise, continue periodically checking the stream queue to see if
|
||||
// ES has consumed the documents and continue when it's ready for more.
|
||||
return new Promise(resolve => {
|
||||
const interval = setInterval(() => {
|
||||
if (this.received === this.processed) {
|
||||
if (this.sourceStream.readableLength < this.configuredBatchSize) {
|
||||
clearInterval(interval);
|
||||
resolve();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user