mirror of
https://github.com/foomo/keel.git
synced 2025-10-16 12:35:34 +00:00
205 lines
6.9 KiB
Go
205 lines
6.9 KiB
Go
package roundtripware
|
|
|
|
import (
|
|
"errors"
|
|
"net/http"
|
|
"time"
|
|
|
|
keelerrors "github.com/foomo/keel/errors"
|
|
"github.com/foomo/keel/log"
|
|
"github.com/sony/gobreaker"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
// ErrCircuitBreaker is returned when the request failed because the circuit breaker did not let it go to the next
|
|
// RoundTripware. It wraps the two gobreaker errors (ErrTooManyRequests & ErrOpenState) so only one comparison is
|
|
// needed
|
|
ErrCircuitBreaker = errors.New("circuit breaker triggered")
|
|
)
|
|
|
|
// CircuitBreakerSettings is a copy of the gobreaker.Settings, except that the IsSuccessful function is omitted since we
|
|
// want to allow access to the request and response. See `CircuitBreakerWithIsSuccessful` for more.
|
|
type CircuitBreakerSettings struct {
|
|
// Name is the name of the CircuitBreaker.
|
|
Name string
|
|
// MaxRequests is the maximum number of requests allowed to pass through
|
|
// when the CircuitBreaker is half-open.
|
|
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
|
|
MaxRequests uint32
|
|
// Interval is the cyclic period of the closed state
|
|
// for the CircuitBreaker to clear the internal Counts.
|
|
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
|
|
Interval time.Duration
|
|
// Timeout is the period of the open state,
|
|
// after which the state of the CircuitBreaker becomes half-open.
|
|
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
|
|
Timeout time.Duration
|
|
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
|
|
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
|
|
// If ReadyToTrip is nil, default ReadyToTrip is used.
|
|
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
|
|
ReadyToTrip func(counts gobreaker.Counts) bool
|
|
// OnStateChange is called whenever the state of the CircuitBreaker changes.
|
|
OnStateChange func(name string, from gobreaker.State, to gobreaker.State)
|
|
}
|
|
|
|
type CircuitBreakerOptions struct {
|
|
Counter metric.Int64Counter
|
|
|
|
IsSuccessful func(err error, req *http.Request, resp *http.Response) error
|
|
CopyReqBody bool
|
|
CopyRespBody bool
|
|
}
|
|
|
|
func NewDefaultCircuitBreakerOptions() *CircuitBreakerOptions {
|
|
return &CircuitBreakerOptions{
|
|
Counter: nil,
|
|
|
|
IsSuccessful: func(err error, req *http.Request, resp *http.Response) error {
|
|
return err
|
|
},
|
|
CopyReqBody: false,
|
|
CopyRespBody: false,
|
|
}
|
|
}
|
|
|
|
type CircuitBreakerOption func(opts *CircuitBreakerOptions)
|
|
|
|
// CircuitBreakerWithMetric adds a metric that counts the (un-)successful requests
|
|
func CircuitBreakerWithMetric(
|
|
meter metric.Meter,
|
|
meterName string,
|
|
meterDescription string,
|
|
) CircuitBreakerOption {
|
|
// intitialize the success counter
|
|
counter, err := meter.Int64Counter(
|
|
meterName,
|
|
metric.WithDescription(meterDescription),
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return func(opts *CircuitBreakerOptions) {
|
|
opts.Counter = counter
|
|
}
|
|
}
|
|
|
|
func CircuitBreakerWithIsSuccessful(
|
|
isSuccessful func(err error, req *http.Request, resp *http.Response) error,
|
|
copyReqBody bool,
|
|
copyRespBody bool,
|
|
) CircuitBreakerOption {
|
|
return func(opts *CircuitBreakerOptions) {
|
|
opts.IsSuccessful = isSuccessful
|
|
opts.CopyReqBody = copyReqBody
|
|
opts.CopyRespBody = copyRespBody
|
|
}
|
|
}
|
|
|
|
// CircuitBreaker returns a RoundTripper which wraps all the following RoundTripwares and the Handler with a circuit
|
|
// breaker. This will prevent further request once a certain number of requests failed.
|
|
// NOTE: It's strongly advised to add this Roundripware before the metric middleware (if both are used). As the measure-
|
|
// ments of the execution time will otherwise be falsified
|
|
func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) RoundTripware {
|
|
// intitialize the options
|
|
o := NewDefaultCircuitBreakerOptions()
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
|
|
// intitialize the gobreaker
|
|
cbrSettings := gobreaker.Settings{
|
|
Name: set.Name,
|
|
MaxRequests: set.MaxRequests,
|
|
Interval: set.Interval,
|
|
Timeout: set.Timeout,
|
|
ReadyToTrip: set.ReadyToTrip,
|
|
OnStateChange: set.OnStateChange,
|
|
}
|
|
circuitBreaker := gobreaker.NewCircuitBreaker(cbrSettings)
|
|
|
|
return func(l *zap.Logger, next Handler) Handler {
|
|
return func(r *http.Request) (*http.Response, error) {
|
|
if r == nil {
|
|
return nil, errors.New("request is nil")
|
|
}
|
|
|
|
// we need to detect the state change by ourselves, because the context does not allow us to hand in a context
|
|
fromState := circuitBreaker.State()
|
|
|
|
// clone the request and the body if wanted
|
|
var errCopy error
|
|
reqCopy, errCopy := copyRequest(r, o.CopyReqBody)
|
|
if errCopy != nil {
|
|
l.Error("unable to copy request", log.FError(errCopy))
|
|
return nil, errCopy
|
|
} else if o.CopyReqBody && reqCopy.Body != nil {
|
|
// make sure the body is closed again - since it is a NopCloser it does not make a difference though
|
|
defer reqCopy.Body.Close()
|
|
}
|
|
|
|
// call the next handler enclosed in the circuit breaker.
|
|
resp, err := circuitBreaker.Execute(func() (interface{}, error) {
|
|
resp, err := next(r) //nolint:bodyclose
|
|
if resp == nil {
|
|
return nil, o.IsSuccessful(err, reqCopy, nil)
|
|
}
|
|
|
|
// clone the response and the body if wanted
|
|
respCopy, errCopy := copyResponse(resp, o.CopyRespBody)
|
|
if errCopy != nil {
|
|
l.Error("unable to copy response", log.FError(errCopy))
|
|
return nil, errCopy
|
|
} else if o.CopyRespBody && respCopy.Body != nil {
|
|
// make sure the body is closed again - since it is a NopCloser it does not make a difference though
|
|
defer respCopy.Body.Close()
|
|
}
|
|
|
|
return resp, o.IsSuccessful(err, reqCopy, respCopy)
|
|
})
|
|
|
|
// detect and log a state change
|
|
toState := circuitBreaker.State()
|
|
if fromState != toState {
|
|
l.Warn("state change occurred",
|
|
zap.String("from", fromState.String()),
|
|
zap.String("to", toState.String()),
|
|
)
|
|
}
|
|
|
|
// wrap the error in case it was produced because of the circuit breaker being (half-)open
|
|
if errors.Is(gobreaker.ErrTooManyRequests, err) || errors.Is(gobreaker.ErrOpenState, err) {
|
|
err = keelerrors.NewWrappedError(ErrCircuitBreaker, err)
|
|
}
|
|
|
|
attributes := []attribute.KeyValue{
|
|
attribute.String("current_state", toState.String()),
|
|
attribute.String("previous_state", fromState.String()),
|
|
attribute.Bool("state_change", fromState != toState),
|
|
}
|
|
if err != nil {
|
|
if o.Counter != nil {
|
|
attributes := append(attributes, attribute.Bool("error", true))
|
|
o.Counter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if o.Counter != nil {
|
|
attributes := append(attributes, attribute.Bool("error", false))
|
|
o.Counter.Add(r.Context(), 1, metric.WithAttributes(attributes...))
|
|
}
|
|
|
|
if res, ok := resp.(*http.Response); ok {
|
|
return res, nil
|
|
} else {
|
|
return nil, errors.New("result is no *http.Response")
|
|
}
|
|
}
|
|
}
|
|
}
|