mirror of
https://github.com/foomo/keel.git
synced 2025-10-16 12:35:34 +00:00
feat: code clean up
This commit is contained in:
parent
690c55482b
commit
94116ea1d2
@ -1,19 +1,14 @@
|
||||
package roundtripware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
keelerrors "github.com/foomo/keel/errors"
|
||||
"github.com/foomo/keel/log"
|
||||
"github.com/sony/gobreaker"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
@ -54,39 +49,31 @@ type CircuitBreakerSettings struct {
|
||||
OnStateChange func(name string, from gobreaker.State, to gobreaker.State)
|
||||
}
|
||||
|
||||
type circuitBreakerOptions struct {
|
||||
stateMeter metric.Meter
|
||||
stateMeterName string
|
||||
stateMeterDescription string
|
||||
type CircuitBreakerOptions struct {
|
||||
StateCounter syncint64.Counter
|
||||
|
||||
successMeter metric.Meter
|
||||
successMeterName string
|
||||
successMeterDescription string
|
||||
SuccessCounter syncint64.Counter
|
||||
|
||||
isSuccessful func(err error, req *http.Request, resp *http.Response) error
|
||||
copyReqBody bool
|
||||
copyRespBody bool
|
||||
IsSuccessful func(err error, req *http.Request, resp *http.Response) error
|
||||
CopyReqBody bool
|
||||
CopyRespBody bool
|
||||
}
|
||||
|
||||
func newDefaultCircuitBreakerOptions() *circuitBreakerOptions {
|
||||
return &circuitBreakerOptions{
|
||||
stateMeter: nil,
|
||||
stateMeterName: "",
|
||||
stateMeterDescription: "",
|
||||
func NewDefaultCircuitBreakerOptions() *CircuitBreakerOptions {
|
||||
return &CircuitBreakerOptions{
|
||||
StateCounter: nil,
|
||||
|
||||
successMeter: nil,
|
||||
successMeterName: "",
|
||||
successMeterDescription: "",
|
||||
SuccessCounter: nil,
|
||||
|
||||
isSuccessful: func(err error, req *http.Request, resp *http.Response) error {
|
||||
IsSuccessful: func(err error, req *http.Request, resp *http.Response) error {
|
||||
return err
|
||||
},
|
||||
copyReqBody: false,
|
||||
copyRespBody: false,
|
||||
CopyReqBody: false,
|
||||
CopyRespBody: false,
|
||||
}
|
||||
}
|
||||
|
||||
type CircuitBreakerOption func(opts *circuitBreakerOptions)
|
||||
type CircuitBreakerOption func(opts *CircuitBreakerOptions)
|
||||
|
||||
// CircuitBreakerWithSuccessMetric adds a metric that counts the state changes of the circuit breaker
|
||||
func CircuitBreakerWithStateChangeMetric(
|
||||
@ -94,10 +81,17 @@ func CircuitBreakerWithStateChangeMetric(
|
||||
stateMeterName string,
|
||||
stateMeterDescription string,
|
||||
) CircuitBreakerOption {
|
||||
return func(opts *circuitBreakerOptions) {
|
||||
opts.stateMeter = stateMeter
|
||||
opts.stateMeterName = stateMeterName
|
||||
opts.stateMeterDescription = stateMeterDescription
|
||||
// intitialize the state change counter
|
||||
stateCounter, err := stateMeter.SyncInt64().Counter(
|
||||
stateMeterName,
|
||||
instrument.WithDescription(stateMeterDescription),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return func(opts *CircuitBreakerOptions) {
|
||||
opts.StateCounter = stateCounter
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,10 +101,17 @@ func CircuitBreakerWithSuccessMetric(
|
||||
successMeterName string,
|
||||
successMeterDescription string,
|
||||
) CircuitBreakerOption {
|
||||
return func(opts *circuitBreakerOptions) {
|
||||
opts.successMeter = successMeter
|
||||
opts.successMeterName = successMeterName
|
||||
opts.successMeterDescription = successMeterDescription
|
||||
// intitialize the success counter
|
||||
successCounter, err := successMeter.SyncInt64().Counter(
|
||||
successMeterName,
|
||||
instrument.WithDescription(successMeterDescription),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return func(opts *CircuitBreakerOptions) {
|
||||
opts.SuccessCounter = successCounter
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,10 +120,10 @@ func CircuitBreakerWithIsSuccessful(
|
||||
copyReqBody bool,
|
||||
copyRespBody bool,
|
||||
) CircuitBreakerOption {
|
||||
return func(opts *circuitBreakerOptions) {
|
||||
opts.isSuccessful = isSuccessful
|
||||
opts.copyReqBody = copyReqBody
|
||||
opts.copyRespBody = copyRespBody
|
||||
return func(opts *CircuitBreakerOptions) {
|
||||
opts.IsSuccessful = isSuccessful
|
||||
opts.CopyReqBody = copyReqBody
|
||||
opts.CopyRespBody = copyRespBody
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,37 +133,11 @@ func CircuitBreakerWithIsSuccessful(
|
||||
// ments of the execution time will otherwise be falsified
|
||||
func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) RoundTripware {
|
||||
// intitialize the options
|
||||
o := newDefaultCircuitBreakerOptions()
|
||||
o := NewDefaultCircuitBreakerOptions()
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
// intitialize the state change counter
|
||||
var stateCounter syncint64.Counter
|
||||
if o.stateMeter != nil {
|
||||
var err error
|
||||
stateCounter, err = o.stateMeter.SyncInt64().Counter(
|
||||
o.stateMeterName,
|
||||
instrument.WithDescription(o.stateMeterDescription),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// intitialize the state (un-)success counter
|
||||
var successCounter syncint64.Counter
|
||||
if o.successMeter != nil {
|
||||
var err error
|
||||
successCounter, err = o.successMeter.SyncInt64().Counter(
|
||||
o.successMeterName,
|
||||
instrument.WithDescription(o.successMeterDescription),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// intitialize the gobreaker
|
||||
cbrSettings := gobreaker.Settings{
|
||||
Name: set.Name,
|
||||
@ -176,24 +151,16 @@ func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) R
|
||||
|
||||
return func(l *zap.Logger, next Handler) Handler {
|
||||
return func(r *http.Request) (*http.Response, error) {
|
||||
var (
|
||||
ctx context.Context
|
||||
labeler *otelhttp.Labeler
|
||||
)
|
||||
if stateCounter != nil || successCounter != nil {
|
||||
ctx, labeler = LabelerFromContext(r.Context())
|
||||
r = r.WithContext(ctx)
|
||||
}
|
||||
|
||||
// we need to detect the state change by ourselves, because the context does not allow us to hand in a context
|
||||
fromState := circuitBreaker.State()
|
||||
|
||||
// clone the request and the body if wanted
|
||||
reqCopy, errCopy := copyRequest(r, o.copyReqBody)
|
||||
reqCopy, errCopy := copyRequest(r, o.CopyReqBody)
|
||||
if errCopy != nil {
|
||||
l.Error("unable to copy request", log.FError(errCopy))
|
||||
return nil, errCopy
|
||||
} else if o.copyReqBody && reqCopy.Body != nil {
|
||||
} else if o.CopyReqBody && reqCopy.Body != nil {
|
||||
// make sure the body is closed again - since it is a NopCloser it does not make a difference though
|
||||
defer reqCopy.Body.Close()
|
||||
}
|
||||
@ -203,16 +170,16 @@ func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) R
|
||||
resp, err := next(r)
|
||||
|
||||
// clone the response and the body if wanted
|
||||
respCopy, errCopy := copyResponse(resp, o.copyRespBody)
|
||||
respCopy, errCopy := copyResponse(resp, o.CopyRespBody)
|
||||
if errCopy != nil {
|
||||
l.Error("unable to copy response", log.FError(errCopy))
|
||||
return nil, errCopy
|
||||
} else if o.copyRespBody && respCopy.Body != nil {
|
||||
} else if o.CopyRespBody && respCopy.Body != nil {
|
||||
// make sure the body is closed again - since it is a NopCloser it does not make a difference though
|
||||
defer respCopy.Body.Close()
|
||||
}
|
||||
|
||||
return resp, o.isSuccessful(err, reqCopy, respCopy)
|
||||
return resp, o.IsSuccessful(err, reqCopy, respCopy)
|
||||
})
|
||||
|
||||
// detect and log a state change
|
||||
@ -224,12 +191,11 @@ func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) R
|
||||
)
|
||||
|
||||
// recording the metric if desired
|
||||
if stateCounter != nil {
|
||||
attributes := append(
|
||||
labeler.Get(),
|
||||
if o.StateCounter != nil {
|
||||
attributes := []attribute.KeyValue{
|
||||
attribute.String("state_change", fmt.Sprintf("%s -> %s", fromState.String(), toState.String())),
|
||||
)
|
||||
stateCounter.Add(ctx, 1, attributes...)
|
||||
}
|
||||
o.StateCounter.Add(r.Context(), 1, attributes...)
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,22 +205,20 @@ func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) R
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if successCounter != nil {
|
||||
attributes := append(
|
||||
labeler.Get(),
|
||||
if o.SuccessCounter != nil {
|
||||
attributes := []attribute.KeyValue{
|
||||
attribute.Bool("success", false),
|
||||
)
|
||||
successCounter.Add(ctx, 1, attributes...)
|
||||
}
|
||||
o.SuccessCounter.Add(r.Context(), 1, attributes...)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if successCounter != nil {
|
||||
attributes := append(
|
||||
labeler.Get(),
|
||||
if o.SuccessCounter != nil {
|
||||
attributes := []attribute.KeyValue{
|
||||
attribute.Bool("success", true),
|
||||
)
|
||||
successCounter.Add(ctx, 1, attributes...)
|
||||
}
|
||||
o.SuccessCounter.Add(r.Context(), 1, attributes...)
|
||||
}
|
||||
|
||||
if res, ok := resp.(*http.Response); ok {
|
||||
@ -265,92 +229,3 @@ func CircuitBreaker(set *CircuitBreakerSettings, opts ...CircuitBreakerOption) R
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// errNoBody is a sentinel error value used by failureToReadBody so we
|
||||
// can detect that the lack of body was intentional.
|
||||
var errNoBody = errors.New("sentinel error value")
|
||||
|
||||
// failureToReadBody is an io.ReadCloser that just returns errNoBody on
|
||||
// Read. It's swapped in when we don't actually want to consume
|
||||
// the body, but need a non-nil one, and want to distinguish the
|
||||
// error from reading the dummy body.
|
||||
type failureToReadBody struct{}
|
||||
|
||||
func (failureToReadBody) Read([]byte) (int, error) { return 0, errNoBody }
|
||||
func (failureToReadBody) Close() error { return nil }
|
||||
|
||||
// emptyBody is an instance of empty reader.
|
||||
var emptyBody = io.NopCloser(strings.NewReader(""))
|
||||
|
||||
func copyRequest(req *http.Request, body bool) (*http.Request, error) {
|
||||
// we don't care about the context, since it is only used for the isSuccessful check
|
||||
out := req.Clone(context.Background())
|
||||
|
||||
// duplicate the body
|
||||
if !body {
|
||||
// For content length of zero. Make sure the body is an empty
|
||||
// reader, instead of returning error through failureToReadBody{}.
|
||||
if req.ContentLength == 0 {
|
||||
out.Body = emptyBody
|
||||
} else {
|
||||
// if it is attempted to read from the body in isSuccessful we actually want the read to fail
|
||||
out.Body = failureToReadBody{}
|
||||
}
|
||||
} else if req.Body == nil {
|
||||
req.Body = nil
|
||||
out.Body = nil
|
||||
} else {
|
||||
var err error
|
||||
out.Body, req.Body, err = drainBody(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func copyResponse(resp *http.Response, body bool) (*http.Response, error) {
|
||||
// we don't care about the context, since it is only used for the isSuccessful check
|
||||
out := new(http.Response)
|
||||
*out = *resp
|
||||
|
||||
// duplicate the body
|
||||
if !body {
|
||||
// For content length of zero. Make sure the body is an empty
|
||||
// reader, instead of returning error through failureToReadBody{}.
|
||||
if resp.ContentLength == 0 {
|
||||
out.Body = emptyBody
|
||||
} else {
|
||||
// if it is attempted to read from the body in isSuccessful we actually want the read to fail
|
||||
out.Body = failureToReadBody{}
|
||||
}
|
||||
} else if resp.Body == nil {
|
||||
out.Body = nil
|
||||
} else {
|
||||
var err error
|
||||
out.Body, resp.Body, err = drainBody(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// copied from httputil
|
||||
func drainBody(b io.ReadCloser) (io.ReadCloser, io.ReadCloser, error) {
|
||||
var err error
|
||||
if b == nil || b == http.NoBody {
|
||||
// No copying needed. Preserve the magic sentinel meaning of NoBody.
|
||||
return http.NoBody, http.NoBody, nil
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
if _, err = buf.ReadFrom(b); err != nil {
|
||||
return nil, b, err
|
||||
}
|
||||
if err = b.Close(); err != nil {
|
||||
return nil, b, err
|
||||
}
|
||||
return io.NopCloser(&buf), io.NopCloser(bytes.NewReader(buf.Bytes())), nil
|
||||
}
|
||||
|
||||
@ -2,8 +2,11 @@ package roundtripware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
@ -42,3 +45,92 @@ func readBodyPretty(contentType string, original io.ReadCloser) (io.ReadCloser,
|
||||
// return copy of the original
|
||||
return io.NopCloser(strings.NewReader(bs.String())), body
|
||||
}
|
||||
|
||||
// errNoBody is a sentinel error value used by failureToReadBody so we
|
||||
// can detect that the lack of body was intentional.
|
||||
var errNoBody = errors.New("sentinel error value")
|
||||
|
||||
// failureToReadBody is an io.ReadCloser that just returns errNoBody on
|
||||
// Read. It's swapped in when we don't actually want to consume
|
||||
// the body, but need a non-nil one, and want to distinguish the
|
||||
// error from reading the dummy body.
|
||||
type failureToReadBody struct{}
|
||||
|
||||
func (failureToReadBody) Read([]byte) (int, error) { return 0, errNoBody }
|
||||
func (failureToReadBody) Close() error { return nil }
|
||||
|
||||
// emptyBody is an instance of empty reader.
|
||||
var emptyBody = io.NopCloser(strings.NewReader(""))
|
||||
|
||||
func copyRequest(req *http.Request, body bool) (*http.Request, error) {
|
||||
// we don't care about the context, since it is only used for the isSuccessful check
|
||||
out := req.Clone(context.Background())
|
||||
|
||||
// duplicate the body
|
||||
if !body {
|
||||
// For content length of zero. Make sure the body is an empty
|
||||
// reader, instead of returning error through failureToReadBody{}.
|
||||
if req.ContentLength == 0 {
|
||||
out.Body = emptyBody
|
||||
} else {
|
||||
// if it is attempted to read from the body in isSuccessful we actually want the read to fail
|
||||
out.Body = failureToReadBody{}
|
||||
}
|
||||
} else if req.Body == nil {
|
||||
req.Body = nil
|
||||
out.Body = nil
|
||||
} else {
|
||||
var err error
|
||||
out.Body, req.Body, err = drainBody(req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func copyResponse(resp *http.Response, body bool) (*http.Response, error) {
|
||||
// we don't care about the context, since it is only used for the isSuccessful check
|
||||
out := new(http.Response)
|
||||
*out = *resp
|
||||
|
||||
// duplicate the body
|
||||
if !body {
|
||||
// For content length of zero. Make sure the body is an empty
|
||||
// reader, instead of returning error through failureToReadBody{}.
|
||||
if resp.ContentLength == 0 {
|
||||
out.Body = emptyBody
|
||||
} else {
|
||||
// if it is attempted to read from the body in isSuccessful we actually want the read to fail
|
||||
out.Body = failureToReadBody{}
|
||||
}
|
||||
} else if resp.Body == nil {
|
||||
out.Body = nil
|
||||
} else {
|
||||
var err error
|
||||
out.Body, resp.Body, err = drainBody(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// copied from httputil
|
||||
func drainBody(b io.ReadCloser) (io.ReadCloser, io.ReadCloser, error) {
|
||||
var err error
|
||||
if b == nil || b == http.NoBody {
|
||||
// No copying needed. Preserve the magic sentinel meaning of NoBody.
|
||||
return http.NoBody, http.NoBody, nil
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
if _, err = buf.ReadFrom(b); err != nil {
|
||||
return nil, b, err
|
||||
}
|
||||
if err = b.Close(); err != nil {
|
||||
return nil, b, err
|
||||
}
|
||||
return io.NopCloser(&buf), io.NopCloser(bytes.NewReader(buf.Bytes())), nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user