Gracefully handle client-level (vs. response-level) errors
Signed-off-by: Eric Peterson <ericpeterson@spotify.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-elasticsearch': patch
|
||||
---
|
||||
|
||||
Fixed a bug that could cause the backstage backend to unexpectedly terminate when client errors were encountered during the indexing process.
|
||||
+29
@@ -250,4 +250,33 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
// Final deletion shouldn't be called.
|
||||
expect(deleteSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('handles bulk client rejection', async () => {
|
||||
// Given an ES client wrapper that rejects an error
|
||||
const expectedError = new Error('HTTP Timeout');
|
||||
const mockClientWrapper = ElasticSearchClientWrapper.fromClientOptions({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: mock.getConnection(),
|
||||
});
|
||||
mockClientWrapper.bulk = jest.fn().mockRejectedValue(expectedError);
|
||||
|
||||
// And a search engine indexer that uses that client wrapper
|
||||
indexer = new ElasticSearchSearchEngineIndexer({
|
||||
type: 'some-type',
|
||||
indexPrefix: '',
|
||||
indexSeparator: '-index__',
|
||||
alias: 'some-type-index__search',
|
||||
logger: getVoidLogger(),
|
||||
elasticSearchClientWrapper: mockClientWrapper,
|
||||
batchSize: 1000,
|
||||
});
|
||||
|
||||
// When the indexer is run in the test pipeline
|
||||
const { error } = await TestPipeline.fromIndexer(indexer)
|
||||
.withDocuments([{ title: 'a', location: 'a', text: '/a' }])
|
||||
.execute();
|
||||
|
||||
// Then the pipeline should have received the expected error
|
||||
expect(error).toBe(expectedError);
|
||||
});
|
||||
});
|
||||
|
||||
+11
@@ -60,6 +60,7 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
private readonly sourceStream: Readable;
|
||||
private readonly elasticSearchClientWrapper: ElasticSearchClientWrapper;
|
||||
private bulkResult: Promise<any>;
|
||||
private bulkClientError?: Error;
|
||||
|
||||
constructor(options: ElasticSearchSearchEngineIndexerOptions) {
|
||||
super({ batchSize: options.batchSize });
|
||||
@@ -94,6 +95,11 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
},
|
||||
refreshOnCompletion: that.indexName,
|
||||
});
|
||||
|
||||
// Safely catch errors thrown by the bulk helper client, e.g. HTTP timeouts
|
||||
this.bulkResult.catch(e => {
|
||||
this.bulkClientError = e;
|
||||
});
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
@@ -197,6 +203,11 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
* backpressure in other parts of the indexing pipeline.
|
||||
*/
|
||||
private isReady(): Promise<void> {
|
||||
// Early exit if the underlying ES client encountered an error.
|
||||
if (this.bulkClientError) {
|
||||
return Promise.reject(this.bulkClientError);
|
||||
}
|
||||
|
||||
return new Promise(resolve => {
|
||||
const interval = setInterval(() => {
|
||||
if (this.received === this.processed) {
|
||||
|
||||
Reference in New Issue
Block a user