Modifying large size files upload to S3

Signed-off-by: Rudra Sharans <rudra099999@gmail.com>
This commit is contained in:
Rudra Sharans
2025-10-08 00:01:00 +05:30
parent da2a66eac6
commit 703f8c08bd
3 changed files with 329 additions and 68 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-techdocs-node': patch
---
There was an issue in the uploading of large size files to the AWS S3. We have modified the logic by adding retry along with multipart uploading functionality.
@@ -31,7 +31,7 @@ import {
AwsCredentialProviderOptions,
DefaultAwsCredentialsManager,
} from '@backstage/integration-aws-node';
import { mockClient, AwsClientStub } from 'aws-sdk-client-mock';
import { mockClient } from 'aws-sdk-client-mock';
import express from 'express';
import request from 'supertest';
import path from 'path';
@@ -44,9 +44,10 @@ import {
} from '@backstage/backend-test-utils';
const env = process.env;
let s3Mock: AwsClientStub<S3Client>;
let s3Mock: any;
const mockDir = createMockDirectory();
// Create a new MockDirectory for each test to avoid Windows file locking issues
let mockDir: ReturnType<typeof createMockDirectory>;
function getMockCredentialProvider(): Promise<AwsCredentialProvider> {
return Promise.resolve({
@@ -155,7 +156,7 @@ describe('AwsS3Publish', () => {
build_timestamp: 612741599,
};
const directory = getEntityRootDir(entity);
let directory: string;
const files = {
'index.html': '',
@@ -176,7 +177,7 @@ describe('AwsS3Publish', () => {
},
};
beforeEach(() => {
beforeEach(async () => {
process.env = { ...env };
process.env.AWS_REGION = 'us-west-2';
@@ -185,20 +186,26 @@ describe('AwsS3Publish', () => {
getMockCredentialProvider(),
);
// Create a fresh mockdirectory for each test to avoid windows file locking
mockDir = createMockDirectory();
// Calculate directory path with the new mockDir instance
directory = getEntityRootDir(entity);
// Set up the test files
mockDir.setContent({
[directory]: files,
});
s3Mock = mockClient(S3Client);
s3Mock = mockClient(S3Client as any);
s3Mock.on(HeadObjectCommand).callsFake(input => {
s3Mock.on(HeadObjectCommand).callsFake((input: any) => {
if (!fs.pathExistsSync(mockDir.resolve(input.Key))) {
throw new Error('File does not exist');
}
return {};
});
s3Mock.on(GetObjectCommand).callsFake(input => {
s3Mock.on(GetObjectCommand).callsFake((input: any) => {
if (fs.pathExistsSync(mockDir.resolve(input.Key))) {
return {
Body: Readable.from(fs.readFileSync(mockDir.resolve(input.Key))),
@@ -208,14 +215,14 @@ describe('AwsS3Publish', () => {
throw new Error(`The file ${input.Key} does not exist!`);
});
s3Mock.on(HeadBucketCommand).callsFake(input => {
s3Mock.on(HeadBucketCommand).callsFake((input: any) => {
if (input.Bucket === 'errorBucket') {
throw new Error('Bucket does not exist');
}
return {};
});
s3Mock.on(ListObjectsV2Command).callsFake(input => {
s3Mock.on(ListObjectsV2Command).callsFake((input: any) => {
if (
input.Bucket === 'delete_stale_files_success' ||
input.Bucket === 'delete_stale_files_error'
@@ -227,7 +234,7 @@ describe('AwsS3Publish', () => {
return {};
});
s3Mock.on(DeleteObjectCommand).callsFake(input => {
s3Mock.on(DeleteObjectCommand).callsFake((input: any) => {
if (input.Bucket === 'delete_stale_files_error') {
throw new Error('Message');
}
@@ -235,7 +242,7 @@ describe('AwsS3Publish', () => {
});
s3Mock.on(UploadPartCommand).rejects();
s3Mock.on(PutObjectCommand).callsFake(input => {
s3Mock.on(PutObjectCommand).callsFake((input: any) => {
mockDir.addContent({ [input.Key]: input.Body });
});
});
@@ -320,7 +327,7 @@ describe('AwsS3Publish', () => {
`default/component/backstage/assets/main.css`,
]),
});
});
}, 30000);
it('should publish a directory as well when legacy casing is used', async () => {
const publisher = await createPublisherFromConfig({
@@ -333,7 +340,7 @@ describe('AwsS3Publish', () => {
`default/Component/backstage/assets/main.css`,
]),
});
});
}, 30000);
it('should publish a directory when root path is specified', async () => {
const publisher = await createPublisherFromConfig({
@@ -346,7 +353,7 @@ describe('AwsS3Publish', () => {
`backstage-data/techdocs/default/component/backstage/assets/main.css`,
]),
});
});
}, 30000);
it('should publish a directory when root path is specified and legacy casing is used', async () => {
const publisher = await createPublisherFromConfig({
@@ -360,7 +367,7 @@ describe('AwsS3Publish', () => {
`backstage-data/techdocs/default/Component/backstage/assets/main.css`,
]),
});
});
}, 30000);
it('should publish a directory when sse is specified', async () => {
const publisher = await createPublisherFromConfig({
@@ -373,7 +380,7 @@ describe('AwsS3Publish', () => {
'default/component/backstage/assets/main.css',
]),
});
});
}, 30000);
it('should fail to publish a directory', async () => {
const wrongPathToGeneratedDirectory = mockDir.resolve(
@@ -408,7 +415,7 @@ describe('AwsS3Publish', () => {
expect(loggerInfoSpy).toHaveBeenLastCalledWith(
`Successfully deleted stale files for Entity ${entity.metadata.name}. Total number of files: 1`,
);
});
}, 30000);
it('should log error when the stale files deletion fails', async () => {
const bucketName = 'delete_stale_files_error';
@@ -419,7 +426,7 @@ describe('AwsS3Publish', () => {
expect(loggerErrorSpy).toHaveBeenLastCalledWith(
'Unable to delete file(s) from AWS S3. Error: Message',
);
});
}, 30000);
});
describe('hasDocsBeenGenerated', () => {
@@ -427,7 +434,7 @@ describe('AwsS3Publish', () => {
const publisher = await createPublisherFromConfig();
await publisher.publish({ entity, directory });
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
});
}, 30000);
it('should return true if docs has been generated even if the legacy case is enabled', async () => {
const publisher = await createPublisherFromConfig({
@@ -435,7 +442,7 @@ describe('AwsS3Publish', () => {
});
await publisher.publish({ entity, directory });
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
});
}, 30000);
it('should return true if docs has been generated if root path is specified', async () => {
const publisher = await createPublisherFromConfig({
@@ -443,7 +450,7 @@ describe('AwsS3Publish', () => {
});
await publisher.publish({ entity, directory });
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
});
}, 30000);
it('should return true if docs has been generated if root path is specified and legacy casing is used', async () => {
const publisher = await createPublisherFromConfig({
@@ -452,7 +459,7 @@ describe('AwsS3Publish', () => {
});
await publisher.publish({ entity, directory });
expect(await publisher.hasDocsBeenGenerated(entity)).toBe(true);
});
}, 30000);
it('should return false if docs has not been generated', async () => {
const publisher = await createPublisherFromConfig();
@@ -475,7 +482,7 @@ describe('AwsS3Publish', () => {
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
techdocsMetadata,
);
});
}, 30000);
it('should return tech docs metadata even if the legacy case is enabled', async () => {
const publisher = await createPublisherFromConfig({
@@ -485,7 +492,7 @@ describe('AwsS3Publish', () => {
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
techdocsMetadata,
);
});
}, 30000);
it('should return tech docs metadata even if root path is specified', async () => {
const publisher = await createPublisherFromConfig({
@@ -495,7 +502,7 @@ describe('AwsS3Publish', () => {
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
techdocsMetadata,
);
});
}, 30000);
it('should return tech docs metadata if root path is specified and legacy casing is used', async () => {
const publisher = await createPublisherFromConfig({
@@ -506,7 +513,7 @@ describe('AwsS3Publish', () => {
expect(await publisher.fetchTechDocsMetadata(entityName)).toStrictEqual(
techdocsMetadata,
);
});
}, 30000);
it('should return tech docs metadata when json encoded with single quotes', async () => {
const techdocsMetadataPath = path.join(
@@ -528,7 +535,7 @@ describe('AwsS3Publish', () => {
);
fs.writeFileSync(techdocsMetadataPath, techdocsMetadataContent);
});
}, 30000);
it('should return an error if the techdocs_metadata.json file is not present', async () => {
const publisher = await createPublisherFromConfig();
@@ -549,7 +556,7 @@ describe('AwsS3Publish', () => {
});
it('should return an error if the techdocs_metadata.json file cannot be read from stream', async () => {
s3Mock.on(GetObjectCommand).callsFake(_ => {
s3Mock.on(GetObjectCommand).callsFake((_: any) => {
return {
Body: new ErrorReadable('No stream!'),
};
@@ -582,7 +589,7 @@ describe('AwsS3Publish', () => {
const publisher = await createPublisherFromConfig();
await publisher.publish({ entity, directory });
app = express().use(publisher.docsRouter());
});
}, 30000);
it('should pass expected object path to bucket', async () => {
// Ensures leading slash is trimmed and encoded path is decoded.
@@ -688,7 +695,7 @@ describe('AwsS3Publish', () => {
});
it('should return 404 if file cannot be read from stream', async () => {
s3Mock.on(GetObjectCommand).callsFake(_ => {
s3Mock.on(GetObjectCommand).callsFake((_: any) => {
return {
Body: new ErrorReadable('No stream!'),
};
+286 -37
View File
@@ -26,10 +26,12 @@ import {
DeleteObjectCommand,
HeadBucketCommand,
HeadObjectCommand,
PutObjectCommand,
PutObjectCommandInput,
ListObjectsV2CommandOutput,
ListObjectsV2Command,
S3Client,
S3ServiceException,
} from '@aws-sdk/client-s3';
import { fromTemporaryCredentials } from '@aws-sdk/credential-providers';
import { NodeHttpHandler } from '@smithy/node-http-handler';
@@ -84,6 +86,7 @@ export class AwsS3Publish implements PublisherBase {
private readonly logger: LoggerService;
private readonly bucketRootPath: string;
private readonly sse?: 'aws:kms' | 'AES256';
private readonly maxAttempts: number;
constructor(options: {
storageClient: S3Client;
@@ -92,6 +95,7 @@ export class AwsS3Publish implements PublisherBase {
logger: LoggerService;
bucketRootPath: string;
sse?: 'aws:kms' | 'AES256';
maxAttempts: number;
}) {
this.storageClient = options.storageClient;
this.bucketName = options.bucketName;
@@ -99,6 +103,7 @@ export class AwsS3Publish implements PublisherBase {
this.logger = options.logger;
this.bucketRootPath = options.bucketRootPath;
this.sse = options.sse;
this.maxAttempts = options.maxAttempts;
}
static async fromConfig(
@@ -175,10 +180,22 @@ export class AwsS3Publish implements PublisherBase {
...(region && { region }),
...(endpoint && { endpoint }),
...(forcePathStyle && { forcePathStyle }),
...(maxAttempts && { maxAttempts }),
// Enhanced retry configuration for better reliability
maxAttempts: maxAttempts || 5,
retryMode: 'adaptive',
...(httpsProxy && {
requestHandler: new NodeHttpHandler({
httpsAgent: new HttpsProxyAgent({ proxy: httpsProxy }),
// Enhanced connection setting for large file uploads
connectionTimeout: 60000,
socketTimeout: 120000,
}),
}),
// Add default request handler with enhanced timeouts if no proxy
...(!httpsProxy && {
requestHandler: new NodeHttpHandler({
connectionTimeout: 60000,
socketTimeout: 120000,
}),
}),
});
@@ -195,6 +212,7 @@ export class AwsS3Publish implements PublisherBase {
legacyPathCasing,
logger,
sse,
maxAttempts: maxAttempts || 5,
});
}
@@ -250,6 +268,126 @@ export class AwsS3Publish implements PublisherBase {
return explicitCredentials;
}
/**
* Custom retry wrapper for S3 operations with detailed error handling.
*/
private async retryOperation<TOutput>(
operation: () => Promise<TOutput>,
operationName: string,
maxAttempts: number = 3,
): Promise<TOutput> {
let attempts = maxAttempts;
let LastError: S3ServiceException;
while (attempts > 0) {
try {
return await operation();
} catch (error: unknown) {
LastError = error as S3ServiceException;
attempts--;
const httpStatusCode = LastError.$metadata?.httpStatusCode;
const errorCode = LastError.name;
this.logger.warn(`${operationName} failed.`, {
errorCode,
httpStatusCode,
attemptsRemaining: attempts,
currentAttempt: maxAttempts - attempts,
totalAttempts: maxAttempts,
error: LastError.message,
});
// Determine if we should retry based on error type
const shouldRetry = this.shouldRetryOperation(LastError, attempts);
if (!shouldRetry || attempts === 0) {
this.logger.error(
`${operationName} failed after all retries: ${LastError.message}`,
);
throw LastError;
}
// Enhanced exponential backoff with jitter for upload operation
let baseDelay = 1000;
if (operationName.startsWith('Upload-')) {
// for uploads use longer base delay due to potential multipart commplexity
baseDelay = 2000;
}
const backoffDelay = Math.min(
baseDelay * Math.pow(2, maxAttempts - attempts),
30000,
);
const jitter = Math.random() * 1000;
const totalDelay = backoffDelay + jitter;
await new Promise(resolve => setTimeout(resolve, totalDelay));
}
}
// Final attempt without retry wrapper
return operation();
}
/**
* Determines if an S3 operation should be retried based on the error details.
*/
private shouldRetryOperation(
error: S3ServiceException,
attemptsRemaining: number,
): boolean {
const httpStatusCode = error.$metadata?.httpStatusCode;
const errorCode = error.name;
// Handle invalid part errors first - these are retriable for multipart uploads
if (errorCode === 'InvalidPart') {
return attemptsRemaining > 0;
}
// Dont retry for client errors (4xx) except specific cases
if (httpStatusCode && httpStatusCode >= 400 && httpStatusCode < 500) {
// Retry specfic 4xx errors that might be transient
const retriable4xxErrors = [
'RequestTimeOut',
'RequestTimeoutException',
'PriorRequestNotComplete',
'ConnectionError',
'RequestTimeToooSkewed',
'InvalidPart',
'NoSuchUpload',
];
if (!retriable4xxErrors.includes(errorCode)) {
return false;
}
}
// Always retry for server errors (5xx)
if (httpStatusCode && httpStatusCode >= 500) {
return attemptsRemaining > 0;
}
// Retry specific network/connection errors and multipart upload errors
const retriableErrors = [
'NetworkingError',
'TimeoutError',
'ConnectionError',
'ECONNRESET',
'ENOTFOUND',
'ECONNREFUSED',
'ETIMEDOUT',
'ServiceUnavailable',
'SlowDown',
'Throttling',
'ThrottlingException',
'ProvisionedThroughputExceededException',
// Multipart upload specific errors - now handled above but kept for completeness
'InvalidPart',
'NoSuchUpload',
'UploadTimeout',
'EntityTooLarge',
'InternalError',
'IncompleteBody',
'RequestTimeout',
];
return (
retriableErrors.some(
retriableError =>
errorCode.includes(retriableError) ||
error.message.includes(retriableError),
) && attemptsRemaining > 0
);
}
/**
* Check if the defined bucket exists. Being able to connect means the configuration is good
@@ -273,13 +411,15 @@ export class AwsS3Publish implements PublisherBase {
'explicitly defining credentials and region in techdocs.publisher.awsS3 in app config or ' +
'by using environment variables. Refer to https://backstage.io/docs/features/techdocs/using-cloud-storage',
);
this.logger.error(`from AWS client library`, error);
this.logger.error(
`from AWS client library`,
error instanceof Error ? error : new Error(String(error)),
);
return {
isAvailable: false,
};
}
}
/**
* Upload all the files from the generated `directory` to the S3 bucket.
* Directory structure used in the bucket is - entityNamespace/entityKind/entityName/index.html
@@ -293,6 +433,9 @@ export class AwsS3Publish implements PublisherBase {
const bucketRootPath = this.bucketRootPath;
const sse = this.sse;
// Track timing for performance monitoring
const publishStartTime = Date.now();
// First, try to retrieve a list of all individual files currently existing
let existingFiles: string[] = [];
try {
@@ -302,9 +445,20 @@ export class AwsS3Publish implements PublisherBase {
useLegacyPathCasing,
bucketRootPath,
);
existingFiles = await this.getAllObjectsFromBucket({
prefix: remoteFolder,
});
const response = await this.retryOperation(
async () => {
const listCommand = new ListObjectsV2Command({
Bucket: this.bucketName,
Prefix: remoteFolder,
});
return this.storageClient.send(listCommand);
},
'ListObjects',
this.maxAttempts,
);
existingFiles = (response.Contents || [])
.map(f => f.Key || '')
.filter(f => !!f);
} catch (e) {
assertError(e);
this.logger.error(
@@ -320,30 +474,103 @@ export class AwsS3Publish implements PublisherBase {
// e.g. ['index.html', 'sub-page/index.html', 'assets/images/favicon.png']
absoluteFilesToUpload = await getFileTreeRecursively(directory);
let uploadCounter = 0;
await bulkStorageOperation(
async absoluteFilePath => {
uploadCounter++;
const relativeFilePath = path.relative(directory, absoluteFilePath);
const fileStream = fs.createReadStream(absoluteFilePath);
const s3Key = getCloudPathForLocalPath(
entity,
relativeFilePath,
useLegacyPathCasing,
bucketRootPath,
);
const params: PutObjectCommandInput = {
Bucket: this.bucketName,
Key: getCloudPathForLocalPath(
entity,
relativeFilePath,
useLegacyPathCasing,
bucketRootPath,
),
Body: fileStream,
Key: s3Key,
Body: absoluteFilePath,
...(sse && { ServerSideEncryption: sse }),
};
objects.push(params.Key!);
// Get file stats before upload
const stats = await fs.stat(absoluteFilePath);
const fileSizeInBytes = stats.size;
const upload = new Upload({
client: this.storageClient,
params,
});
return upload.done();
// Use retry wrapper for uploads with enhanced error handling
try {
const result = await this.retryOperation(
async () => {
const fiveMB = 5 * 1024 * 1024;
// For files smaller than 5MB, use simple PutObject to avoid multipart complexity
if (fileSizeInBytes < fiveMB) {
const fileContent = await fs.readFile(absoluteFilePath);
const putParams = { ...params, Body: fileContent };
return this.storageClient.send(
new PutObjectCommand(putParams),
);
}
// For files 5MB and larger, use multipart upload with enhanced configuration
const calaculatedPartSize = Math.max(
fiveMB,
Math.ceil(fileSizeInBytes / 10000),
);
const upload = new Upload({
client: this.storageClient,
params,
// Configure miltipart upload option for better reliability
partSize: calaculatedPartSize,
queueSize: 3,
leavePartsOnError: false,
});
return upload.done();
},
`Upload-${params.Key}`,
this.maxAttempts,
);
return result;
} catch (error) {
const s3Error = error as any;
const errorName = s3Error?.name || 'Unknown';
// Check if this is a multipart upload failure that we can handle
if (
fileSizeInBytes >= 5 * 1024 * 1024 &&
(errorName === 'InvalidPart' || errorName === 'NoSuchUpload')
) {
this.logger.warn(
`Multipart upload failed for ${params.Key}, Attempting simple upload fallback.`,
);
try {
// Attempt simple upload as a fallback
const fileContent = await fs.readFile(absoluteFilePath);
const simpleParams = { ...params, Body: fileContent };
const fallbackResult = await this.storageClient.send(
new PutObjectCommand(simpleParams),
);
this.logger.info(
`Simple upload fallback succeeded for ${params.Key}`,
);
return fallbackResult;
} catch (fallbackError) {
this.logger.error(
`Both multipart and simple upload failed for ${params.Key}: ${
fallbackError instanceof Error
? fallbackError.message
: String(fallbackError)
}`,
);
// Fall through to throw original error
}
}
this.logger.error(
`Upload failed for ${params.Key}: ${
error instanceof Error ? error.message : String(error)
}`,
);
throw error;
}
},
absoluteFilesToUpload,
{ concurrencyLimit: 10 },
@@ -373,17 +600,21 @@ export class AwsS3Publish implements PublisherBase {
await bulkStorageOperation(
async relativeFilePath => {
return await this.storageClient.send(
new DeleteObjectCommand({
Bucket: this.bucketName,
Key: relativeFilePath,
}),
return this.retryOperation(
async () => {
const deleteCommand = new DeleteObjectCommand({
Bucket: this.bucketName,
Key: relativeFilePath,
});
return this.storageClient.send(deleteCommand);
},
'DeleteObject',
this.maxAttempts,
);
},
staleFiles,
{ concurrencyLimit: 10 },
);
this.logger.info(
`Successfully deleted stale files for Entity ${entity.metadata.name}. Total number of files: ${staleFiles.length}`,
);
@@ -391,6 +622,13 @@ export class AwsS3Publish implements PublisherBase {
const errorMessage = `Unable to delete file(s) from AWS S3. ${error}`;
this.logger.error(errorMessage);
}
const publishEndTime = Date.now();
const publishDurationMs = publishEndTime - publishStartTime;
this.logger.info(
`Successfully published ${objects.length} files for ${
entity.metadata.name
} in ${Math.round(publishDurationMs / 1000)}s`,
);
return { objects };
}
@@ -413,11 +651,16 @@ export class AwsS3Publish implements PublisherBase {
}
try {
const resp = await this.storageClient.send(
new GetObjectCommand({
Bucket: this.bucketName,
Key: `${entityRootDir}/techdocs_metadata.json`,
}),
const resp = await this.retryOperation(
async () => {
const getCommand = new GetObjectCommand({
Bucket: this.bucketName,
Key: `${entityRootDir}/techdocs_metadata.json`,
});
return this.storageClient.send(getCommand);
},
'GetTechDocsMetadata',
this.maxAttempts,
);
const techdocsMetadataJson = await streamToBuffer(
@@ -598,12 +841,18 @@ export class AwsS3Publish implements PublisherBase {
let allObjects: ListObjectsV2CommandOutput;
// Iterate through every file in the root of the publisher.
do {
allObjects = await this.storageClient.send(
new ListObjectsV2Command({
Bucket: this.bucketName,
ContinuationToken: nextContinuation,
...(prefix ? { Prefix: prefix } : {}),
}),
const currentToken = nextContinuation;
allObjects = await this.retryOperation(
async () => {
const listCommand = new ListObjectsV2Command({
Bucket: this.bucketName,
ContinuationToken: currentToken,
...(prefix ? { Prefix: prefix } : {}),
});
return this.storageClient.send(listCommand);
},
'GetAllObjects',
this.maxAttempts,
);
objects.push(
...(allObjects.Contents || []).map(f => f.Key || '').filter(f => !!f),