2015-01-28 08:27:46 +01:00
|
|
|
package notifications
|
|
|
|
|
|
|
|
import (
|
|
|
|
"net/http"
|
|
|
|
"time"
|
2018-08-06 21:12:07 +02:00
|
|
|
|
2020-08-24 13:18:39 +02:00
|
|
|
"github.com/distribution/distribution/v3/configuration"
|
2018-03-14 01:08:11 +01:00
|
|
|
events "github.com/docker/go-events"
|
2015-01-28 08:27:46 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// EndpointConfig covers the optional configuration parameters for an active
|
|
|
|
// endpoint.
|
|
|
|
type EndpointConfig struct {
|
2016-09-13 00:07:49 +02:00
|
|
|
Headers http.Header
|
|
|
|
Timeout time.Duration
|
|
|
|
Threshold int
|
|
|
|
Backoff time.Duration
|
|
|
|
IgnoredMediaTypes []string
|
2017-02-08 20:38:44 +01:00
|
|
|
Transport *http.Transport `json:"-"`
|
2018-01-18 11:26:54 +01:00
|
|
|
Ignore configuration.Ignore
|
2015-01-28 08:27:46 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// defaults set any zero-valued fields to a reasonable default.
|
|
|
|
func (ec *EndpointConfig) defaults() {
|
|
|
|
if ec.Timeout <= 0 {
|
|
|
|
ec.Timeout = time.Second
|
|
|
|
}
|
|
|
|
|
|
|
|
if ec.Threshold <= 0 {
|
|
|
|
ec.Threshold = 10
|
|
|
|
}
|
|
|
|
|
|
|
|
if ec.Backoff <= 0 {
|
|
|
|
ec.Backoff = time.Second
|
|
|
|
}
|
2016-07-09 21:59:05 +02:00
|
|
|
|
|
|
|
if ec.Transport == nil {
|
|
|
|
ec.Transport = http.DefaultTransport.(*http.Transport)
|
|
|
|
}
|
2015-01-28 08:27:46 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Endpoint is a reliable, queued, thread-safe sink that notify external http
|
|
|
|
// services when events are written. Writes are non-blocking and always
|
|
|
|
// succeed for callers but events may be queued internally.
|
|
|
|
type Endpoint struct {
|
2018-03-14 01:08:11 +01:00
|
|
|
events.Sink
|
2015-01-28 08:27:46 +01:00
|
|
|
url string
|
|
|
|
name string
|
|
|
|
|
|
|
|
EndpointConfig
|
|
|
|
|
|
|
|
metrics *safeMetrics
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewEndpoint returns a running endpoint, ready to receive events.
|
|
|
|
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
|
|
|
var endpoint Endpoint
|
|
|
|
endpoint.name = name
|
|
|
|
endpoint.url = url
|
|
|
|
endpoint.EndpointConfig = config
|
|
|
|
endpoint.defaults()
|
2018-10-11 15:39:02 +02:00
|
|
|
endpoint.metrics = newSafeMetrics(name)
|
2015-01-28 08:27:46 +01:00
|
|
|
|
|
|
|
// Configures the inmemory queue, retry, http pipeline.
|
|
|
|
endpoint.Sink = newHTTPSink(
|
|
|
|
endpoint.url, endpoint.Timeout, endpoint.Headers,
|
2016-07-09 21:59:05 +02:00
|
|
|
endpoint.Transport, endpoint.metrics.httpStatusListener())
|
2018-03-14 01:08:11 +01:00
|
|
|
endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff))
|
2015-01-28 08:27:46 +01:00
|
|
|
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
2018-01-18 11:26:54 +01:00
|
|
|
mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...)
|
|
|
|
endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions)
|
2015-01-28 08:27:46 +01:00
|
|
|
|
|
|
|
register(&endpoint)
|
|
|
|
return &endpoint
|
|
|
|
}
|
|
|
|
|
|
|
|
// Name returns the name of the endpoint, generally used for debugging.
|
|
|
|
func (e *Endpoint) Name() string {
|
|
|
|
return e.name
|
|
|
|
}
|
|
|
|
|
|
|
|
// URL returns the url of the endpoint.
|
|
|
|
func (e *Endpoint) URL() string {
|
|
|
|
return e.url
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadMetrics populates em with metrics from the endpoint.
|
|
|
|
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
|
|
|
|
e.metrics.Lock()
|
|
|
|
defer e.metrics.Unlock()
|
|
|
|
|
|
|
|
*em = e.metrics.EndpointMetrics
|
|
|
|
// Map still need to copied in a threadsafe manner.
|
|
|
|
em.Statuses = make(map[string]int)
|
|
|
|
for k, v := range e.metrics.Statuses {
|
|
|
|
em.Statuses[k] = v
|
|
|
|
}
|
|
|
|
}
|