@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend-module-incremental-ingestion': minor
|
||||
---
|
||||
|
||||
Make incremental providers more resilient to failures
|
||||
+21
-9
@@ -287,6 +287,15 @@ export class IncrementalIngestionDatabaseManager {
|
||||
*/
|
||||
async computeRemoved(provider: string, ingestionId: string) {
|
||||
return await this.client.transaction(async tx => {
|
||||
const rows = await tx('final_entities')
|
||||
.count({ total: '*' })
|
||||
.join('search', 'search.entity_id', 'final_entities.entity_id')
|
||||
.where(
|
||||
'search.key',
|
||||
`metadata.annotations.${INCREMENTAL_ENTITY_PROVIDER_ANNOTATION}`,
|
||||
)
|
||||
.andWhere('search.value', provider);
|
||||
const total = rows.reduce((acc, cur) => acc + (cur.total as number), 0);
|
||||
const removed: { entity: string; ref: string }[] = await tx(
|
||||
'final_entities',
|
||||
)
|
||||
@@ -302,23 +311,26 @@ export class IncrementalIngestionDatabaseManager {
|
||||
.join('search', 'search.entity_id', 'final_entities.entity_id')
|
||||
.whereNotIn(
|
||||
'entity_ref',
|
||||
tx('ingestion_marks')
|
||||
tx('ingestion.ingestion_marks')
|
||||
.join(
|
||||
'ingestion_mark_entities',
|
||||
'ingestion_marks.id',
|
||||
'ingestion_mark_entities.ingestion_mark_id',
|
||||
'ingestion.ingestion_mark_entities',
|
||||
'ingestion.ingestion_marks.id',
|
||||
'ingestion.ingestion_mark_entities.ingestion_mark_id',
|
||||
)
|
||||
.select('ingestion_mark_entities.ref')
|
||||
.where('ingestion_marks.ingestion_id', ingestionId),
|
||||
.select('ingestion.ingestion_mark_entities.ref')
|
||||
.where('ingestion.ingestion_marks.ingestion_id', ingestionId),
|
||||
)
|
||||
.andWhere(
|
||||
'search.key',
|
||||
`metadata.annotations.${INCREMENTAL_ENTITY_PROVIDER_ANNOTATION}`,
|
||||
)
|
||||
.andWhere('search.value', provider);
|
||||
return removed.map(entity => {
|
||||
return { entity: JSON.parse(entity.entity) };
|
||||
});
|
||||
return {
|
||||
total,
|
||||
removed: removed.map(entity => {
|
||||
return { entity: JSON.parse(entity.entity) };
|
||||
}),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
+28
-5
@@ -27,6 +27,8 @@ import { Duration, DurationObjectUnits } from 'luxon';
|
||||
import { v4 } from 'uuid';
|
||||
import { stringifyError } from '@backstage/errors';
|
||||
|
||||
const REMOVAL_THRESHOLD = 5;
|
||||
|
||||
export class IncrementalIngestionEngine implements IterationEngine {
|
||||
private readonly restLength: Duration;
|
||||
private readonly backoff: DurationObjectUnits[];
|
||||
@@ -279,12 +281,33 @@ export class IncrementalIngestionEngine implements IterationEngine {
|
||||
const removed: DeferredEntity[] = [];
|
||||
|
||||
if (done) {
|
||||
removed.push(
|
||||
...(await this.manager.computeRemoved(
|
||||
this.options.provider.getProviderName(),
|
||||
id,
|
||||
)),
|
||||
this.options.logger.info(
|
||||
`incremental-engine: Ingestion '${id}': Final page reached, calculating removed entities`,
|
||||
);
|
||||
const result = await this.manager.computeRemoved(
|
||||
this.options.provider.getProviderName(),
|
||||
id,
|
||||
);
|
||||
const total = result.total + added.length;
|
||||
const percentRemoved =
|
||||
total > 0 ? (result.removed.length / total) * 100 : 0;
|
||||
if (percentRemoved <= REMOVAL_THRESHOLD) {
|
||||
this.options.logger.info(
|
||||
`incremental-engine: Ingestion '${id}': Removing ${result.removed.length} entities that have no matching assets`,
|
||||
);
|
||||
removed.push(...result.removed);
|
||||
} else {
|
||||
const notice = `Attempted to remove ${percentRemoved}% of ${total} matching entities!`;
|
||||
this.options.logger.error(
|
||||
`incremental-engine: Ingestion '${id}': ${notice}`,
|
||||
);
|
||||
await this.manager.updateIngestionRecordById({
|
||||
ingestionId: id,
|
||||
update: {
|
||||
last_error: `REMOVAL_THRESHOLD exceeded on ingestion mark ${markId}: ${notice}`,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
await this.options.connection.applyMutation({
|
||||
|
||||
Reference in New Issue
Block a user