Restructure the next catalog types and files a bit

Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2021-06-03 10:38:59 +02:00
parent e7a5a34740
commit 9c63be5454
28 changed files with 772 additions and 533 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---
Restructure the next catalog types and files a bit
@@ -15,12 +15,11 @@
*/
import { getVoidLogger } from '@backstage/backend-common';
import { DefaultProcessingDatabase } from './database/DefaultProcessingDatabase';
import { DefaultCatalogProcessingEngine } from './DefaultCatalogProcessingEngine';
import { Stitcher } from './Stitcher';
import { CatalogProcessingOrchestrator } from './types';
import waitForExpect from 'wait-for-expect';
import { DefaultProcessingDatabase } from './database/DefaultProcessingDatabase';
import { DefaultCatalogProcessingEngine } from './DefaultCatalogProcessingEngine';
import { CatalogProcessingOrchestrator } from './processing/types';
import { Stitcher } from './stitching/Stitcher';
describe('DefaultCatalogProcessingEngine', () => {
const db = ({
@@ -22,10 +22,10 @@ import {
import { serializeError } from '@backstage/errors';
import { Logger } from 'winston';
import { ProcessingDatabase } from './database/types';
import { Stitcher } from './Stitcher';
import { CatalogProcessingOrchestrator } from './processing/types';
import { Stitcher } from './stitching/Stitcher';
import {
CatalogProcessingEngine,
CatalogProcessingOrchestrator,
EntityProvider,
EntityProviderConnection,
EntityProviderMutation,
@@ -1,387 +0,0 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
Entity,
entityEnvelopeSchemaValidator,
EntityPolicy,
EntityRelationSpec,
entitySchemaValidator,
LocationEntity,
LocationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
parseLocationReference,
stringifyEntityRef,
stringifyLocationReference,
} from '@backstage/catalog-model';
import { ConflictError, InputError } from '@backstage/errors';
import { ScmIntegrationRegistry } from '@backstage/integration';
import path from 'path';
import { Logger } from 'winston';
import {
CatalogProcessor,
CatalogProcessorParser,
CatalogProcessorResult,
} from '../ingestion/processors';
import * as results from '../ingestion/processors/results';
import {
CatalogProcessingOrchestrator,
EntityProcessingRequest,
EntityProcessingResult,
} from './types';
import { locationSpecToLocationEntity } from './util';
const validateEntity = entitySchemaValidator();
const validateEntityEnvelope = entityEnvelopeSchemaValidator();
function isLocationEntity(entity: Entity): entity is LocationEntity {
return entity.kind === 'Location';
}
function getEntityLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[LOCATION_ANNOTATION];
if (!ref) {
const entityRef = stringifyEntityRef(entity);
throw new InputError(`Entity '${entityRef}' does not have a location`);
}
return ref;
}
function getEntityOriginLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[ORIGIN_LOCATION_ANNOTATION];
if (!ref) {
const entityRef = stringifyEntityRef(entity);
throw new InputError(
`Entity '${entityRef}' does not have an origin location`,
);
}
return ref;
}
function toAbsoluteUrl(
integrations: ScmIntegrationRegistry,
base: LocationSpec,
type: string,
target: string,
): string {
if (base.type !== type) {
return target;
}
try {
if (type === 'file') {
if (target.startsWith('.')) {
return path.join(path.dirname(base.target), target);
}
return target;
} else if (type === 'url') {
return integrations.resolveUrl({ url: target, base: base.target });
}
return target;
} catch (e) {
return target;
}
}
export class DefaultCatalogProcessingOrchestrator
implements CatalogProcessingOrchestrator {
constructor(
private readonly options: {
processors: CatalogProcessor[];
integrations: ScmIntegrationRegistry;
logger: Logger;
parser: CatalogProcessorParser;
policy: EntityPolicy;
},
) {}
async process(
request: EntityProcessingRequest,
): Promise<EntityProcessingResult> {
// TODO: implement dryRun/eager
return this.processSingleEntity(request.entity);
}
private async processSingleEntity(
unprocessedEntity: Entity,
): Promise<EntityProcessingResult> {
const emitter = createEmitter(this.options.logger, unprocessedEntity);
try {
// This will be checked and mutated step by step below
let entity: Entity = unprocessedEntity;
// NOTE: At this early point, we can only rely on the envelope having to
// be valid; full entity + kind validation happens after the (potentially
// mutative) pre-steps. This means that the code below can't make a lot
// of assumptions about the data despite it using the Entity type.
try {
validateEntityEnvelope(entity);
} catch (e) {
throw new InputError(
`Entity envelope failed validation before processing`,
e,
);
}
const entityRef = stringifyEntityRef(entity);
// TODO: which one do we actually use here? source-location? - maybe probably doesn't exist yet?
const locationRef = getEntityLocationRef(entity);
const location = parseLocationReference(locationRef);
const originLocation = parseLocationReference(
getEntityOriginLocationRef(entity),
);
// Pre-process phase, used to populate entities with data that is required during main processing step
for (const processor of this.options.processors) {
if (processor.preProcessEntity) {
try {
entity = await processor.preProcessEntity(
entity,
location,
emitter.emit,
originLocation,
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while preprocessing`,
e,
);
}
}
}
// Enforce entity policies making sure that entities conform to a general schema
let policyEnforcedEntity: Entity | undefined;
try {
policyEnforcedEntity = await this.options.policy.enforce(entity);
} catch (e) {
throw new InputError('Policy check failed', e);
}
if (!policyEnforcedEntity) {
throw new Error('Policy unexpectedly returned no data');
}
entity = policyEnforcedEntity;
// Validate that the end result is a valid Entity at all
try {
validateEntity(entity);
} catch (e) {
throw new ConflictError(
`Entity envelope failed validation after preprocessing`,
e,
);
}
// Validate the given entity kind against its schema
let didValidate = false;
for (const processor of this.options.processors) {
if (processor.validateEntityKind) {
try {
didValidate = await processor.validateEntityKind(entity);
if (didValidate) {
break;
}
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while validating the entity`,
e,
);
}
}
}
if (!didValidate) {
throw new InputError(
'No processor recognized the entity as valid, possibly caused by a foreign kind or apiVersion',
);
}
// Double check that none of the previous steps tried to change something
// related to the entity ref, which would break downstream
if (stringifyEntityRef(entity) !== entityRef) {
throw new ConflictError(
'Fatal: The entity kind, namespace, or name changed during processing',
);
}
// Backwards compatible processing of location entities
if (isLocationEntity(entity)) {
const { type = location.type } = entity.spec;
const targets = new Array<string>();
if (entity.spec.target) {
targets.push(entity.spec.target);
}
if (entity.spec.targets) {
targets.push(...entity.spec.targets);
}
for (const maybeRelativeTarget of targets) {
if (type === 'file' && maybeRelativeTarget.endsWith(path.sep)) {
emitter.emit(
results.inputError(
location,
`LocationEntityProcessor cannot handle ${type} type location with target ${location.target} that ends with a path separator`,
),
);
continue;
}
const target = toAbsoluteUrl(
this.options.integrations,
location,
type,
maybeRelativeTarget,
);
let didRead = false;
for (const processor of this.options.processors) {
if (processor.readLocation) {
try {
const read = await processor.readLocation(
{
type,
target,
presence: 'required',
},
false,
emitter.emit,
this.options.parser,
);
if (read) {
didRead = true;
break;
}
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while reading ${type}:${target}`,
e,
);
}
}
}
if (!didRead) {
throw new InputError(
`No processor was able to handle reading of ${type}:${target}`,
);
}
}
}
// Main processing step of the entity
for (const processor of this.options.processors) {
if (processor.postProcessEntity) {
try {
entity = await processor.postProcessEntity(
entity,
location,
emitter.emit,
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while postprocessing`,
e,
);
}
}
}
return {
...emitter.results(),
completedEntity: entity,
state: new Map(),
ok: true,
};
} catch (error) {
this.options.logger.warn(error.message);
return {
ok: false,
errors: emitter.results().errors.concat(error),
};
}
}
}
function createEmitter(logger: Logger, parentEntity: Entity) {
let done = false;
const errors = new Array<Error>();
const relations = new Array<EntityRelationSpec>();
const deferredEntities = new Array<Entity>();
const emit = (i: CatalogProcessorResult) => {
if (done) {
logger.warn(
`Item if type ${i.type} was emitted after processing had completed at ${
new Error().stack
}`,
);
return;
}
if (i.type === 'entity') {
let entity: Entity;
try {
entity = validateEntityEnvelope(i.entity);
} catch (e) {
logger.debug(`Envelope validation failed at ${i.location}, ${e}`);
errors.push(e);
return;
}
// Note that at this point, we have only validated the envelope part of
// the entity data. Annotations are not part of that, so we have to be
// defensive. If the annotations were malformed (e.g. were not a valid
// object), we just skip over this step and let the full entity
// validation at the next step of processing catch that.
const annotations = entity.metadata.annotations || {};
if (typeof annotations === 'object' && !Array.isArray(annotations)) {
const originLocation = getEntityOriginLocationRef(parentEntity);
const location = stringifyLocationReference(i.location);
entity = {
...entity,
metadata: {
...entity.metadata,
annotations: {
...annotations,
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
[LOCATION_ANNOTATION]: location,
},
},
};
}
deferredEntities.push(entity);
} else if (i.type === 'location') {
deferredEntities.push(
locationSpecToLocationEntity(i.location, parentEntity),
);
} else if (i.type === 'relation') {
relations.push(i.relation);
} else if (i.type === 'error') {
errors.push(i.error);
}
};
return {
emit,
results() {
done = true;
return {
errors,
relations,
deferredEntities,
};
},
};
}
@@ -15,7 +15,8 @@
*/
import { DefaultLocationService } from './DefaultLocationService';
import { CatalogProcessingOrchestrator, LocationStore } from './types';
import { CatalogProcessingOrchestrator } from './processing/types';
import { LocationStore } from './types';
describe('DefaultLocationServiceTest', () => {
const orchestrator: jest.Mocked<CatalogProcessingOrchestrator> = {
@@ -27,9 +28,12 @@ describe('DefaultLocationServiceTest', () => {
listLocations: jest.fn(),
getLocation: jest.fn(),
};
beforeEach(() => jest.resetAllMocks());
const locationService = new DefaultLocationService(store, orchestrator);
beforeEach(() => {
jest.resetAllMocks();
});
describe('createLocation', () => {
it('should support dry run', async () => {
orchestrator.process.mockResolvedValueOnce({
@@ -136,6 +140,7 @@ describe('DefaultLocationServiceTest', () => {
});
});
});
describe('listLocations', () => {
it('should call locationStore.deleteLocation', async () => {
await locationService.listLocations();
@@ -14,17 +14,14 @@
* limitations under the License.
*/
import {
LocationSpec,
Location,
Entity,
Location,
LocationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
} from '@backstage/catalog-model';
import {
LocationService,
LocationStore,
CatalogProcessingOrchestrator,
} from './types';
import { CatalogProcessingOrchestrator } from './processing/types';
import { LocationService, LocationStore } from './types';
import { locationSpecToMetadataName } from './util';
export class DefaultLocationService implements LocationService {
@@ -23,7 +23,6 @@ const createLocationStore = async () => {
const connection = { applyMutation: jest.fn() };
const store = new DefaultLocationStore(knex);
await store.connect(connection);
return { store, connection };
};
@@ -14,22 +14,17 @@
* limitations under the License.
*/
import { LocationSpec, Location } from '@backstage/catalog-model';
import {
LocationStore,
EntityProvider,
EntityProviderConnection,
} from './types';
import { v4 as uuid } from 'uuid';
import { locationSpecToLocationEntity } from './util';
import { Location, LocationSpec } from '@backstage/catalog-model';
import { ConflictError, NotFoundError } from '@backstage/errors';
import { Knex } from 'knex';
type DbLocationsRow = {
id: string;
type: string;
target: string;
};
import { v4 as uuid } from 'uuid';
import { DbLocationsRow } from './database/tables';
import {
EntityProvider,
EntityProviderConnection,
LocationStore,
} from './types';
import { locationSpecToLocationEntity } from './util';
export class DefaultLocationStore implements LocationStore, EntityProvider {
private _connection: EntityProviderConnection | undefined;
@@ -67,11 +67,11 @@ import { CatalogProcessingEngine, LocationService } from '../next/types';
import { ConfigLocationEntityProvider } from './ConfigLocationEntityProvider';
import { DefaultProcessingDatabase } from './database/DefaultProcessingDatabase';
import { DefaultCatalogProcessingEngine } from './DefaultCatalogProcessingEngine';
import { DefaultCatalogProcessingOrchestrator } from './DefaultCatalogProcessingOrchestrator';
import { DefaultLocationService } from './DefaultLocationService';
import { DefaultLocationStore } from './DefaultLocationStore';
import { NextEntitiesCatalog } from './NextEntitiesCatalog';
import { Stitcher } from './Stitcher';
import { DefaultCatalogProcessingOrchestrator } from './processing/DefaultCatalogProcessingOrchestrator';
import { Stitcher } from './stitching/Stitcher';
export type CatalogEnvironment = {
logger: Logger;
@@ -14,17 +14,19 @@
* limitations under the License.
*/
import { Knex } from 'knex';
import { DbFinalEntitiesRow } from './Stitcher';
import { EntitiesCatalog } from '../catalog';
import { EntitiesRequest, EntitiesResponse } from '../catalog/types';
import { DbRefreshStateRow } from './database/DefaultProcessingDatabase';
import {
DbEntitiesSearchRow,
DbPageInfo,
EntityPagination,
} from '../database/types';
import { InputError } from '@backstage/errors';
import { Knex } from 'knex';
import {
EntitiesCatalog,
EntitiesRequest,
EntitiesResponse,
} from '../catalog/types';
import { DbPageInfo, EntityPagination } from '../database/types';
import {
DbFinalEntitiesRow,
DbRefreshStateRow,
DbSearchRow,
} from './database/tables';
function parsePagination(
input?: EntityPagination,
@@ -80,7 +82,7 @@ export class NextEntitiesCatalog implements EntitiesCatalog {
// NOTE(freben): This used to be a set of OUTER JOIN, which may seem to
// make a lot of sense. However, it had abysmal performance on sqlite
// when datasets grew large, so we're using IN instead.
const matchQuery = db<DbEntitiesSearchRow>('search')
const matchQuery = db<DbSearchRow>('search')
.select('entity_id')
.where(function keyFilter() {
this.andWhere({ key: key.toLowerCase() });
@@ -33,8 +33,8 @@ import {
parseEntityPaginationParams,
parseEntityTransformParams,
} from '../service/request';
import { LocationService } from './types';
import { disallowReadonlyMode, validateRequestBody } from '../service/util';
import { LocationService } from './types';
export interface RouterOptions {
entitiesCatalog?: EntitiesCatalog;
@@ -14,19 +14,18 @@
* limitations under the License.
*/
// import { DefaultProcessingDatabase } from './DefaultProcessingDatabase';
import { DatabaseManager } from './DatabaseManager';
import { getVoidLogger } from '@backstage/backend-common';
import { Entity, stringifyEntityRef } from '@backstage/catalog-model';
import { JsonObject } from '@backstage/config';
import { Knex } from 'knex';
import * as uuid from 'uuid';
import { DatabaseManager } from './DatabaseManager';
import { DefaultProcessingDatabase } from './DefaultProcessingDatabase';
import {
DbRefreshStateReferencesRow,
DbRefreshStateRow,
DbRelationsRow,
DefaultProcessingDatabase,
} from './DefaultProcessingDatabase';
import { Entity, stringifyEntityRef } from '@backstage/catalog-model';
import * as uuid from 'uuid';
import { getVoidLogger } from '@backstage/backend-common';
import { JsonObject } from '@backstage/config';
} from './tables';
describe('Default Processing Database', () => {
let db: Knex;
@@ -22,6 +22,11 @@ import lodash from 'lodash';
import { v4 as uuid } from 'uuid';
import type { Logger } from 'winston';
import { Transaction } from '../../database';
import {
DbRefreshStateReferencesRow,
DbRefreshStateRow,
DbRelationsRow,
} from './tables';
import {
AddUnprocessedEntitiesOptions,
GetProcessableEntitiesResult,
@@ -31,30 +36,6 @@ import {
UpdateProcessedEntityOptions,
} from './types';
export type DbRefreshStateRow = {
entity_id: string;
entity_ref: string;
unprocessed_entity: string;
processed_entity?: string;
cache?: string;
next_update_at: string;
last_discovery_at: string; // remove?
errors?: string;
};
export type DbRelationsRow = {
originating_entity_id: string;
source_entity_ref: string;
target_entity_ref: string;
type: string;
};
export type DbRefreshStateReferencesRow = {
source_key?: string;
source_entity_ref?: string;
target_entity_ref: string;
};
// The number of items that are sent per batch to the database layer, when
// doing .batchInsert calls to knex. This needs to be low enough to not cause
// errors in the underlying engine due to exceeding query limits, but large
@@ -0,0 +1,58 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export type DbLocationsRow = {
id: string;
type: string;
target: string;
};
export type DbRefreshStateRow = {
entity_id: string;
entity_ref: string;
unprocessed_entity: string;
processed_entity?: string;
cache?: string;
next_update_at: string;
last_discovery_at: string; // remove?
errors?: string;
};
export type DbRefreshStateReferencesRow = {
source_key?: string;
source_entity_ref?: string;
target_entity_ref: string;
};
export type DbRelationsRow = {
originating_entity_id: string;
source_entity_ref: string;
target_entity_ref: string;
type: string;
};
export type DbFinalEntitiesRow = {
entity_id: string;
hash: string;
stitch_ticket: string;
final_entity?: string;
};
export type DbSearchRow = {
entity_id: string;
key: string;
value: string | null;
};
@@ -16,3 +16,5 @@
export { NextCatalogBuilder } from './NextCatalogBuilder';
export { createNextRouter } from './NextRouter';
export * from './processing';
export * from './stitching';
@@ -0,0 +1,333 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
Entity,
EntityPolicy,
LocationEntity,
LocationSpec,
parseLocationReference,
stringifyEntityRef,
} from '@backstage/catalog-model';
import { ConflictError, InputError } from '@backstage/errors';
import { ScmIntegrationRegistry } from '@backstage/integration';
import path from 'path';
import { Logger } from 'winston';
import {
CatalogProcessor,
CatalogProcessorParser,
} from '../../ingestion/processors';
import * as results from '../../ingestion/processors/results';
import {
CatalogProcessingOrchestrator,
EntityProcessingRequest,
EntityProcessingResult,
} from './types';
import { ProcessorOutputCollector } from './ProcessorOutputCollector';
import {
getEntityLocationRef,
getEntityOriginLocationRef,
isLocationEntity,
toAbsoluteUrl,
validateEntity,
validateEntityEnvelope,
} from './util';
type Context = {
entityRef: string;
location: LocationSpec;
originLocation: LocationSpec;
collector: ProcessorOutputCollector;
};
export class DefaultCatalogProcessingOrchestrator
implements CatalogProcessingOrchestrator {
constructor(
private readonly options: {
processors: CatalogProcessor[];
integrations: ScmIntegrationRegistry;
logger: Logger;
parser: CatalogProcessorParser;
policy: EntityPolicy;
},
) {}
async process(
request: EntityProcessingRequest,
): Promise<EntityProcessingResult> {
// TODO: implement dryRun/eager
return this.processSingleEntity(request.entity);
}
private async processSingleEntity(
unprocessedEntity: Entity,
): Promise<EntityProcessingResult> {
const collector = new ProcessorOutputCollector(
this.options.logger,
unprocessedEntity,
);
try {
// This will be checked and mutated step by step below
let entity: Entity = unprocessedEntity;
// NOTE: At this early point, we can only rely on the envelope having to
// be valid; full entity + kind validation happens after the (potentially
// mutative) pre-steps. This means that the code below can't make a lot
// of assumptions about the data despite it using the Entity type.
try {
validateEntityEnvelope(entity);
} catch (e) {
throw new InputError(
`Entity envelope failed validation before processing`,
e,
);
}
// TODO: which one do we actually use for the location?
// source-location? - maybe probably doesn't exist yet?
const context: Context = {
entityRef: stringifyEntityRef(entity),
location: parseLocationReference(getEntityLocationRef(entity)),
originLocation: parseLocationReference(
getEntityOriginLocationRef(entity),
),
collector,
};
// Run the steps
entity = await this.runPreProcessStep(entity, context);
entity = await this.runPolicyStep(entity);
await this.runValidateStep(entity, context);
if (isLocationEntity(entity)) {
await this.runSpecialLocationStep(entity, context);
}
entity = await this.runPostProcessStep(entity, context);
return {
...context.collector.results(),
completedEntity: entity,
state: new Map(),
ok: true,
};
} catch (error) {
this.options.logger.warn(error.message);
return {
ok: false,
errors: collector.results().errors.concat(error),
};
}
}
// Pre-process phase, used to populate entities with data that is required
// during the main processing step
private async runPreProcessStep(
entity: Entity,
context: Context,
): Promise<Entity> {
let result = entity;
for (const processor of this.options.processors) {
if (processor.preProcessEntity) {
try {
result = await processor.preProcessEntity(
result,
context.location,
context.collector.onEmit,
context.originLocation,
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while preprocessing`,
e,
);
}
}
}
return result;
}
/**
* Enforce entity policies making sure that entities conform to a general schema
*/
private async runPolicyStep(entity: Entity): Promise<Entity> {
let policyEnforcedEntity: Entity | undefined;
try {
policyEnforcedEntity = await this.options.policy.enforce(entity);
} catch (e) {
throw new InputError('Policy check failed', e);
}
if (!policyEnforcedEntity) {
throw new Error('Policy unexpectedly returned no data');
}
return policyEnforcedEntity;
}
/**
* Validate the given entity
*/
private async runValidateStep(
entity: Entity,
context: Context,
): Promise<void> {
// Double check that none of the previous steps tried to change something
// related to the entity ref, which would break downstream
if (stringifyEntityRef(entity) !== context.entityRef) {
throw new ConflictError(
'Fatal: The entity kind, namespace, or name changed during processing',
);
}
// Validate that the end result is a valid Entity at all
try {
validateEntity(entity);
} catch (e) {
throw new ConflictError(
`Entity envelope failed validation after preprocessing`,
e,
);
}
let foundKind = false;
for (const processor of this.options.processors) {
if (processor.validateEntityKind) {
try {
foundKind = await processor.validateEntityKind(entity);
if (foundKind) {
// TODO(freben): It would make sense to keep running, so that
// multiple processors could have a go at making checks. For
// example, an org may want to add additional rules on top of the
// provided ones. But that would be a breaking change, so we'll
// postpone that to a future processors rewrite.
break;
}
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while validating the entity`,
e,
);
}
}
}
if (!foundKind) {
throw new InputError(
'No processor recognized the entity as valid, possibly caused by a foreign kind or apiVersion',
);
}
}
/**
* Backwards compatible processing of location entities
*/
private async runSpecialLocationStep(
entity: LocationEntity,
context: Context,
): Promise<void> {
const { type = context.location.type } = entity.spec;
const targets = new Array<string>();
if (entity.spec.target) {
targets.push(entity.spec.target);
}
if (entity.spec.targets) {
targets.push(...entity.spec.targets);
}
for (const maybeRelativeTarget of targets) {
if (type === 'file' && maybeRelativeTarget.endsWith(path.sep)) {
context.collector.onEmit(
results.inputError(
context.location,
`LocationEntityProcessor cannot handle ${type} type location with target ${context.location.target} that ends with a path separator`,
),
);
continue;
}
const target = toAbsoluteUrl(
this.options.integrations,
context.location,
type,
maybeRelativeTarget,
);
let didRead = false;
for (const processor of this.options.processors) {
if (processor.readLocation) {
try {
const read = await processor.readLocation(
{
type,
target,
presence: 'required',
},
false,
context.collector.onEmit,
this.options.parser,
);
if (read) {
didRead = true;
break;
}
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while reading ${type}:${target}`,
e,
);
}
}
}
if (!didRead) {
throw new InputError(
`No processor was able to handle reading of ${type}:${target}`,
);
}
}
}
/**
* Main processing step of the entity
*/
private async runPostProcessStep(
entity: Entity,
context: Context,
): Promise<Entity> {
let result = entity;
for (const processor of this.options.processors) {
if (processor.postProcessEntity) {
try {
result = await processor.postProcessEntity(
result,
context.location,
context.collector.onEmit,
);
} catch (e) {
throw new InputError(
`Processor ${processor.constructor.name} threw an error while postprocessing`,
e,
);
}
}
}
return result;
}
}
@@ -0,0 +1,109 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
Entity,
EntityRelationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
stringifyLocationReference,
} from '@backstage/catalog-model';
import { Logger } from 'winston';
import { CatalogProcessorResult } from '../../ingestion';
import { locationSpecToLocationEntity } from '../util';
import { getEntityOriginLocationRef, validateEntityEnvelope } from './util';
/**
* Helper class for aggregating all of the emitted data from processors.
*/
export class ProcessorOutputCollector {
private readonly errors = new Array<Error>();
private readonly relations = new Array<EntityRelationSpec>();
private readonly deferredEntities = new Array<Entity>();
private done = false;
constructor(
private readonly logger: Logger,
private readonly parentEntity: Entity,
) {}
get onEmit(): (i: CatalogProcessorResult) => void {
return i => this.receive(i);
}
results() {
this.done = true;
return {
errors: this.errors,
relations: this.relations,
deferredEntities: this.deferredEntities,
};
}
private receive(i: CatalogProcessorResult) {
if (this.done) {
this.logger.warn(
`Item if type ${i.type} was emitted after processing had completed at ${
new Error().stack
}`,
);
return;
}
if (i.type === 'entity') {
let entity: Entity;
try {
entity = validateEntityEnvelope(i.entity);
} catch (e) {
this.logger.debug(`Envelope validation failed at ${i.location}, ${e}`);
this.errors.push(e);
return;
}
// Note that at this point, we have only validated the envelope part of
// the entity data. Annotations are not part of that, so we have to be
// defensive. If the annotations were malformed (e.g. were not a valid
// object), we just skip over this step and let the full entity
// validation at the next step of processing catch that.
const annotations = entity.metadata.annotations || {};
if (typeof annotations === 'object' && !Array.isArray(annotations)) {
const originLocation = getEntityOriginLocationRef(this.parentEntity);
const location = stringifyLocationReference(i.location);
entity = {
...entity,
metadata: {
...entity.metadata,
annotations: {
...annotations,
[ORIGIN_LOCATION_ANNOTATION]: originLocation,
[LOCATION_ANNOTATION]: location,
},
},
};
}
this.deferredEntities.push(entity);
} else if (i.type === 'location') {
this.deferredEntities.push(
locationSpecToLocationEntity(i.location, this.parentEntity),
);
} else if (i.type === 'relation') {
this.relations.push(i.relation);
} else if (i.type === 'error') {
this.errors.push(i.error);
}
}
}
@@ -0,0 +1,22 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export type {
CatalogProcessingOrchestrator,
EntityProcessingRequest,
EntityProcessingResult,
} from './types';
export { DefaultCatalogProcessingOrchestrator } from './DefaultCatalogProcessingOrchestrator';
@@ -0,0 +1,41 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Entity, EntityRelationSpec } from '@backstage/catalog-model';
import { JsonObject } from '@backstage/config';
export type EntityProcessingRequest = {
entity: Entity;
state: Map<string, JsonObject>; // Versions for multiple deployments etc
};
export type EntityProcessingResult =
| {
ok: true;
state: Map<string, JsonObject>;
completedEntity: Entity;
deferredEntities: Entity[];
relations: EntityRelationSpec[];
errors: Error[];
}
| {
ok: false;
errors: Error[];
};
export interface CatalogProcessingOrchestrator {
process(request: EntityProcessingRequest): Promise<EntityProcessingResult>;
}
@@ -0,0 +1,81 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
Entity,
entityEnvelopeSchemaValidator,
entitySchemaValidator,
LocationEntity,
LocationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
stringifyEntityRef,
} from '@backstage/catalog-model';
import { InputError } from '@backstage/errors';
import { ScmIntegrationRegistry } from '@backstage/integration';
import path from 'path';
export function isLocationEntity(entity: Entity): entity is LocationEntity {
return entity.kind === 'Location';
}
export function getEntityLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[LOCATION_ANNOTATION];
if (!ref) {
const entityRef = stringifyEntityRef(entity);
throw new InputError(`Entity '${entityRef}' does not have a location`);
}
return ref;
}
export function getEntityOriginLocationRef(entity: Entity): string {
const ref = entity.metadata.annotations?.[ORIGIN_LOCATION_ANNOTATION];
if (!ref) {
const entityRef = stringifyEntityRef(entity);
throw new InputError(
`Entity '${entityRef}' does not have an origin location`,
);
}
return ref;
}
export function toAbsoluteUrl(
integrations: ScmIntegrationRegistry,
base: LocationSpec,
type: string,
target: string,
): string {
if (base.type !== type) {
return target;
}
try {
if (type === 'file') {
if (target.startsWith('.')) {
return path.join(path.dirname(base.target), target);
}
return target;
} else if (type === 'url') {
return integrations.resolveUrl({ url: target, base: base.target });
}
return target;
} catch (e) {
return target;
}
}
export const validateEntity = entitySchemaValidator();
export const validateEntityEnvelope = entityEnvelopeSchemaValidator();
@@ -17,14 +17,15 @@
import { getVoidLogger } from '@backstage/backend-common';
import { Entity } from '@backstage/catalog-model';
import { Knex } from 'knex';
import { DatabaseManager } from './database/DatabaseManager';
import { DatabaseManager } from '../database/DatabaseManager';
import {
DbFinalEntitiesRow,
DbRefreshStateReferencesRow,
DbRefreshStateRow,
DbRelationsRow,
} from './database/DefaultProcessingDatabase';
import { DbSearchRow } from './search';
import { DbFinalEntitiesRow, Stitcher } from './Stitcher';
DbSearchRow,
} from '../database/tables';
import { Stitcher } from './Stitcher';
describe('Stitcher', () => {
let db: Knex;
@@ -21,32 +21,16 @@ import {
UNSTABLE_EntityStatusItem,
} from '@backstage/catalog-model';
import { SerializedError } from '@backstage/errors';
import { createHash } from 'crypto';
import stableStringify from 'fast-json-stable-stringify';
import { Knex } from 'knex';
import { Logger } from 'winston';
import { buildEntitySearch, DbSearchRow } from './search';
import { v4 as uuid } from 'uuid';
import { DbRefreshStateRow } from './database/DefaultProcessingDatabase';
// The number of items that are sent per batch to the database layer, when
// doing .batchInsert calls to knex. This needs to be low enough to not cause
// errors in the underlying engine due to exceeding query limits, but large
// enough to get the speed benefits.
const BATCH_SIZE = 50;
export type DbFinalEntitiesRow = {
entity_id: string;
hash: string;
stitch_ticket: string;
final_entity?: string;
};
function generateStableHash(entity: Entity) {
return createHash('sha1')
.update(stableStringify({ ...entity }))
.digest('hex');
}
import { Logger } from 'winston';
import {
DbFinalEntitiesRow,
DbRefreshStateRow,
DbSearchRow,
} from '../database/tables';
import { buildEntitySearch } from './buildEntitySearch';
import { BATCH_SIZE, generateStableHash } from './util';
/**
* Performs the act of stitching - to take all of the various outputs from the
@@ -15,9 +15,9 @@
*/
import { Entity, ENTITY_DEFAULT_NAMESPACE } from '@backstage/catalog-model';
import { buildEntitySearch, mapToRows, traverse } from './search';
import { buildEntitySearch, mapToRows, traverse } from './buildEntitySearch';
describe('search', () => {
describe('buildEntitySearch', () => {
describe('traverse', () => {
it('expands lists of strings to several rows', () => {
const input = { a: ['b', 'c', 'd'] };
@@ -19,12 +19,7 @@ import {
ENTITY_DEFAULT_NAMESPACE,
stringifyEntityRef,
} from '@backstage/catalog-model';
export type DbSearchRow = {
entity_id: string;
key: string;
value: string | null;
};
import { DbSearchRow } from '../database/tables';
// These are excluded in the generic loop, either because they do not make sense
// to index, or because they are special-case always inserted whether they are
@@ -0,0 +1,17 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export {};
@@ -0,0 +1,31 @@
/*
* Copyright 2021 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Entity } from '@backstage/catalog-model';
import { createHash } from 'crypto';
import stableStringify from 'fast-json-stable-stringify';
// The number of items that are sent per batch to the database layer, when
// doing .batchInsert calls to knex. This needs to be low enough to not cause
// errors in the underlying engine due to exceeding query limits, but large
// enough to get the speed benefits.
export const BATCH_SIZE = 50;
export function generateStableHash(entity: Entity) {
return createHash('sha1')
.update(stableStringify({ ...entity }))
.digest('hex');
}
+1 -30
View File
@@ -14,13 +14,7 @@
* limitations under the License.
*/
import {
Entity,
EntityRelationSpec,
Location,
LocationSpec,
} from '@backstage/catalog-model';
import { JsonObject } from '@backstage/config';
import { Entity, Location, LocationSpec } from '@backstage/catalog-model';
export interface LocationService {
createLocation(
@@ -56,26 +50,3 @@ export interface EntityProvider {
getProviderName(): string;
connect(connection: EntityProviderConnection): Promise<void>;
}
export type EntityProcessingRequest = {
entity: Entity;
state: Map<string, JsonObject>; // Versions for multiple deployments etc
};
export type EntityProcessingResult =
| {
ok: true;
state: Map<string, JsonObject>;
completedEntity: Entity;
deferredEntities: Entity[];
relations: EntityRelationSpec[];
errors: Error[];
}
| {
ok: false;
errors: Error[];
};
export interface CatalogProcessingOrchestrator {
process(request: EntityProcessingRequest): Promise<EntityProcessingResult>;
}
+1 -2
View File
@@ -16,8 +16,8 @@
import {
Entity,
LocationSpec,
LocationEntityV1alpha1,
LocationSpec,
LOCATION_ANNOTATION,
ORIGIN_LOCATION_ANNOTATION,
stringifyEntityRef,
@@ -29,7 +29,6 @@ export function locationSpecToMetadataName(location: LocationSpec) {
const hash = createHash('sha1')
.update(`${location.type}:${location.target}`)
.digest('hex');
return `generated-${hash}`;
}