Fix dataloader caching, and use the proper catalog service ref
Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-notifications-backend-module-slack': patch
|
||||
---
|
||||
|
||||
Fix dataloader caching, and use the proper catalog service ref
|
||||
@@ -89,6 +89,7 @@ CVEs
|
||||
daemonsets
|
||||
Datadog
|
||||
dataflow
|
||||
dataloader
|
||||
dayjs
|
||||
debounce
|
||||
debounces
|
||||
|
||||
@@ -35,10 +35,10 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@backstage/backend-plugin-api": "workspace:^",
|
||||
"@backstage/catalog-client": "workspace:^",
|
||||
"@backstage/catalog-model": "workspace:^",
|
||||
"@backstage/config": "workspace:^",
|
||||
"@backstage/errors": "workspace:^",
|
||||
"@backstage/plugin-catalog-node": "workspace:^",
|
||||
"@backstage/plugin-notifications-common": "workspace:^",
|
||||
"@backstage/plugin-notifications-node": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
|
||||
@@ -109,7 +109,6 @@ const DEFAULT_ENTITIES_RESPONSE = {
|
||||
describe('SlackNotificationProcessor', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const auth = mockServices.auth();
|
||||
const discovery = mockServices.discovery();
|
||||
const config = mockServices.rootConfig({
|
||||
data: {
|
||||
app: {
|
||||
@@ -136,7 +135,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -199,7 +197,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -275,7 +272,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -310,7 +306,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -366,7 +361,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(broadcastConfig, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -405,7 +399,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -429,7 +422,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: [DEFAULT_ENTITIES_RESPONSE.items[2]],
|
||||
@@ -469,7 +461,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -515,7 +506,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -600,7 +590,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
const slack = new WebClient();
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
@@ -641,7 +630,6 @@ describe('SlackNotificationProcessor', () => {
|
||||
const slack = new WebClient();
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
|
||||
@@ -14,12 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
AuthService,
|
||||
DiscoveryService,
|
||||
LoggerService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { CatalogApi } from '@backstage/catalog-client';
|
||||
import { AuthService, LoggerService } from '@backstage/backend-plugin-api';
|
||||
import {
|
||||
Entity,
|
||||
isUserEntity,
|
||||
@@ -39,25 +34,26 @@ import { ChatPostMessageArguments, WebClient } from '@slack/web-api';
|
||||
import DataLoader from 'dataloader';
|
||||
import pThrottle from 'p-throttle';
|
||||
import { ANNOTATION_SLACK_BOT_NOTIFY } from './constants';
|
||||
import { toChatPostMessageArgs } from './util';
|
||||
import { ExpiryMap, toChatPostMessageArgs } from './util';
|
||||
import { CatalogService } from '@backstage/plugin-catalog-node';
|
||||
|
||||
export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
private readonly logger: LoggerService;
|
||||
private readonly catalog: CatalogApi;
|
||||
private readonly catalog: CatalogService;
|
||||
private readonly auth: AuthService;
|
||||
private readonly slack: WebClient;
|
||||
private readonly sendNotifications;
|
||||
private readonly messagesSent: Counter;
|
||||
private readonly messagesFailed: Counter;
|
||||
private readonly broadcastChannels?: string[];
|
||||
private readonly entityLoader: DataLoader<string, Entity | undefined>;
|
||||
|
||||
static fromConfig(
|
||||
config: Config,
|
||||
options: {
|
||||
auth: AuthService;
|
||||
discovery: DiscoveryService;
|
||||
logger: LoggerService;
|
||||
catalog: CatalogApi;
|
||||
catalog: CatalogService;
|
||||
slack?: WebClient;
|
||||
broadcastChannels?: string[];
|
||||
},
|
||||
@@ -79,9 +75,8 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
private constructor(options: {
|
||||
slack: WebClient;
|
||||
auth: AuthService;
|
||||
discovery: DiscoveryService;
|
||||
logger: LoggerService;
|
||||
catalog: CatalogApi;
|
||||
catalog: CatalogService;
|
||||
broadcastChannels?: string[];
|
||||
}) {
|
||||
const { auth, catalog, logger, slack, broadcastChannels } = options;
|
||||
@@ -91,6 +86,31 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
this.slack = slack;
|
||||
this.broadcastChannels = broadcastChannels;
|
||||
|
||||
this.entityLoader = new DataLoader<string, Entity | undefined>(
|
||||
async entityRefs => {
|
||||
return await this.catalog
|
||||
.getEntitiesByRefs(
|
||||
{
|
||||
entityRefs: entityRefs.slice(),
|
||||
fields: [
|
||||
`kind`,
|
||||
`spec.profile.email`,
|
||||
`metadata.annotations.${ANNOTATION_SLACK_BOT_NOTIFY}`,
|
||||
],
|
||||
},
|
||||
{ credentials: await this.auth.getOwnServiceCredentials() },
|
||||
)
|
||||
.then(r => r.items);
|
||||
},
|
||||
{
|
||||
name: 'SlackNotificationProcessor.entityLoader',
|
||||
cacheMap: new ExpiryMap(durationToMilliseconds({ minutes: 10 })),
|
||||
maxBatchSize: 100,
|
||||
batchScheduleFn: cb =>
|
||||
setTimeout(cb, durationToMilliseconds({ milliseconds: 10 })),
|
||||
},
|
||||
);
|
||||
|
||||
const meter = metrics.getMeter('default');
|
||||
this.messagesSent = meter.createCounter(
|
||||
'notifications.processors.slack.sent.count',
|
||||
@@ -193,7 +213,6 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
}),
|
||||
);
|
||||
|
||||
console.log('dispatching message');
|
||||
await this.sendNotifications(outbound);
|
||||
|
||||
return options;
|
||||
@@ -261,31 +280,6 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
};
|
||||
}
|
||||
|
||||
async getEntities(
|
||||
entityRefs: readonly string[],
|
||||
): Promise<(Entity | undefined)[]> {
|
||||
const { token } = await this.auth.getPluginRequestToken({
|
||||
onBehalfOf: await this.auth.getOwnServiceCredentials(),
|
||||
targetPluginId: 'catalog',
|
||||
});
|
||||
|
||||
const response = await this.catalog.getEntitiesByRefs(
|
||||
{
|
||||
entityRefs: entityRefs.slice(),
|
||||
fields: [
|
||||
`kind`,
|
||||
`spec.profile.email`,
|
||||
`metadata.annotations.${ANNOTATION_SLACK_BOT_NOTIFY}`,
|
||||
],
|
||||
},
|
||||
{
|
||||
token,
|
||||
},
|
||||
);
|
||||
|
||||
return response.items;
|
||||
}
|
||||
|
||||
async replaceUserRefsWithSlackIds(
|
||||
text?: string,
|
||||
): Promise<string | undefined> {
|
||||
@@ -327,13 +321,8 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
async getSlackNotificationTarget(
|
||||
entityRef: string,
|
||||
): Promise<string | undefined> {
|
||||
const entityLoader = new DataLoader<string, Entity | undefined>(
|
||||
entityRefs => this.getEntities(entityRefs),
|
||||
);
|
||||
const entity = await entityLoader.load(entityRef);
|
||||
|
||||
const entity = await this.entityLoader.load(entityRef);
|
||||
if (!entity) {
|
||||
console.log(`Entity not found: ${entityRef}`);
|
||||
throw new NotFoundError(`Entity not found: ${entityRef}`);
|
||||
}
|
||||
|
||||
|
||||
@@ -90,3 +90,42 @@ function getColor(severity: NotificationSeverity | undefined) {
|
||||
return '#00A699'; // Neutral color
|
||||
}
|
||||
}
|
||||
|
||||
// Simple expiry map for the data loader, which only expects a map that implements set, get, and delete and clear
|
||||
export class ExpiryMap<K, V> extends Map<K, V> {
|
||||
#ttlMs: number;
|
||||
#timestamps: Map<K, number> = new Map();
|
||||
|
||||
constructor(ttlMs: number) {
|
||||
super();
|
||||
this.#ttlMs = ttlMs;
|
||||
}
|
||||
|
||||
set(key: K, value: V) {
|
||||
const result = super.set(key, value);
|
||||
this.#timestamps.set(key, Date.now());
|
||||
return result;
|
||||
}
|
||||
|
||||
get(key: K) {
|
||||
if (!this.has(key)) {
|
||||
return undefined;
|
||||
}
|
||||
const timestamp = this.#timestamps.get(key)!;
|
||||
if (Date.now() - timestamp > this.#ttlMs) {
|
||||
this.delete(key);
|
||||
return undefined;
|
||||
}
|
||||
return super.get(key);
|
||||
}
|
||||
|
||||
delete(key: K) {
|
||||
this.#timestamps.delete(key);
|
||||
return super.delete(key);
|
||||
}
|
||||
|
||||
clear() {
|
||||
this.#timestamps.clear();
|
||||
return super.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,9 +17,9 @@ import {
|
||||
coreServices,
|
||||
createBackendModule,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { CatalogClient } from '@backstage/catalog-client';
|
||||
import { notificationsProcessingExtensionPoint } from '@backstage/plugin-notifications-node';
|
||||
import { SlackNotificationProcessor } from './lib/SlackNotificationProcessor';
|
||||
import { catalogServiceRef } from '@backstage/plugin-catalog-node';
|
||||
|
||||
/**
|
||||
* The Slack notification processor for use with the notifications plugin.
|
||||
@@ -35,21 +35,16 @@ export const notificationsModuleSlack = createBackendModule({
|
||||
deps: {
|
||||
auth: coreServices.auth,
|
||||
config: coreServices.rootConfig,
|
||||
discovery: coreServices.discovery,
|
||||
logger: coreServices.logger,
|
||||
catalog: catalogServiceRef,
|
||||
notifications: notificationsProcessingExtensionPoint,
|
||||
},
|
||||
async init({ auth, config, discovery, logger, notifications }) {
|
||||
const catalogClient = new CatalogClient({
|
||||
discoveryApi: discovery,
|
||||
});
|
||||
|
||||
async init({ auth, config, logger, catalog, notifications }) {
|
||||
notifications.addProcessor(
|
||||
SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
discovery,
|
||||
logger,
|
||||
catalog: catalogClient,
|
||||
catalog,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
||||
@@ -7205,7 +7205,6 @@ __metadata:
|
||||
dependencies:
|
||||
"@backstage/backend-plugin-api": "workspace:^"
|
||||
"@backstage/backend-test-utils": "workspace:^"
|
||||
"@backstage/catalog-client": "workspace:^"
|
||||
"@backstage/catalog-model": "workspace:^"
|
||||
"@backstage/cli": "workspace:^"
|
||||
"@backstage/config": "workspace:^"
|
||||
|
||||
Reference in New Issue
Block a user