diff --git a/configuration/configuration.go b/configuration/configuration.go index 0f8c8bc23..55b9fcba1 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -527,13 +527,14 @@ type Notifications struct { // Endpoint describes the configuration of an http webhook notification // endpoint. type Endpoint struct { - Name string `yaml:"name"` // identifies the endpoint in the registry instance. - Disabled bool `yaml:"disabled"` // disables the endpoint - URL string `yaml:"url"` // post url for the endpoint. - Headers http.Header `yaml:"headers"` // static headers that should be added to all requests - Timeout time.Duration `yaml:"timeout"` // HTTP timeout - Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure - Backoff time.Duration `yaml:"backoff"` // backoff duration + Name string `yaml:"name"` // identifies the endpoint in the registry instance. + Disabled bool `yaml:"disabled"` // disables the endpoint + URL string `yaml:"url"` // post url for the endpoint. + Headers http.Header `yaml:"headers"` // static headers that should be added to all requests + Timeout time.Duration `yaml:"timeout"` // HTTP timeout + Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure + Backoff time.Duration `yaml:"backoff"` // backoff duration + IgnoredMediaTypes []string `yaml:"ignoredmediatypes"` // target media types to ignore } // Reporting defines error reporting methods. diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 08c015ee3..3e1583dd1 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -62,6 +62,7 @@ var configStruct = Configuration{ Headers: http.Header{ "Authorization": []string{"Bearer "}, }, + IgnoredMediaTypes: []string{"application/octet-stream"}, }, }, }, @@ -139,6 +140,8 @@ notifications: url: http://example.com headers: Authorization: [Bearer ] + ignoredmediatypes: + - application/octet-stream reporting: bugsnag: apikey: BugsnagApiKey @@ -165,6 +168,8 @@ notifications: url: http://example.com headers: Authorization: [Bearer ] + ignoredmediatypes: + - application/octet-stream http: headers: X-Content-Type-Options: [nosniff] diff --git a/docs/configuration.md b/docs/configuration.md index 53af5ed82..fb3de48e2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -214,6 +214,8 @@ information about each option that appears later in this page. timeout: 500 threshold: 5 backoff: 1000 + ignoredmediatypes: + - application/octet-stream redis: addr: localhost:6379 password: asecret @@ -1177,6 +1179,8 @@ settings for the registry. timeout: 500 threshold: 5 backoff: 1000 + ignoredmediatypes: + - application/octet-stream The notifications option is **optional** and currently may contain a single option, `endpoints`. @@ -1291,6 +1295,18 @@ The URL to which events should be published. If you omit the suffix, the system interprets the value as nanoseconds. + + + ignoredmediatypes + + + no + + + List of target media types to ignore. An event whose target media type + is present in this list will not be published to the endpoint. + + diff --git a/notifications/endpoint.go b/notifications/endpoint.go index b5ed955d1..29a9e27b5 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -8,11 +8,12 @@ import ( // EndpointConfig covers the optional configuration parameters for an active // endpoint. type EndpointConfig struct { - Headers http.Header - Timeout time.Duration - Threshold int - Backoff time.Duration - Transport *http.Transport + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration + IgnoredMediaTypes []string + Transport *http.Transport } // defaults set any zero-valued fields to a reasonable default. @@ -62,6 +63,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.Transport, endpoint.metrics.httpStatusListener()) endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes) register(&endpoint) return &endpoint diff --git a/notifications/sinks.go b/notifications/sinks.go index dda4a5653..549ba97e2 100644 --- a/notifications/sinks.go +++ b/notifications/sinks.go @@ -210,6 +210,44 @@ func (eq *eventQueue) next() []Event { return block } +// ignoredMediaTypesSink discards events with ignored target media types and +// passes the rest along. +type ignoredMediaTypesSink struct { + Sink + ignored map[string]bool +} + +func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink { + if len(ignored) == 0 { + return sink + } + + ignoredMap := make(map[string]bool) + for _, mediaType := range ignored { + ignoredMap[mediaType] = true + } + + return &ignoredMediaTypesSink{ + Sink: sink, + ignored: ignoredMap, + } +} + +// Write discards events with ignored target media types and passes the rest +// along. +func (imts *ignoredMediaTypesSink) Write(events ...Event) error { + var kept []Event + for _, e := range events { + if !imts.ignored[e.Target.MediaType] { + kept = append(kept, e) + } + } + if len(kept) == 0 { + return nil + } + return imts.Sink.Write(kept...) +} + // retryingSink retries the write until success or an ErrSinkClosed is // returned. Underlying sink must have p > 0 of succeeding or the sink will // block. Internally, it is a circuit breaker retries to manage reset. diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 89756a999..1bfa12c6b 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -3,6 +3,7 @@ package notifications import ( "fmt" "math/rand" + "reflect" "sync" "time" @@ -112,6 +113,38 @@ func TestEventQueue(t *testing.T) { } } +func TestIgnoredMediaTypesSink(t *testing.T) { + blob := createTestEvent("push", "library/test", "blob") + manifest := createTestEvent("push", "library/test", "manifest") + + type testcase struct { + ignored []string + expected []Event + } + + cases := []testcase{ + {nil, []Event{blob, manifest}}, + {[]string{"other"}, []Event{blob, manifest}}, + {[]string{"blob"}, []Event{manifest}}, + {[]string{"blob", "manifest"}, nil}, + } + + for _, c := range cases { + ts := &testSink{} + s := newIgnoredMediaTypesSink(ts, c.ignored) + + if err := s.Write(blob, manifest); err != nil { + t.Fatalf("error writing event: %v", err) + } + + ts.mu.Lock() + if !reflect.DeepEqual(ts.events, c.expected) { + t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected) + } + ts.mu.Unlock() + } +} + func TestRetryingSink(t *testing.T) { // Make a sync that fails most of the time, ensuring that all the events diff --git a/registry/handlers/app.go b/registry/handlers/app.go index cdd88bf12..4df15ae6e 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -427,10 +427,11 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ - Timeout: endpoint.Timeout, - Threshold: endpoint.Threshold, - Backoff: endpoint.Backoff, - Headers: endpoint.Headers, + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + IgnoredMediaTypes: endpoint.IgnoredMediaTypes, }) sinks = append(sinks, endpoint)