diff --git a/.changeset/moody-doors-hug.md b/.changeset/moody-doors-hug.md new file mode 100644 index 0000000000..033bdeedd9 --- /dev/null +++ b/.changeset/moody-doors-hug.md @@ -0,0 +1,5 @@ +--- +'@backstage/plugin-events-backend-module-google-pubsub': patch +--- + +Add an `EventConsumingGooglePubSubPublisher`, for pushing Backstage events to pubsub diff --git a/plugins/events-backend-module-google-pubsub/README.md b/plugins/events-backend-module-google-pubsub/README.md index 67ddcfacb4..9752fa30c6 100644 --- a/plugins/events-backend-module-google-pubsub/README.md +++ b/plugins/events-backend-module-google-pubsub/README.md @@ -19,11 +19,24 @@ events: # The fully qualified name of the subscription subscriptionName: 'projects/my-google-project/subscriptions/github-enterprise-events' # The event system topic to transfer to. This can also be just a plain string - targetTopic: - # This example picks the topic name from a message attribute + a prefix - fromMessageAttribute: - attributeName: 'x-github-event' - withPrefix: 'github.' + targetTopic: 'github.{{ event.attributes.x-github-event }}' +``` + +The following configuration enables the transfer of events from a Backstage events topic into a Google +Pub/Sub topic. + +```yaml +events: + modules: + googlePubSub: + eventConsumingGooglePubSubPublisher: + subscriptions: + # A unique key for your subscription, to be used in logging and metrics + mySubscription: + # The source topic (or array of topics) + sourceTopic: 'github' + # The fully qualified name of the target topic + targetTopicName: 'projects/my-google-project/topics/github-enterprise-events' ``` ## Installation diff --git a/plugins/events-backend-module-google-pubsub/config.d.ts b/plugins/events-backend-module-google-pubsub/config.d.ts index 3e70489b9d..1097b71e00 100644 --- a/plugins/events-backend-module-google-pubsub/config.d.ts +++ b/plugins/events-backend-module-google-pubsub/config.d.ts @@ -90,6 +90,68 @@ export interface Config { }; }; }; + + /** + * Configuration for `EventConsumingGooglePubSubPublisher`, which + * consumes messages from the Backstage events system and forwards them + * into Google Pub/Sub topics. + */ + eventConsumingGooglePubSubPublisher?: { + subscriptions: { + [name: string]: { + /** + * The name of the events backend topic(s) that messages are + * consumed from. + */ + sourceTopic: string | string[]; + + /** + * The complete name of the Google Pub/Sub subscription to forward + * events to, on the form + * `projects/PROJECT_ID/topics/TOPIC_ID`. + * + * The value can contain placeholders on the form `{{ + * message.attributes.foo }}`, to mirror attribute `foo` as the + * whole or part of the topic name. + * + * @example + * + * This example expects the events topic to contain GitHub + * webhook events where the HTTP headers were mapped into + * event metadata fields. The outcome should be that messages + * end up on event topics such as `github.push`, + * `github.repository` etc which matches the [`@backstage/plugin-events-backend-module-github`](https://github.com/backstage/backstage/tree/master/plugins/events-backend-module-github) structure. + * + * ```yaml + * targetTopic: 'projects/my-project/topics/github.{{ event.metadata.x-github-event }}' + * ``` + */ + targetTopicName: string; + + /** + * Event metadata fields are by default copied to the Pub/Sub + * message attribute. This setting allows you to override or amend + * those attributes. + * + * @remarks + * + * The values can contain placeholders on the form `{{ event.metadata.foo }}`, + * to mirror metadata field `foo` as the whole or part of a + * message attribute value. + * + * @example + * + * ```yaml + * messageAttributes: + * x-gitHub-event: '{{ event.metadata.event }}' + * ``` + */ + messageAttributes?: { + [key: string]: string; + }; + }; + }; + }; }; }; }; diff --git a/plugins/events-backend-module-google-pubsub/dev/index.ts b/plugins/events-backend-module-google-pubsub/dev/index.ts index 752217e6db..78a7e775cf 100644 --- a/plugins/events-backend-module-google-pubsub/dev/index.ts +++ b/plugins/events-backend-module-google-pubsub/dev/index.ts @@ -28,20 +28,21 @@ backend.add(import('../src')); backend.add( createBackendPlugin({ - pluginId: 'example-event-consumer', + pluginId: 'example-event-drivers', register(reg) { reg.registerInit({ deps: { events: eventsServiceRef, logger: coreServices.logger, + scheduler: coreServices.scheduler, }, - async init({ events, logger }) { - events.subscribe({ - id: 'example-subscription', - topics: ['example-topic'], + async init({ events, logger, scheduler }) { + await events.subscribe({ + id: 'inbox-subscription', + topics: ['inbox-topic'], async onEvent(event) { logger.info( - `Received event: ${JSON.stringify( + `Received inbox event: ${JSON.stringify( { topic: event.topic, payload: event.eventPayload, @@ -53,6 +54,24 @@ backend.add( ); }, }); + + await scheduler.scheduleTask({ + id: 'outbox-task', + scope: 'local', + frequency: { seconds: 5 }, + timeout: { seconds: 5 }, + fn: async () => { + await events.publish({ + topic: 'outbox-topic', + eventPayload: { + message: 'Hello, world!', + }, + metadata: { + source: 'outbox-task', + }, + }); + }, + }); }, }); }, diff --git a/plugins/events-backend-module-google-pubsub/report.api.md b/plugins/events-backend-module-google-pubsub/report.api.md index bdb057d7fc..b42c1f80ff 100644 --- a/plugins/events-backend-module-google-pubsub/report.api.md +++ b/plugins/events-backend-module-google-pubsub/report.api.md @@ -6,6 +6,6 @@ import { BackendFeature } from '@backstage/backend-plugin-api'; // @public -const eventsModuleGooglePubsubConsumingEventPublisher: BackendFeature; -export default eventsModuleGooglePubsubConsumingEventPublisher; +const _default: BackendFeature; +export default _default; ``` diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts new file mode 100644 index 0000000000..e5af68add7 --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.test.ts @@ -0,0 +1,111 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { mockServices } from '@backstage/backend-test-utils'; +import { EventParams } from '@backstage/plugin-events-node'; +import { PubSub } from '@google-cloud/pubsub'; +import waitFor from 'wait-for-expect'; +import { EventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher'; + +describe('EventConsumingGooglePubSubPublisher', () => { + const logger = mockServices.logger.mock(); + const events = mockServices.events.mock(); + + let onEvent: undefined | ((event: EventParams) => void); + events.subscribe.mockImplementation(async options => { + onEvent = options.onEvent; + }); + + const topic = { + publishMessage: jest.fn(), + }; + const pubSub = { + close: jest.fn(), + topic: jest.fn(() => topic), + }; + const pubSubFactory = jest.fn(() => pubSub as unknown as PubSub); + + beforeEach(() => { + onEvent = undefined; + jest.clearAllMocks(); + }); + + it('should go though the expected registration and flows', async () => { + const publisher = new EventConsumingGooglePubSubPublisher({ + logger, + events, + tasks: [ + { + id: 'my-id', + sourceTopics: ['my-topic'], + targetTopicPattern: 'projects/my-project/topics/my-topic', + mapToTopic: () => ({ project: 'my-project', topic: 'my-topic' }), + mapToAttributes: m => ({ ...m.metadata, more: 'yes' }), + }, + ], + pubSubFactory, + }); + + // Start up + + await publisher.start(); + + expect(events.subscribe).toHaveBeenCalledWith({ + id: 'EventConsumingGooglePubSubPublisher.my-id', + topics: ['my-topic'], + onEvent: expect.any(Function), + }); + + // Publish successfully + + onEvent!({ + topic: 'my-topic', + eventPayload: { foo: 'bar' }, + metadata: { extra: 'data', more: 'yes' }, + }); + + await waitFor(() => { + expect(pubSubFactory).toHaveBeenCalledWith('my-project'); + expect(pubSub.topic).toHaveBeenCalledWith('my-topic'); + expect(topic.publishMessage).toHaveBeenCalledWith({ + json: { foo: 'bar' }, + attributes: { extra: 'data', more: 'yes' }, + }); + }); + + // Simulate a failed publish + + topic.publishMessage.mockRejectedValueOnce(new Error('Failed to publish')); + + await expect( + onEvent!({ + topic: 'my-topic', + eventPayload: { foo: 'bar' }, + metadata: { extra: 'data', more: 'yes' }, + }), + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failed to publish"`); + + expect(logger.error).toHaveBeenCalledWith( + 'Error publishing Google Pub/Sub message', + new Error('Failed to publish'), + ); + + // Shut down + + await publisher.stop(); + expect(pubSub.close).toHaveBeenCalled(); + }); +}); diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts new file mode 100644 index 0000000000..4a88bc2e79 --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/EventConsumingGooglePubSubPublisher.ts @@ -0,0 +1,153 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + LoggerService, + RootConfigService, + RootLifecycleService, +} from '@backstage/backend-plugin-api'; +import { EventsService } from '@backstage/plugin-events-node'; +import { PubSub } from '@google-cloud/pubsub'; +import { Counter, metrics } from '@opentelemetry/api'; +import { readSubscriptionTasksFromConfig } from './config'; +import { SubscriptionTask } from './types'; + +/** + * Reads messages off of the events system and forwards them into Google Pub/Sub + * topics. + */ +export class EventConsumingGooglePubSubPublisher { + readonly #logger: LoggerService; + readonly #events: EventsService; + readonly #tasks: SubscriptionTask[]; + readonly #pubSubFactory: (projectId: string) => PubSub; + readonly #metrics: { messages: Counter }; + #activeClientsByProjectId: Map; + + static create(options: { + config: RootConfigService; + logger: LoggerService; + rootLifecycle: RootLifecycleService; + events: EventsService; + }) { + const publisher = new EventConsumingGooglePubSubPublisher({ + logger: options.logger, + events: options.events, + tasks: readSubscriptionTasksFromConfig(options.config), + pubSubFactory: projectId => new PubSub({ projectId }), + }); + + options.rootLifecycle.addStartupHook(async () => { + await publisher.start(); + }); + + options.rootLifecycle.addBeforeShutdownHook(async () => { + await publisher.stop(); + }); + + return publisher; + } + + constructor(options: { + logger: LoggerService; + events: EventsService; + tasks: SubscriptionTask[]; + pubSubFactory: (projectId: string) => PubSub; + }) { + this.#logger = options.logger; + this.#events = options.events; + this.#tasks = options.tasks; + this.#pubSubFactory = options.pubSubFactory; + + const meter = metrics.getMeter('default'); + this.#metrics = { + messages: meter.createCounter( + 'events.google.pubsub.publisher.messages.total', + { + description: + 'Number of Pub/Sub messages sent by EventConsumingGooglePubSubPublisher', + unit: 'short', + }, + ), + }; + + this.#activeClientsByProjectId = new Map(); + } + + async start() { + for (const task of this.#tasks) { + this.#logger.info( + `Starting publisher: id=${ + task.id + } sourceTopics=${task.sourceTopics.join(',')} targetTopic=${ + task.targetTopicPattern + }`, + ); + + await this.#events.subscribe({ + id: `EventConsumingGooglePubSubPublisher.${task.id}`, + topics: task.sourceTopics, + onEvent: async event => { + let status: 'success' | 'failed' | 'ignored' = 'failed'; + try { + const topic = task.mapToTopic(event); + if (!topic) { + status = 'ignored'; + return; + } + + let pubsub = this.#activeClientsByProjectId.get(topic.project); + if (!pubsub) { + pubsub = this.#pubSubFactory(topic.project); + this.#activeClientsByProjectId.set(topic.project, pubsub); + } + + await pubsub.topic(topic.topic).publishMessage({ + json: event.eventPayload, + attributes: task.mapToAttributes(event), + }); + + status = 'success'; + } catch (error) { + this.#logger.error( + 'Error publishing Google Pub/Sub message', + error, + ); + status = 'failed'; + throw error; + } finally { + this.#metrics.messages.add(1, { + subscription: task.id, + status: status, + }); + } + }, + }); + } + } + + async stop() { + const clients = Array.from(this.#activeClientsByProjectId.values()); + this.#activeClientsByProjectId = new Map(); + + await Promise.allSettled( + clients.map(async client => { + this.#logger.info(`Closing Google Pub/Sub client: ${client.projectId}`); + await client.close(); + }), + ); + } +} diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts new file mode 100644 index 0000000000..c3a5c1a5bb --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.test.ts @@ -0,0 +1,196 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { mockServices } from '@backstage/backend-test-utils'; +import { readSubscriptionTasksFromConfig } from './config'; + +describe('readSubscriptionTasksFromConfig', () => { + it('reads with basic targetTopic', () => { + const data = { + events: { + modules: { + googlePubSub: { + eventConsumingGooglePubSubPublisher: { + subscriptions: { + subKey1: { + sourceTopic: 'my-topic', + targetTopicName: 'projects/pid/topics/tid', + }, + subKey2: { + sourceTopic: ['my-topic-1', 'my-topic-2'], + targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}', + }, + }, + }, + }, + }, + }, + }; + + const result = readSubscriptionTasksFromConfig( + mockServices.rootConfig({ data }), + ); + + expect(result).toEqual([ + { + id: 'subKey1', + sourceTopics: ['my-topic'], + targetTopicPattern: 'projects/pid/topics/tid', + mapToTopic: expect.any(Function), + mapToAttributes: expect.any(Function), + }, + { + id: 'subKey2', + sourceTopics: ['my-topic-1', 'my-topic-2'], + targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}', + mapToTopic: expect.any(Function), + mapToAttributes: expect.any(Function), + }, + ]); + + expect( + result[0].mapToTopic({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { attr: 'yes' }, + }), + ).toEqual({ project: 'pid', topic: 'tid' }); + expect( + result[0].mapToAttributes({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { attr: 'yes' }, + }), + ).toEqual({ attr: 'yes' }); + }); + + it('fills in placeholders', () => { + const data = { + events: { + modules: { + googlePubSub: { + eventConsumingGooglePubSubPublisher: { + subscriptions: { + sub1: { + sourceTopic: 'my-topic', + targetTopicName: 'projects/pid/topics/tid.{{ event.topic }}', + messageAttributes: { + attr1: 'updated.{{ event.metadata.exists }}', + attr2: 'updated.{{ event.metadata.missing }}', + }, + }, + sub2: { + sourceTopic: 'my-topic', + targetTopicName: + 'projects/pid/topics/tid.{{ event.metadata.missing }}', + messageAttributes: { + attr3: 'new', + }, + }, + }, + }, + }, + }, + }, + }; + + const result = readSubscriptionTasksFromConfig( + mockServices.rootConfig({ data }), + ); + + expect(result).toEqual([ + { + id: 'sub1', + sourceTopics: ['my-topic'], + targetTopicPattern: 'projects/pid/topics/tid.{{ event.topic }}', + mapToTopic: expect.any(Function), + mapToAttributes: expect.any(Function), + }, + { + id: 'sub2', + sourceTopics: ['my-topic'], + targetTopicPattern: + 'projects/pid/topics/tid.{{ event.metadata.missing }}', + mapToTopic: expect.any(Function), + mapToAttributes: expect.any(Function), + }, + ]); + + expect( + result[0].mapToTopic({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + }), + ).toEqual({ project: 'pid', topic: 'tid.a' }); // Message attribute existed, successfully routed + expect( + result[0].mapToAttributes({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + }), + ).toEqual({ + exists: 'exists', + attr1: 'updated.exists', // message attribute existed, was replaced + attr2: 'original2', // message attribute did not exist, was not replaced + }); + + expect( + result[1].mapToTopic({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + }), + ).toBeUndefined(); // Message attribute did not exist, could not be routed + expect( + result[1].mapToAttributes({ + topic: 'a', + eventPayload: { foo: 'bar' }, + metadata: { exists: 'exists', attr1: 'original1', attr2: 'original2' }, + }), + ).toEqual({ + exists: 'exists', + attr1: 'original1', + attr2: 'original2', + attr3: 'new', + }); + }); + + it('rejects malformed subscription name', () => { + const data = { + events: { + modules: { + googlePubSub: { + eventConsumingGooglePubSubPublisher: { + subscriptions: { + subKey: { + sourceTopic: 'sid', + targetTopicName: 'foo', + }, + }, + }, + }, + }, + }, + }; + + expect(() => + readSubscriptionTasksFromConfig(mockServices.rootConfig({ data })), + ).toThrowErrorMatchingInlineSnapshot( + `"Expected Google Pub/Sub 'targetTopicName' to be on the form 'projects/PROJECT_ID/topics/TOPIC_ID' but got 'foo'"`, + ); + }); +}); diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts new file mode 100644 index 0000000000..7ccd78cfcb --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/config.ts @@ -0,0 +1,146 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { RootConfigService } from '@backstage/backend-plugin-api'; +import { Config } from '@backstage/config'; +import { InputError } from '@backstage/errors'; +import { EventParams } from '@backstage/plugin-events-node'; +import { createPatternResolver } from '../util/createPatternResolver'; +import { SubscriptionTask } from './types'; + +export function readSubscriptionTasksFromConfig( + rootConfig: RootConfigService, +): SubscriptionTask[] { + const subscriptionsConfig = rootConfig.getOptionalConfig( + 'events.modules.googlePubSub.eventConsumingGooglePubSubPublisher.subscriptions', + ); + if (!subscriptionsConfig) { + return []; + } + + return subscriptionsConfig.keys().map(subscriptionId => { + if (!subscriptionId.match(/^[-_\w]+$/)) { + throw new InputError( + `Expected Google Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`, + ); + } + + const config = subscriptionsConfig.getConfig(subscriptionId); + const sourceTopics = readSourceTopics(config); + const mapToTopic = readTopicMapper(config); + const mapToAttributes = readAttributeMapper(config); + + return { + id: subscriptionId, + sourceTopics: sourceTopics, + targetTopicPattern: config.getString('targetTopicName'), + mapToTopic, + mapToAttributes, + }; + }); +} + +function readSourceTopics(config: Config): string[] { + if (Array.isArray(config.getOptional('sourceTopic'))) { + return config.getStringArray('sourceTopic'); + } + return [config.getString('sourceTopic')]; +} + +/** + * Handles the `targetTopicName` configuration field. + */ +function readTopicMapper( + config: Config, +): (event: EventParams) => { project: string; topic: string } | undefined { + const regex = /^projects\/([^/]+)\/topics\/(.+)$/; + + const targetTopicPattern = config.getString('targetTopicName'); + let parts = targetTopicPattern.match(regex); + if (!parts) { + throw new InputError( + `Expected Google Pub/Sub 'targetTopicName' to be on the form 'projects/PROJECT_ID/topics/TOPIC_ID' but got '${targetTopicPattern}'`, + ); + } + + const patternResolver = createPatternResolver(targetTopicPattern); + + return event => { + try { + parts = patternResolver({ event }).match(regex); + if (!parts) { + return undefined; + } + return { + project: parts[1], + topic: parts[2], + }; + } catch { + // could not map to a topic + return undefined; + } + }; +} + +/** + * Handles the `messageAttributes` configuration field. + */ +function readAttributeMapper( + config: Config, +): (event: EventParams) => Record { + const setters = new Array< + (options: { + event: EventParams; + attributes: Record; + }) => void + >(); + + const eventMetadata = config.getOptionalConfig('messageAttributes'); + if (eventMetadata) { + for (const key of eventMetadata?.keys() ?? []) { + const valuePattern = eventMetadata.getString(key); + const patternResolver = createPatternResolver(valuePattern); + setters.push(({ event, attributes }) => { + try { + const value = patternResolver({ event }); + if (value) { + attributes[key] = value; + } + } catch { + // ignore silently, keep original + } + }); + } + } + + return event => { + const result: Record = {}; + for (const [key, value] of Object.entries(event.metadata ?? {})) { + if (value) { + if (typeof value === 'string') { + result[key] = value; + } else if (Array.isArray(value) && value.length > 0) { + // Google Pub/Sub does not support array values + result[key] = value.join(','); + } + } + } + for (const setter of setters) { + setter({ event, attributes: result }); + } + return result; + }; +} diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/index.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/index.ts new file mode 100644 index 0000000000..3e5713a28d --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/index.ts @@ -0,0 +1,17 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export { eventsModuleEventConsumingGooglePubSubPublisher } from './module'; diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/module.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/module.ts new file mode 100644 index 0000000000..498996fee0 --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/module.ts @@ -0,0 +1,52 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + coreServices, + createBackendModule, +} from '@backstage/backend-plugin-api'; +import { eventsServiceRef } from '@backstage/plugin-events-node'; +import { EventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher'; + +/** + * Reads messages off of the events system and forwards them into Google Pub/Sub + * topics. + * + * @public + */ +export const eventsModuleEventConsumingGooglePubSubPublisher = + createBackendModule({ + pluginId: 'events', + moduleId: 'event-consuming-google-pubsub-publisher', + register(reg) { + reg.registerInit({ + deps: { + config: coreServices.rootConfig, + logger: coreServices.logger, + rootLifecycle: coreServices.rootLifecycle, + events: eventsServiceRef, + }, + async init({ config, logger, rootLifecycle, events }) { + EventConsumingGooglePubSubPublisher.create({ + config, + logger, + rootLifecycle, + events, + }); + }, + }); + }, + }); diff --git a/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts new file mode 100644 index 0000000000..36b3aeaa78 --- /dev/null +++ b/plugins/events-backend-module-google-pubsub/src/EventConsumingGooglePubSubPublisher/types.ts @@ -0,0 +1,30 @@ +/* + * Copyright 2025 The Backstage Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { EventParams } from '@backstage/plugin-events-node'; + +/** + * A configured subscription task. + */ +export interface SubscriptionTask { + id: string; + sourceTopics: string[]; + targetTopicPattern: string; + mapToTopic: ( + event: EventParams, + ) => { project: string; topic: string } | undefined; + mapToAttributes: (event: EventParams) => Record; +} diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts index bfe3c20773..b4034123f0 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.test.ts @@ -188,7 +188,7 @@ describe('readSubscriptionTasksFromConfig', () => { expect(() => readSubscriptionTasksFromConfig(mockServices.rootConfig({ data })), ).toThrowErrorMatchingInlineSnapshot( - `"Expected Googoe Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got 'sid'"`, + `"Expected Google Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got 'sid'"`, ); }); }); diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts index fdcab2e2e5..d8dfb2ada2 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/config.ts @@ -34,7 +34,7 @@ export function readSubscriptionTasksFromConfig( return subscriptionsConfig.keys().map(subscriptionId => { if (!subscriptionId.match(/^[-_\w]+$/)) { throw new InputError( - `Expected Googoe Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`, + `Expected Google Pub/Sub subscription ID to consist of letters, numbers, dashes and underscores, but got '${subscriptionId}'`, ); } @@ -63,7 +63,7 @@ function readSubscriptionName(config: Config): { ); if (!parts) { throw new InputError( - `Expected Googoe Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got '${subscriptionName}'`, + `Expected Google Pub/Sub 'subscriptionName' to be on the form 'projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID' but got '${subscriptionName}'`, ); } return { diff --git a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/module.ts b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/module.ts index 14cfdd2bd8..3d5fce14ee 100644 --- a/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/module.ts +++ b/plugins/events-backend-module-google-pubsub/src/GooglePubSubConsumingEventPublisher/module.ts @@ -24,8 +24,6 @@ import { GooglePubSubConsumingEventPublisher } from './GooglePubSubConsumingEven /** * Reads messages off of Google Pub/Sub subscriptions and forwards them into the * Backstage events system. - * - * @public */ export const eventsModuleGooglePubsubConsumingEventPublisher = createBackendModule({ diff --git a/plugins/events-backend-module-google-pubsub/src/index.ts b/plugins/events-backend-module-google-pubsub/src/index.ts index 2b79fcb9a1..0dcbe3c5d8 100644 --- a/plugins/events-backend-module-google-pubsub/src/index.ts +++ b/plugins/events-backend-module-google-pubsub/src/index.ts @@ -14,10 +14,19 @@ * limitations under the License. */ +import { createBackendFeatureLoader } from '@backstage/backend-plugin-api'; +import { eventsModuleEventConsumingGooglePubSubPublisher } from './EventConsumingGooglePubSubPublisher'; +import { eventsModuleGooglePubsubConsumingEventPublisher } from './GooglePubSubConsumingEventPublisher'; + /** * The google-pubsub backend module for the events plugin. * * @packageDocumentation */ -export { eventsModuleGooglePubsubConsumingEventPublisher as default } from './GooglePubSubConsumingEventPublisher'; +export default createBackendFeatureLoader({ + *loader() { + yield eventsModuleGooglePubsubConsumingEventPublisher; + yield eventsModuleEventConsumingGooglePubSubPublisher; + }, +});