diff --git a/.github/workflows/plonds-uploader.yml b/.github/workflows/plonds-uploader.yml index b59f0b6..d81eb85 100644 --- a/.github/workflows/plonds-uploader.yml +++ b/.github/workflows/plonds-uploader.yml @@ -22,6 +22,9 @@ env: PLONDS_S3_PREFIX: lanmountain/update/plonds PLONDS_S3_PUBLIC_BASE_KEY_PREFIX: lanmountain/update PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY: '4' + PLONDS_S3_MULTIPART_THRESHOLD_MB: '8' + PLONDS_S3_MULTIPART_PART_SIZE_MB: '8' + PLONDS_S3_MULTIPART_CONCURRENCY: '4' jobs: publish: @@ -119,7 +122,10 @@ jobs: --s3-secret-key "$S3_SECRET_KEY" \ --s3-public-base-url "$PUBLIC_BASE" \ --s3-public-base-key-prefix "$PLONDS_S3_PUBLIC_BASE_KEY_PREFIX" \ - --directory-upload-concurrency "$PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY" + --directory-upload-concurrency "$PLONDS_S3_DIRECTORY_UPLOAD_CONCURRENCY" \ + --multipart-threshold-mb "$PLONDS_S3_MULTIPART_THRESHOLD_MB" \ + --multipart-part-size-mb "$PLONDS_S3_MULTIPART_PART_SIZE_MB" \ + --multipart-concurrency "$PLONDS_S3_MULTIPART_CONCURRENCY" jq -e '.downloads.github.changedZipUrl and .downloads.github.filesZipUrl and .downloads.s3.changedFolderUrl and .downloads.s3.filesFolderUrl' plonds-assets/PLONDS.json >/dev/null diff --git a/.trae/specs/plonds-client-service/spec.md b/.trae/specs/plonds-client-service/spec.md index 9e7dacc..72dbbb3 100644 --- a/.trae/specs/plonds-client-service/spec.md +++ b/.trae/specs/plonds-client-service/spec.md @@ -118,6 +118,7 @@ Publisher 上传到 S3 的版本目录: - `PLONDS.json` 的 downloads 字段同时包含 GitHub 与 S3 的增量包、完整包位置。 - Publisher 必须先完成版本目录内的 `changed.zip`、`Files.zip`、解压目录和版本 `PLONDS.json` 上传,再更新 `/PLONDS.json` latest 指针。 - Publisher 的 S3 目录上传必须支持重跑续传;同 key 且大小一致的对象可以跳过,避免失败后从头上传完整包目录。 +- Publisher 上传大对象时应使用 S3 multipart upload,以避免 `changed.zip` / `Files.zip` 在低吞吐链路上被单次 PUT 长时间阻塞。 ## 7. 建议代码结构 diff --git a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3Client.cs b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3Client.cs index c988f84..6250cc9 100644 --- a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3Client.cs +++ b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3Client.cs @@ -3,6 +3,7 @@ using System.Net; using System.Net.Http.Headers; using System.Security.Cryptography; using System.Text; +using System.Xml.Linq; namespace Plonds.Core.Publishing; @@ -29,7 +30,10 @@ public sealed class PlondsS3Client : IDisposable PublicBaseUrl = Require(options.PublicBaseUrl, nameof(options.PublicBaseUrl)).TrimEnd('/'), PublicBaseKeyPrefix = NormalizeOptionalKeyPrefix(options.PublicBaseKeyPrefix), RequestTimeout = options.RequestTimeout <= TimeSpan.Zero ? TimeSpan.FromMinutes(30) : options.RequestTimeout, - MaxUploadAttempts = Math.Max(1, options.MaxUploadAttempts) + MaxUploadAttempts = Math.Max(1, options.MaxUploadAttempts), + MultipartThresholdBytes = Math.Max(5L * 1024 * 1024, options.MultipartThresholdBytes), + MultipartPartSizeBytes = Math.Max(5L * 1024 * 1024, options.MultipartPartSizeBytes), + MultipartConcurrency = Math.Max(1, options.MultipartConcurrency) }; ownsHttpClient = httpClient is null; @@ -53,6 +57,19 @@ public sealed class PlondsS3Client : IDisposable var payloadHash = PayloadUtilities.ComputeSha256(sourcePath); var contentLength = new FileInfo(sourcePath).Length; + if (contentLength >= options.MultipartThresholdBytes) + { + try + { + await UploadFileMultipartAsync(sourcePath, key, upload.ContentType, contentLength, cancellationToken).ConfigureAwait(false); + return; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + Console.Error.WriteLine($"S3 multipart upload failed for {key}; falling back to single PUT. {ex.Message}"); + } + } + for (var attempt = 1; attempt <= options.MaxUploadAttempts; attempt++) { try @@ -69,6 +86,216 @@ public sealed class PlondsS3Client : IDisposable } } + private async Task UploadFileMultipartAsync( + string sourcePath, + string key, + string? contentType, + long contentLength, + CancellationToken cancellationToken) + { + var uploadId = await CreateMultipartUploadAsync(key, contentType, cancellationToken).ConfigureAwait(false); + var partCount = checked((int)((contentLength + options.MultipartPartSizeBytes - 1) / options.MultipartPartSizeBytes)); + var parts = new PlondsS3UploadedPart[partCount]; + + Console.WriteLine($"Uploading S3 object {key} ({FormatBytes(contentLength)}) using multipart upload {uploadId}: {partCount} parts, part size {FormatBytes(options.MultipartPartSizeBytes)}, concurrency {options.MultipartConcurrency}."); + + try + { + var completed = 0; + await Parallel.ForEachAsync( + Enumerable.Range(1, partCount), + new ParallelOptions + { + MaxDegreeOfParallelism = options.MultipartConcurrency, + CancellationToken = cancellationToken + }, + async (partNumber, token) => + { + var offset = (long)(partNumber - 1) * options.MultipartPartSizeBytes; + var length = Math.Min(options.MultipartPartSizeBytes, contentLength - offset); + parts[partNumber - 1] = await UploadMultipartPartWithRetriesAsync( + sourcePath, + key, + uploadId, + partNumber, + offset, + length, + token).ConfigureAwait(false); + + var done = Interlocked.Increment(ref completed); + Console.WriteLine($"S3 multipart progress {key}: {done}/{partCount} parts uploaded."); + }).ConfigureAwait(false); + + await CompleteMultipartUploadAsync(key, uploadId, parts, cancellationToken).ConfigureAwait(false); + Console.WriteLine($"Uploaded S3 object {key} using multipart upload."); + } + catch + { + await AbortMultipartUploadBestEffortAsync(key, uploadId, CancellationToken.None).ConfigureAwait(false); + throw; + } + } + + private async Task CreateMultipartUploadAsync(string key, string? contentType, CancellationToken cancellationToken) + { + var requestUri = BuildObjectUri(key, "uploads="); + using var request = new HttpRequestMessage(HttpMethod.Post, requestUri); + if (!string.IsNullOrWhiteSpace(contentType)) + { + request.Headers.TryAddWithoutValidation("Content-Type", contentType); + } + + SignRequest(request, key, EmptyPayloadHash, DateTimeOffset.UtcNow); + + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + var body = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + throw new InvalidOperationException($"S3 create multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(body, 512)}"); + } + + var uploadId = XDocument.Parse(body).Descendants().FirstOrDefault(element => element.Name.LocalName == "UploadId")?.Value; + return string.IsNullOrWhiteSpace(uploadId) + ? throw new InvalidOperationException($"S3 create multipart upload response did not include UploadId for {key}.") + : uploadId; + } + + private async Task UploadMultipartPartWithRetriesAsync( + string sourcePath, + string key, + string uploadId, + int partNumber, + long offset, + long length, + CancellationToken cancellationToken) + { + for (var attempt = 1; attempt <= options.MaxUploadAttempts; attempt++) + { + try + { + return await UploadMultipartPartOnceAsync( + sourcePath, + key, + uploadId, + partNumber, + offset, + length, + attempt, + cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) when (attempt < options.MaxUploadAttempts && IsRetriable(ex)) + { + var delay = TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, attempt))); + Console.Error.WriteLine($"S3 multipart retry {attempt + 1}/{options.MaxUploadAttempts} for {key} part {partNumber} after {delay.TotalSeconds:0}s: {ex.Message}"); + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + } + + throw new InvalidOperationException($"S3 multipart upload failed for {key} part {partNumber}."); + } + + private async Task UploadMultipartPartOnceAsync( + string sourcePath, + string key, + string uploadId, + int partNumber, + long offset, + long length, + int attempt, + CancellationToken cancellationToken) + { + var requestUri = BuildObjectUri(key, $"partNumber={partNumber}&uploadId={Uri.EscapeDataString(uploadId)}"); + var bytes = new byte[length]; + await using (var fileStream = File.OpenRead(sourcePath)) + { + fileStream.Seek(offset, SeekOrigin.Begin); + var totalRead = 0; + while (totalRead < bytes.Length) + { + var read = await fileStream.ReadAsync(bytes.AsMemory(totalRead), cancellationToken).ConfigureAwait(false); + if (read == 0) + { + throw new EndOfStreamException($"Unexpected end of file while reading {sourcePath} for part {partNumber}."); + } + + totalRead += read; + } + } + + var payloadHash = Sha256Hex(bytes); + Console.WriteLine($"Uploading S3 multipart part {partNumber} for {key} ({FormatBytes(length)}), attempt {attempt}/{options.MaxUploadAttempts}."); + + using var content = new ByteArrayContent(bytes); + content.Headers.ContentLength = length; + using var request = new HttpRequestMessage(HttpMethod.Put, requestUri) + { + Content = content + }; + SignRequest(request, key, payloadHash, DateTimeOffset.UtcNow); + + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + var body = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + throw new InvalidOperationException($"S3 multipart upload failed for {key} part {partNumber}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(body, 512)}"); + } + + var etag = response.Headers.ETag?.Tag; + if (string.IsNullOrWhiteSpace(etag)) + { + throw new InvalidOperationException($"S3 multipart upload did not return ETag for {key} part {partNumber}."); + } + + return new PlondsS3UploadedPart(partNumber, etag); + } + + private async Task CompleteMultipartUploadAsync( + string key, + string uploadId, + IReadOnlyList parts, + CancellationToken cancellationToken) + { + var body = BuildCompleteMultipartUploadBody(parts); + var bodyBytes = Encoding.UTF8.GetBytes(body); + var payloadHash = Sha256Hex(bodyBytes); + var requestUri = BuildObjectUri(key, $"uploadId={Uri.EscapeDataString(uploadId)}"); + + using var content = new ByteArrayContent(bodyBytes); + content.Headers.ContentType = new MediaTypeHeaderValue("application/xml"); + content.Headers.ContentLength = bodyBytes.Length; + using var request = new HttpRequestMessage(HttpMethod.Post, requestUri) + { + Content = content + }; + SignRequest(request, key, payloadHash, DateTimeOffset.UtcNow); + + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); + throw new InvalidOperationException($"S3 complete multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}. {Truncate(responseBody, 512)}"); + } + } + + private async Task AbortMultipartUploadBestEffortAsync(string key, string uploadId, CancellationToken cancellationToken) + { + try + { + var requestUri = BuildObjectUri(key, $"uploadId={Uri.EscapeDataString(uploadId)}"); + using var request = new HttpRequestMessage(HttpMethod.Delete, requestUri); + SignRequest(request, key, EmptyPayloadHash, DateTimeOffset.UtcNow); + using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + if (!response.IsSuccessStatusCode) + { + Console.Error.WriteLine($"S3 abort multipart upload failed for {key}: HTTP {(int)response.StatusCode} {response.ReasonPhrase}."); + } + } + catch (Exception ex) + { + Console.Error.WriteLine($"S3 abort multipart upload failed for {key}: {ex.Message}"); + } + } + public async Task UploadFileIfChangedAsync(PlondsS3ObjectUpload upload, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(upload); @@ -204,6 +431,7 @@ public sealed class PlondsS3Client : IDisposable var dateStamp = now.UtcDateTime.ToString("yyyyMMdd", CultureInfo.InvariantCulture); var credentialScope = $"{dateStamp}/{options.Region}/{ServiceName}/aws4_request"; var canonicalUri = BuildCanonicalUri(key); + var canonicalQueryString = BuildCanonicalQueryString(request.RequestUri); var host = request.RequestUri?.IsDefaultPort == true ? request.RequestUri.Host : request.RequestUri?.Authority; @@ -227,7 +455,7 @@ public sealed class PlondsS3Client : IDisposable [ request.Method.Method, canonicalUri, - string.Empty, + canonicalQueryString, canonicalHeaders.ToString(), signedHeaders, payloadHash @@ -247,13 +475,14 @@ public sealed class PlondsS3Client : IDisposable request.Headers.TryAddWithoutValidation("Authorization", authorization); } - private Uri BuildObjectUri(string key) + private Uri BuildObjectUri(string key, string? query = null) { var bucketPrefix = Uri.EscapeDataString(options.Bucket).Replace("%2F", "/", StringComparison.OrdinalIgnoreCase); var path = $"{options.Endpoint.AbsolutePath.TrimEnd('/')}/{bucketPrefix}/{BuildCanonicalKey(key)}"; var builder = new UriBuilder(options.Endpoint) { - Path = path + Path = path, + Query = query ?? string.Empty }; return builder.Uri; @@ -272,6 +501,27 @@ public sealed class PlondsS3Client : IDisposable .Select(Uri.EscapeDataString)); } + private static string BuildCanonicalQueryString(Uri? uri) + { + if (uri is null || string.IsNullOrEmpty(uri.Query)) + { + return string.Empty; + } + + return string.Join("&", uri.Query.TrimStart('?') + .Split('&', StringSplitOptions.RemoveEmptyEntries) + .Select(parameter => + { + var parts = parameter.Split('=', 2); + var name = Uri.UnescapeDataString(parts[0]); + var value = parts.Length > 1 ? Uri.UnescapeDataString(parts[1]) : string.Empty; + return new KeyValuePair(name, value); + }) + .OrderBy(parameter => parameter.Key, StringComparer.Ordinal) + .ThenBy(parameter => parameter.Value, StringComparer.Ordinal) + .Select(parameter => $"{Uri.EscapeDataString(parameter.Key)}={Uri.EscapeDataString(parameter.Value)}")); + } + private static string NormalizeKey(string value) { var normalized = value.Replace('\\', '/').Trim('/'); @@ -320,6 +570,23 @@ public sealed class PlondsS3Client : IDisposable return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(value))).ToLowerInvariant(); } + private static string Sha256Hex(byte[] value) + { + return Convert.ToHexString(SHA256.HashData(value)).ToLowerInvariant(); + } + + private static string BuildCompleteMultipartUploadBody(IEnumerable parts) + { + var document = new XDocument( + new XElement("CompleteMultipartUpload", + parts.OrderBy(part => part.PartNumber) + .Select(part => new XElement("Part", + new XElement("PartNumber", part.PartNumber.ToString(CultureInfo.InvariantCulture)), + new XElement("ETag", part.ETag))))); + + return document.ToString(SaveOptions.DisableFormatting); + } + private static byte[] HmacSha256(byte[] key, string data) { return HMACSHA256.HashData(key, Encoding.UTF8.GetBytes(data)); @@ -371,4 +638,6 @@ public sealed class PlondsS3Client : IDisposable return $"{value:0.##} {units[unit]}"; } + + private sealed record PlondsS3UploadedPart(int PartNumber, string ETag); } diff --git a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3ClientOptions.cs b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3ClientOptions.cs index be2ed14..ed05255 100644 --- a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3ClientOptions.cs +++ b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Core/Publishing/PlondsS3ClientOptions.cs @@ -12,4 +12,10 @@ public sealed record PlondsS3ClientOptions( public TimeSpan RequestTimeout { get; init; } = TimeSpan.FromMinutes(30); public int MaxUploadAttempts { get; init; } = 3; + + public long MultipartThresholdBytes { get; init; } = 8L * 1024 * 1024; + + public long MultipartPartSizeBytes { get; init; } = 8L * 1024 * 1024; + + public int MultipartConcurrency { get; init; } = 4; } diff --git a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Tool/Program.cs b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Tool/Program.cs index 79e4828..002b0fa 100644 --- a/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Tool/Program.cs +++ b/PenguinLogisticsOnlineNetworkDistributionSystem/src/Plonds.Tool/Program.cs @@ -113,7 +113,12 @@ internal static class PlondsCli AccessKey: Require(options, "s3-access-key"), SecretKey: Require(options, "s3-secret-key"), PublicBaseUrl: Require(options, "s3-public-base-url"), - PublicBaseKeyPrefix: Get(options, "s3-public-base-key-prefix", string.Empty) ?? string.Empty)) + PublicBaseKeyPrefix: Get(options, "s3-public-base-key-prefix", string.Empty) ?? string.Empty) + { + MultipartThresholdBytes = GetLong(options, "multipart-threshold-mb", 8) * 1024 * 1024, + MultipartPartSizeBytes = GetLong(options, "multipart-part-size-mb", 8) * 1024 * 1024, + MultipartConcurrency = GetInt(options, "multipart-concurrency", 4) + }) { DirectoryUploadConcurrency = GetInt(options, "directory-upload-concurrency", 4) }).ConfigureAwait(false); @@ -216,6 +221,9 @@ internal static class PlondsCli Console.WriteLine(" [--s3-public-base-key-prefix ] Key prefix already represented by public URL"); Console.WriteLine(" [--s3-prefix ] Object key prefix (default: lanmountain/update/plonds)"); Console.WriteLine(" [--directory-upload-concurrency ] Parallel file uploads for expanded directories (default: 4)"); + Console.WriteLine(" [--multipart-threshold-mb ] Use multipart upload for files at or above this size (default: 8)"); + Console.WriteLine(" [--multipart-part-size-mb ] Multipart upload part size in MiB (default: 8)"); + Console.WriteLine(" [--multipart-concurrency ] Parallel multipart part uploads (default: 4)"); Console.WriteLine(" [--work-dir ] Temporary publish work directory"); } @@ -230,4 +238,16 @@ internal static class PlondsCli ? parsed : throw new InvalidOperationException($"Option --{key} must be a positive integer."); } + + private static long GetLong(IReadOnlyDictionary options, string key, long defaultValue) + { + if (!options.TryGetValue(key, out var value) || string.IsNullOrWhiteSpace(value)) + { + return defaultValue; + } + + return long.TryParse(value, out var parsed) && parsed > 0 + ? parsed + : throw new InvalidOperationException($"Option --{key} must be a positive integer."); + } }