mirror of
https://github.com/distribution/distribution
synced 2024-11-12 05:45:51 +01:00
Prevent false sharing in signature fetch
The original implementation wrote to different locations in a shared slice. While this is theoretically okay, we end up thrashing the cpu cache since multiple slice members may be on the same cache line. So, even though each thread has its own memory location, there may be contention over the cache line. This changes the code to aggregate to a slice in a single goroutine. In reality, this change likely won't have any performance impact. The theory proposed above hasn't really even been tested. Either way, we can consider it and possibly go forward. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
5a4f66b2f8
commit
84046e03e0
@ -36,8 +36,13 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
signatures := make([][]byte, len(signaturePaths)) // make space for everything
|
type result struct {
|
||||||
errCh := make(chan error, 1) // buffered chan so one proceeds
|
index int
|
||||||
|
signature []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
ch := make(chan result)
|
||||||
|
|
||||||
for i, sigPath := range signaturePaths {
|
for i, sigPath := range signaturePaths {
|
||||||
// Append the link portion
|
// Append the link portion
|
||||||
sigPath = path.Join(sigPath, "link")
|
sigPath = path.Join(sigPath, "link")
|
||||||
@ -47,33 +52,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
context.GetLogger(s.ctx).
|
context.GetLogger(s.ctx).
|
||||||
Debugf("fetching signature from %q", sigPath)
|
Debugf("fetching signature from %q", sigPath)
|
||||||
p, err := s.blobStore.linked(sigPath)
|
|
||||||
if err != nil {
|
r := result{index: idx}
|
||||||
|
if p, err := s.blobStore.linked(sigPath); err != nil {
|
||||||
context.GetLogger(s.ctx).
|
context.GetLogger(s.ctx).
|
||||||
Errorf("error fetching signature from %q: %v", sigPath, err)
|
Errorf("error fetching signature from %q: %v", sigPath, err)
|
||||||
|
r.err = err
|
||||||
// try to send an error, if it hasn't already been sent.
|
} else {
|
||||||
select {
|
r.signature = p
|
||||||
case errCh <- err:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
signatures[idx] = p
|
|
||||||
|
ch <- r
|
||||||
}(i, sigPath)
|
}(i, sigPath)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
// aggregrate the results
|
||||||
case err := <-errCh:
|
signatures := make([][]byte, len(signaturePaths))
|
||||||
// just return the first error, similar to single threaded code.
|
loop:
|
||||||
return nil, err
|
for {
|
||||||
default:
|
select {
|
||||||
// pass
|
case result := <-ch:
|
||||||
|
signatures[result.index] = result.signature
|
||||||
|
if result.err != nil && err == nil {
|
||||||
|
// only set the first one.
|
||||||
|
err = result.err
|
||||||
|
}
|
||||||
|
case <-done:
|
||||||
|
break loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return signatures, nil
|
return signatures, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
|
func (s *signatureStore) Put(dgst digest.Digest, signatures ...[]byte) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user