Support AWS OpenSearch Serverless

skipping _refresh call as it is not supported

Signed-off-by: Andrew Ochsner <andrew.ochsner@cognizant.com>
This commit is contained in:
Andrew Ochsner
2023-09-27 22:02:47 -05:00
parent 998d5a704e
commit 006df4a581
4 changed files with 58 additions and 3 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-search-backend-module-elasticsearch': patch
---
Support AWS OpenSearch Serverless search backend. Does not support \_refresh endpoint.
@@ -26,7 +26,10 @@ import { isEmpty, isNumber, isNaN as nan } from 'lodash';
import { AwsSigv4Signer } from '@opensearch-project/opensearch/aws';
import { RequestSigner } from 'aws4';
import { Config } from '@backstage/config';
import { ElasticSearchClientOptions } from './ElasticSearchClientOptions';
import {
ElasticSearchClientOptions,
OpenSearchElasticSearchClientOptions,
} from './ElasticSearchClientOptions';
import { ElasticSearchClientWrapper } from './ElasticSearchClientWrapper';
import { ElasticSearchCustomIndexTemplate } from './types';
import { ElasticSearchSearchEngineIndexer } from './ElasticSearchSearchEngineIndexer';
@@ -162,6 +165,7 @@ export class ElasticSearchSearchEngine implements SearchEngine {
logger.info('Initializing Elastic.co ElasticSearch search engine.');
} else if (clientOptions.provider === 'aws') {
logger.info('Initializing AWS OpenSearch search engine.');
logger.info(JSON.stringify(clientOptions));
} else if (clientOptions.provider === 'opensearch') {
logger.info('Initializing OpenSearch search engine.');
} else {
@@ -302,6 +306,11 @@ export class ElasticSearchSearchEngine implements SearchEngine {
elasticSearchClientWrapper: this.elasticSearchClientWrapper,
logger: indexerLogger,
batchSize: this.batchSize,
skipRefresh:
(
this
.elasticSearchClientOptions as OpenSearchElasticSearchClientOptions
)?.service === 'aoss',
});
// Attempt cleanup upon failure.
@@ -473,6 +482,8 @@ export class ElasticSearchSearchEngine implements SearchEngine {
return {
provider: 'aws',
node: config.getString('node'),
region: config.getOptionalString('region'),
service,
...(sslConfig
? {
ssl: {
@@ -34,6 +34,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
let createSpy: jest.Mock;
let aliasesSpy: jest.Mock;
let deleteSpy: jest.Mock;
let refreshSpy: jest.Mock;
beforeEach(() => {
// Instantiate the indexer to be tested.
@@ -45,6 +46,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
logger: getVoidLogger(),
elasticSearchClientWrapper: clientWrapper,
batchSize: 1000,
skipRefresh: false,
});
// Set up all requisite Elastic mocks.
@@ -57,12 +59,13 @@ describe('ElasticSearchSearchEngineIndexer', () => {
},
bulkSpy,
);
refreshSpy = jest.fn().mockReturnValue({});
mock.add(
{
method: 'GET',
path: '/:index/_refresh',
},
jest.fn().mockReturnValue({}),
refreshSpy,
);
catSpy = jest.fn().mockReturnValue([
@@ -212,6 +215,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
// Ensure multiple bulk requests were made.
expect(bulkSpy).toHaveBeenCalledTimes(2);
expect(refreshSpy).toHaveBeenCalledTimes(1);
// Ensure the first and last documents were included in the payloads.
const docLocations: string[] = [
@@ -269,6 +273,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
logger: getVoidLogger(),
elasticSearchClientWrapper: mockClientWrapper,
batchSize: 1000,
skipRefresh: false,
});
// When the indexer is run in the test pipeline
@@ -279,4 +284,37 @@ describe('ElasticSearchSearchEngineIndexer', () => {
// Then the pipeline should have received the expected error
expect(error).toBe(expectedError);
});
it('indexes documents, skip refresh', async () => {
// Instantiate the indexer to be tested.
indexer = new ElasticSearchSearchEngineIndexer({
type: 'some-type',
indexPrefix: '',
indexSeparator: '-index__',
alias: 'some-type-index__search',
logger: getVoidLogger(),
elasticSearchClientWrapper: clientWrapper,
batchSize: 1000,
skipRefresh: true,
});
const documents = [
{
title: 'testTerm',
text: 'testText',
location: 'test/location',
},
{
title: 'Another test',
text: 'Some more text',
location: 'test/location/2',
},
];
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
// Ensure bulk called but refresh not
expect(bulkSpy).toHaveBeenCalledTimes(1);
expect(refreshSpy).toHaveBeenCalledTimes(0);
});
});
@@ -33,6 +33,7 @@ export type ElasticSearchSearchEngineIndexerOptions = {
logger: Logger | LoggerService;
elasticSearchClientWrapper: ElasticSearchClientWrapper;
batchSize: number;
skipRefresh: boolean;
};
function duration(startTimestamp: [number, number]): string {
@@ -95,7 +96,7 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
index: { _index: that.indexName },
};
},
refreshOnCompletion: that.indexName,
refreshOnCompletion: !options.skipRefresh && that.indexName,
});
// Safely catch errors thrown by the bulk helper client, e.g. HTTP timeouts