feat(notifications-slack): add scope-based message update support
When a Backstage notification is re-sent with the same scope, the notifications backend updates the existing DB record and sets notification.updated. Previously, the SlackNotificationProcessor always called chat.postMessage(), creating duplicate Slack messages. This adds database-backed scope-based update support: - New slack_message_timestamps table to persist Slack message ts values keyed by (scope, channel) - After each chat.postMessage(), store the response ts in the database - When postProcess receives a notification with updated set and a matching stored ts, use chat.update() instead of chat.postMessage() - Scope context is passed as parameters through the call chain to avoid race conditions with concurrent postProcess calls - Scheduled daily cleanup of old timestamp records (24h retention) - New messagesUpdated metrics counter for observability - Graceful degradation when no database is provided - Explicitly picks only supported fields for chat.update calls Signed-off-by: Erik Miller <erik.miller@gusto.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-notifications-backend-module-slack': patch
|
||||
---
|
||||
|
||||
Added scope-based message update support. When a notification is re-sent with the same `scope` and `notification.updated` is set, the processor now calls `chat.update()` on the existing Slack message instead of sending a duplicate via `chat.postMessage()`. Message timestamps are persisted in a new `slack_message_timestamps` database table with automatic daily cleanup. The processor gracefully degrades to the previous behavior when no database is provided.
|
||||
+44
@@ -0,0 +1,44 @@
|
||||
// @ts-check
|
||||
/*
|
||||
* Copyright 2025 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {import('knex').Knex} knex
|
||||
*/
|
||||
exports.up = async function up(knex) {
|
||||
await knex.schema.createTable(
|
||||
'slack_message_timestamps',
|
||||
function tableSetup(table) {
|
||||
table.string('origin', 256).notNullable();
|
||||
table.string('scope', 512).notNullable();
|
||||
table.string('channel', 64).notNullable();
|
||||
table.string('ts', 64).notNullable();
|
||||
table
|
||||
.timestamp('created_at', { useTz: true })
|
||||
.defaultTo(knex.fn.now())
|
||||
.notNullable();
|
||||
table.primary(['origin', 'scope', 'channel']);
|
||||
table.index('created_at', 'idx_slack_message_timestamps_created_at');
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {import('knex').Knex} knex
|
||||
*/
|
||||
exports.down = async function down(knex) {
|
||||
await knex.schema.dropTable('slack_message_timestamps');
|
||||
};
|
||||
@@ -22,7 +22,8 @@
|
||||
"types": "src/index.ts",
|
||||
"files": [
|
||||
"dist",
|
||||
"config.d.ts"
|
||||
"config.d.ts",
|
||||
"migrations"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "backstage-cli package build",
|
||||
@@ -46,6 +47,7 @@
|
||||
"@slack/types": "^2.14.0",
|
||||
"@slack/web-api": "^7.5.0",
|
||||
"dataloader": "^2.0.0",
|
||||
"knex": "^3.0.0",
|
||||
"p-throttle": "^4.1.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
## SQL Report file for "@backstage/plugin-notifications-backend-module-slack"
|
||||
|
||||
> Do not edit this file. It is a report generated by `yarn build:api-reports`
|
||||
|
||||
## Table `slack_message_timestamps`
|
||||
|
||||
| Column | Type | Nullable | Max Length | Default |
|
||||
| ------------ | -------------------------- | -------- | ---------- | ------------------- |
|
||||
| `channel` | `character varying` | false | 64 | - |
|
||||
| `created_at` | `timestamp with time zone` | false | - | `CURRENT_TIMESTAMP` |
|
||||
| `origin` | `character varying` | false | 256 | - |
|
||||
| `scope` | `character varying` | false | 512 | - |
|
||||
| `ts` | `character varying` | false | 64 | - |
|
||||
|
||||
### Indices
|
||||
|
||||
- `idx_slack_message_timestamps_created_at` (`created_at`)
|
||||
- `slack_message_timestamps_pkey` (`origin`, `scope`, `channel`) unique primary
|
||||
+383
@@ -20,6 +20,7 @@ import { SlackNotificationProcessor } from './SlackNotificationProcessor';
|
||||
import { catalogServiceMock } from '@backstage/plugin-catalog-node/testUtils';
|
||||
import { KnownBlock, WebClient } from '@slack/web-api';
|
||||
import { Entity } from '@backstage/catalog-model';
|
||||
import { Knex } from 'knex';
|
||||
import pThrottle from 'p-throttle';
|
||||
import { durationToMilliseconds } from '@backstage/types';
|
||||
|
||||
@@ -45,6 +46,11 @@ jest.mock('@slack/web-api', () => {
|
||||
ts: '1234567890.123456',
|
||||
channel: 'C12345678',
|
||||
})),
|
||||
update: jest.fn(() => ({
|
||||
ok: true,
|
||||
ts: '1234567890.123456',
|
||||
channel: 'C12345678',
|
||||
})),
|
||||
},
|
||||
conversations: {
|
||||
list: jest.fn(() => ({
|
||||
@@ -1615,4 +1621,381 @@ describe('SlackNotificationProcessor', () => {
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('scope-based message updates', () => {
|
||||
function createMockDb() {
|
||||
const store = new Map<
|
||||
string,
|
||||
{
|
||||
origin: string;
|
||||
scope: string;
|
||||
channel: string;
|
||||
ts: string;
|
||||
created_at: Date;
|
||||
}
|
||||
>();
|
||||
|
||||
function storeKey(origin: string, scope: string, channel: string) {
|
||||
return `${origin}:${scope}:${channel}`;
|
||||
}
|
||||
|
||||
// Each call to db() creates a fresh query builder that tracks its own
|
||||
// chained .where()/.insert() arguments, avoiding cross-call interference.
|
||||
function createQueryBuilder(): any {
|
||||
let lastWhereArgs: any;
|
||||
let lastInsertRow: any;
|
||||
const qb: any = {
|
||||
where: jest.fn().mockImplementation((args: any) => {
|
||||
lastWhereArgs = args;
|
||||
return qb;
|
||||
}),
|
||||
first: jest.fn().mockImplementation(() => {
|
||||
if (lastWhereArgs) {
|
||||
const key = storeKey(
|
||||
lastWhereArgs.origin,
|
||||
lastWhereArgs.scope,
|
||||
lastWhereArgs.channel,
|
||||
);
|
||||
return Promise.resolve(store.get(key));
|
||||
}
|
||||
return Promise.resolve(undefined);
|
||||
}),
|
||||
insert: jest.fn().mockImplementation((row: any) => {
|
||||
lastInsertRow = row;
|
||||
store.set(storeKey(row.origin, row.scope, row.channel), row);
|
||||
return qb;
|
||||
}),
|
||||
onConflict: jest.fn().mockReturnThis(),
|
||||
merge: jest.fn().mockImplementation((row: any) => {
|
||||
if (lastInsertRow) {
|
||||
const key = storeKey(
|
||||
lastInsertRow.origin,
|
||||
lastInsertRow.scope,
|
||||
lastInsertRow.channel,
|
||||
);
|
||||
const existing = store.get(key);
|
||||
if (existing) {
|
||||
store.set(key, { ...existing, ...row });
|
||||
}
|
||||
}
|
||||
return Promise.resolve();
|
||||
}),
|
||||
delete: jest.fn().mockResolvedValue(0),
|
||||
};
|
||||
return qb;
|
||||
}
|
||||
|
||||
const db = jest
|
||||
.fn()
|
||||
.mockImplementation(() => createQueryBuilder()) as unknown as Knex;
|
||||
return { db, store, storeKey };
|
||||
}
|
||||
|
||||
function createProcessorWithDb(
|
||||
slack: WebClient,
|
||||
db: Knex,
|
||||
): SlackNotificationProcessor {
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
}),
|
||||
metrics,
|
||||
slack,
|
||||
})[0];
|
||||
processor.setDatabase(db);
|
||||
return processor;
|
||||
}
|
||||
|
||||
it('should store the message timestamp after initial scoped send', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db, store, storeKey } = createMockDb();
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1234',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
|
||||
expect(store.size).toBe(1);
|
||||
const key = storeKey(
|
||||
'plugin',
|
||||
'deployment-failure/my-service/42',
|
||||
'U12345678',
|
||||
);
|
||||
expect(store.get(key)).toEqual(
|
||||
expect.objectContaining({
|
||||
origin: 'plugin',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
channel: 'U12345678',
|
||||
ts: '1234567890.123456',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should use chat.update when the notification has been updated and a stored ts exists', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db, store, storeKey } = createMockDb();
|
||||
|
||||
// Pre-populate the store with a previously sent message.
|
||||
store.set(
|
||||
storeKey('plugin', 'deployment-failure/my-service/42', 'U12345678'),
|
||||
{
|
||||
origin: 'plugin',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
channel: 'U12345678',
|
||||
ts: '1111111111.111111',
|
||||
created_at: new Date(),
|
||||
},
|
||||
);
|
||||
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1234',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
updated: new Date(),
|
||||
payload: {
|
||||
title: 'notification',
|
||||
description: 'Updated with analysis',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: {
|
||||
title: 'notification',
|
||||
description: 'Updated with analysis',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
expect(slack.chat.postMessage).not.toHaveBeenCalled();
|
||||
expect(slack.chat.update).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Verify chat.update receives only the fields it supports (channel, ts,
|
||||
// text, blocks, attachments) and not the full ChatPostMessageArguments.
|
||||
const updateArgs = (slack.chat.update as jest.Mock).mock.calls[0][0];
|
||||
const updateKeys = Object.keys(updateArgs).sort();
|
||||
expect(updateKeys).toEqual(['attachments', 'channel', 'text', 'ts']);
|
||||
expect(updateArgs.channel).toBe('U12345678');
|
||||
expect(updateArgs.ts).toBe('1111111111.111111');
|
||||
});
|
||||
|
||||
it('should fall back to chat.postMessage when notification is updated but no stored ts exists', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db } = createMockDb();
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1234',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
updated: new Date(),
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
// Should fall back to postMessage since there is no stored ts.
|
||||
expect(slack.chat.update).not.toHaveBeenCalled();
|
||||
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should not interact with the database for non-scoped notifications', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db } = createMockDb();
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1234',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
payload: {
|
||||
title: 'notification',
|
||||
},
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: { title: 'notification' },
|
||||
},
|
||||
);
|
||||
|
||||
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
|
||||
// The db function should not have been called since there is no scope.
|
||||
expect(db).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should work without a database (graceful degradation)', async () => {
|
||||
const slack = new WebClient();
|
||||
|
||||
// No db set — the processor should still send messages normally.
|
||||
const processor = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
logger,
|
||||
catalog: catalogServiceMock({
|
||||
entities: DEFAULT_ENTITIES_RESPONSE.items,
|
||||
}),
|
||||
metrics,
|
||||
slack,
|
||||
})[0];
|
||||
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1234',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: {
|
||||
title: 'notification',
|
||||
scope: 'deployment-failure/my-service/42',
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
|
||||
expect(slack.chat.update).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle concurrent postProcess calls with different scopes correctly', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db, store, storeKey } = createMockDb();
|
||||
|
||||
// Pre-populate stored timestamps for both scopes.
|
||||
store.set(storeKey('plugin', 'scope-a', 'U12345678'), {
|
||||
origin: 'plugin',
|
||||
scope: 'scope-a',
|
||||
channel: 'U12345678',
|
||||
ts: '1111111111.111111',
|
||||
created_at: new Date(),
|
||||
});
|
||||
store.set(storeKey('plugin', 'scope-b', 'U12345678'), {
|
||||
origin: 'plugin',
|
||||
scope: 'scope-b',
|
||||
channel: 'U12345678',
|
||||
ts: '2222222222.222222',
|
||||
created_at: new Date(),
|
||||
});
|
||||
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
// Fire both postProcess calls concurrently with different scopes.
|
||||
await Promise.all([
|
||||
processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '1',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
updated: new Date(),
|
||||
payload: { title: 'A', scope: 'scope-a' },
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: { title: 'A', scope: 'scope-a' },
|
||||
},
|
||||
),
|
||||
processor.postProcess(
|
||||
{
|
||||
origin: 'plugin',
|
||||
id: '2',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
updated: new Date(),
|
||||
payload: { title: 'B', scope: 'scope-b' },
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: { title: 'B', scope: 'scope-b' },
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
// Both should use chat.update, not postMessage.
|
||||
expect(slack.chat.postMessage).not.toHaveBeenCalled();
|
||||
expect(slack.chat.update).toHaveBeenCalledTimes(2);
|
||||
|
||||
// Each update should use the correct ts for its scope.
|
||||
const updateCalls = (slack.chat.update as jest.Mock).mock.calls;
|
||||
const timestamps = updateCalls.map((call: any[]) => call[0].ts).sort();
|
||||
expect(timestamps).toEqual(['1111111111.111111', '2222222222.222222']);
|
||||
});
|
||||
|
||||
it('should not collide across different origins with the same scope', async () => {
|
||||
const slack = new WebClient();
|
||||
const { db, store, storeKey } = createMockDb();
|
||||
|
||||
// Pre-populate a stored timestamp for origin-a only.
|
||||
store.set(storeKey('origin-a', 'shared-scope', 'U12345678'), {
|
||||
origin: 'origin-a',
|
||||
scope: 'shared-scope',
|
||||
channel: 'U12345678',
|
||||
ts: '1111111111.111111',
|
||||
created_at: new Date(),
|
||||
});
|
||||
|
||||
const processor = createProcessorWithDb(slack, db);
|
||||
|
||||
// Send an update from origin-b with the same scope — should NOT find
|
||||
// the stored ts from origin-a, and should fall back to postMessage.
|
||||
await processor.postProcess(
|
||||
{
|
||||
origin: 'origin-b',
|
||||
id: '1',
|
||||
user: 'user:default/mock',
|
||||
created: new Date(),
|
||||
updated: new Date(),
|
||||
payload: { title: 'From B', scope: 'shared-scope' },
|
||||
},
|
||||
{
|
||||
recipients: { type: 'entity', entityRef: 'user:default/mock' },
|
||||
payload: { title: 'From B', scope: 'shared-scope' },
|
||||
},
|
||||
);
|
||||
|
||||
expect(slack.chat.update).not.toHaveBeenCalled();
|
||||
expect(slack.chat.postMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -34,8 +34,13 @@ import {
|
||||
NotificationSendOptions,
|
||||
} from '@backstage/plugin-notifications-node';
|
||||
import { durationToMilliseconds } from '@backstage/types';
|
||||
import { ChatPostMessageArguments, WebClient } from '@slack/web-api';
|
||||
import {
|
||||
ChatPostMessageArguments,
|
||||
ChatUpdateArguments,
|
||||
WebClient,
|
||||
} from '@slack/web-api';
|
||||
import DataLoader from 'dataloader';
|
||||
import { Knex } from 'knex';
|
||||
import pThrottle from 'p-throttle';
|
||||
import { ANNOTATION_SLACK_BOT_NOTIFY } from './constants';
|
||||
import { BroadcastRoute } from './types';
|
||||
@@ -43,6 +48,12 @@ import { ExpiryMap, toChatPostMessageArgs } from './util';
|
||||
import { CatalogService } from '@backstage/plugin-catalog-node';
|
||||
import { SlackBlockKitRenderer } from '../extensions';
|
||||
|
||||
interface ScopeContext {
|
||||
origin: string;
|
||||
scope?: string;
|
||||
isUpdate?: boolean;
|
||||
}
|
||||
|
||||
export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
private readonly logger: LoggerService;
|
||||
private readonly catalog: CatalogService;
|
||||
@@ -50,9 +61,12 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
private readonly slack: WebClient;
|
||||
private readonly sendNotifications: (
|
||||
opts: ChatPostMessageArguments[],
|
||||
scopeContext?: ScopeContext,
|
||||
) => Promise<void>;
|
||||
private readonly messagesSent: MetricsServiceCounter;
|
||||
private readonly messagesFailed: MetricsServiceCounter;
|
||||
private readonly messagesUpdated: MetricsServiceCounter;
|
||||
private db?: Knex;
|
||||
private readonly broadcastChannels?: string[];
|
||||
private readonly broadcastRoutes?: BroadcastRoute[];
|
||||
private readonly entityLoader: DataLoader<string, Entity | undefined>;
|
||||
@@ -179,25 +193,42 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
unit: '{message}',
|
||||
},
|
||||
);
|
||||
this.messagesUpdated = metrics.createCounter(
|
||||
'notifications.processors.slack.update.count',
|
||||
{
|
||||
description:
|
||||
'Number of existing Slack messages updated via scope matching',
|
||||
unit: '{message}',
|
||||
},
|
||||
);
|
||||
|
||||
const throttle = pThrottle({
|
||||
limit: this.concurrencyLimit,
|
||||
interval: this.throttleInterval,
|
||||
});
|
||||
const throttled = throttle((opts: ChatPostMessageArguments) =>
|
||||
this.sendNotification(opts),
|
||||
const throttled = throttle(
|
||||
(opts: ChatPostMessageArguments, ctx?: ScopeContext) =>
|
||||
this.sendNotification(opts, ctx),
|
||||
);
|
||||
this.sendNotifications = async (opts: ChatPostMessageArguments[]) => {
|
||||
this.sendNotifications = async (
|
||||
opts: ChatPostMessageArguments[],
|
||||
scopeContext?: ScopeContext,
|
||||
) => {
|
||||
const results = await Promise.allSettled(
|
||||
opts.map(message => throttled(message)),
|
||||
opts.map(message => throttled(message, scopeContext)),
|
||||
);
|
||||
|
||||
let successCount = 0;
|
||||
let sentCount = 0;
|
||||
let updateCount = 0;
|
||||
let failureCount = 0;
|
||||
|
||||
results.forEach((result, index) => {
|
||||
if (result.status === 'fulfilled') {
|
||||
successCount++;
|
||||
if (result.value === 'updated') {
|
||||
updateCount++;
|
||||
} else {
|
||||
sentCount++;
|
||||
}
|
||||
} else {
|
||||
this.logger.error(
|
||||
`Failed to send Slack channel notification to ${opts[index].channel}: ${result.reason.message}`,
|
||||
@@ -206,11 +237,16 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
}
|
||||
});
|
||||
|
||||
this.messagesSent.add(successCount);
|
||||
this.messagesSent.add(sentCount);
|
||||
this.messagesUpdated.add(updateCount);
|
||||
this.messagesFailed.add(failureCount);
|
||||
};
|
||||
}
|
||||
|
||||
setDatabase(db: Knex): void {
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
getName(): string {
|
||||
return 'SlackNotificationProcessor';
|
||||
}
|
||||
@@ -337,8 +373,11 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
this.logger.debug(`Sending notification: ${JSON.stringify(payload)}`);
|
||||
});
|
||||
|
||||
// Send notifications
|
||||
await this.sendNotifications(outbound);
|
||||
await this.sendNotifications(outbound, {
|
||||
origin: notification.origin,
|
||||
scope: notification.payload.scope,
|
||||
isUpdate: !!notification.updated,
|
||||
});
|
||||
}
|
||||
|
||||
private async formatPayloadDescriptionForSlack(
|
||||
@@ -434,12 +473,92 @@ export class SlackNotificationProcessor implements NotificationProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async sendNotification(args: ChatPostMessageArguments): Promise<void> {
|
||||
async sendNotification(
|
||||
args: ChatPostMessageArguments,
|
||||
scopeContext?: ScopeContext,
|
||||
): Promise<'sent' | 'updated'> {
|
||||
const channel = args.channel as string;
|
||||
const scope = scopeContext?.scope;
|
||||
|
||||
// If this is a scoped update, try to update the existing Slack message.
|
||||
const origin = scopeContext?.origin;
|
||||
if (scopeContext?.isUpdate && origin && scope && this.db) {
|
||||
const storedTs = await this.getStoredTimestamp(origin, scope, channel);
|
||||
if (storedTs) {
|
||||
const updateArgs = {
|
||||
channel,
|
||||
ts: storedTs,
|
||||
...('text' in args ? { text: args.text } : {}),
|
||||
...('blocks' in args ? { blocks: args.blocks } : {}),
|
||||
...('attachments' in args ? { attachments: args.attachments } : {}),
|
||||
} as ChatUpdateArguments;
|
||||
const updateResponse = await this.slack.chat.update(updateArgs);
|
||||
|
||||
if (!updateResponse.ok) {
|
||||
throw new Error(
|
||||
`Failed to update notification: ${updateResponse.error}`,
|
||||
);
|
||||
}
|
||||
|
||||
return 'updated';
|
||||
}
|
||||
}
|
||||
|
||||
// Send a new message.
|
||||
const response = await this.slack.chat.postMessage(args);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to send notification: ${response.error}`);
|
||||
}
|
||||
|
||||
// Persist the message timestamp for future scope-based updates.
|
||||
if (origin && scope && response.ts && this.db) {
|
||||
await this.saveTimestamp(origin, scope, channel, response.ts);
|
||||
}
|
||||
|
||||
return 'sent';
|
||||
}
|
||||
|
||||
private async getStoredTimestamp(
|
||||
origin: string,
|
||||
scope: string,
|
||||
channel: string,
|
||||
): Promise<string | undefined> {
|
||||
try {
|
||||
const row = await this.db!('slack_message_timestamps')
|
||||
.where({ origin, scope, channel })
|
||||
.first();
|
||||
return row?.ts;
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to look up stored Slack message timestamp', {
|
||||
origin,
|
||||
scope,
|
||||
channel,
|
||||
error,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private async saveTimestamp(
|
||||
origin: string,
|
||||
scope: string,
|
||||
channel: string,
|
||||
ts: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.db!('slack_message_timestamps')
|
||||
.insert({ origin, scope, channel, ts, created_at: new Date() })
|
||||
.onConflict(['origin', 'scope', 'channel'])
|
||||
.merge({ ts, created_at: new Date() });
|
||||
} catch (error) {
|
||||
this.logger.warn('Failed to persist Slack message timestamp', {
|
||||
origin,
|
||||
scope,
|
||||
channel,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static parseBroadcastRoute(route: Config): BroadcastRoute {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
import {
|
||||
coreServices,
|
||||
createBackendModule,
|
||||
resolvePackagePath,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { metricsServiceRef } from '@backstage/backend-plugin-api/alpha';
|
||||
import { notificationsProcessingExtensionPoint } from '@backstage/plugin-notifications-node';
|
||||
@@ -26,6 +27,13 @@ import {
|
||||
SlackBlockKitRenderer,
|
||||
} from './extensions';
|
||||
|
||||
const MIGRATIONS_DIR = resolvePackagePath(
|
||||
'@backstage/plugin-notifications-backend-module-slack',
|
||||
'migrations',
|
||||
);
|
||||
|
||||
const CLEANUP_RETENTION_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
|
||||
/**
|
||||
* The Slack notification processor for use with the notifications plugin.
|
||||
* This allows sending of notifications via Slack DMs or to channels.
|
||||
@@ -54,17 +62,65 @@ export const notificationsModuleSlack = createBackendModule({
|
||||
catalog: catalogServiceRef,
|
||||
notifications: notificationsProcessingExtensionPoint,
|
||||
metrics: metricsServiceRef,
|
||||
database: coreServices.database,
|
||||
scheduler: coreServices.scheduler,
|
||||
},
|
||||
async init({ auth, config, logger, catalog, notifications, metrics }) {
|
||||
notifications.addProcessor(
|
||||
SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
logger,
|
||||
catalog,
|
||||
metrics,
|
||||
blockKitRenderer,
|
||||
}),
|
||||
);
|
||||
async init({
|
||||
auth,
|
||||
config,
|
||||
logger,
|
||||
catalog,
|
||||
notifications,
|
||||
metrics,
|
||||
database,
|
||||
scheduler,
|
||||
}) {
|
||||
const processors = SlackNotificationProcessor.fromConfig(config, {
|
||||
auth,
|
||||
logger,
|
||||
catalog,
|
||||
metrics,
|
||||
blockKitRenderer,
|
||||
});
|
||||
|
||||
if (processors.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const db = await database.getClient();
|
||||
|
||||
if (!database.migrations?.skip) {
|
||||
await db.migrate.latest({
|
||||
directory: MIGRATIONS_DIR,
|
||||
});
|
||||
}
|
||||
|
||||
// Attach the DB to each processor now that migrations have run.
|
||||
for (const processor of processors) {
|
||||
processor.setDatabase(db);
|
||||
}
|
||||
|
||||
notifications.addProcessor(processors);
|
||||
|
||||
// Clean up old message timestamp records daily. These records are only
|
||||
// needed for the short window between initial send and scope-based
|
||||
// update (typically minutes), so a 24-hour retention is sufficient.
|
||||
await scheduler.scheduleTask({
|
||||
id: 'slack-message-timestamps-cleanup',
|
||||
frequency: { hours: 24 },
|
||||
timeout: { minutes: 5 },
|
||||
initialDelay: { hours: 2 },
|
||||
scope: 'global',
|
||||
fn: async () => {
|
||||
const cutoff = new Date(Date.now() - CLEANUP_RETENTION_MS);
|
||||
const deleted = await db('slack_message_timestamps')
|
||||
.where('created_at', '<=', cutoff)
|
||||
.delete();
|
||||
logger.info('Cleaned up old Slack message timestamps', {
|
||||
deleted,
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user