S3 driver: Attempt HeadObject on Stat first, fail over to List (#4401)

This commit is contained in:
Milos Gajdos 2024-07-17 10:25:16 +01:00 committed by GitHub
commit 753d64b677
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 59 additions and 20 deletions

View File

@ -763,37 +763,76 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
return nil, storagedriver.PathNotFoundError{Path: path}
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
func (d *driver) statHead(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
resp, err := d.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
})
if err != nil {
return nil, err
}
return &storagedriver.FileInfoFields{
Path: path,
IsDir: false,
Size: *resp.ContentLength,
ModTime: *resp.LastModified,
}, nil
}
func (d *driver) statList(ctx context.Context, path string) (*storagedriver.FileInfoFields, error) {
s3Path := d.s3Path(path)
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Prefix: aws.String(s3Path),
MaxKeys: aws.Int64(1),
})
if err != nil {
return nil, err
}
fi := storagedriver.FileInfoFields{
Path: path,
}
if len(resp.Contents) == 1 {
if *resp.Contents[0].Key != d.s3Path(path) {
fi.IsDir = true
} else {
fi.IsDir = false
fi.Size = *resp.Contents[0].Size
fi.ModTime = *resp.Contents[0].LastModified
if *resp.Contents[0].Key != s3Path {
return &storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}, nil
}
} else if len(resp.CommonPrefixes) == 1 {
fi.IsDir = true
} else {
return nil, storagedriver.PathNotFoundError{Path: path}
return &storagedriver.FileInfoFields{
Path: path,
Size: *resp.Contents[0].Size,
ModTime: *resp.Contents[0].LastModified,
}, nil
}
if len(resp.CommonPrefixes) == 1 {
return &storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}, nil
}
return nil, storagedriver.PathNotFoundError{Path: path}
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
fi, err := d.statHead(ctx, path)
if err != nil {
// For AWS errors, we fail over to ListObjects:
// Though the official docs https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_Errors
// are slightly outdated, the HeadObject actually returns NotFound error
// if querying a key which doesn't exist or a key which has nested keys
// and Forbidden if IAM/ACL permissions do not allow Head but allow List.
var awsErr awserr.Error
if errors.As(err, &awsErr) {
fi, err := d.statList(ctx, path)
if err != nil {
return nil, parseError(path, err)
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
}
// For non-AWS errors, return the error directly
return nil, err
}
return storagedriver.FileInfoInternal{FileInfoFields: *fi}, nil
}
// List returns a list of the objects that are direct descendants of the given path.