fix(search): use sub-transaction rollback when truncating document text (#31494)
* fix: implement sub-transactions for partial rollbacks in PgSearchEngineIndexer Signed-off-by: Stanislav Cherkasov <150145013+stanislav-c@users.noreply.github.com> * fix: address Copilot review feedback - Wrap sub-transaction creation in try/catch to handle failures - Await subTx.commit() and subTx.rollback() to prevent race conditions - Use inline eslint-disable comments instead of file-level disable - Use mockResolvedValue for async tx.transaction mock Signed-off-by: Stanislav Cherkasov <150145013+stanislav-c@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Signed-off-by: Stanislav Cherkasov <150145013+stanislav-c@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
c6abc52e1f
commit
aa08b7f135
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-search-backend-module-pg': patch
|
||||
---
|
||||
|
||||
Fix a bug in large document indexing logic by using sub-transaction rollbacks
|
||||
@@ -20,10 +20,18 @@ import { PgSearchEngineIndexer } from './PgSearchEngineIndexer';
|
||||
import { DatabaseStore } from '../database';
|
||||
|
||||
describe('PgSearchEngineIndexer', () => {
|
||||
const subTx = {
|
||||
rollback: jest.fn(),
|
||||
commit: jest.fn(),
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} as any;
|
||||
|
||||
const tx = {
|
||||
rollback: jest.fn(),
|
||||
commit: jest.fn(),
|
||||
transaction: jest.fn().mockResolvedValue(subTx),
|
||||
isCompleted: jest.fn(),
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} as any;
|
||||
let database: jest.Mocked<DatabaseStore>;
|
||||
let indexer: PgSearchEngineIndexer;
|
||||
@@ -65,8 +73,14 @@ describe('PgSearchEngineIndexer', () => {
|
||||
'my-type',
|
||||
documents,
|
||||
);
|
||||
expect(database.completeInsert).toHaveBeenCalledWith(tx, 'my-type', false);
|
||||
expect(database.completeInsert).toHaveBeenCalledWith(
|
||||
subTx,
|
||||
'my-type',
|
||||
false,
|
||||
);
|
||||
expect(subTx.commit).toHaveBeenCalled();
|
||||
expect(tx.commit).toHaveBeenCalled();
|
||||
expect(subTx.rollback).not.toHaveBeenCalled();
|
||||
expect(tx.rollback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -95,18 +109,20 @@ describe('PgSearchEngineIndexer', () => {
|
||||
expect(database.completeInsert).toHaveBeenCalledTimes(2);
|
||||
expect(database.completeInsert).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
tx,
|
||||
subTx,
|
||||
'my-type',
|
||||
false,
|
||||
);
|
||||
expect(database.completeInsert).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
tx,
|
||||
subTx,
|
||||
'my-type',
|
||||
true,
|
||||
);
|
||||
expect(subTx.commit).toHaveBeenCalled();
|
||||
expect(tx.commit).toHaveBeenCalled();
|
||||
expect(tx.rollback).toHaveBeenCalledTimes(1);
|
||||
expect(subTx.rollback).toHaveBeenCalledTimes(1);
|
||||
expect(tx.rollback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should batch insert documents', async () => {
|
||||
@@ -121,7 +137,11 @@ describe('PgSearchEngineIndexer', () => {
|
||||
expect(database.getTransaction).toHaveBeenCalledTimes(1);
|
||||
expect(database.prepareInsert).toHaveBeenCalledTimes(1);
|
||||
expect(database.insertDocuments).toHaveBeenCalledTimes(4);
|
||||
expect(database.completeInsert).toHaveBeenCalledWith(tx, 'my-type', false);
|
||||
expect(database.completeInsert).toHaveBeenCalledWith(
|
||||
subTx,
|
||||
'my-type',
|
||||
false,
|
||||
);
|
||||
});
|
||||
|
||||
it('should rollback transaction if no documents indexed', async () => {
|
||||
|
||||
@@ -89,15 +89,28 @@ export class PgSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
let retryTruncated = false;
|
||||
|
||||
do {
|
||||
// Sub-transaction allows a partial rollback if a truncated retry is needed
|
||||
let subTx: Knex.Transaction;
|
||||
try {
|
||||
subTx = await this.tx!.transaction();
|
||||
} catch (e) {
|
||||
// If sub-transaction creation fails, rollback the outer transaction
|
||||
// and re-throw the error so that the stream can be closed and
|
||||
// destroyed properly.
|
||||
this.tx!.rollback!(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Attempt to complete and commit the transaction.
|
||||
try {
|
||||
await this.store.completeInsert(this.tx!, this.type, retryTruncated);
|
||||
this.tx!.commit();
|
||||
await this.store.completeInsert(subTx, this.type, retryTruncated);
|
||||
|
||||
// Commit both transactions if successful
|
||||
await subTx.commit();
|
||||
await this.tx!.commit();
|
||||
retryTruncated = false;
|
||||
} catch (e) {
|
||||
// Otherwise, rollback the transaction and re-throw the error so that the
|
||||
// stream can be closed and destroyed properly.
|
||||
this.tx!.rollback!(e);
|
||||
await subTx.rollback(e); // Rollback the first completeInsert attempt
|
||||
|
||||
if (
|
||||
e instanceof Error &&
|
||||
@@ -108,6 +121,9 @@ export class PgSearchEngineIndexer extends BatchSearchEngineIndexer {
|
||||
// retry the operation with truncated text.
|
||||
retryTruncated = true;
|
||||
} else {
|
||||
// Otherwise, rollback the outer transaction and re-throw the error so that the
|
||||
// stream can be closed and destroyed properly.
|
||||
this.tx!.rollback!(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user