fix.plonds-s3-multipart-upload

This commit is contained in:
lincube
2026-06-02 10:09:06 +08:00
parent 04b95020bd
commit 54d97e312d
5 changed files with 308 additions and 6 deletions

View File

@@ -22,6 +22,9 @@ env:
PLONDS_S3_PREFIX: lanmountain/update/plonds PLONDS_S3_PREFIX: lanmountain/update/plonds
PLONDS_S3_PUBLIC_BASE_KEY_PREFIX: lanmountain/update PLONDS_S3_PUBLIC_BASE_KEY_PREFIX: lanmountain/update
PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY: '4' PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY: '4'
PLONDS_S3_MULTIPART_THRESHOLD_MB: '8'
PLONDS_S3_MULTIPART_PART_SIZE_MB: '8'
PLONDS_S3_MULTIPART_CONCURRENCY: '4'
jobs: jobs:
publish: publish:
@@ -119,7 +122,10 @@ jobs:
--s3-secret-key "$S3_SECRET_KEY" \ --s3-secret-key "$S3_SECRET_KEY" \
--s3-public-base-url "$PUBLIC_BASE" \ --s3-public-base-url "$PUBLIC_BASE" \
--s3-public-base-key-prefix "$PLONDS_S3_PUBLIC_BASE_KEY_PREFIX" \ --s3-public-base-key-prefix "$PLONDS_S3_PUBLIC_BASE_KEY_PREFIX" \
--directory-upload-concurrency "$PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY" --directory-upload-concurrency "$PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY" \
--multipart-threshold-mb "$PLONDS_S3_MULTIPART_THRESHOLD_MB" \
--multipart-part-size-mb "$PLONDS_S3_MULTIPART_PART_SIZE_MB" \
--multipart-concurrency "$PLONDS_S3_MULTIPART_CONCURRENCY"
jq -e '.downloads.github.changedZipUrl and .downloads.github.filesZipUrl and .downloads.s3.changedFolderUrl and .downloads.s3.filesFolderUrl' plonds-assets/PLONDS.json >/dev/null jq -e '.downloads.github.changedZipUrl and .downloads.github.filesZipUrl and .downloads.s3.changedFolderUrl and .downloads.s3.filesFolderUrl' plonds-assets/PLONDS.json >/dev/null

View File

@@ -118,6 +118,7 @@ Publisher 上传到 S3 的版本目录:
- `PLONDS.json` 的 downloads 字段同时包含 GitHub 与 S3 的增量包、完整包位置。 - `PLONDS.json` 的 downloads 字段同时包含 GitHub 与 S3 的增量包、完整包位置。
- Publisher 必须先完成版本目录内的 `changed.zip``Files.zip`、解压目录和版本 `PLONDS.json` 上传,再更新 `<prefix>/PLONDS.json` latest 指针。 - Publisher 必须先完成版本目录内的 `changed.zip``Files.zip`、解压目录和版本 `PLONDS.json` 上传,再更新 `<prefix>/PLONDS.json` latest 指针。
- Publisher 的 S3 目录上传必须支持重跑续传;同 key 且大小一致的对象可以跳过,避免失败后从头上传完整包目录。 - Publisher 的 S3 目录上传必须支持重跑续传;同 key 且大小一致的对象可以跳过,避免失败后从头上传完整包目录。
- Publisher 上传大对象时应使用 S3 multipart upload以避免 `changed.zip` / `Files.zip` 在低吞吐链路上被单次 PUT 长时间阻塞。
## 7. 建议代码结构 ## 7. 建议代码结构

View File

@@ -3,6 +3,7 @@ using System.Net;
using System.Net.Http.Headers; using System.Net.Http.Headers;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Xml.Linq;
namespace Plonds.Core.Publishing; namespace Plonds.Core.Publishing;
@@ -29,7 +30,10 @@ public sealed class PlondsS3Client : IDisposable
PublicBaseUrl = Require(options.PublicBaseUrl, nameof(options.PublicBaseUrl)).TrimEnd('/'), PublicBaseUrl = Require(options.PublicBaseUrl, nameof(options.PublicBaseUrl)).TrimEnd('/'),
PublicBaseKeyPrefix = NormalizeOptionalKeyPrefix(options.PublicBaseKeyPrefix), PublicBaseKeyPrefix = NormalizeOptionalKeyPrefix(options.PublicBaseKeyPrefix),
RequestTimeout = options.RequestTimeout <= TimeSpan.Zero ? TimeSpan.FromMinutes(30) : options.RequestTimeout, RequestTimeout = options.RequestTimeout <= TimeSpan.Zero ? TimeSpan.FromMinutes(30) : options.RequestTimeout,
MaxUploadAttempts = Math.Max(1, options.MaxUploadAttempts) MaxUploadAttempts = Math.Max(1, options.MaxUploadAttempts),
MultipartThresholdBytes = Math.Max(5L * 1024 * 1024, options.MultipartThresholdBytes),
MultipartPartSizeBytes = Math.Max(5L * 1024 * 1024, options.MultipartPartSizeBytes),
MultipartConcurrency = Math.Max(1, options.MultipartConcurrency)
}; };
ownsHttpClient = httpClient is null; ownsHttpClient = httpClient is null;
@@ -53,6 +57,19 @@ public sealed class PlondsS3Client : IDisposable
var payloadHash = PayloadUtilities.ComputeSha256(sourcePath); var payloadHash = PayloadUtilities.ComputeSha256(sourcePath);
var contentLength = new FileInfo(sourcePath).Length; var contentLength = new FileInfo(sourcePath).Length;
if (contentLength >= options.MultipartThresholdBytes)
{
try
{
await UploadFileMultipartAsync(sourcePath, key, upload.ContentType, contentLength, cancellationToken).ConfigureAwait(false);
return;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
Console.Error.WriteLine($"S3 multipart upload failed for {key}; falling back to single PUT. {ex.Message}");
}
}
for (var attempt = 1; attempt <= options.MaxUploadAttempts; attempt++) for (var attempt = 1; attempt <= options.MaxUploadAttempts; attempt++)
{ {
try try
@@ -69,6 +86,216 @@ public sealed class PlondsS3Client : IDisposable
} }
} }
private async Task UploadFileMultipartAsync(
string sourcePath,
string key,
string? contentType,
long contentLength,
CancellationToken cancellationToken)
{
var uploadId = await CreateMultipartUploadAsync(key, contentType, cancellationToken).ConfigureAwait(false);
var partCount = checked((int)((contentLength + options.MultipartPartSizeBytes - 1) / options.MultipartPartSizeBytes));
var parts = new PlondsS3UploadedPart[partCount];
Console.WriteLine($"Uploading S3 object {key} ({FormatBytes(contentLength)}) using multipart upload {uploadId}: {partCount} parts, part size {FormatBytes(options.MultipartPartSizeBytes)}, concurrency {options.MultipartConcurrency}.");
try
{
var completed = 0;
await Parallel.ForEachAsync(
Enumerable.Range(1, partCount),
new ParallelOptions
{
MaxDegreeOfParallelism = options.MultipartConcurrency,
CancellationToken = cancellationToken
},
async (partNumber, token) =>
{
var offset = (long)(partNumber - 1) * options.MultipartPartSizeBytes;
var length = Math.Min(options.MultipartPartSizeBytes, contentLength - offset);
parts[partNumber - 1] = await UploadMultipartPartWithRetriesAsync(
sourcePath,
key,
uploadId,
partNumber,
offset,
length,
token).ConfigureAwait(false);
var done = Interlocked.Increment(ref completed);
Console.WriteLine($"S3 multipart progress {key}: {done}/{partCount} parts uploaded.");
}).ConfigureAwait(false);
await CompleteMultipartUploadAsync(key, uploadId, parts, cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Uploaded S3 object {key} using multipart upload.");
}
catch
{
await AbortMultipartUploadBestEffortAsync(key, uploadId, CancellationToken.None).ConfigureAwait(false);
throw;
}
}
private async Task<string> CreateMultipartUploadAsync(string key, string? contentType, CancellationToken cancellationToken)
{
var requestUri = BuildObjectUri(key, "uploads=");
using var request = new HttpRequestMessage(HttpMethod.Post, requestUri);
if (!string.IsNullOrWhiteSpace(contentType))
{
request.Headers.TryAddWithoutValidation("Content-Type", contentType);
}
SignRequest(request, key, EmptyPayloadHash, DateTimeOffset.UtcNow);
using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var body = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
throw new InvalidOperationException($"S3 create multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(body, 512)}");
}
var uploadId = XDocument.Parse(body).Descendants().FirstOrDefault(element => element.Name.LocalName == "UploadId")?.Value;
return string.IsNullOrWhiteSpace(uploadId)
? throw new InvalidOperationException($"S3 create multipart upload response did not include UploadId for {key}.")
: uploadId;
}
private async Task<PlondsS3UploadedPart> UploadMultipartPartWithRetriesAsync(
string sourcePath,
string key,
string uploadId,
int partNumber,
long offset,
long length,
CancellationToken cancellationToken)
{
for (var attempt = 1; attempt <= options.MaxUploadAttempts; attempt++)
{
try
{
return await UploadMultipartPartOnceAsync(
sourcePath,
key,
uploadId,
partNumber,
offset,
length,
attempt,
cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (attempt < options.MaxUploadAttempts && IsRetriable(ex))
{
var delay = TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, attempt)));
Console.Error.WriteLine($"S3 multipart retry {attempt + 1}/{options.MaxUploadAttempts} for {key} part {partNumber} after {delay.TotalSeconds:0}s: {ex.Message}");
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
}
throw new InvalidOperationException($"S3 multipart upload failed for {key} part {partNumber}.");
}
private async Task<PlondsS3UploadedPart> UploadMultipartPartOnceAsync(
string sourcePath,
string key,
string uploadId,
int partNumber,
long offset,
long length,
int attempt,
CancellationToken cancellationToken)
{
var requestUri = BuildObjectUri(key, $"partNumber={partNumber}&uploadId={Uri.EscapeDataString(uploadId)}");
var bytes = new byte[length];
await using (var fileStream = File.OpenRead(sourcePath))
{
fileStream.Seek(offset, SeekOrigin.Begin);
var totalRead = 0;
while (totalRead < bytes.Length)
{
var read = await fileStream.ReadAsync(bytes.AsMemory(totalRead), cancellationToken).ConfigureAwait(false);
if (read == 0)
{
throw new EndOfStreamException($"Unexpected end of file while reading {sourcePath} for part {partNumber}.");
}
totalRead += read;
}
}
var payloadHash = Sha256Hex(bytes);
Console.WriteLine($"Uploading S3 multipart part {partNumber} for {key} ({FormatBytes(length)}), attempt {attempt}/{options.MaxUploadAttempts}.");
using var content = new ByteArrayContent(bytes);
content.Headers.ContentLength = length;
using var request = new HttpRequestMessage(HttpMethod.Put, requestUri)
{
Content = content
};
SignRequest(request, key, payloadHash, DateTimeOffset.UtcNow);
using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
var body = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
throw new InvalidOperationException($"S3 multipart upload failed for {key} part {partNumber}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(body, 512)}");
}
var etag = response.Headers.ETag?.Tag;
if (string.IsNullOrWhiteSpace(etag))
{
throw new InvalidOperationException($"S3 multipart upload did not return ETag for {key} part {partNumber}.");
}
return new PlondsS3UploadedPart(partNumber, etag);
}
private async Task CompleteMultipartUploadAsync(
string key,
string uploadId,
IReadOnlyList<PlondsS3UploadedPart> parts,
CancellationToken cancellationToken)
{
var body = BuildCompleteMultipartUploadBody(parts);
var bodyBytes = Encoding.UTF8.GetBytes(body);
var payloadHash = Sha256Hex(bodyBytes);
var requestUri = BuildObjectUri(key, $"uploadId={Uri.EscapeDataString(uploadId)}");
using var content = new ByteArrayContent(bodyBytes);
content.Headers.ContentType = new MediaTypeHeaderValue("application/xml");
content.Headers.ContentLength = bodyBytes.Length;
using var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = content
};
SignRequest(request, key, payloadHash, DateTimeOffset.UtcNow);
using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
var responseBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
throw new InvalidOperationException($"S3 complete multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(responseBody, 512)}");
}
}
private async Task AbortMultipartUploadBestEffortAsync(string key, string uploadId, CancellationToken cancellationToken)
{
try
{
var requestUri = BuildObjectUri(key, $"uploadId={Uri.EscapeDataString(uploadId)}");
using var request = new HttpRequestMessage(HttpMethod.Delete, requestUri);
SignRequest(request, key, EmptyPayloadHash, DateTimeOffset.UtcNow);
using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
Console.Error.WriteLine($"S3 abort multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}.");
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"S3 abort multipart upload failed for {key}: {ex.Message}");
}
}
public async Task<bool> UploadFileIfChangedAsync(PlondsS3ObjectUpload upload, CancellationToken cancellationToken = default) public async Task<bool> UploadFileIfChangedAsync(PlondsS3ObjectUpload upload, CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(upload); ArgumentNullException.ThrowIfNull(upload);
@@ -204,6 +431,7 @@ public sealed class PlondsS3Client : IDisposable
var dateStamp = now.UtcDateTime.ToString("yyyyMMdd", CultureInfo.InvariantCulture); var dateStamp = now.UtcDateTime.ToString("yyyyMMdd", CultureInfo.InvariantCulture);
var credentialScope = $"{dateStamp}/{options.Region}/{ServiceName}/aws4_request"; var credentialScope = $"{dateStamp}/{options.Region}/{ServiceName}/aws4_request";
var canonicalUri = BuildCanonicalUri(key); var canonicalUri = BuildCanonicalUri(key);
var canonicalQueryString = BuildCanonicalQueryString(request.RequestUri);
var host = request.RequestUri?.IsDefaultPort == true var host = request.RequestUri?.IsDefaultPort == true
? request.RequestUri.Host ? request.RequestUri.Host
: request.RequestUri?.Authority; : request.RequestUri?.Authority;
@@ -227,7 +455,7 @@ public sealed class PlondsS3Client : IDisposable
[ [
request.Method.Method, request.Method.Method,
canonicalUri, canonicalUri,
string.Empty, canonicalQueryString,
canonicalHeaders.ToString(), canonicalHeaders.ToString(),
signedHeaders, signedHeaders,
payloadHash payloadHash
@@ -247,13 +475,14 @@ public sealed class PlondsS3Client : IDisposable
request.Headers.TryAddWithoutValidation("Authorization", authorization); request.Headers.TryAddWithoutValidation("Authorization", authorization);
} }
private Uri BuildObjectUri(string key) private Uri BuildObjectUri(string key, string? query = null)
{ {
var bucketPrefix = Uri.EscapeDataString(options.Bucket).Replace("%2F", "/", StringComparison.OrdinalIgnoreCase); var bucketPrefix = Uri.EscapeDataString(options.Bucket).Replace("%2F", "/", StringComparison.OrdinalIgnoreCase);
var path = $"{options.Endpoint.AbsolutePath.TrimEnd('/')}/{bucketPrefix}/{BuildCanonicalKey(key)}"; var path = $"{options.Endpoint.AbsolutePath.TrimEnd('/')}/{bucketPrefix}/{BuildCanonicalKey(key)}";
var builder = new UriBuilder(options.Endpoint) var builder = new UriBuilder(options.Endpoint)
{ {
Path = path Path = path,
Query = query ?? string.Empty
}; };
return builder.Uri; return builder.Uri;
@@ -272,6 +501,27 @@ public sealed class PlondsS3Client : IDisposable
.Select(Uri.EscapeDataString)); .Select(Uri.EscapeDataString));
} }
private static string BuildCanonicalQueryString(Uri? uri)
{
if (uri is null || string.IsNullOrEmpty(uri.Query))
{
return string.Empty;
}
return string.Join("&", uri.Query.TrimStart('?')
.Split('&', StringSplitOptions.RemoveEmptyEntries)
.Select(parameter =>
{
var parts = parameter.Split('=', 2);
var name = Uri.UnescapeDataString(parts[0]);
var value = parts.Length > 1 ? Uri.UnescapeDataString(parts[1]) : string.Empty;
return new KeyValuePair<string, string>(name, value);
})
.OrderBy(parameter => parameter.Key, StringComparer.Ordinal)
.ThenBy(parameter => parameter.Value, StringComparer.Ordinal)
.Select(parameter => $"{Uri.EscapeDataString(parameter.Key)}={Uri.EscapeDataString(parameter.Value)}"));
}
private static string NormalizeKey(string value) private static string NormalizeKey(string value)
{ {
var normalized = value.Replace('\\', '/').Trim('/'); var normalized = value.Replace('\\', '/').Trim('/');
@@ -320,6 +570,23 @@ public sealed class PlondsS3Client : IDisposable
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(value))).ToLowerInvariant(); return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(value))).ToLowerInvariant();
} }
private static string Sha256Hex(byte[] value)
{
return Convert.ToHexString(SHA256.HashData(value)).ToLowerInvariant();
}
private static string BuildCompleteMultipartUploadBody(IEnumerable<PlondsS3UploadedPart> parts)
{
var document = new XDocument(
new XElement("CompleteMultipartUpload",
parts.OrderBy(part => part.PartNumber)
.Select(part => new XElement("Part",
new XElement("PartNumber", part.PartNumber.ToString(CultureInfo.InvariantCulture)),
new XElement("ETag", part.ETag)))));
return document.ToString(SaveOptions.DisableFormatting);
}
private static byte[] HmacSha256(byte[] key, string data) private static byte[] HmacSha256(byte[] key, string data)
{ {
return HMACSHA256.HashData(key, Encoding.UTF8.GetBytes(data)); return HMACSHA256.HashData(key, Encoding.UTF8.GetBytes(data));
@@ -371,4 +638,6 @@ public sealed class PlondsS3Client : IDisposable
return $"{value:0.##} {units[unit]}"; return $"{value:0.##} {units[unit]}";
} }
private sealed record PlondsS3UploadedPart(int PartNumber, string ETag);
} }

View File

@@ -12,4 +12,10 @@ public sealed record PlondsS3ClientOptions(
public TimeSpan RequestTimeout { get; init; } = TimeSpan.FromMinutes(30); public TimeSpan RequestTimeout { get; init; } = TimeSpan.FromMinutes(30);
public int MaxUploadAttempts { get; init; } = 3; public int MaxUploadAttempts { get; init; } = 3;
public long MultipartThresholdBytes { get; init; } = 8L * 1024 * 1024;
public long MultipartPartSizeBytes { get; init; } = 8L * 1024 * 1024;
public int MultipartConcurrency { get; init; } = 4;
} }

View File

@@ -113,7 +113,12 @@ internal static class PlondsCli
AccessKey: Require(options, "s3-access-key"), AccessKey: Require(options, "s3-access-key"),
SecretKey: Require(options, "s3-secret-key"), SecretKey: Require(options, "s3-secret-key"),
PublicBaseUrl: Require(options, "s3-public-base-url"), PublicBaseUrl: Require(options, "s3-public-base-url"),
PublicBaseKeyPrefix: Get(options, "s3-public-base-key-prefix", string.Empty) ?? string.Empty)) PublicBaseKeyPrefix: Get(options, "s3-public-base-key-prefix", string.Empty) ?? string.Empty)
{
MultipartThresholdBytes = GetLong(options, "multipart-threshold-mb", 8) * 1024 * 1024,
MultipartPartSizeBytes = GetLong(options, "multipart-part-size-mb", 8) * 1024 * 1024,
MultipartConcurrency = GetInt(options, "multipart-concurrency", 4)
})
{ {
DirectoryUploadConcurrency = GetInt(options, "directory-upload-concurrency", 4) DirectoryUploadConcurrency = GetInt(options, "directory-upload-concurrency", 4)
}).ConfigureAwait(false); }).ConfigureAwait(false);
@@ -216,6 +221,9 @@ internal static class PlondsCli
Console.WriteLine(" [--s3-public-base-key-prefix <prefix>] Key prefix already represented by public URL"); Console.WriteLine(" [--s3-public-base-key-prefix <prefix>] Key prefix already represented by public URL");
Console.WriteLine(" [--s3-prefix <prefix>] Object key prefix (default: lanmountain/update/plonds)"); Console.WriteLine(" [--s3-prefix <prefix>] Object key prefix (default: lanmountain/update/plonds)");
Console.WriteLine(" [--directory-upload-concurrency <n>] Parallel file uploads for expanded directories (default: 4)"); Console.WriteLine(" [--directory-upload-concurrency <n>] Parallel file uploads for expanded directories (default: 4)");
Console.WriteLine(" [--multipart-threshold-mb <n>] Use multipart upload for files at or above this size (default: 8)");
Console.WriteLine(" [--multipart-part-size-mb <n>] Multipart upload part size in MiB (default: 8)");
Console.WriteLine(" [--multipart-concurrency <n>] Parallel multipart part uploads (default: 4)");
Console.WriteLine(" [--work-dir <dir>] Temporary publish work directory"); Console.WriteLine(" [--work-dir <dir>] Temporary publish work directory");
} }
@@ -230,4 +238,16 @@ internal static class PlondsCli
? parsed ? parsed
: throw new InvalidOperationException($"Option --{key} must be a positive integer."); : throw new InvalidOperationException($"Option --{key} must be a positive integer.");
} }
private static long GetLong(IReadOnlyDictionary<string, string> options, string key, long defaultValue)
{
if (!options.TryGetValue(key, out var value) || string.IsNullOrWhiteSpace(value))
{
return defaultValue;
}
return long.TryParse(value, out var parsed) && parsed > 0
? parsed
: throw new InvalidOperationException($"Option --{key} must be a positive integer.");
}
} }