diff --git a/registry/client/repository.go b/registry/client/repository.go index fc709ded9..6fc2bf727 100644 --- a/registry/client/repository.go +++ b/registry/client/repository.go @@ -391,17 +391,18 @@ func (bs *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { } func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { - stat, err := bs.statter.Stat(ctx, dgst) + blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst) if err != nil { return nil, err } - blobURL, err := bs.ub.BuildBlobURL(bs.name, stat.Digest) - if err != nil { - return nil, err - } - - return transport.NewHTTPReadSeeker(bs.client, blobURL, stat.Size), nil + return transport.NewHTTPReadSeeker(bs.client, blobURL, + func(resp *http.Response) error { + if resp.StatusCode == http.StatusNotFound { + return distribution.ErrBlobUnknown + } + return handleErrorResponse(resp) + }), nil } func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { diff --git a/registry/client/transport/http_reader.go b/registry/client/transport/http_reader.go index b2e74ddb8..b27b6c237 100644 --- a/registry/client/transport/http_reader.go +++ b/registry/client/transport/http_reader.go @@ -2,11 +2,9 @@ package transport import ( "bufio" - "bytes" "errors" "fmt" "io" - "io/ioutil" "net/http" "os" ) @@ -21,11 +19,11 @@ type ReadSeekCloser interface { // request. When seeking and starting a read from a non-zero offset // the a "Range" header will be added which sets the offset. // TODO(dmcgowan): Move this into a separate utility package -func NewHTTPReadSeeker(client *http.Client, url string, size int64) ReadSeekCloser { +func NewHTTPReadSeeker(client *http.Client, url string, errorHandler func(*http.Response) error) ReadSeekCloser { return &httpReadSeeker{ - client: client, - url: url, - size: size, + client: client, + url: url, + errorHandler: errorHandler, } } @@ -33,12 +31,26 @@ type httpReadSeeker struct { client *http.Client url string + // errorHandler creates an error from an unsuccessful HTTP response. + // This allows the error to be created with the HTTP response body + // without leaking the body through a returned error. + errorHandler func(*http.Response) error + size int64 - rc io.ReadCloser // remote read closer - brd *bufio.Reader // internal buffered io - offset int64 - err error + // rc is the remote read closer. + rc io.ReadCloser + // brd is a buffer for internal buffered io. + brd *bufio.Reader + // readerOffset tracks the offset as of the last read. + readerOffset int64 + // seekOffset allows Seek to override the offset. Seek changes + // seekOffset instead of changing readOffset directly so that + // connection resets can be delayed and possibly avoided if the + // seek is undone (i.e. seeking to the end and then back to the + // beginning). + seekOffset int64 + err error } func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { @@ -46,16 +58,29 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { return 0, hrs.err } + // If we seeked to a different position, we need to reset the + // connection. This logic is here instead of Seek so that if + // a seek is undone before the next read, the connection doesn't + // need to be closed and reopened. A common example of this is + // seeking to the end to determine the length, and then seeking + // back to the original position. + if hrs.readerOffset != hrs.seekOffset { + hrs.reset() + } + + hrs.readerOffset = hrs.seekOffset + rd, err := hrs.reader() if err != nil { return 0, err } n, err = rd.Read(p) - hrs.offset += int64(n) + hrs.seekOffset += int64(n) + hrs.readerOffset += int64(n) // Simulate io.EOF error if we reach filesize. - if err == nil && hrs.offset >= hrs.size { + if err == nil && hrs.size >= 0 && hrs.readerOffset >= hrs.size { err = io.EOF } @@ -67,13 +92,20 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { return 0, hrs.err } - var err error - newOffset := hrs.offset + _, err := hrs.reader() + if err != nil { + return 0, err + } + + newOffset := hrs.seekOffset switch whence { case os.SEEK_CUR: newOffset += int64(offset) case os.SEEK_END: + if hrs.size < 0 { + return 0, errors.New("content length not known") + } newOffset = hrs.size + int64(offset) case os.SEEK_SET: newOffset = int64(offset) @@ -82,15 +114,10 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { if newOffset < 0 { err = errors.New("cannot seek to negative position") } else { - if hrs.offset != newOffset { - hrs.reset() - } - - // No problems, set the offset. - hrs.offset = newOffset + hrs.seekOffset = newOffset } - return hrs.offset, err + return hrs.seekOffset, err } func (hrs *httpReadSeeker) Close() error { @@ -130,17 +157,12 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { return hrs.brd, nil } - // If the offset is great than or equal to size, return a empty, noop reader. - if hrs.offset >= hrs.size { - return ioutil.NopCloser(bytes.NewReader([]byte{})), nil - } - req, err := http.NewRequest("GET", hrs.url, nil) if err != nil { return nil, err } - if hrs.offset > 0 { + if hrs.readerOffset > 0 { // TODO(stevvooe): Get this working correctly. // If we are at different offset, issue a range request from there. @@ -158,8 +180,16 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) { // import if resp.StatusCode >= 200 && resp.StatusCode <= 399 { hrs.rc = resp.Body + if resp.StatusCode == http.StatusOK { + hrs.size = resp.ContentLength + } else { + hrs.size = -1 + } } else { defer resp.Body.Close() + if hrs.errorHandler != nil { + return nil, hrs.errorHandler(resp) + } return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) }