@@ -103,43 +103,42 @@ export class DefaultCatalogProcessingEngine implements CatalogProcessingEngine {
|
||||
});
|
||||
});
|
||||
|
||||
if (items.length) {
|
||||
const { id, state, unprocessedEntity } = items[0];
|
||||
|
||||
const result = await this.orchestrator.process({
|
||||
entity: unprocessedEntity,
|
||||
state,
|
||||
});
|
||||
for (const error of result.errors) {
|
||||
this.logger.warn(error.message);
|
||||
}
|
||||
if (!result.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
result.completedEntity.metadata.uid = id;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateProcessedEntity(tx, {
|
||||
id,
|
||||
processedEntity: result.completedEntity,
|
||||
state: result.state,
|
||||
errors: JSON.stringify(result.errors),
|
||||
relations: result.relations,
|
||||
deferredEntities: result.deferredEntities,
|
||||
});
|
||||
});
|
||||
|
||||
const setOfThingsToStitch = new Set<string>([
|
||||
stringifyEntityRef(result.completedEntity),
|
||||
...result.relations.map(relation =>
|
||||
stringifyEntityRef(relation.source),
|
||||
),
|
||||
]);
|
||||
await this.stitcher.stitch(setOfThingsToStitch);
|
||||
} else {
|
||||
if (!items.length) {
|
||||
// No items to process, wait and try again.
|
||||
await this.wait();
|
||||
return;
|
||||
}
|
||||
|
||||
const { id, state, unprocessedEntity } = items[0];
|
||||
|
||||
const result = await this.orchestrator.process({
|
||||
entity: unprocessedEntity,
|
||||
state,
|
||||
});
|
||||
for (const error of result.errors) {
|
||||
this.logger.warn(error.message);
|
||||
}
|
||||
if (!result.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
result.completedEntity.metadata.uid = id;
|
||||
await this.processingDatabase.transaction(async tx => {
|
||||
await this.processingDatabase.updateProcessedEntity(tx, {
|
||||
id,
|
||||
processedEntity: result.completedEntity,
|
||||
state: result.state,
|
||||
errors: JSON.stringify(result.errors),
|
||||
relations: result.relations,
|
||||
deferredEntities: result.deferredEntities,
|
||||
});
|
||||
});
|
||||
|
||||
const setOfThingsToStitch = new Set<string>([
|
||||
stringifyEntityRef(result.completedEntity),
|
||||
...result.relations.map(relation => stringifyEntityRef(relation.source)),
|
||||
]);
|
||||
await this.stitcher.stitch(setOfThingsToStitch);
|
||||
}
|
||||
|
||||
private async wait() {
|
||||
|
||||
Reference in New Issue
Block a user