backend-common: fix stuck s3 reading

Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
Patrik Oldsberg
2022-05-20 15:36:04 +02:00
parent 48121b12e9
commit 5b22a8c97f
3 changed files with 146 additions and 138 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-common': patch
---
Fix a bug in the URL Reading towards AWS S3 where it would hang indefinitely.
@@ -18,6 +18,9 @@ import { ConfigReader } from '@backstage/config';
import { JsonObject } from '@backstage/types';
import { getVoidLogger } from '../logging';
import { DefaultReadTreeResponseFactory } from './tree';
import { rest } from 'msw';
import { setupServer } from 'msw/node';
import { setupRequestMockHandlers } from '@backstage/backend-test-utils';
import { AwsS3UrlReader, parseUrl } from './AwsS3UrlReader';
import {
AwsS3Integration,
@@ -111,6 +114,9 @@ describe('parseUrl', () => {
});
describe('AwsS3UrlReader', () => {
const worker = setupServer();
setupRequestMockHandlers(worker);
const createReader = (config: JsonObject): UrlReaderPredicateTuple[] => {
return AwsS3UrlReader.factory({
config: new ConfigReader(config),
@@ -119,10 +125,6 @@ describe('AwsS3UrlReader', () => {
});
};
afterEach(() => {
AWSMock.restore();
});
it('creates a dummy reader without the awsS3 field', () => {
const entries = createReader({
integrations: {},
@@ -209,40 +211,34 @@ describe('AwsS3UrlReader', () => {
});
describe('read', () => {
let awsS3UrlReader: AwsS3UrlReader;
const [{ reader }] = createReader({
integrations: {
awsS3: [
{
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
},
],
},
});
beforeAll(() => {
AWSMock.setSDKInstance(aws);
AWSMock.mock(
'S3',
'getObject',
Buffer.from(
require('fs').readFileSync(
path.resolve(
__dirname,
'__fixtures__/awsS3/awsS3-mock-object.yaml',
beforeEach(() => {
worker.use(
rest.get(
'https://test-bucket.s3.amazonaws.com/awsS3-mock-object.yaml',
(_, res, ctx) =>
res(
ctx.status(200),
ctx.set('ETag', '123abc'),
ctx.body('site_name: Test'),
),
),
),
);
const s3 = new aws.S3();
awsS3UrlReader = new AwsS3UrlReader(
new AwsS3Integration(
readAwsS3IntegrationConfig(
new ConfigReader({
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
}),
),
),
{ s3, treeResponseFactory },
);
});
it('returns contents of an object in a bucket', async () => {
const response = await awsS3UrlReader.read(
const response = await reader.read(
'https://test-bucket.s3.us-east-2.amazonaws.com/awsS3-mock-object.yaml',
);
expect(response.toString().trim()).toBe('site_name: Test');
@@ -250,7 +246,7 @@ describe('AwsS3UrlReader', () => {
it('rejects unknown targets', async () => {
await expect(
awsS3UrlReader.read(
reader.read(
'https://test-bucket.s3.us-east-2.NOTamazonaws.com/file.yaml',
),
).rejects.toThrow(
@@ -262,59 +258,53 @@ describe('AwsS3UrlReader', () => {
});
describe('readUrl', () => {
let awsS3UrlReader: AwsS3UrlReader;
const [{ reader }] = createReader({
integrations: {
awsS3: [
{
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
},
],
},
});
beforeEach(() => {
AWSMock.setSDKInstance(aws);
AWSMock.mock(
'S3',
'getObject',
Buffer.from(
require('fs').readFileSync(
path.resolve(
__dirname,
'__fixtures__/awsS3/awsS3-mock-object.yaml',
worker.use(
rest.get(
'https://test-bucket.s3.amazonaws.com/awsS3-mock-object.yaml',
(_, res, ctx) =>
res(
ctx.status(200),
ctx.set('ETag', '123abc'),
ctx.body('site_name: Test'),
),
),
),
);
const s3 = new aws.S3();
awsS3UrlReader = new AwsS3UrlReader(
new AwsS3Integration(
readAwsS3IntegrationConfig(
new ConfigReader({
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
}),
),
),
{ s3, treeResponseFactory },
);
});
it('returns contents of an object in a bucket via buffer', async () => {
const response = await awsS3UrlReader.readUrl(
const response = await reader.readUrl!(
'https://test-bucket.s3.us-east-2.amazonaws.com/awsS3-mock-object.yaml',
);
expect(response.etag).toBe('123abc');
const buffer = await response.buffer();
expect(buffer.toString().trim()).toBe('site_name: Test');
});
it('returns contents of an object in a bucket via stream', async () => {
const response = await awsS3UrlReader.readUrl(
const response = await reader.readUrl!(
'https://test-bucket.s3.us-east-2.amazonaws.com/awsS3-mock-object.yaml',
);
expect(response.etag).toBe('123abc');
const fromStream = await getRawBody(response.stream!());
expect(fromStream.toString().trim()).toBe('site_name: Test');
});
it('rejects unknown targets', async () => {
await expect(
awsS3UrlReader.readUrl(
reader.readUrl!(
'https://test-bucket.s3.us-east-2.NOTamazonaws.com/file.yaml',
),
).rejects.toThrow(
@@ -325,45 +315,73 @@ describe('AwsS3UrlReader', () => {
});
});
describe('readUrl with etag', () => {
let awsS3UrlReader: AwsS3UrlReader;
describe('readUrl towards custom host', () => {
const [{ reader }] = createReader({
integrations: {
awsS3: [
{
host: 'localhost:4566',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
endpoint: 'http://localhost:4566',
s3ForcePathStyle: true,
},
],
},
});
beforeAll(() => {
AWSMock.setSDKInstance(aws);
AWSMock.mock('S3', 'getObject', (_, callback) => {
const error: aws.AWSError = {
code: 'NotModified',
message: 'Not Modified',
statusCode: 304,
name: 'oops',
time: new Date('2019-01-01T00:00:00.000Z'),
};
callback(error, undefined);
});
const s3 = new aws.S3();
awsS3UrlReader = new AwsS3UrlReader(
new AwsS3Integration(
readAwsS3IntegrationConfig(
new ConfigReader({
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
}),
),
beforeEach(() => {
worker.use(
rest.get(
'http://localhost:4566/test-bucket/awsS3-mock-object.yaml',
(_, res, ctx) =>
res(
ctx.status(200),
ctx.set('ETag', '123abc'),
ctx.body('site_name: Test'),
),
),
);
});
it('returns contents of an object in a bucket via buffer', async () => {
const response = await reader.readUrl!(
'http://localhost:4566/test-bucket/awsS3-mock-object.yaml',
);
expect(response.etag).toBe('123abc');
const buffer = await response.buffer();
expect(buffer.toString().trim()).toBe('site_name: Test');
});
});
describe('readUrl with etag', () => {
const [{ reader }] = createReader({
integrations: {
awsS3: [
{
host: 'amazonaws.com',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
},
],
},
});
beforeEach(() => {
worker.use(
rest.get(
'https://test-bucket.s3.amazonaws.com/awsS3-mock-object.yaml',
(_, res, ctx) => res(ctx.status(304)),
),
{ s3, treeResponseFactory },
);
});
it('returns contents of an object in a bucket', async () => {
await expect(
awsS3UrlReader.readUrl(
reader.readUrl!(
'https://test-bucket.s3.us-east-2.amazonaws.com/awsS3-mock-object.yaml',
{
etag: 'abc123',
etag: '123abc',
},
),
).rejects.toThrow(NotModifiedError);
@@ -424,47 +442,4 @@ describe('AwsS3UrlReader', () => {
expect(body.toString().trim()).toBe('site_name: Test');
});
});
describe('readNonAwsHost', () => {
let awsS3UrlReader: AwsS3UrlReader;
beforeAll(() => {
AWSMock.setSDKInstance(aws);
AWSMock.mock(
'S3',
'getObject',
Buffer.from(
require('fs').readFileSync(
path.resolve(
__dirname,
'__fixtures__/awsS3/awsS3-mock-object.yaml',
),
),
),
);
const s3 = new aws.S3();
awsS3UrlReader = new AwsS3UrlReader(
new AwsS3Integration(
readAwsS3IntegrationConfig(
new ConfigReader({
host: 'localhost:4566',
accessKeyId: 'fake-access-key',
secretAccessKey: 'fake-secret-key',
endpoint: 'http://localhost:4566',
s3ForcePathStyle: true,
}),
),
),
{ s3, treeResponseFactory },
);
});
it('returns contents of an object in a bucket', async () => {
const response = await awsS3UrlReader.read(
'http://localhost:4566/test-bucket/awsS3-mock-object.yaml',
);
expect(response.toString().trim()).toBe('site_name: Test');
});
});
});
@@ -214,10 +214,38 @@ export class AwsS3UrlReader implements UrlReader {
const request = this.deps.s3.getObject(params);
options?.signal?.addEventListener('abort', () => request.abort());
const etag = (await request.promise()).ETag;
return ReadUrlResponseFactory.fromReadable(request.createReadStream(), {
etag,
// Since we're consuming the read stream we need to consume headers and errors via events.
const etagPromise = new Promise<string | undefined>((resolve, reject) => {
request.on('httpHeaders', (status, headers) => {
if (status < 400) {
if (status === 200) {
resolve(headers.etag);
} else if (status !== 304 /* not modified */) {
reject(
new Error(
`S3 readUrl request received unexpected status '${status}' in response`,
),
);
}
}
});
request.on('error', error => reject(error));
request.on('complete', () =>
reject(
new Error('S3 readUrl request completed without receiving headers'),
),
);
});
const stream = request.createReadStream();
stream.on('error', () => {
// The AWS SDK forwards request errors to the stream, so we need to
// ignore those errors here or the process will crash.
});
return ReadUrlResponseFactory.fromReadable(stream, {
etag: await etagPromise,
});
} catch (e) {
if (e.statusCode === 304) {