Merge pull request #30180 from backstage/freben/publisher
Add an `EventConsumingGooglePubSubPublisher`, for pushing Backstage events to pubsub
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-events-backend-module-google-pubsub': patch
|
||||
---
|
||||
|
||||
Add an `EventConsumingGooglePubSubPublisher`, for pushing Backstage events to pubsub
|
||||
@@ -19,11 +19,24 @@ events:
|
||||
# The fully qualified name of the subscription
|
||||
subscriptionName: 'projects/my-google-project/subscriptions/github-enterprise-events'
|
||||
# The event system topic to transfer to. This can also be just a plain string
|
||||
targetTopic:
|
||||
# This example picks the topic name from a message attribute + a prefix
|
||||
fromMessageAttribute:
|
||||
attributeName: 'x-github-event'
|
||||
withPrefix: 'github.'
|
||||
targetTopic: 'github.{{ event.attributes.x-github-event }}'
|
||||
```
|
||||
|
||||
The following configuration enables the transfer of events from a Backstage events topic into a Google
|
||||
Pub/Sub topic.
|
||||
|
||||
```yaml
|
||||
events:
|
||||
modules:
|
||||
googlePubSub:
|
||||
eventConsumingGooglePubSubPublisher:
|
||||
subscriptions:
|
||||
# A unique key for your subscription, to be used in logging and metrics
|
||||
mySubscription:
|
||||
# The source topic (or array of topics)
|
||||
sourceTopic: 'github'
|
||||
# The fully qualified name of the target topic
|
||||
targetTopicName: 'projects/my-google-project/topics/github-enterprise-events'
|
||||
```
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -90,6 +90,68 @@ export interface Config {
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Configuration for `EventConsumingGooglePubSubPublisher`, which
|
||||
* consumes messages from the Backstage events system and forwards them
|
||||
* into Google Pub/Sub topics.
|
||||
*/
|
||||
eventConsumingGooglePubSubPublisher?: {
|
||||
subscriptions: {
|
||||
[name: string]: {
|
||||
/**
|
||||
* The name of the events backend topic(s) that messages are
|
||||
* consumed from.
|
||||
*/
|
||||
sourceTopic: string | string[];
|
||||
|
||||
/**
|
||||
* The complete name of the Google Pub/Sub subscription to forward
|
||||
* events to, on the form
|
||||
* `projects/PROJECT_ID/topics/TOPIC_ID`.
|
||||
*
|
||||
* The value can contain placeholders on the form `{{
|
||||
* message.attributes.foo }}`, to mirror attribute `foo` as the
|
||||
* whole or part of the topic name.
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* This example expects the events topic to contain GitHub
|
||||
* webhook events where the HTTP headers were mapped into
|
||||
* event metadata fields. The outcome should be that messages
|
||||
* end up on event topics such as `github.push`,
|
||||
* `github.repository` etc which matches the [`@backstage/plugin-events-backend-module-github`](https://github.com/backstage/backstage/tree/master/plugins/events-backend-module-github) structure.
|
||||
*
|
||||
* ```yaml
|
||||
* targetTopic: 'projects/my-project/topics/github.{{ event.metadata.x-github-event }}'
|
||||
* ```
|
||||
*/
|
||||
targetTopicName: string;
|
||||
|
||||
/**
|
||||
* Event metadata fields are by default copied to the Pub/Sub
|
||||
* message attribute. This setting allows you to override or amend
|
||||
* those attributes.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* The values can contain placeholders on the form `{{ event.metadata.foo }}`,
|
||||
* to mirror metadata field `foo` as the whole or part of a
|
||||
* message attribute value.
|
||||
*
|
||||
* @example
|
||||
*
|
||||
* ```yaml
|
||||
* messageAttributes:
|
||||
* x-gitHub-event: '{{ event.metadata.event }}'
|
||||
* ```
|
||||
*/
|
||||
messageAttributes?: {
|
||||
[key: string]: string;
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
@@ -28,20 +28,21 @@ backend.add(import('../src'));
|
||||
|
||||
backend.add(
|
||||
createBackendPlugin({
|
||||
pluginId: 'example-event-consumer',
|
||||
pluginId: 'example-event-drivers',
|
||||
register(reg) {
|
||||
reg.registerInit({
|
||||
deps: {
|
||||
events: eventsServiceRef,
|
||||
logger: coreServices.logger,
|
||||
scheduler: coreServices.scheduler,
|
||||
},
|
||||
async init({ events, logger }) {
|
||||
events.subscribe({
|
||||
id: 'example-subscription',
|
||||
topics: ['example-topic'],
|
||||
async init({ events, logger, scheduler }) {
|
||||
await events.subscribe({
|
||||
id: 'inbox-subscription',
|
||||
topics: ['inbox-topic'],
|
||||
async onEvent(event) {
|
||||
logger.info(
|
||||
`Received event: ${JSON.stringify(
|
||||
`Received inbox event: ${JSON.stringify(
|
||||
{
|
||||
topic: event.topic,
|
||||
payload: event.eventPayload,
|
||||
@@ -53,6 +54,24 @@ backend.add(
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
await scheduler.scheduleTask({
|
||||
id: 'outbox-task',
|
||||
scope: 'local',
|
||||
frequency: { seconds: 5 },
|
||||
timeout: { seconds: 5 },
|
||||
fn: async () => {
|
||||
await events.publish({
|
||||
topic: 'outbox-topic',
|
||||
eventPayload: {
|
||||
message: 'Hello, world!',
|
||||
},
|
||||
metadata: {
|
||||
source: 'outbox-task',
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
@@ -6,6 +6,6 @@
|
||||
import { BackendFeature } from '@backstage/backend-plugin-api';
|
||||
|
||||
// @public
|
||||
const eventsModuleGooglePubsubConsumingEventPublisher: BackendFeature;
|
||||
export default eventsModuleGooglePubsubConsumingEventPublisher;
|
||||
const _default: BackendFeature;
|
||||
export default _default;
|
||||
```
|
||||
|
||||
+111
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import { PubSub } from '@google-cloud/pubsub';
|
||||
import waitFor from 'wait-for-expect';
|
||||
import { EventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher';
|
||||
|
||||
describe('EventConsumingGooglePubSubPublisher', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const events = mockServices.events.mock();
|
||||
|
||||
let onEvent: undefined | ((event: EventParams) => void);
|
||||
events.subscribe.mockImplementation(async options => {
|
||||
onEvent = options.onEvent;
|
||||
});
|
||||
|
||||
const topic = {
|
||||
publishMessage: jest.fn(),
|
||||
};
|
||||
const pubSub = {
|
||||
close: jest.fn(),
|
||||
topic: jest.fn(() => topic),
|
||||
};
|
||||
const pubSubFactory = jest.fn(() => pubSub as unknown as PubSub);
|
||||
|
||||
beforeEach(() => {
|
||||
onEvent = undefined;
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('should go though the expected registration and flows', async () => {
|
||||
const publisher = new EventConsumingGooglePubSubPublisher({
|
||||
logger,
|
||||
events,
|
||||
tasks: [
|
||||
{
|
||||
id: 'my-id',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/my-project/topics/my-topic',
|
||||
mapToTopic: () => ({ project: 'my-project', topic: 'my-topic' }),
|
||||
mapToAttributes: m => ({ ...m.metadata, more: 'yes' }),
|
||||
},
|
||||
],
|
||||
pubSubFactory,
|
||||
});
|
||||
|
||||
// Start up
|
||||
|
||||
await publisher.start();
|
||||
|
||||
expect(events.subscribe).toHaveBeenCalledWith({
|
||||
id: 'EventConsumingGooglePubSubPublisher.my-id',
|
||||
topics: ['my-topic'],
|
||||
onEvent: expect.any(Function),
|
||||
});
|
||||
|
||||
// Publish successfully
|
||||
|
||||
onEvent!({
|
||||
topic: 'my-topic',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { extra: 'data', more: 'yes' },
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(pubSubFactory).toHaveBeenCalledWith('my-project');
|
||||
expect(pubSub.topic).toHaveBeenCalledWith('my-topic');
|
||||
expect(topic.publishMessage).toHaveBeenCalledWith({
|
||||
json: { foo: 'bar' },
|
||||
attributes: { extra: 'data', more: 'yes' },
|
||||
});
|
||||
});
|
||||
|
||||
// Simulate a failed publish
|
||||
|
||||
topic.publishMessage.mockRejectedValueOnce(new Error('Failed to publish'));
|
||||
|
||||
await expect(
|
||||
onEvent!({
|
||||
topic: 'my-topic',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { extra: 'data', more: 'yes' },
|
||||
}),
|
||||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Failed to publish"`);
|
||||
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
'Error publishing Google Pub/Sub message',
|
||||
new Error('Failed to publish'),
|
||||
);
|
||||
|
||||
// Shut down
|
||||
|
||||
await publisher.stop();
|
||||
expect(pubSub.close).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
+153
@@ -0,0 +1,153 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
LoggerService,
|
||||
RootConfigService,
|
||||
RootLifecycleService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { PubSub } from '@google-cloud/pubsub';
|
||||
import { Counter, metrics } from '@opentelemetry/api';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
import { SubscriptionTask } from './types';
|
||||
|
||||
/**
|
||||
* Reads messages off of the events system and forwards them into Google Pub/Sub
|
||||
* topics.
|
||||
*/
|
||||
export class EventConsumingGooglePubSubPublisher {
|
||||
readonly #logger: LoggerService;
|
||||
readonly #events: EventsService;
|
||||
readonly #tasks: SubscriptionTask[];
|
||||
readonly #pubSubFactory: (projectId: string) => PubSub;
|
||||
readonly #metrics: { messages: Counter };
|
||||
#activeClientsByProjectId: Map<string, PubSub>;
|
||||
|
||||
static create(options: {
|
||||
config: RootConfigService;
|
||||
logger: LoggerService;
|
||||
rootLifecycle: RootLifecycleService;
|
||||
events: EventsService;
|
||||
}) {
|
||||
const publisher = new EventConsumingGooglePubSubPublisher({
|
||||
logger: options.logger,
|
||||
events: options.events,
|
||||
tasks: readSubscriptionTasksFromConfig(options.config),
|
||||
pubSubFactory: projectId => new PubSub({ projectId }),
|
||||
});
|
||||
|
||||
options.rootLifecycle.addStartupHook(async () => {
|
||||
await publisher.start();
|
||||
});
|
||||
|
||||
options.rootLifecycle.addBeforeShutdownHook(async () => {
|
||||
await publisher.stop();
|
||||
});
|
||||
|
||||
return publisher;
|
||||
}
|
||||
|
||||
constructor(options: {
|
||||
logger: LoggerService;
|
||||
events: EventsService;
|
||||
tasks: SubscriptionTask[];
|
||||
pubSubFactory: (projectId: string) => PubSub;
|
||||
}) {
|
||||
this.#logger = options.logger;
|
||||
this.#events = options.events;
|
||||
this.#tasks = options.tasks;
|
||||
this.#pubSubFactory = options.pubSubFactory;
|
||||
|
||||
const meter = metrics.getMeter('default');
|
||||
this.#metrics = {
|
||||
messages: meter.createCounter(
|
||||
'events.google.pubsub.publisher.messages.total',
|
||||
{
|
||||
description:
|
||||
'Number of Pub/Sub messages sent by EventConsumingGooglePubSubPublisher',
|
||||
unit: 'short',
|
||||
},
|
||||
),
|
||||
};
|
||||
|
||||
this.#activeClientsByProjectId = new Map();
|
||||
}
|
||||
|
||||
async start() {
|
||||
for (const task of this.#tasks) {
|
||||
this.#logger.info(
|
||||
`Starting publisher: id=${
|
||||
task.id
|
||||
} sourceTopics=${task.sourceTopics.join(',')} targetTopic=${
|
||||
task.targetTopicPattern
|
||||
}`,
|
||||
);
|
||||
|
||||
await this.#events.subscribe({
|
||||
id: `EventConsumingGooglePubSubPublisher.${task.id}`,
|
||||
topics: task.sourceTopics,
|
||||
onEvent: async event => {
|
||||
let status: 'success' | 'failed' | 'ignored' = 'failed';
|
||||
try {
|
||||
const topic = task.mapToTopic(event);
|
||||
if (!topic) {
|
||||
status = 'ignored';
|
||||
return;
|
||||
}
|
||||
|
||||
let pubsub = this.#activeClientsByProjectId.get(topic.project);
|
||||
if (!pubsub) {
|
||||
pubsub = this.#pubSubFactory(topic.project);
|
||||
this.#activeClientsByProjectId.set(topic.project, pubsub);
|
||||
}
|
||||
|
||||
await pubsub.topic(topic.topic).publishMessage({
|
||||
json: event.eventPayload,
|
||||
attributes: task.mapToAttributes(event),
|
||||
});
|
||||
|
||||
status = 'success';
|
||||
} catch (error) {
|
||||
this.#logger.error(
|
||||
'Error publishing Google Pub/Sub message',
|
||||
error,
|
||||
);
|
||||
status = 'failed';
|
||||
throw error;
|
||||
} finally {
|
||||
this.#metrics.messages.add(1, {
|
||||
subscription: task.id,
|
||||
status: status,
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async stop() {
|
||||
const clients = Array.from(this.#activeClientsByProjectId.values());
|
||||
this.#activeClientsByProjectId = new Map();
|
||||
|
||||
await Promise.allSettled(
|
||||
clients.map(async client => {
|
||||
this.#logger.info(`Closing Google Pub/Sub client: ${client.projectId}`);
|
||||
await client.close();
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
+196
@@ -0,0 +1,196 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { readSubscriptionTasksFromConfig } from './config';
|
||||
|
||||
describe('readSubscriptionTasksFromConfig', () => {
|
||||
it('reads with basic targetTopic', () => {
|
||||
const data = {
|
||||
events: {
|
||||
modules: {
|
||||
googlePubSub: {
|
||||
eventConsumingGooglePubSubPublisher: {
|
||||
subscriptions: {
|
||||
subKey1: {
|
||||
sourceTopic: 'my-topic',
|
||||
targetTopicName: 'projects/pid/topics/tid',
|
||||
},
|
||||
subKey2: {
|
||||
sourceTopic: ['my-topic-1', 'my-topic-2'],
|
||||
targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = readSubscriptionTasksFromConfig(
|
||||
mockServices.rootConfig({ data }),
|
||||
);
|
||||
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: 'subKey1',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid',
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
{
|
||||
id: 'subKey2',
|
||||
sourceTopics: ['my-topic-1', 'my-topic-2'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
}),
|
||||
).toEqual({ project: 'pid', topic: 'tid' });
|
||||
expect(
|
||||
result[0].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { attr: 'yes' },
|
||||
}),
|
||||
).toEqual({ attr: 'yes' });
|
||||
});
|
||||
|
||||
it('fills in placeholders', () => {
|
||||
const data = {
|
||||
events: {
|
||||
modules: {
|
||||
googlePubSub: {
|
||||
eventConsumingGooglePubSubPublisher: {
|
||||
subscriptions: {
|
||||
sub1: {
|
||||
sourceTopic: 'my-topic',
|
||||
targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
messageAttributes: {
|
||||
attr1: 'updated.{{ event.metadata.exists }}',
|
||||
attr2: 'updated.{{ event.metadata.missing }}',
|
||||
},
|
||||
},
|
||||
sub2: {
|
||||
sourceTopic: 'my-topic',
|
||||
targetTopicName:
|
||||
'projects/pid/topics/tid.{{ event.metadata.missing }}',
|
||||
messageAttributes: {
|
||||
attr3: 'new',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const result = readSubscriptionTasksFromConfig(
|
||||
mockServices.rootConfig({ data }),
|
||||
);
|
||||
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: 'sub1',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}',
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
{
|
||||
id: 'sub2',
|
||||
sourceTopics: ['my-topic'],
|
||||
targetTopicPattern:
|
||||
'projects/pid/topics/tid.{{ event.metadata.missing }}',
|
||||
mapToTopic: expect.any(Function),
|
||||
mapToAttributes: expect.any(Function),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(
|
||||
result[0].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
}),
|
||||
).toEqual({ project: 'pid', topic: 'tid.a' }); // Message attribute existed, successfully routed
|
||||
expect(
|
||||
result[0].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
}),
|
||||
).toEqual({
|
||||
exists: 'exists',
|
||||
attr1: 'updated.exists', // message attribute existed, was replaced
|
||||
attr2: 'original2', // message attribute did not exist, was not replaced
|
||||
});
|
||||
|
||||
expect(
|
||||
result[1].mapToTopic({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
}),
|
||||
).toBeUndefined(); // Message attribute did not exist, could not be routed
|
||||
expect(
|
||||
result[1].mapToAttributes({
|
||||
topic: 'a',
|
||||
eventPayload: { foo: 'bar' },
|
||||
metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' },
|
||||
}),
|
||||
).toEqual({
|
||||
exists: 'exists',
|
||||
attr1: 'original1',
|
||||
attr2: 'original2',
|
||||
attr3: 'new',
|
||||
});
|
||||
});
|
||||
|
||||
it('rejects malformed subscription name', () => {
|
||||
const data = {
|
||||
events: {
|
||||
modules: {
|
||||
googlePubSub: {
|
||||
eventConsumingGooglePubSubPublisher: {
|
||||
subscriptions: {
|
||||
subKey: {
|
||||
sourceTopic: 'sid',
|
||||
targetTopicName: 'foo',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
expect(() =>
|
||||
readSubscriptionTasksFromConfig(mockServices.rootConfig({ data })),
|
||||
).toThrowErrorMatchingInlineSnapshot(
|
||||
`"Expected Google Pub/Sub 'targetTopicName' to be on the form 'projects/PROJECT_ID/topics/TOPIC_ID' but got 'foo'"`,
|
||||
);
|
||||
});
|
||||
});
|
||||
+146
@@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { RootConfigService } from '@backstage/backend-plugin-api';
|
||||
import { Config } from '@backstage/config';
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import { createPatternResolver } from '../util/createPatternResolver';
|
||||
import { SubscriptionTask } from './types';
|
||||
|
||||
export function readSubscriptionTasksFromConfig(
|
||||
rootConfig: RootConfigService,
|
||||
): SubscriptionTask[] {
|
||||
const subscriptionsConfig = rootConfig.getOptionalConfig(
|
||||
'events.modules.googlePubSub.eventConsumingGooglePubSubPublisher.subscriptions',
|
||||
);
|
||||
if (!subscriptionsConfig) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return subscriptionsConfig.keys().map(subscriptionId => {
|
||||
if (!subscriptionId.match(/^[-_\w]+$/)) {
|
||||
throw new InputError(
|
||||
`Expected Google Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`,
|
||||
);
|
||||
}
|
||||
|
||||
const config = subscriptionsConfig.getConfig(subscriptionId);
|
||||
const sourceTopics = readSourceTopics(config);
|
||||
const mapToTopic = readTopicMapper(config);
|
||||
const mapToAttributes = readAttributeMapper(config);
|
||||
|
||||
return {
|
||||
id: subscriptionId,
|
||||
sourceTopics: sourceTopics,
|
||||
targetTopicPattern: config.getString('targetTopicName'),
|
||||
mapToTopic,
|
||||
mapToAttributes,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function readSourceTopics(config: Config): string[] {
|
||||
if (Array.isArray(config.getOptional('sourceTopic'))) {
|
||||
return config.getStringArray('sourceTopic');
|
||||
}
|
||||
return [config.getString('sourceTopic')];
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the `targetTopicName` configuration field.
|
||||
*/
|
||||
function readTopicMapper(
|
||||
config: Config,
|
||||
): (event: EventParams) => { project: string; topic: string } | undefined {
|
||||
const regex = /^projects\/([^/]+)\/topics\/(.+)$/;
|
||||
|
||||
const targetTopicPattern = config.getString('targetTopicName');
|
||||
let parts = targetTopicPattern.match(regex);
|
||||
if (!parts) {
|
||||
throw new InputError(
|
||||
`Expected Google Pub/Sub 'targetTopicName' to be on the form 'projects/PROJECT_ID/topics/TOPIC_ID' but got '${targetTopicPattern}'`,
|
||||
);
|
||||
}
|
||||
|
||||
const patternResolver = createPatternResolver(targetTopicPattern);
|
||||
|
||||
return event => {
|
||||
try {
|
||||
parts = patternResolver({ event }).match(regex);
|
||||
if (!parts) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
project: parts[1],
|
||||
topic: parts[2],
|
||||
};
|
||||
} catch {
|
||||
// could not map to a topic
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the `messageAttributes` configuration field.
|
||||
*/
|
||||
function readAttributeMapper(
|
||||
config: Config,
|
||||
): (event: EventParams) => Record<string, string> {
|
||||
const setters = new Array<
|
||||
(options: {
|
||||
event: EventParams;
|
||||
attributes: Record<string, string>;
|
||||
}) => void
|
||||
>();
|
||||
|
||||
const eventMetadata = config.getOptionalConfig('messageAttributes');
|
||||
if (eventMetadata) {
|
||||
for (const key of eventMetadata?.keys() ?? []) {
|
||||
const valuePattern = eventMetadata.getString(key);
|
||||
const patternResolver = createPatternResolver(valuePattern);
|
||||
setters.push(({ event, attributes }) => {
|
||||
try {
|
||||
const value = patternResolver({ event });
|
||||
if (value) {
|
||||
attributes[key] = value;
|
||||
}
|
||||
} catch {
|
||||
// ignore silently, keep original
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return event => {
|
||||
const result: Record<string, string> = {};
|
||||
for (const [key, value] of Object.entries(event.metadata ?? {})) {
|
||||
if (value) {
|
||||
if (typeof value === 'string') {
|
||||
result[key] = value;
|
||||
} else if (Array.isArray(value) && value.length > 0) {
|
||||
// Google Pub/Sub does not support array values
|
||||
result[key] = value.join(',');
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const setter of setters) {
|
||||
setter({ event, attributes: result });
|
||||
}
|
||||
return result;
|
||||
};
|
||||
}
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export { eventsModuleEventConsumingGooglePubSubPublisher } from './module';
|
||||
+52
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
coreServices,
|
||||
createBackendModule,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { eventsServiceRef } from '@backstage/plugin-events-node';
|
||||
import { EventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher';
|
||||
|
||||
/**
|
||||
* Reads messages off of the events system and forwards them into Google Pub/Sub
|
||||
* topics.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export const eventsModuleEventConsumingGooglePubSubPublisher =
|
||||
createBackendModule({
|
||||
pluginId: 'events',
|
||||
moduleId: 'event-consuming-google-pubsub-publisher',
|
||||
register(reg) {
|
||||
reg.registerInit({
|
||||
deps: {
|
||||
config: coreServices.rootConfig,
|
||||
logger: coreServices.logger,
|
||||
rootLifecycle: coreServices.rootLifecycle,
|
||||
events: eventsServiceRef,
|
||||
},
|
||||
async init({ config, logger, rootLifecycle, events }) {
|
||||
EventConsumingGooglePubSubPublisher.create({
|
||||
config,
|
||||
logger,
|
||||
rootLifecycle,
|
||||
events,
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
+30
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
|
||||
/**
|
||||
* A configured subscription task.
|
||||
*/
|
||||
export interface SubscriptionTask {
|
||||
id: string;
|
||||
sourceTopics: string[];
|
||||
targetTopicPattern: string;
|
||||
mapToTopic: (
|
||||
event: EventParams,
|
||||
) => { project: string; topic: string } | undefined;
|
||||
mapToAttributes: (event: EventParams) => Record<string, string>;
|
||||
}
|
||||
+1
-1
@@ -188,7 +188,7 @@ describe('readSubscriptionTasksFromConfig', () => {
|
||||
expect(() =>
|
||||
readSubscriptionTasksFromConfig(mockServices.rootConfig({ data })),
|
||||
).toThrowErrorMatchingInlineSnapshot(
|
||||
`"Expected Googoe Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got 'sid'"`,
|
||||
`"Expected Google Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got 'sid'"`,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
+2
-2
@@ -34,7 +34,7 @@ export function readSubscriptionTasksFromConfig(
|
||||
return subscriptionsConfig.keys().map(subscriptionId => {
|
||||
if (!subscriptionId.match(/^[-_\w]+$/)) {
|
||||
throw new InputError(
|
||||
`Expected Googoe Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`,
|
||||
`Expected Google Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ function readSubscriptionName(config: Config): {
|
||||
);
|
||||
if (!parts) {
|
||||
throw new InputError(
|
||||
`Expected Googoe Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got '${subscriptionName}'`,
|
||||
`Expected Google Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got '${subscriptionName}'`,
|
||||
);
|
||||
}
|
||||
return {
|
||||
|
||||
-2
@@ -24,8 +24,6 @@ import { GooglePubSubConsumingEventPublisher } from './GooglePubSubConsumingEven
|
||||
/**
|
||||
* Reads messages off of Google Pub/Sub subscriptions and forwards them into the
|
||||
* Backstage events system.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export const eventsModuleGooglePubsubConsumingEventPublisher =
|
||||
createBackendModule({
|
||||
|
||||
@@ -14,10 +14,19 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { createBackendFeatureLoader } from '@backstage/backend-plugin-api';
|
||||
import { eventsModuleEventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher';
|
||||
import { eventsModuleGooglePubsubConsumingEventPublisher } from './GooglePubSubConsumingEventPublisher';
|
||||
|
||||
/**
|
||||
* The google-pubsub backend module for the events plugin.
|
||||
*
|
||||
* @packageDocumentation
|
||||
*/
|
||||
|
||||
export { eventsModuleGooglePubsubConsumingEventPublisher as default } from './GooglePubSubConsumingEventPublisher';
|
||||
export default createBackendFeatureLoader({
|
||||
*loader() {
|
||||
yield eventsModuleGooglePubsubConsumingEventPublisher;
|
||||
yield eventsModuleEventConsumingGooglePubSubPublisher;
|
||||
},
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user