1
0
mirror of https://github.com/distribution/distribution synced 2024-12-25 15:05:51 +01:00

fix(registry/storage/driver/s3-aws): use a consistent multipart chunk size (#4424)

This commit is contained in:
Milos Gajdos 2024-11-05 11:39:43 +00:00 committed by GitHub
commit 099201adde
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 179 additions and 255 deletions

@ -80,9 +80,6 @@ Amazon S3 or S3 compatible services for object storage.
`loglevel`: (optional) Valid values are: `off` (default), `debug`, `debugwithsigning`, `debugwithhttpbody`, `debugwithrequestretries`, `debugwithrequesterrors` and `debugwitheventstreambody`. See the [AWS SDK for Go API reference](https://docs.aws.amazon.com/sdk-for-go/api/aws/#LogLevelType) for details.
**NOTE:** Currently the S3 storage driver only supports S3 API compatible storage that
allows parts of a multipart upload to vary in size. [Cloudflare R2 is not supported.](https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations)
## S3 permission scopes
The following AWS policy is required by the registry for push and pull. Make sure to replace `S3_BUCKET_NAME` with the name of your bucket.

@ -21,7 +21,7 @@ import (
"math"
"net/http"
"path/filepath"
"reflect"
"slices"
"sort"
"strconv"
"strings"
@ -48,10 +48,6 @@ const driverName = "s3aws"
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = 5 * 1024 * 1024
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 * 1024 * 1024 * 1024
const defaultChunkSize = 2 * minChunkSize
const (
@ -107,7 +103,7 @@ type DriverParameters struct {
Secure bool
SkipVerify bool
V4Auth bool
ChunkSize int64
ChunkSize int
MultipartCopyChunkSize int64
MultipartCopyMaxConcurrency int64
MultipartCopyThresholdSize int64
@ -158,7 +154,7 @@ var _ storagedriver.StorageDriver = &driver{}
type driver struct {
S3 *s3.S3
Bucket string
ChunkSize int64
ChunkSize int
Encrypt bool
KeyID string
MultipartCopyChunkSize int64
@ -313,22 +309,22 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
keyID = ""
}
chunkSize, err := getParameterAsInt64(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
chunkSize, err := getParameterAsInteger(parameters, "chunksize", defaultChunkSize, minChunkSize, maxChunkSize)
if err != nil {
return nil, err
}
multipartCopyChunkSize, err := getParameterAsInt64(parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
multipartCopyChunkSize, err := getParameterAsInteger[int64](parameters, "multipartcopychunksize", defaultMultipartCopyChunkSize, minChunkSize, maxChunkSize)
if err != nil {
return nil, err
}
multipartCopyMaxConcurrency, err := getParameterAsInt64(parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
multipartCopyMaxConcurrency, err := getParameterAsInteger[int64](parameters, "multipartcopymaxconcurrency", defaultMultipartCopyMaxConcurrency, 1, math.MaxInt64)
if err != nil {
return nil, err
}
multipartCopyThresholdSize, err := getParameterAsInt64(parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
multipartCopyThresholdSize, err := getParameterAsInteger[int64](parameters, "multipartcopythresholdsize", defaultMultipartCopyThresholdSize, 0, maxChunkSize)
if err != nil {
return nil, err
}
@ -424,29 +420,29 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
}
params := DriverParameters{
fmt.Sprint(accessKey),
fmt.Sprint(secretKey),
fmt.Sprint(bucket),
region,
fmt.Sprint(regionEndpoint),
forcePathStyleBool,
encryptBool,
fmt.Sprint(keyID),
secureBool,
skipVerifyBool,
v4Bool,
chunkSize,
multipartCopyChunkSize,
multipartCopyMaxConcurrency,
multipartCopyThresholdSize,
fmt.Sprint(rootDirectory),
storageClass,
fmt.Sprint(userAgent),
objectACL,
fmt.Sprint(sessionToken),
useDualStackBool,
accelerateBool,
getS3LogLevelFromParam(parameters["loglevel"]),
AccessKey: fmt.Sprint(accessKey),
SecretKey: fmt.Sprint(secretKey),
Bucket: fmt.Sprint(bucket),
Region: region,
RegionEndpoint: fmt.Sprint(regionEndpoint),
ForcePathStyle: forcePathStyleBool,
Encrypt: encryptBool,
KeyID: fmt.Sprint(keyID),
Secure: secureBool,
SkipVerify: skipVerifyBool,
V4Auth: v4Bool,
ChunkSize: chunkSize,
MultipartCopyChunkSize: multipartCopyChunkSize,
MultipartCopyMaxConcurrency: multipartCopyMaxConcurrency,
MultipartCopyThresholdSize: multipartCopyThresholdSize,
RootDirectory: fmt.Sprint(rootDirectory),
StorageClass: storageClass,
UserAgent: fmt.Sprint(userAgent),
ObjectACL: objectACL,
SessionToken: fmt.Sprint(sessionToken),
UseDualStack: useDualStackBool,
Accelerate: accelerateBool,
LogLevel: getS3LogLevelFromParam(parameters["loglevel"]),
}
return New(ctx, params)
@ -479,33 +475,29 @@ func getS3LogLevelFromParam(param interface{}) aws.LogLevelType {
return logLevel
}
// getParameterAsInt64 converts parameters[name] to an int64 value (using
// defaultt if nil), verifies it is no smaller than min, and returns it.
func getParameterAsInt64(parameters map[string]interface{}, name string, defaultt int64, min int64, max int64) (int64, error) {
rv := defaultt
param := parameters[name]
switch v := param.(type) {
case string:
vv, err := strconv.ParseInt(v, 0, 64)
if err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, param)
}
rv = vv
case int64:
rv = v
case int, uint, int32, uint32, uint64:
rv = reflect.ValueOf(v).Convert(reflect.TypeOf(rv)).Int()
case nil:
// do nothing
default:
return 0, fmt.Errorf("invalid value for %s: %#v", name, param)
type integer interface{ signed | unsigned }
type signed interface {
~int | ~int8 | ~int16 | ~int32 | ~int64
}
if rv < min || rv > max {
return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, rv, min, max)
type unsigned interface {
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}
return rv, nil
// getParameterAsInteger converts parameters[name] to T (using defaultValue if
// nil) and ensures it is in the range of min and max.
func getParameterAsInteger[T integer](parameters map[string]any, name string, defaultValue, min, max T) (T, error) {
v := defaultValue
if p := parameters[name]; p != nil {
if _, err := fmt.Sscanf(fmt.Sprint(p), "%d", &v); err != nil {
return 0, fmt.Errorf("%s parameter must be an integer, %v invalid", name, p)
}
}
if v < min || v > max {
return 0, fmt.Errorf("the %s %#v parameter should be a number between %d and %d (inclusive)", name, v, min, max)
}
return v, nil
}
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
@ -592,11 +584,7 @@ func New(ctx context.Context, params DriverParameters) (*Driver, error) {
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
pool: &sync.Pool{
New: func() interface{} {
return &buffer{
data: make([]byte, 0, params.ChunkSize),
}
},
New: func() any { return &bytes.Buffer{} },
},
}
@ -903,7 +891,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
func (d *driver) Move(ctx context.Context, sourcePath, destPath string) error {
/* This is terrible, but aws doesn't have an actual move. */
if err := d.copy(ctx, sourcePath, destPath); err != nil {
return err
@ -912,7 +900,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
}
// copy copies an object stored at sourcePath to destPath.
func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error {
func (d *driver) copy(ctx context.Context, sourcePath, destPath string) error {
// S3 can copy objects up to 5 GB in size with a single PUT Object - Copy
// operation. For larger objects, the multipart upload API must be used.
//
@ -1121,7 +1109,7 @@ func (d *driver) Walk(ctx context.Context, from string, f storagedriver.WalkFn,
return nil
}
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from string, startAfter string, f storagedriver.WalkFn) error {
func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, from, startAfter string, f storagedriver.WalkFn) error {
var (
retError error
// the most recent directory walked for de-duping
@ -1267,16 +1255,10 @@ func directoryDiff(prev, current string) []string {
}
paths = append(paths, parent)
}
reverse(paths)
slices.Reverse(paths)
return paths
}
func reverse(s []string) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}
func (d *driver) s3Path(path string) string {
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
}
@ -1326,53 +1308,11 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass)
}
// buffer is a static size bytes buffer.
type buffer struct {
data []byte
}
// NewBuffer returns a new bytes buffer from driver's memory pool.
// The size of the buffer is static and set to params.ChunkSize.
func (d *driver) NewBuffer() *buffer {
return d.pool.Get().(*buffer)
}
// ReadFrom reads as much data as it can fit in from r without growing its size.
// It returns the number of bytes successfully read from r or error.
func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) {
for len(b.data) < cap(b.data) && err == nil {
var n int
n, err = r.Read(b.data[len(b.data):cap(b.data)])
offset += int64(n)
b.data = b.data[:len(b.data)+n]
}
// NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF
// See: https://pkg.go.dev/io#ReaderFrom
if err == io.EOF {
err = nil
}
return offset, err
}
// Cap returns the capacity of the buffer's underlying byte slice.
func (b *buffer) Cap() int {
return cap(b.data)
}
// Len returns the length of the data in the buffer
func (b *buffer) Len() int {
return len(b.data)
}
// Clear the buffer data.
func (b *buffer) Clear() {
b.data = b.data[:0]
}
// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
// writer uploads parts to S3 in a buffered fashion where the length of each
// part is [writer.driver.ChunkSize], excluding the last part which may be
// smaller than the configured chunk size and never larger. This allows the
// multipart upload to be cleanly resumed in future. This is violated if
// [writer.Close] is called before at least one chunk is written.
type writer struct {
ctx context.Context
driver *driver
@ -1380,8 +1320,7 @@ type writer struct {
uploadID string
parts []*s3.Part
size int64
ready *buffer
pending *buffer
buf *bytes.Buffer
closed bool
committed bool
cancelled bool
@ -1399,8 +1338,7 @@ func (d *driver) newWriter(ctx context.Context, key, uploadID string, parts []*s
uploadID: uploadID,
parts: parts,
size: size,
ready: d.NewBuffer(),
pending: d.NewBuffer(),
buf: d.pool.Get().(*bytes.Buffer),
}
}
@ -1411,12 +1349,8 @@ func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
if err := w.done(); err != nil {
return 0, err
}
// If the last written part is smaller than minChunkSize, we need to make a
@ -1476,17 +1410,11 @@ func (w *writer) Write(p []byte) (int, error) {
}
defer resp.Body.Close()
// reset uploaded parts
w.parts = nil
w.ready.Clear()
w.reset()
n, err := w.ready.ReadFrom(resp.Body)
if err != nil {
if _, err := io.Copy(w.buf, resp.Body); err != nil {
return 0, err
}
if resp.ContentLength != nil && n < *resp.ContentLength {
return 0, io.ErrShortBuffer
}
} else {
// Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(w.ctx, &s3.UploadPartCopyInput{
@ -1499,52 +1427,21 @@ func (w *writer) Write(p []byte) (int, error) {
if err != nil {
return 0, err
}
w.parts = []*s3.Part{
{
w.parts = []*s3.Part{{
ETag: copyPartResp.CopyPartResult.ETag,
PartNumber: aws.Int64(1),
Size: aws.Int64(w.size),
},
}
}}
}
}
var n int
n, _ := w.buf.Write(p)
defer func() { w.size += int64(n) }()
reader := bytes.NewReader(p)
for reader.Len() > 0 {
// NOTE(milosgajdos): we do some seemingly unsafe conversions
// from int64 to int in this for loop. These are fine as the
// offset returned from buffer.ReadFrom can only ever be
// maxChunkSize large which fits in to int. The reason why
// we return int64 is to play nice with Go interfaces where
// the buffer implements io.ReaderFrom interface.
// fill up the ready parts buffer
offset, err := w.ready.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// try filling up the pending parts buffer
offset, err = w.pending.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// we filled up pending buffer, flush
if w.pending.Len() == w.pending.Cap() {
for w.buf.Len() >= w.driver.ChunkSize {
if err := w.flush(); err != nil {
return n, err
return 0, fmt.Errorf("flush: %w", err)
}
}
}
return n, nil
}
@ -1552,28 +1449,38 @@ func (w *writer) Size() int64 {
return w.size
}
// Close flushes any remaining data in the buffer and releases the buffer back
// to the pool.
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
defer func() {
w.ready.Clear()
w.driver.pool.Put(w.ready)
w.pending.Clear()
w.driver.pool.Put(w.pending)
}()
defer w.releaseBuffer()
return w.flush()
}
func (w *writer) Cancel(ctx context.Context) error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
func (w *writer) reset() {
w.buf.Reset()
w.parts = nil
w.size = 0
}
// releaseBuffer resets the buffer and returns it to the pool.
func (w *writer) releaseBuffer() {
w.buf.Reset()
w.driver.pool.Put(w.buf)
}
// Cancel aborts the multipart upload and closes the writer.
func (w *writer) Cancel(ctx context.Context) error {
if err := w.done(); err != nil {
return err
}
w.cancelled = true
_, err := w.driver.S3.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
@ -1583,17 +1490,14 @@ func (w *writer) Cancel(ctx context.Context) error {
return err
}
// Commit flushes any remaining data in the buffer and completes the multipart
// upload.
func (w *writer) Commit(ctx context.Context) error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
if err := w.done(); err != nil {
return err
}
err := w.flush()
if err != nil {
if err := w.flush(); err != nil {
return err
}
@ -1634,15 +1538,14 @@ func (w *writer) Commit(ctx context.Context) error {
sort.Sort(completedUploadedParts)
_, err = w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
if _, err := w.driver.S3.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedUploadedParts,
},
})
if err != nil {
}); err != nil {
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(w.ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
@ -1655,33 +1558,28 @@ func (w *writer) Commit(ctx context.Context) error {
return nil
}
// flush flushes all buffers to write a part to S3.
// flush is only called by Write (with both buffers full) and Close/Commit (always)
// flush writes at most [w.driver.ChunkSize] of the buffer to S3. flush is only
// called by [writer.Write] if the buffer is full, and always by [writer.Close]
// and [writer.Commit].
func (w *writer) flush() error {
if w.ready.Len() == 0 && w.pending.Len() == 0 {
if w.buf.Len() == 0 {
return nil
}
buf := bytes.NewBuffer(w.ready.data)
if w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize) {
if _, err := buf.Write(w.pending.data); err != nil {
return err
}
w.pending.Clear()
}
r := bytes.NewReader(w.buf.Next(w.driver.ChunkSize))
partSize := buf.Len()
partNumber := aws.Int64(int64(len(w.parts) + 1))
partSize := r.Len()
partNumber := aws.Int64(int64(len(w.parts)) + 1)
resp, err := w.driver.S3.UploadPartWithContext(w.ctx, &s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
PartNumber: partNumber,
UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(buf.Bytes()),
Body: r,
})
if err != nil {
return err
return fmt.Errorf("upload part: %w", err)
}
w.parts = append(w.parts, &s3.Part{
@ -1690,9 +1588,20 @@ func (w *writer) flush() error {
Size: aws.Int64(int64(partSize)),
})
// reset the flushed buffer and swap buffers
w.ready.Clear()
w.ready, w.pending = w.pending, w.ready
w.size += int64(partSize)
return nil
}
// done returns an error if the writer is in an invalid state.
func (w *writer) done() error {
switch {
case w.closed:
return fmt.Errorf("already closed")
case w.committed:
return fmt.Errorf("already committed")
case w.cancelled:
return fmt.Errorf("already cancelled")
}
return nil
}

@ -0,0 +1,10 @@
//go:build arm
package s3
import "math"
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB, but this overflows on 32-bit
// platforms.
const maxChunkSize = math.MaxInt32

@ -0,0 +1,7 @@
//go:build !arm
package s3
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 * 1024 * 1024 * 1024

@ -101,30 +101,34 @@ func init() {
}
}
if objectACL == "" {
objectACL = s3.ObjectCannedACLPrivate
}
parameters := DriverParameters{
accessKey,
secretKey,
bucket,
region,
regionEndpoint,
forcePathStyleBool,
encryptBool,
keyID,
secureBool,
skipVerifyBool,
v4Bool,
minChunkSize,
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
rootDirectory,
storageClass,
driverName + "-test",
objectACL,
sessionToken,
useDualStackBool,
accelerateBool,
getS3LogLevelFromParam(logLevel),
AccessKey: accessKey,
SecretKey: secretKey,
Bucket: bucket,
Region: region,
RegionEndpoint: regionEndpoint,
ForcePathStyle: forcePathStyleBool,
Encrypt: encryptBool,
KeyID: keyID,
Secure: secureBool,
SkipVerify: skipVerifyBool,
V4Auth: v4Bool,
ChunkSize: minChunkSize,
MultipartCopyChunkSize: defaultMultipartCopyChunkSize,
MultipartCopyMaxConcurrency: defaultMultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: defaultMultipartCopyThresholdSize,
RootDirectory: rootDirectory,
StorageClass: storageClass,
UserAgent: driverName + "-test",
ObjectACL: objectACL,
SessionToken: sessionToken,
UseDualStack: useDualStackBool,
Accelerate: accelerateBool,
LogLevel: getS3LogLevelFromParam(logLevel),
}
return New(context.Background(), parameters)

@ -398,31 +398,28 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
filename := randomPath(32)
defer suite.deletePath(firstPart(filename))
contentsChunk1 := randomContents(chunkSize)
contentsChunk2 := randomContents(chunkSize)
contentsChunk3 := randomContents(chunkSize)
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
var fullContents bytes.Buffer
contents := io.TeeReader(newRandReader(chunkSize*3), &fullContents)
writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false)
suite.Require().NoError(err)
nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1))
nn, err := io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err)
suite.Require().Equal(int64(len(contentsChunk1)), nn)
suite.Require().Equal(chunkSize, nn)
err = writer.Close()
suite.Require().NoError(err)
curSize := writer.Size()
suite.Require().Equal(int64(len(contentsChunk1)), curSize)
suite.Require().Equal(chunkSize, curSize)
writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true)
suite.Require().NoError(err)
suite.Require().Equal(curSize, writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2))
nn, err = io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err)
suite.Require().Equal(int64(len(contentsChunk2)), nn)
suite.Require().Equal(chunkSize, nn)
err = writer.Close()
suite.Require().NoError(err)
@ -434,9 +431,9 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
suite.Require().NoError(err)
suite.Require().Equal(curSize, writer.Size())
nn, err = io.Copy(writer, bytes.NewReader(fullContents[curSize:]))
nn, err = io.CopyN(writer, contents, chunkSize)
suite.Require().NoError(err)
suite.Require().Equal(int64(len(fullContents[curSize:])), nn)
suite.Require().Equal(chunkSize, nn)
err = writer.Commit(context.Background())
suite.Require().NoError(err)
@ -445,7 +442,7 @@ func (suite *DriverSuite) testContinueStreamAppend(chunkSize int64) {
received, err := suite.StorageDriver.GetContent(suite.ctx, filename)
suite.Require().NoError(err)
suite.Require().Equal(fullContents, received)
suite.Require().Equal(fullContents.Bytes(), received)
}
// TestReadNonexistentStream tests that reading a stream for a nonexistent path