Support AWS OpenSearch Serverless
skipping _refresh call as it is not supported Signed-off-by: Andrew Ochsner <andrew.ochsner@cognizant.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-elasticsearch': patch
|
||||
---
|
||||
|
||||
Support AWS OpenSearch Serverless search backend. Does not support \_refresh endpoint.
|
||||
+12
-1
@@ -26,7 +26,10 @@ import { isEmpty, isNumber, isNaN as nan } from 'lodash';
|
||||
import { AwsSigv4Signer } from '@opensearch-project/opensearch/aws';
|
||||
import { RequestSigner } from 'aws4';
|
||||
import { Config } from '@backstage/config';
|
||||
import { ElasticSearchClientOptions } from './ElasticSearchClientOptions';
|
||||
import {
|
||||
ElasticSearchClientOptions,
|
||||
OpenSearchElasticSearchClientOptions,
|
||||
} from './ElasticSearchClientOptions';
|
||||
import { ElasticSearchClientWrapper } from './ElasticSearchClientWrapper';
|
||||
import { ElasticSearchCustomIndexTemplate } from './types';
|
||||
import { ElasticSearchSearchEngineIndexer } from './ElasticSearchSearchEngineIndexer';
|
||||
@@ -162,6 +165,7 @@ export class ElasticSearchSearchEngine implements SearchEngine {
|
||||
logger.info('Initializing Elastic.co ElasticSearch search engine.');
|
||||
} else if (clientOptions.provider === 'aws') {
|
||||
logger.info('Initializing AWS OpenSearch search engine.');
|
||||
logger.info(JSON.stringify(clientOptions));
|
||||
} else if (clientOptions.provider === 'opensearch') {
|
||||
logger.info('Initializing OpenSearch search engine.');
|
||||
} else {
|
||||
@@ -302,6 +306,11 @@ export class ElasticSearchSearchEngine implements SearchEngine {
|
||||
elasticSearchClientWrapper: this.elasticSearchClientWrapper,
|
||||
logger: indexerLogger,
|
||||
batchSize: this.batchSize,
|
||||
skipRefresh:
|
||||
(
|
||||
this
|
||||
.elasticSearchClientOptions as OpenSearchElasticSearchClientOptions
|
||||
)?.service === 'aoss',
|
||||
});
|
||||
|
||||
// Attempt cleanup upon failure.
|
||||
@@ -473,6 +482,8 @@ export class ElasticSearchSearchEngine implements SearchEngine {
|
||||
return {
|
||||
provider: 'aws',
|
||||
node: config.getString('node'),
|
||||
region: config.getOptionalString('region'),
|
||||
service,
|
||||
...(sslConfig
|
||||
? {
|
||||
ssl: {
|
||||
|
||||
+39
-1
@@ -34,6 +34,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
let createSpy: jest.Mock;
|
||||
let aliasesSpy: jest.Mock;
|
||||
let deleteSpy: jest.Mock;
|
||||
let refreshSpy: jest.Mock;
|
||||
|
||||
beforeEach(() => {
|
||||
// Instantiate the indexer to be tested.
|
||||
@@ -45,6 +46,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
logger: getVoidLogger(),
|
||||
elasticSearchClientWrapper: clientWrapper,
|
||||
batchSize: 1000,
|
||||
skipRefresh: false,
|
||||
});
|
||||
|
||||
// Set up all requisite Elastic mocks.
|
||||
@@ -57,12 +59,13 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
},
|
||||
bulkSpy,
|
||||
);
|
||||
refreshSpy = jest.fn().mockReturnValue({});
|
||||
mock.add(
|
||||
{
|
||||
method: 'GET',
|
||||
path: '/:index/_refresh',
|
||||
},
|
||||
jest.fn().mockReturnValue({}),
|
||||
refreshSpy,
|
||||
);
|
||||
|
||||
catSpy = jest.fn().mockReturnValue([
|
||||
@@ -212,6 +215,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
|
||||
// Ensure multiple bulk requests were made.
|
||||
expect(bulkSpy).toHaveBeenCalledTimes(2);
|
||||
expect(refreshSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Ensure the first and last documents were included in the payloads.
|
||||
const docLocations: string[] = [
|
||||
@@ -269,6 +273,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
logger: getVoidLogger(),
|
||||
elasticSearchClientWrapper: mockClientWrapper,
|
||||
batchSize: 1000,
|
||||
skipRefresh: false,
|
||||
});
|
||||
|
||||
// When the indexer is run in the test pipeline
|
||||
@@ -279,4 +284,37 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
// Then the pipeline should have received the expected error
|
||||
expect(error).toBe(expectedError);
|
||||
});
|
||||
|
||||
it('indexes documents, skip refresh', async () => {
|
||||
// Instantiate the indexer to be tested.
|
||||
indexer = new ElasticSearchSearchEngineIndexer({
|
||||
type: 'some-type',
|
||||
indexPrefix: '',
|
||||
indexSeparator: '-index__',
|
||||
alias: 'some-type-index__search',
|
||||
logger: getVoidLogger(),
|
||||
elasticSearchClientWrapper: clientWrapper,
|
||||
batchSize: 1000,
|
||||
skipRefresh: true,
|
||||
});
|
||||
|
||||
const documents = [
|
||||
{
|
||||
title: 'testTerm',
|
||||
text: 'testText',
|
||||
location: 'test/location',
|
||||
},
|
||||
{
|
||||
title: 'Another test',
|
||||
text: 'Some more text',
|
||||
location: 'test/location/2',
|
||||
},
|
||||
];
|
||||
|
||||
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
|
||||
|
||||
// Ensure bulk called but refresh not
|
||||
expect(bulkSpy).toHaveBeenCalledTimes(1);
|
||||
expect(refreshSpy).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
});
|
||||
|
||||
+2
-1
@@ -33,6 +33,7 @@ export type ElasticSearchSearchEngineIndexerOptions = {
|
||||
logger: Logger | LoggerService;
|
||||
elasticSearchClientWrapper: ElasticSearchClientWrapper;
|
||||
batchSize: number;
|
||||
skipRefresh: boolean;
|
||||
};
|
||||
|
||||
function duration(startTimestamp: [number, number]): string {
|
||||
@@ -95,7 +96,7 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
index: { _index: that.indexName },
|
||||
};
|
||||
},
|
||||
refreshOnCompletion: that.indexName,
|
||||
refreshOnCompletion: !options.skipRefresh && that.indexName,
|
||||
});
|
||||
|
||||
// Safely catch errors thrown by the bulk helper client, e.g. HTTP timeouts
|
||||
|
||||
Reference in New Issue
Block a user