Update changeset

Signed-off-by: Damon Kaswell <damon.kaswell1@hp.com>
This commit is contained in:
Damon Kaswell
2022-11-28 07:50:31 -08:00
parent 3733c02da0
commit 61d4efe978
3 changed files with 54 additions and 14 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend-module-incremental-ingestion': minor
---
Make incremental providers more resilient to failures
@@ -287,6 +287,15 @@ export class IncrementalIngestionDatabaseManager {
*/
async computeRemoved(provider: string, ingestionId: string) {
return await this.client.transaction(async tx => {
const rows = await tx('final_entities')
.count({ total: '*' })
.join('search', 'search.entity_id', 'final_entities.entity_id')
.where(
'search.key',
`metadata.annotations.${INCREMENTAL_ENTITY_PROVIDER_ANNOTATION}`,
)
.andWhere('search.value', provider);
const total = rows.reduce((acc, cur) => acc + (cur.total as number), 0);
const removed: { entity: string; ref: string }[] = await tx(
'final_entities',
)
@@ -302,23 +311,26 @@ export class IncrementalIngestionDatabaseManager {
.join('search', 'search.entity_id', 'final_entities.entity_id')
.whereNotIn(
'entity_ref',
tx('ingestion_marks')
tx('ingestion.ingestion_marks')
.join(
'ingestion_mark_entities',
'ingestion_marks.id',
'ingestion_mark_entities.ingestion_mark_id',
'ingestion.ingestion_mark_entities',
'ingestion.ingestion_marks.id',
'ingestion.ingestion_mark_entities.ingestion_mark_id',
)
.select('ingestion_mark_entities.ref')
.where('ingestion_marks.ingestion_id', ingestionId),
.select('ingestion.ingestion_mark_entities.ref')
.where('ingestion.ingestion_marks.ingestion_id', ingestionId),
)
.andWhere(
'search.key',
`metadata.annotations.${INCREMENTAL_ENTITY_PROVIDER_ANNOTATION}`,
)
.andWhere('search.value', provider);
return removed.map(entity => {
return { entity: JSON.parse(entity.entity) };
});
return {
total,
removed: removed.map(entity => {
return { entity: JSON.parse(entity.entity) };
}),
};
});
}
@@ -27,6 +27,8 @@ import { Duration, DurationObjectUnits } from 'luxon';
import { v4 } from 'uuid';
import { stringifyError } from '@backstage/errors';
const REMOVAL_THRESHOLD = 5;
export class IncrementalIngestionEngine implements IterationEngine {
private readonly restLength: Duration;
private readonly backoff: DurationObjectUnits[];
@@ -279,12 +281,33 @@ export class IncrementalIngestionEngine implements IterationEngine {
const removed: DeferredEntity[] = [];
if (done) {
removed.push(
...(await this.manager.computeRemoved(
this.options.provider.getProviderName(),
id,
)),
this.options.logger.info(
`incremental-engine: Ingestion '${id}': Final page reached, calculating removed entities`,
);
const result = await this.manager.computeRemoved(
this.options.provider.getProviderName(),
id,
);
const total = result.total + added.length;
const percentRemoved =
total > 0 ? (result.removed.length / total) * 100 : 0;
if (percentRemoved <= REMOVAL_THRESHOLD) {
this.options.logger.info(
`incremental-engine: Ingestion '${id}': Removing ${result.removed.length} entities that have no matching assets`,
);
removed.push(...result.removed);
} else {
const notice = `Attempted to remove ${percentRemoved}% of ${total} matching entities!`;
this.options.logger.error(
`incremental-engine: Ingestion '${id}': ${notice}`,
);
await this.manager.updateIngestionRecordById({
ingestionId: id,
update: {
last_error: `REMOVAL_THRESHOLD exceeded on ingestion mark ${markId}: ${notice}`,
},
});
}
}
await this.options.connection.applyMutation({