feat(search-backend-module-elasticsearch): ensure all stale indices are deleted

Signed-off-by: Thomas Cardonne <thomas.cardonne@adevinta.com>
This commit is contained in:
Thomas Cardonne
2024-05-08 00:43:22 +02:00
parent 5b958682f8
commit b18670184f
5 changed files with 68 additions and 107 deletions
+10
View File
@@ -0,0 +1,10 @@
---
'@backstage/plugin-search-backend-module-elasticsearch': minor
---
**BREAKING** The ElasticSearch indexer will now delete stale indices matching the indexer's pattern.
An indexer using the `some-type-index__*` pattern will remove indices matching this pattern after indexation
to prevent stale indices leading to shards exhaustion.
Note: The ElasticSearch indexer already uses wildcards patterns to remove aliases on these indices.
@@ -28,17 +28,13 @@ jest.mock('@elastic/elasticsearch', () => ({
search: jest
.fn()
.mockImplementation(async args => ({ client: 'es', args })),
cat: {
aliases: jest
.fn()
.mockImplementation(async args => ({ client: 'es', args })),
},
helpers: {
bulk: jest
.fn()
.mockImplementation(async args => ({ client: 'es', args })),
},
indices: {
get: jest.fn().mockImplementation(async args => ({ client: 'es', args })),
create: jest
.fn()
.mockImplementation(async args => ({ client: 'es', args })),
@@ -64,17 +60,13 @@ jest.mock('@opensearch-project/opensearch', () => ({
search: jest
.fn()
.mockImplementation(async args => ({ client: 'os', args })),
cat: {
aliases: jest
.fn()
.mockImplementation(async args => ({ client: 'os', args })),
},
helpers: {
bulk: jest
.fn()
.mockImplementation(async args => ({ client: 'os', args })),
},
indices: {
get: jest.fn().mockImplementation(async args => ({ client: 'os', args })),
create: jest
.fn()
.mockImplementation(async args => ({ client: 'os', args })),
@@ -154,6 +146,16 @@ describe('ElasticSearchClientWrapper', () => {
expect(result.args).toStrictEqual(indexTemplate);
});
it('indexList', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
const input = { index: 'xyz-*' };
const result = (await wrapper.listIndices(input)) as any;
expect(result.client).toBe('es');
expect(result.args).toStrictEqual(input);
});
it('indexExists', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
@@ -187,20 +189,6 @@ describe('ElasticSearchClientWrapper', () => {
expect(result.args).toStrictEqual(input);
});
it('getAliases', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
const input = { aliases: ['xyz'] };
const result = (await wrapper.getAliases(input)) as any;
// Should call the OpenSearch client with expected input.
expect(result.client).toBe('es');
expect(result.args).toStrictEqual({
format: 'json',
name: input.aliases,
});
});
it('updateAliases', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(esOptions);
@@ -282,6 +270,16 @@ describe('ElasticSearchClientWrapper', () => {
expect(result.args).toStrictEqual(indexTemplate);
});
it('indexList', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
const input = { index: 'xyz-*' };
const result = (await wrapper.listIndices(input)) as any;
expect(result.client).toBe('os');
expect(result.args).toStrictEqual(input);
});
it('indexExists', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
@@ -315,20 +313,6 @@ describe('ElasticSearchClientWrapper', () => {
expect(result.args).toStrictEqual(input);
});
it('getAliases', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
const input = { aliases: ['xyz'] };
const result = (await wrapper.getAliases(input)) as any;
// Should call the OpenSearch client with expected input.
expect(result.client).toBe('os');
expect(result.args).toStrictEqual({
format: 'json',
name: input.aliases,
});
});
it('updateAliases', async () => {
const wrapper = ElasticSearchClientWrapper.fromClientOptions(osOptions);
@@ -141,6 +141,18 @@ export class ElasticSearchClientWrapper {
throw new Error('No client defined');
}
listIndices(options: { index: string }) {
if (this.openSearchClient) {
return this.openSearchClient.indices.get(options);
}
if (this.elasticSearchClient) {
return this.elasticSearchClient.indices.get(options);
}
throw new Error('No client defined');
}
indexExists(options: { index: string | string[] }) {
if (this.openSearchClient) {
return this.openSearchClient.indices.exists(options);
@@ -177,26 +189,6 @@ export class ElasticSearchClientWrapper {
throw new Error('No client defined');
}
getAliases(options: { aliases: string[] }) {
const { aliases } = options;
if (this.openSearchClient) {
return this.openSearchClient.cat.aliases({
format: 'json',
name: aliases,
});
}
if (this.elasticSearchClient) {
return this.elasticSearchClient.cat.aliases({
format: 'json',
name: aliases,
});
}
throw new Error('No client defined');
}
updateAliases(options: { actions: ElasticSearchAliasAction[] }) {
const filteredActions = options.actions.filter(Boolean);
@@ -30,7 +30,7 @@ const clientWrapper = ElasticSearchClientWrapper.fromClientOptions({
describe('ElasticSearchSearchEngineIndexer', () => {
let indexer: ElasticSearchSearchEngineIndexer;
let bulkSpy: jest.Mock;
let catSpy: jest.Mock;
let getSpy: jest.Mock;
let createSpy: jest.Mock;
let aliasesSpy: jest.Mock;
let deleteSpy: jest.Mock;
@@ -68,30 +68,24 @@ describe('ElasticSearchSearchEngineIndexer', () => {
refreshSpy,
);
catSpy = jest.fn().mockReturnValue([
{
alias: 'some-type-index__search',
index: 'some-type-index__123tobedeleted',
filter: '-',
'routing.index': '-',
'routing.search': '-',
is_write_index: '-',
getSpy = jest.fn().mockReturnValue({
'some-type-index__123tobedeleted': {
aliases: {},
mappings: {},
settings: {},
},
{
alias: 'some-type-index__search_removable',
index: 'some-type-index__456tobedeleted',
filter: '-',
'routing.index': '-',
'routing.search': '-',
is_write_index: '-',
'some-type-index__456tobedeleted': {
aliases: {},
mappings: {},
settings: {},
},
]);
});
mock.add(
{
method: 'GET',
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
path: '/some-type-index__*',
},
catSpy,
getSpy,
);
createSpy = jest.fn().mockReturnValue({
@@ -143,7 +137,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
// Older indices should have been queried for.
expect(catSpy).toHaveBeenCalled();
expect(getSpy).toHaveBeenCalled();
// A new index should have been created.
const createdIndex = createSpy.mock.calls[0][0].path.slice(1);
@@ -163,15 +157,6 @@ describe('ElasticSearchSearchEngineIndexer', () => {
remove: { index: 'some-type-index__*', alias: 'some-type-index__search' },
});
expect(aliasActions[1]).toStrictEqual({
add: {
indices: [
'some-type-index__123tobedeleted',
'some-type-index__456tobedeleted',
],
alias: 'some-type-index__search_removable',
},
});
expect(aliasActions[2]).toStrictEqual({
add: { index: createdIndex, alias: 'some-type-index__search' },
});
@@ -183,7 +168,7 @@ describe('ElasticSearchSearchEngineIndexer', () => {
await TestPipeline.fromIndexer(indexer).withDocuments([]).execute();
// Older indices should have been queried for.
expect(catSpy).toHaveBeenCalled();
expect(getSpy).toHaveBeenCalled();
// A new index should have been created.
expect(createSpy).toHaveBeenNthCalledWith(
@@ -236,17 +221,17 @@ describe('ElasticSearchSearchEngineIndexer', () => {
];
// Update initial alias cat to return nothing.
catSpy = jest.fn().mockReturnValue([]);
getSpy = jest.fn().mockReturnValue({});
mock.clear({
method: 'GET',
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
path: '/some-type-index__*',
});
mock.add(
{
method: 'GET',
path: '/_cat/aliases/some-type-index__search%2Csome-type-index__search_removable',
path: '/some-type-index__*',
},
catSpy,
getSpy,
);
await TestPipeline.fromIndexer(indexer).withDocuments(documents).execute();
@@ -56,7 +56,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
private readonly indexPrefix: string;
private readonly indexSeparator: string;
private readonly alias: string;
private readonly removableAlias: string;
private readonly logger: Logger | LoggerService;
private readonly sourceStream: Readable;
private readonly elasticSearchClientWrapper: ElasticSearchClientWrapper;
@@ -74,7 +73,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
this.indexSeparator = options.indexSeparator;
this.indexName = this.constructIndexName(`${Date.now()}`);
this.alias = options.alias;
this.removableAlias = `${this.alias}_removable`;
this.elasticSearchClientWrapper = options.elasticSearchClientWrapper;
// The ES client bulk helper supports stream-based indexing, but we have to
@@ -108,13 +106,13 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
async initialize(): Promise<void> {
this.logger.info(`Started indexing documents for index ${this.type}`);
const aliases = await this.elasticSearchClientWrapper.getAliases({
aliases: [this.alias, this.removableAlias],
const indices = await this.elasticSearchClientWrapper.listIndices({
index: this.constructIndexName('*'),
});
this.removableIndices = [
...new Set(aliases.body.map((r: Record<string, any>) => r.index)),
] as string[];
for (const key of Object.keys(indices.body)) {
this.removableIndices.push(key);
}
await this.elasticSearchClientWrapper.createIndex({
index: this.indexName,
@@ -171,14 +169,6 @@ export class ElasticSearchSearchEngineIndexer extends BatchSearchEngineIndexer {
{
remove: { index: this.constructIndexName('*'), alias: this.alias },
},
this.removableIndices.length
? {
add: {
indices: this.removableIndices,
alias: this.removableAlias,
},
}
: undefined,
{
add: { index: this.indexName, alias: this.alias },
},