Apply more deliberate validation of the entities throughout processing, making sure that errors bubble out to the state structure in the end.
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': patch
|
||||
---
|
||||
|
||||
Only validate the envelope for emitted entities, and defer full validation to when they get processed later on.
|
||||
@@ -14,7 +14,11 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { stringifyEntityRef } from '@backstage/catalog-model';
|
||||
import {
|
||||
Entity,
|
||||
entityEnvelopeSchemaValidator,
|
||||
stringifyEntityRef,
|
||||
} from '@backstage/catalog-model';
|
||||
import { serializeError } from '@backstage/errors';
|
||||
import { Logger } from 'winston';
|
||||
import { ProcessingDatabase } from './database/types';
|
||||
@@ -28,6 +32,8 @@ import {
|
||||
} from './types';
|
||||
|
||||
class Connection implements EntityProviderConnection {
|
||||
readonly validateEntityEnvelope = entityEnvelopeSchemaValidator();
|
||||
|
||||
constructor(
|
||||
private readonly config: {
|
||||
processingDatabase: ProcessingDatabase;
|
||||
@@ -37,7 +43,9 @@ class Connection implements EntityProviderConnection {
|
||||
|
||||
async applyMutation(mutation: EntityProviderMutation): Promise<void> {
|
||||
const db = this.config.processingDatabase;
|
||||
|
||||
if (mutation.type === 'full') {
|
||||
this.check(mutation.entities);
|
||||
await db.transaction(async tx => {
|
||||
await db.replaceUnprocessedEntities(tx, {
|
||||
sourceKey: this.config.id,
|
||||
@@ -47,6 +55,9 @@ class Connection implements EntityProviderConnection {
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
this.check(mutation.added);
|
||||
this.check(mutation.removed);
|
||||
await db.transaction(async tx => {
|
||||
await db.replaceUnprocessedEntities(tx, {
|
||||
sourceKey: this.config.id,
|
||||
@@ -56,6 +67,16 @@ class Connection implements EntityProviderConnection {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private check(entities: Entity[]) {
|
||||
for (const entity of entities) {
|
||||
try {
|
||||
this.validateEntityEnvelope(entity);
|
||||
} catch (e) {
|
||||
throw new TypeError(`Malformed entity envelope, ${e}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
@@ -113,14 +134,18 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
// TODO: replace Promise.all with something more sophisticated for parallel processing.
|
||||
await Promise.all(
|
||||
items.map(async item => {
|
||||
const { id, state, unprocessedEntity } = item;
|
||||
const { id, state, unprocessedEntity, entityRef } = item;
|
||||
const result = await this.orchestrator.process({
|
||||
entity: unprocessedEntity,
|
||||
state,
|
||||
});
|
||||
|
||||
for (const error of result.errors) {
|
||||
this.logger.warn(error.message);
|
||||
// TODO(freben): Try to extract the location out of the unprocessed
|
||||
// entity and add as meta to the log lines
|
||||
this.logger.warn(error.message, {
|
||||
entity: entityRef,
|
||||
});
|
||||
}
|
||||
const errorsString = JSON.stringify(
|
||||
result.errors.map(e => serializeError(e)),
|
||||
|
||||
@@ -16,37 +16,51 @@
|
||||
|
||||
import {
|
||||
Entity,
|
||||
EntityRelationSpec,
|
||||
stringifyEntityRef,
|
||||
LOCATION_ANNOTATION,
|
||||
LocationSpec,
|
||||
LocationEntity,
|
||||
entityEnvelopeSchemaValidator,
|
||||
EntityPolicy,
|
||||
EntityRelationSpec,
|
||||
entitySchemaValidator,
|
||||
LocationEntity,
|
||||
LocationSpec,
|
||||
LOCATION_ANNOTATION,
|
||||
ORIGIN_LOCATION_ANNOTATION,
|
||||
stringifyLocationReference,
|
||||
parseLocationReference,
|
||||
stringifyEntityRef,
|
||||
stringifyLocationReference,
|
||||
} from '@backstage/catalog-model';
|
||||
import { ConflictError, InputError } from '@backstage/errors';
|
||||
import { ScmIntegrationRegistry } from '@backstage/integration';
|
||||
import path from 'path';
|
||||
import { Logger } from 'winston';
|
||||
import {
|
||||
CatalogProcessor,
|
||||
CatalogProcessorParser,
|
||||
CatalogProcessorResult,
|
||||
} from '../ingestion/processors';
|
||||
import * as results from '../ingestion/processors/results';
|
||||
import {
|
||||
CatalogProcessingOrchestrator,
|
||||
EntityProcessingRequest,
|
||||
EntityProcessingResult,
|
||||
} from './types';
|
||||
import { Logger } from 'winston';
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { locationSpecToLocationEntity } from './util';
|
||||
import path from 'path';
|
||||
import * as results from '../ingestion/processors/results';
|
||||
import { ScmIntegrationRegistry } from '@backstage/integration';
|
||||
|
||||
const validateEntity = entitySchemaValidator();
|
||||
const validateEntityEnvelope = entityEnvelopeSchemaValidator();
|
||||
|
||||
function isLocationEntity(entity: Entity): entity is LocationEntity {
|
||||
return entity.kind === 'Location';
|
||||
}
|
||||
|
||||
function getEntityLocationRef(entity: Entity): string {
|
||||
const ref = entity.metadata.annotations?.[LOCATION_ANNOTATION];
|
||||
if (!ref) {
|
||||
const entityRef = stringifyEntityRef(entity);
|
||||
throw new InputError(`Entity '${entityRef}' does not have a location`);
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
function getEntityOriginLocationRef(entity: Entity): string {
|
||||
const ref = entity.metadata.annotations?.[ORIGIN_LOCATION_ANNOTATION];
|
||||
if (!ref) {
|
||||
@@ -104,23 +118,33 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
private async processSingleEntity(
|
||||
unprocessedEntity: Entity,
|
||||
): Promise<EntityProcessingResult> {
|
||||
// TODO: validate that this doesn't change during processing
|
||||
const entityRef = stringifyEntityRef(unprocessedEntity);
|
||||
// TODO: which one do we actually use here? source-location? - maybe probably doesn't exist yet?
|
||||
const locationRef =
|
||||
unprocessedEntity.metadata?.annotations?.[LOCATION_ANNOTATION];
|
||||
if (!locationRef) {
|
||||
throw new InputError(`Entity '${entityRef}' does not have a location`);
|
||||
}
|
||||
const location = parseLocationReference(locationRef);
|
||||
const originLocation = parseLocationReference(
|
||||
getEntityOriginLocationRef(unprocessedEntity),
|
||||
);
|
||||
|
||||
const emitter = createEmitter(this.options.logger, unprocessedEntity);
|
||||
try {
|
||||
// This will be checked and mutated step by step below
|
||||
let entity: Entity = unprocessedEntity;
|
||||
|
||||
// NOTE: At this early point, we can only rely on the envelope having to
|
||||
// be valid; full entity + kind validation happens after the (potentially
|
||||
// mutative) pre-steps. This means that the code below can't make a lot
|
||||
// of assumptions about the data despite it using the Entity type.
|
||||
try {
|
||||
validateEntityEnvelope(entity);
|
||||
} catch (e) {
|
||||
throw new InputError(
|
||||
`Entity envelope failed validation before processing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
|
||||
const entityRef = stringifyEntityRef(entity);
|
||||
// TODO: which one do we actually use here? source-location? - maybe probably doesn't exist yet?
|
||||
const locationRef = getEntityLocationRef(entity);
|
||||
const location = parseLocationReference(locationRef);
|
||||
const originLocation = parseLocationReference(
|
||||
getEntityOriginLocationRef(entity),
|
||||
);
|
||||
|
||||
// Pre-process phase, used to populate entities with data that is required during main processing step
|
||||
let entity = unprocessedEntity;
|
||||
for (const processor of this.options.processors) {
|
||||
if (processor.preProcessEntity) {
|
||||
try {
|
||||
@@ -131,48 +155,64 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
originLocation,
|
||||
);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Processor ${processor.constructor.name} threw an error while preprocessing entity ${entityRef} at ${locationRef}, ${e}`,
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while preprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce entity policies making sure that entities conform to a general schema
|
||||
let policyEnforcedEntity;
|
||||
let policyEnforcedEntity: Entity | undefined;
|
||||
try {
|
||||
policyEnforcedEntity = await this.options.policy.enforce(entity);
|
||||
} catch (e) {
|
||||
throw new InputError(
|
||||
`Policy check failed while analyzing entity ${entityRef} at ${locationRef}, ${e}`,
|
||||
);
|
||||
throw new InputError('Policy check failed', e);
|
||||
}
|
||||
if (!policyEnforcedEntity) {
|
||||
throw new Error(
|
||||
`Policy unexpectedly returned no data while analyzing entity ${entityRef} at ${locationRef}`,
|
||||
);
|
||||
throw new Error('Policy unexpectedly returned no data');
|
||||
}
|
||||
entity = policyEnforcedEntity;
|
||||
|
||||
// Validate that the end result is a valid Entity at all
|
||||
try {
|
||||
validateEntity(entity);
|
||||
} catch (e) {
|
||||
throw new ConflictError(
|
||||
`Entity envelope failed validation after preprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
|
||||
// Validate the given entity kind against its schema
|
||||
let handled = false;
|
||||
let didValidate = false;
|
||||
for (const processor of this.options.processors) {
|
||||
if (processor.validateEntityKind) {
|
||||
try {
|
||||
handled = await processor.validateEntityKind(entity);
|
||||
if (handled) {
|
||||
didValidate = await processor.validateEntityKind(entity);
|
||||
if (didValidate) {
|
||||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while validating the entity ${entityRef} at ${locationRef}, ${e}`,
|
||||
`Processor ${processor.constructor.name} threw an error while validating the entity`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!handled) {
|
||||
if (!didValidate) {
|
||||
throw new InputError(
|
||||
`No processor recognized the entity ${entityRef} at ${locationRef}`,
|
||||
'No processor recognized the entity as valid, possibly caused by a foreign kind or apiVersion',
|
||||
);
|
||||
}
|
||||
|
||||
// Double check that none of the previous steps tried to change something
|
||||
// related to the entity ref, which would break downstream
|
||||
if (stringifyEntityRef(entity) !== entityRef) {
|
||||
throw new ConflictError(
|
||||
'Fatal: The entity kind, namespace, or name changed during processing',
|
||||
);
|
||||
}
|
||||
|
||||
@@ -204,6 +244,7 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
maybeRelativeTarget,
|
||||
);
|
||||
|
||||
let didRead = false;
|
||||
for (const processor of this.options.processors) {
|
||||
if (processor.readLocation) {
|
||||
try {
|
||||
@@ -218,15 +259,22 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
this.options.parser,
|
||||
);
|
||||
if (read) {
|
||||
didRead = true;
|
||||
break;
|
||||
}
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Processor ${processor.constructor.name} threw an error while postprocessing entity ${entityRef} at ${locationRef}, ${e}`,
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while reading ${type}:${target}`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!didRead) {
|
||||
throw new InputError(
|
||||
`No processor was able to handle reading of ${type}:${target}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,8 +288,9 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
emitter.emit,
|
||||
);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Processor ${processor.constructor.name} threw an error while postprocessing entity ${entityRef} at ${locationRef}, ${e}`,
|
||||
throw new InputError(
|
||||
`Processor ${processor.constructor.name} threw an error while postprocessing`,
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -255,7 +304,10 @@ export class DefaultCatalogProcessingOrchestrator
|
||||
};
|
||||
} catch (error) {
|
||||
this.options.logger.warn(error.message);
|
||||
return { ok: false, errors: emitter.results().errors.concat(error) };
|
||||
return {
|
||||
ok: false,
|
||||
errors: emitter.results().errors.concat(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -276,23 +328,40 @@ function createEmitter(logger: Logger, parentEntity: Entity) {
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (i.type === 'entity') {
|
||||
// TODO(freben): Perform the most basic validation here
|
||||
// (apiVersion, kind, metadata, metadata.name, metadata.namespace, spec)
|
||||
let entity: Entity;
|
||||
try {
|
||||
entity = validateEntityEnvelope(i.entity);
|
||||
} catch (e) {
|
||||
logger.debug(`Envelope validation failed at ${i.location}, ${e}`);
|
||||
errors.push(e);
|
||||
return;
|
||||
}
|
||||
|
||||
const originLocation = getEntityOriginLocationRef(parentEntity);
|
||||
|
||||
deferredEntities.push({
|
||||
...i.entity,
|
||||
metadata: {
|
||||
...i.entity.metadata,
|
||||
annotations: {
|
||||
...i.entity.metadata.annotations,
|
||||
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
|
||||
[LOCATION_ANNOTATION]: stringifyLocationReference(i.location),
|
||||
// Note that at this point, we have only validated the envelope part of
|
||||
// the entity data. Annotations are not part of that, so we have to be
|
||||
// defensive. If the annotations were malformed (e.g. were not a valid
|
||||
// object), we just skip over this step and let the full entity
|
||||
// validation at the next step of processing catch that.
|
||||
const annotations = entity.metadata.annotations || {};
|
||||
if (typeof annotations === 'object' && !Array.isArray(annotations)) {
|
||||
const originLocation = getEntityOriginLocationRef(parentEntity);
|
||||
const location = stringifyLocationReference(i.location);
|
||||
entity = {
|
||||
...entity,
|
||||
metadata: {
|
||||
...entity.metadata,
|
||||
annotations: {
|
||||
...annotations,
|
||||
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
|
||||
[LOCATION_ANNOTATION]: location,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
deferredEntities.push(entity);
|
||||
} else if (i.type === 'location') {
|
||||
deferredEntities.push(
|
||||
locationSpecToLocationEntity(i.location, parentEntity),
|
||||
|
||||
@@ -176,7 +176,7 @@ export class Stitcher {
|
||||
statusItems = parsedErrors.map(e => ({
|
||||
type: ENTITY_STATUS_CATALOG_PROCESSING_TYPE,
|
||||
level: 'error',
|
||||
message: e.toString(),
|
||||
message: `${e.name}: ${e.message}`,
|
||||
error: e,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -14,24 +14,22 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { ConflictError, NotFoundError } from '@backstage/errors';
|
||||
import { stringifyEntityRef, Entity } from '@backstage/catalog-model';
|
||||
import { Knex } from 'knex';
|
||||
import { Transaction } from '../../database';
|
||||
import lodash from 'lodash';
|
||||
|
||||
import {
|
||||
ProcessingDatabase,
|
||||
AddUnprocessedEntitiesOptions,
|
||||
UpdateProcessedEntityOptions,
|
||||
GetProcessableEntitiesResult,
|
||||
ReplaceUnprocessedEntitiesOptions,
|
||||
RefreshStateItem,
|
||||
} from './types';
|
||||
import type { Logger } from 'winston';
|
||||
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { Entity, stringifyEntityRef } from '@backstage/catalog-model';
|
||||
import { JsonObject } from '@backstage/config';
|
||||
import { ConflictError, NotFoundError } from '@backstage/errors';
|
||||
import { Knex } from 'knex';
|
||||
import lodash from 'lodash';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import type { Logger } from 'winston';
|
||||
import { Transaction } from '../../database';
|
||||
import {
|
||||
AddUnprocessedEntitiesOptions,
|
||||
GetProcessableEntitiesResult,
|
||||
ProcessingDatabase,
|
||||
RefreshStateItem,
|
||||
ReplaceUnprocessedEntitiesOptions,
|
||||
UpdateProcessedEntityOptions,
|
||||
} from './types';
|
||||
|
||||
export type DbRefreshStateRow = {
|
||||
entity_id: string;
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
|
||||
import {
|
||||
Entity,
|
||||
LocationSpec,
|
||||
Location,
|
||||
EntityRelationSpec,
|
||||
Location,
|
||||
LocationSpec,
|
||||
} from '@backstage/catalog-model';
|
||||
import { JsonObject } from '@backstage/config';
|
||||
|
||||
|
||||
Reference in New Issue
Block a user