Fix dataloader caching, and use the proper catalog service ref

Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2025-05-06 14:47:50 +02:00
parent 80aa734ade
commit f6480c7fea
8 changed files with 83 additions and 67 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-notifications-backend-module-slack': patch
---
Fix dataloader caching, and use the proper catalog service ref
@@ -89,6 +89,7 @@ CVEs
daemonsets
Datadog
dataflow
dataloader
dayjs
debounce
debounces
@@ -35,10 +35,10 @@
},
"dependencies": {
"@backstage/backend-plugin-api": "workspace:^",
"@backstage/catalog-client": "workspace:^",
"@backstage/catalog-model": "workspace:^",
"@backstage/config": "workspace:^",
"@backstage/errors": "workspace:^",
"@backstage/plugin-catalog-node": "workspace:^",
"@backstage/plugin-notifications-common": "workspace:^",
"@backstage/plugin-notifications-node": "workspace:^",
"@backstage/types": "workspace:^",
@@ -109,7 +109,6 @@ const DEFAULT_ENTITIES_RESPONSE = {
describe('SlackNotificationProcessor', () => {
const logger = mockServices.logger.mock();
const auth = mockServices.auth();
const discovery = mockServices.discovery();
const config = mockServices.rootConfig({
data: {
app: {
@@ -136,7 +135,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -199,7 +197,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -275,7 +272,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -310,7 +306,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -366,7 +361,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(broadcastConfig, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -405,7 +399,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -429,7 +422,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: [DEFAULT_ENTITIES_RESPONSE.items[2]],
@@ -469,7 +461,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -515,7 +506,6 @@ describe('SlackNotificationProcessor', () => {
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -600,7 +590,6 @@ describe('SlackNotificationProcessor', () => {
const slack = new WebClient();
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -641,7 +630,6 @@ describe('SlackNotificationProcessor', () => {
const slack = new WebClient();
const processor = SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogServiceMock({
entities: DEFAULT_ENTITIES_RESPONSE.items,
@@ -14,12 +14,7 @@
* limitations under the License.
*/
import {
AuthService,
DiscoveryService,
LoggerService,
} from '@backstage/backend-plugin-api';
import { CatalogApi } from '@backstage/catalog-client';
import { AuthService, LoggerService } from '@backstage/backend-plugin-api';
import {
Entity,
isUserEntity,
@@ -39,25 +34,26 @@ import { ChatPostMessageArguments, WebClient } from '@slack/web-api';
import DataLoader from 'dataloader';
import pThrottle from 'p-throttle';
import { ANNOTATION_SLACK_BOT_NOTIFY } from './constants';
import { toChatPostMessageArgs } from './util';
import { ExpiryMap, toChatPostMessageArgs } from './util';
import { CatalogService } from '@backstage/plugin-catalog-node';
export class SlackNotificationProcessor implements NotificationProcessor {
private readonly logger: LoggerService;
private readonly catalog: CatalogApi;
private readonly catalog: CatalogService;
private readonly auth: AuthService;
private readonly slack: WebClient;
private readonly sendNotifications;
private readonly messagesSent: Counter;
private readonly messagesFailed: Counter;
private readonly broadcastChannels?: string[];
private readonly entityLoader: DataLoader<string, Entity | undefined>;
static fromConfig(
config: Config,
options: {
auth: AuthService;
discovery: DiscoveryService;
logger: LoggerService;
catalog: CatalogApi;
catalog: CatalogService;
slack?: WebClient;
broadcastChannels?: string[];
},
@@ -79,9 +75,8 @@ export class SlackNotificationProcessor implements NotificationProcessor {
private constructor(options: {
slack: WebClient;
auth: AuthService;
discovery: DiscoveryService;
logger: LoggerService;
catalog: CatalogApi;
catalog: CatalogService;
broadcastChannels?: string[];
}) {
const { auth, catalog, logger, slack, broadcastChannels } = options;
@@ -91,6 +86,31 @@ export class SlackNotificationProcessor implements NotificationProcessor {
this.slack = slack;
this.broadcastChannels = broadcastChannels;
this.entityLoader = new DataLoader<string, Entity | undefined>(
async entityRefs => {
return await this.catalog
.getEntitiesByRefs(
{
entityRefs: entityRefs.slice(),
fields: [
`kind`,
`spec.profile.email`,
`metadata.annotations.${ANNOTATION_SLACK_BOT_NOTIFY}`,
],
},
{ credentials: await this.auth.getOwnServiceCredentials() },
)
.then(r => r.items);
},
{
name: 'SlackNotificationProcessor.entityLoader',
cacheMap: new ExpiryMap(durationToMilliseconds({ minutes: 10 })),
maxBatchSize: 100,
batchScheduleFn: cb =>
setTimeout(cb, durationToMilliseconds({ milliseconds: 10 })),
},
);
const meter = metrics.getMeter('default');
this.messagesSent = meter.createCounter(
'notifications.processors.slack.sent.count',
@@ -193,7 +213,6 @@ export class SlackNotificationProcessor implements NotificationProcessor {
}),
);
console.log('dispatching message');
await this.sendNotifications(outbound);
return options;
@@ -261,31 +280,6 @@ export class SlackNotificationProcessor implements NotificationProcessor {
};
}
async getEntities(
entityRefs: readonly string[],
): Promise<(Entity | undefined)[]> {
const { token } = await this.auth.getPluginRequestToken({
onBehalfOf: await this.auth.getOwnServiceCredentials(),
targetPluginId: 'catalog',
});
const response = await this.catalog.getEntitiesByRefs(
{
entityRefs: entityRefs.slice(),
fields: [
`kind`,
`spec.profile.email`,
`metadata.annotations.${ANNOTATION_SLACK_BOT_NOTIFY}`,
],
},
{
token,
},
);
return response.items;
}
async replaceUserRefsWithSlackIds(
text?: string,
): Promise<string | undefined> {
@@ -327,13 +321,8 @@ export class SlackNotificationProcessor implements NotificationProcessor {
async getSlackNotificationTarget(
entityRef: string,
): Promise<string | undefined> {
const entityLoader = new DataLoader<string, Entity | undefined>(
entityRefs => this.getEntities(entityRefs),
);
const entity = await entityLoader.load(entityRef);
const entity = await this.entityLoader.load(entityRef);
if (!entity) {
console.log(`Entity not found: ${entityRef}`);
throw new NotFoundError(`Entity not found: ${entityRef}`);
}
@@ -90,3 +90,42 @@ function getColor(severity: NotificationSeverity | undefined) {
return '#00A699'; // Neutral color
}
}
// Simple expiry map for the data loader, which only expects a map that implements set, get, and delete and clear
export class ExpiryMap<K, V> extends Map<K, V> {
#ttlMs: number;
#timestamps: Map<K, number> = new Map();
constructor(ttlMs: number) {
super();
this.#ttlMs = ttlMs;
}
set(key: K, value: V) {
const result = super.set(key, value);
this.#timestamps.set(key, Date.now());
return result;
}
get(key: K) {
if (!this.has(key)) {
return undefined;
}
const timestamp = this.#timestamps.get(key)!;
if (Date.now() - timestamp > this.#ttlMs) {
this.delete(key);
return undefined;
}
return super.get(key);
}
delete(key: K) {
this.#timestamps.delete(key);
return super.delete(key);
}
clear() {
this.#timestamps.clear();
return super.clear();
}
}
@@ -17,9 +17,9 @@ import {
coreServices,
createBackendModule,
} from '@backstage/backend-plugin-api';
import { CatalogClient } from '@backstage/catalog-client';
import { notificationsProcessingExtensionPoint } from '@backstage/plugin-notifications-node';
import { SlackNotificationProcessor } from './lib/SlackNotificationProcessor';
import { catalogServiceRef } from '@backstage/plugin-catalog-node';
/**
* The Slack notification processor for use with the notifications plugin.
@@ -35,21 +35,16 @@ export const notificationsModuleSlack = createBackendModule({
deps: {
auth: coreServices.auth,
config: coreServices.rootConfig,
discovery: coreServices.discovery,
logger: coreServices.logger,
catalog: catalogServiceRef,
notifications: notificationsProcessingExtensionPoint,
},
async init({ auth, config, discovery, logger, notifications }) {
const catalogClient = new CatalogClient({
discoveryApi: discovery,
});
async init({ auth, config, logger, catalog, notifications }) {
notifications.addProcessor(
SlackNotificationProcessor.fromConfig(config, {
auth,
discovery,
logger,
catalog: catalogClient,
catalog,
}),
);
},
-1
View File
@@ -7205,7 +7205,6 @@ __metadata:
dependencies:
"@backstage/backend-plugin-api": "workspace:^"
"@backstage/backend-test-utils": "workspace:^"
"@backstage/catalog-client": "workspace:^"
"@backstage/catalog-model": "workspace:^"
"@backstage/cli": "workspace:^"
"@backstage/config": "workspace:^"