Apply more deliberate validation of the entities throughout processing, making sure that errors bubble out to the state structure in the end.

Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2021-06-01 16:17:25 +02:00
parent 1422f9dbae
commit e7a5a34740
6 changed files with 179 additions and 82 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Only validate the envelope for emitted entities, and defer full validation to when they get processed later on.
@@ -14,7 +14,11 @@
* limitations under the License.
*/
import { stringifyEntityRef } from '@backstage/catalog-model';
import {
Entity,
entityEnvelopeSchemaValidator,
stringifyEntityRef,
} from '@backstage/catalog-model';
import { serializeError } from '@backstage/errors';
import { Logger } from 'winston';
import { ProcessingDatabase } from './database/types';
@@ -28,6 +32,8 @@ import {
} from './types';
class Connection implements EntityProviderConnection {
readonly validateEntityEnvelope = entityEnvelopeSchemaValidator();
constructor(
private readonly config: {
processingDatabase: ProcessingDatabase;
@@ -37,7 +43,9 @@ class Connection implements EntityProviderConnection {
async applyMutation(mutation: EntityProviderMutation): Promise<void> {
const db = this.config.processingDatabase;
if (mutation.type === 'full') {
this.check(mutation.entities);
await db.transaction(async tx => {
await db.replaceUnprocessedEntities(tx, {
sourceKey: this.config.id,
@@ -47,6 +55,9 @@ class Connection implements EntityProviderConnection {
});
return;
}
this.check(mutation.added);
this.check(mutation.removed);
await db.transaction(async tx => {
await db.replaceUnprocessedEntities(tx, {
sourceKey: this.config.id,
@@ -56,6 +67,16 @@ class Connection implements EntityProviderConnection {
});
});
}
private check(entities: Entity[]) {
for (const entity of entities) {
try {
this.validateEntityEnvelope(entity);
} catch (e) {
throw new TypeError(`Malformed entity envelope, ${e}`);
}
}
}
}
export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
@@ -113,14 +134,18 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
// TODO: replace Promise.all with something more sophisticated for parallel processing.
await Promise.all(
items.map(async item => {
const { id, state, unprocessedEntity } = item;
const { id, state, unprocessedEntity, entityRef } = item;
const result = await this.orchestrator.process({
entity: unprocessedEntity,
state,
});
for (const error of result.errors) {
this.logger.warn(error.message);
// TODO(freben): Try to extract the location out of the unprocessed
// entity and add as meta to the log lines
this.logger.warn(error.message, {
entity: entityRef,
});
}
const errorsString = JSON.stringify(
result.errors.map(e => serializeError(e)),
@@ -16,37 +16,51 @@
import {
Entity,
EntityRelationSpec,
stringifyEntityRef,
LOCATION_ANNOTATION,
LocationSpec,
LocationEntity,
entityEnvelopeSchemaValidator,
EntityPolicy,
EntityRelationSpec,
entitySchemaValidator,
LocationEntity,
LocationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
stringifyLocationReference,
parseLocationReference,
stringifyEntityRef,
stringifyLocationReference,
} from '@backstage/catalog-model';
import { ConflictError, InputError } from '@backstage/errors';
import { ScmIntegrationRegistry } from '@backstage/integration';
import path from 'path';
import { Logger } from 'winston';
import {
CatalogProcessor,
CatalogProcessorParser,
CatalogProcessorResult,
} from '../ingestion/processors';
import * as results from '../ingestion/processors/results';
import {
CatalogProcessingOrchestrator,
EntityProcessingRequest,
EntityProcessingResult,
} from './types';
import { Logger } from 'winston';
import { InputError } from '@backstage/errors';
import { locationSpecToLocationEntity } from './util';
import path from 'path';
import * as results from '../ingestion/processors/results';
import { ScmIntegrationRegistry } from '@backstage/integration';
const validateEntity = entitySchemaValidator();
const validateEntityEnvelope = entityEnvelopeSchemaValidator();
function isLocationEntity(entity: Entity): entity is LocationEntity {
return entity.kind === 'Location';
}
function getEntityLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[LOCATION_ANNOTATION];
if (!ref) {
const entityRef = stringifyEntityRef(entity);
throw new InputError(`Entity '${entityRef}' does not have a location`);
}
return ref;
}
function getEntityOriginLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[ORIGIN_LOCATION_ANNOTATION];
if (!ref) {
@@ -104,23 +118,33 @@ export class DefaultCatalogProcessingOrchestrator
private async processSingleEntity(
unprocessedEntity: Entity,
): Promise<EntityProcessingResult> {
// TODO: validate that this doesn't change during processing
const entityRef = stringifyEntityRef(unprocessedEntity);
// TODO: which one do we actually use here? source-location? - maybe probably doesn't exist yet?
const locationRef =
unprocessedEntity.metadata?.annotations?.[LOCATION_ANNOTATION];
if (!locationRef) {
throw new InputError(`Entity '${entityRef}' does not have a location`);
}
const location = parseLocationReference(locationRef);
const originLocation = parseLocationReference(
getEntityOriginLocationRef(unprocessedEntity),
);
const emitter = createEmitter(this.options.logger, unprocessedEntity);
try {
// This will be checked and mutated step by step below
let entity: Entity = unprocessedEntity;
// NOTE: At this early point, we can only rely on the envelope having to
// be valid; full entity + kind validation happens after the (potentially
// mutative) pre-steps. This means that the code below can't make a lot
// of assumptions about the data despite it using the Entity type.
try {
validateEntityEnvelope(entity);
} catch (e) {
throw new InputError(
`Entity envelope failed validation before processing`,
e,
);
}
const entityRef = stringifyEntityRef(entity);
// TODO: which one do we actually use here? source-location? - maybe probably doesn't exist yet?
const locationRef = getEntityLocationRef(entity);
const location = parseLocationReference(locationRef);
const originLocation = parseLocationReference(
getEntityOriginLocationRef(entity),
);
// Pre-process phase, used to populate entities with data that is required during main processing step
let entity = unprocessedEntity;
for (const processor of this.options.processors) {
if (processor.preProcessEntity) {
try {
@@ -131,48 +155,64 @@ export class DefaultCatalogProcessingOrchestrator
originLocation,
);
} catch (e) {
throw new Error(
`Processor ${processor.constructor.name} threw an error while preprocessing entity ${entityRef} at ${locationRef}, ${e}`,
throw new InputError(
`Processor ${processor.constructor.name} threw an error while preprocessing`,
e,
);
}
}
}
// Enforce entity policies making sure that entities conform to a general schema
let policyEnforcedEntity;
let policyEnforcedEntity: Entity | undefined;
try {
policyEnforcedEntity = await this.options.policy.enforce(entity);
} catch (e) {
throw new InputError(
`Policy check failed while analyzing entity ${entityRef} at ${locationRef}, ${e}`,
);
throw new InputError('Policy check failed', e);
}
if (!policyEnforcedEntity) {
throw new Error(
`Policy unexpectedly returned no data while analyzing entity ${entityRef} at ${locationRef}`,
);
throw new Error('Policy unexpectedly returned no data');
}
entity = policyEnforcedEntity;
// Validate that the end result is a valid Entity at all
try {
validateEntity(entity);
} catch (e) {
throw new ConflictError(
`Entity envelope failed validation after preprocessing`,
e,
);
}
// Validate the given entity kind against its schema
let handled = false;
let didValidate = false;
for (const processor of this.options.processors) {
if (processor.validateEntityKind) {
try {
handled = await processor.validateEntityKind(entity);
if (handled) {
didValidate = await processor.validateEntityKind(entity);
if (didValidate) {
break;
}
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while validating the entity ${entityRef} at ${locationRef}, ${e}`,
`Processor ${processor.constructor.name} threw an error while validating the entity`,
e,
);
}
}
}
if (!handled) {
if (!didValidate) {
throw new InputError(
`No processor recognized the entity ${entityRef} at ${locationRef}`,
'No processor recognized the entity as valid, possibly caused by a foreign kind or apiVersion',
);
}
// Double check that none of the previous steps tried to change something
// related to the entity ref, which would break downstream
if (stringifyEntityRef(entity) !== entityRef) {
throw new ConflictError(
'Fatal: The entity kind, namespace, or name changed during processing',
);
}
@@ -204,6 +244,7 @@ export class DefaultCatalogProcessingOrchestrator
maybeRelativeTarget,
);
let didRead = false;
for (const processor of this.options.processors) {
if (processor.readLocation) {
try {
@@ -218,15 +259,22 @@ export class DefaultCatalogProcessingOrchestrator
this.options.parser,
);
if (read) {
didRead = true;
break;
}
} catch (e) {
throw new Error(
`Processor ${processor.constructor.name} threw an error while postprocessing entity ${entityRef} at ${locationRef}, ${e}`,
throw new InputError(
`Processor ${processor.constructor.name} threw an error while reading ${type}:${target}`,
e,
);
}
}
}
if (!didRead) {
throw new InputError(
`No processor was able to handle reading of ${type}:${target}`,
);
}
}
}
@@ -240,8 +288,9 @@ export class DefaultCatalogProcessingOrchestrator
emitter.emit,
);
} catch (e) {
throw new Error(
`Processor ${processor.constructor.name} threw an error while postprocessing entity ${entityRef} at ${locationRef}, ${e}`,
throw new InputError(
`Processor ${processor.constructor.name} threw an error while postprocessing`,
e,
);
}
}
@@ -255,7 +304,10 @@ export class DefaultCatalogProcessingOrchestrator
};
} catch (error) {
this.options.logger.warn(error.message);
return { ok: false, errors: emitter.results().errors.concat(error) };
return {
ok: false,
errors: emitter.results().errors.concat(error),
};
}
}
}
@@ -276,23 +328,40 @@ function createEmitter(logger: Logger, parentEntity: Entity) {
);
return;
}
if (i.type === 'entity') {
// TODO(freben): Perform the most basic validation here
// (apiVersion, kind, metadata, metadata.name, metadata.namespace, spec)
let entity: Entity;
try {
entity = validateEntityEnvelope(i.entity);
} catch (e) {
logger.debug(`Envelope validation failed at ${i.location}, ${e}`);
errors.push(e);
return;
}
const originLocation = getEntityOriginLocationRef(parentEntity);
deferredEntities.push({
...i.entity,
metadata: {
...i.entity.metadata,
annotations: {
...i.entity.metadata.annotations,
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
[LOCATION_ANNOTATION]: stringifyLocationReference(i.location),
// Note that at this point, we have only validated the envelope part of
// the entity data. Annotations are not part of that, so we have to be
// defensive. If the annotations were malformed (e.g. were not a valid
// object), we just skip over this step and let the full entity
// validation at the next step of processing catch that.
const annotations = entity.metadata.annotations || {};
if (typeof annotations === 'object' && !Array.isArray(annotations)) {
const originLocation = getEntityOriginLocationRef(parentEntity);
const location = stringifyLocationReference(i.location);
entity = {
...entity,
metadata: {
...entity.metadata,
annotations: {
...annotations,
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
[LOCATION_ANNOTATION]: location,
},
},
},
});
};
}
deferredEntities.push(entity);
} else if (i.type === 'location') {
deferredEntities.push(
locationSpecToLocationEntity(i.location, parentEntity),
+1 -1
View File
@@ -176,7 +176,7 @@ export class Stitcher {
statusItems = parsedErrors.map(e => ({
type: ENTITY_STATUS_CATALOG_PROCESSING_TYPE,
level: 'error',
message: e.toString(),
message: `${e.name}: ${e.message}`,
error: e,
}));
}
@@ -14,24 +14,22 @@
* limitations under the License.
*/
import { ConflictError, NotFoundError } from '@backstage/errors';
import { stringifyEntityRef, Entity } from '@backstage/catalog-model';
import { Knex } from 'knex';
import { Transaction } from '../../database';
import lodash from 'lodash';
import {
ProcessingDatabase,
AddUnprocessedEntitiesOptions,
UpdateProcessedEntityOptions,
GetProcessableEntitiesResult,
ReplaceUnprocessedEntitiesOptions,
RefreshStateItem,
} from './types';
import type { Logger } from 'winston';
import { v4 as uuid } from 'uuid';
import { Entity, stringifyEntityRef } from '@backstage/catalog-model';
import { JsonObject } from '@backstage/config';
import { ConflictError, NotFoundError } from '@backstage/errors';
import { Knex } from 'knex';
import lodash from 'lodash';
import { v4 as uuid } from 'uuid';
import type { Logger } from 'winston';
import { Transaction } from '../../database';
import {
AddUnprocessedEntitiesOptions,
GetProcessableEntitiesResult,
ProcessingDatabase,
RefreshStateItem,
ReplaceUnprocessedEntitiesOptions,
UpdateProcessedEntityOptions,
} from './types';
export type DbRefreshStateRow = {
entity_id: string;
+2 -2
View File
@@ -16,9 +16,9 @@
import {
Entity,
LocationSpec,
Location,
EntityRelationSpec,
Location,
LocationSpec,
} from '@backstage/catalog-model';
import { JsonObject } from '@backstage/config';