feat(search-backend-module-elasticsearch): ensure all stale indices are deleted
Signed-off-by: Thomas Cardonne <thomas.cardonne@adevinta.com>
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-elasticsearch': minor
|
||||
---
|
||||
|
||||
**BREAKING** The ElasticSearch indexer will now delete stale indices matching the indexer's pattern.
|
||||
|
||||
An indexer using the `some-type-index__*` pattern will remove indices matching this pattern after indexation
|
||||
to prevent stale indices leading to shards exhaustion.
|
||||
|
||||
Note: The ElasticSearch indexer already uses wildcards patterns to remove aliases on these indices.
|
||||
+22
-38
@@ -28,17 +28,13 @@ jest.mock('@elastic/elasticsearch', () => ({
|
||||
search: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'es', args })),
|
||||
cat: {
|
||||
aliases: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'es', args })),
|
||||
},
|
||||
helpers: {
|
||||
bulk: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'es', args })),
|
||||
},
|
||||
indices: {
|
||||
get: jest.fn().mockImplementation(async args => ({ client: 'es', args })),
|
||||
create: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'es', args })),
|
||||
@@ -64,17 +60,13 @@ jest.mock('@opensearch-project/opensearch', () => ({
|
||||
search: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'os', args })),
|
||||
cat: {
|
||||
aliases: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'os', args })),
|
||||
},
|
||||
helpers: {
|
||||
bulk: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'os', args })),
|
||||
},
|
||||
indices: {
|
||||
get: jest.fn().mockImplementation(async args => ({ client: 'os', args })),
|
||||
create: jest
|
||||
.fn()
|
||||
.mockImplementation(async args => ({ client: 'os', args })),
|
||||
@@ -154,6 +146,16 @@ describe('ElasticSearchClientWrapper', () => {
|
||||
expect(result.args).toStrictEqual(indexTemplate);
|
||||
});
|
||||
|
||||
it('indexList', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
|
||||
|
||||
const input = { index: 'xyz-*' };
|
||||
const result = (await wrapper.listIndices(input)) as any;
|
||||
|
||||
expect(result.client).toBe('es');
|
||||
expect(result.args).toStrictEqual(input);
|
||||
});
|
||||
|
||||
it('indexExists', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
|
||||
|
||||
@@ -187,20 +189,6 @@ describe('ElasticSearchClientWrapper', () => {
|
||||
expect(result.args).toStrictEqual(input);
|
||||
});
|
||||
|
||||
it('getAliases', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
|
||||
|
||||
const input = { aliases: ['xyz'] };
|
||||
const result = (await wrapper.getAliases(input)) as any;
|
||||
|
||||
// Should call the OpenSearch client with expected input.
|
||||
expect(result.client).toBe('es');
|
||||
expect(result.args).toStrictEqual({
|
||||
format: 'json',
|
||||
name: input.aliases,
|
||||
});
|
||||
});
|
||||
|
||||
it('updateAliases', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
|
||||
|
||||
@@ -282,6 +270,16 @@ describe('ElasticSearchClientWrapper', () => {
|
||||
expect(result.args).toStrictEqual(indexTemplate);
|
||||
});
|
||||
|
||||
it('indexList', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
|
||||
|
||||
const input = { index: 'xyz-*' };
|
||||
const result = (await wrapper.listIndices(input)) as any;
|
||||
|
||||
expect(result.client).toBe('os');
|
||||
expect(result.args).toStrictEqual(input);
|
||||
});
|
||||
|
||||
it('indexExists', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
|
||||
|
||||
@@ -315,20 +313,6 @@ describe('ElasticSearchClientWrapper', () => {
|
||||
expect(result.args).toStrictEqual(input);
|
||||
});
|
||||
|
||||
it('getAliases', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
|
||||
|
||||
const input = { aliases: ['xyz'] };
|
||||
const result = (await wrapper.getAliases(input)) as any;
|
||||
|
||||
// Should call the OpenSearch client with expected input.
|
||||
expect(result.client).toBe('os');
|
||||
expect(result.args).toStrictEqual({
|
||||
format: 'json',
|
||||
name: input.aliases,
|
||||
});
|
||||
});
|
||||
|
||||
it('updateAliases', async () => {
|
||||
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
|
||||
|
||||
|
||||
+12
-20
@@ -141,6 +141,18 @@ export class ElasticSearchClientWrapper {
|
||||
throw new Error('No client defined');
|
||||
}
|
||||
|
||||
listIndices(options: { index: string }) {
|
||||
if (this.openSearchClient) {
|
||||
return this.openSearchClient.indices.get(options);
|
||||
}
|
||||
|
||||
if (this.elasticSearchClient) {
|
||||
return this.elasticSearchClient.indices.get(options);
|
||||
}
|
||||
|
||||
throw new Error('No client defined');
|
||||
}
|
||||
|
||||
indexExists(options: { index: string | string[] }) {
|
||||
if (this.openSearchClient) {
|
||||
return this.openSearchClient.indices.exists(options);
|
||||
@@ -177,26 +189,6 @@ export class ElasticSearchClientWrapper {
|
||||
throw new Error('No client defined');
|
||||
}
|
||||
|
||||
getAliases(options: { aliases: string[] }) {
|
||||
const { aliases } = options;
|
||||
|
||||
if (this.openSearchClient) {
|
||||
return this.openSearchClient.cat.aliases({
|
||||
format: 'json',
|
||||
name: aliases,
|
||||
});
|
||||
}
|
||||
|
||||
if (this.elasticSearchClient) {
|
||||
return this.elasticSearchClient.cat.aliases({
|
||||
format: 'json',
|
||||
name: aliases,
|
||||
});
|
||||
}
|
||||
|
||||
throw new Error('No client defined');
|
||||
}
|
||||
|
||||
updateAliases(options: { actions: ElasticSearchAliasAction[] }) {
|
||||
const filteredActions = options.actions.filter(Boolean);
|
||||
|
||||
|
||||
+19
-34
@@ -30,7 +30,7 @@ const clientWrapper = ElasticSearchClientWrapper.fromClientOptions({
|
||||
describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
let indexer: ElasticSearchSearchEngineIndexer;
|
||||
let bulkSpy: jest.Mock;
|
||||
let catSpy: jest.Mock;
|
||||
let getSpy: jest.Mock;
|
||||
let createSpy: jest.Mock;
|
||||
let aliasesSpy: jest.Mock;
|
||||
let deleteSpy: jest.Mock;
|
||||
@@ -68,30 +68,24 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
refreshSpy,
|
||||
);
|
||||
|
||||
catSpy = jest.fn().mockReturnValue([
|
||||
{
|
||||
alias: 'some-type-index__search',
|
||||
index: 'some-type-index__123tobedeleted',
|
||||
filter: '-',
|
||||
'routing.index': '-',
|
||||
'routing.search': '-',
|
||||
is_write_index: '-',
|
||||
getSpy = jest.fn().mockReturnValue({
|
||||
'some-type-index__123tobedeleted': {
|
||||
aliases: {},
|
||||
mappings: {},
|
||||
settings: {},
|
||||
},
|
||||
{
|
||||
alias: 'some-type-index__search_removable',
|
||||
index: 'some-type-index__456tobedeleted',
|
||||
filter: '-',
|
||||
'routing.index': '-',
|
||||
'routing.search': '-',
|
||||
is_write_index: '-',
|
||||
'some-type-index__456tobedeleted': {
|
||||
aliases: {},
|
||||
mappings: {},
|
||||
settings: {},
|
||||
},
|
||||
]);
|
||||
});
|
||||
mock.add(
|
||||
{
|
||||
method: 'GET',
|
||||
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
|
||||
path: '/some-type-index__*',
|
||||
},
|
||||
catSpy,
|
||||
getSpy,
|
||||
);
|
||||
|
||||
createSpy = jest.fn().mockReturnValue({
|
||||
@@ -143,7 +137,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
|
||||
|
||||
// Older indices should have been queried for.
|
||||
expect(catSpy).toHaveBeenCalled();
|
||||
expect(getSpy).toHaveBeenCalled();
|
||||
|
||||
// A new index should have been created.
|
||||
const createdIndex = createSpy.mock.calls[0][0].path.slice(1);
|
||||
@@ -163,15 +157,6 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
remove: { index: 'some-type-index__*', alias: 'some-type-index__search' },
|
||||
});
|
||||
expect(aliasActions[1]).toStrictEqual({
|
||||
add: {
|
||||
indices: [
|
||||
'some-type-index__123tobedeleted',
|
||||
'some-type-index__456tobedeleted',
|
||||
],
|
||||
alias: 'some-type-index__search_removable',
|
||||
},
|
||||
});
|
||||
expect(aliasActions[2]).toStrictEqual({
|
||||
add: { index: createdIndex, alias: 'some-type-index__search' },
|
||||
});
|
||||
|
||||
@@ -183,7 +168,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
await TestPipeline.fromIndexer(indexer).withDocuments([]).execute();
|
||||
|
||||
// Older indices should have been queried for.
|
||||
expect(catSpy).toHaveBeenCalled();
|
||||
expect(getSpy).toHaveBeenCalled();
|
||||
|
||||
// A new index should have been created.
|
||||
expect(createSpy).toHaveBeenNthCalledWith(
|
||||
@@ -236,17 +221,17 @@ describe('ElasticSearchSearchEngineIndexer', () => {
|
||||
];
|
||||
|
||||
// Update initial alias cat to return nothing.
|
||||
catSpy = jest.fn().mockReturnValue([]);
|
||||
getSpy = jest.fn().mockReturnValue({});
|
||||
mock.clear({
|
||||
method: 'GET',
|
||||
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
|
||||
path: '/some-type-index__*',
|
||||
});
|
||||
mock.add(
|
||||
{
|
||||
method: 'GET',
|
||||
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
|
||||
path: '/some-type-index__*',
|
||||
},
|
||||
catSpy,
|
||||
getSpy,
|
||||
);
|
||||
|
||||
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
|
||||
|
||||
+5
-15
@@ -56,7 +56,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
private readonly indexPrefix: string;
|
||||
private readonly indexSeparator: string;
|
||||
private readonly alias: string;
|
||||
private readonly removableAlias: string;
|
||||
private readonly logger: Logger | LoggerService;
|
||||
private readonly sourceStream: Readable;
|
||||
private readonly elasticSearchClientWrapper: ElasticSearchClientWrapper;
|
||||
@@ -74,7 +73,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
this.indexSeparator = options.indexSeparator;
|
||||
this.indexName = this.constructIndexName(`${Date.now()}`);
|
||||
this.alias = options.alias;
|
||||
this.removableAlias = `${this.alias}_removable`;
|
||||
this.elasticSearchClientWrapper = options.elasticSearchClientWrapper;
|
||||
|
||||
// The ES client bulk helper supports stream-based indexing, but we have to
|
||||
@@ -108,13 +106,13 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
async initialize(): Promise<void> {
|
||||
this.logger.info(`Started indexing documents for index ${this.type}`);
|
||||
|
||||
const aliases = await this.elasticSearchClientWrapper.getAliases({
|
||||
aliases: [this.alias, this.removableAlias],
|
||||
const indices = await this.elasticSearchClientWrapper.listIndices({
|
||||
index: this.constructIndexName('*'),
|
||||
});
|
||||
|
||||
this.removableIndices = [
|
||||
...new Set(aliases.body.map((r: Record<string, any>) => r.index)),
|
||||
] as string[];
|
||||
for (const key of Object.keys(indices.body)) {
|
||||
this.removableIndices.push(key);
|
||||
}
|
||||
|
||||
await this.elasticSearchClientWrapper.createIndex({
|
||||
index: this.indexName,
|
||||
@@ -171,14 +169,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
{
|
||||
remove: { index: this.constructIndexName('*'), alias: this.alias },
|
||||
},
|
||||
this.removableIndices.length
|
||||
? {
|
||||
add: {
|
||||
indices: this.removableIndices,
|
||||
alias: this.removableAlias,
|
||||
},
|
||||
}
|
||||
: undefined,
|
||||
{
|
||||
add: { index: this.indexName, alias: this.alias },
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user