feat: k8s probes integration

This commit is contained in:
Ognjen 2022-03-10 15:25:54 +01:00
parent d8361766c4
commit 528be63a88
8 changed files with 314 additions and 4 deletions

View File

@ -0,0 +1,20 @@
package handler
import (
"context"
"go.uber.org/zap"
)
type Handler struct {
l *zap.Logger
}
func New(l *zap.Logger) *Handler {
return &Handler{l: l}
}
func (h *Handler) Ping(ctx context.Context) error {
h.l.Info("ping")
return nil
}

44
example/probes/main.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"net/http"
"os"
"github.com/foomo/keel"
"github.com/foomo/keel/example/probes/handler"
)
func main() {
// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_HEALTHZ_ENABLED", "true")
svr := keel.NewServer(
// add probes service listening on 0.0.0.0:9400
// allows you to use probes for health checks in cluster: GET 0.0.0.0:9400/healthz
keel.WithHTTPProbesService(false),
)
l := svr.Logger()
// alternatively you can add them manually
svr.AddServices(keel.NewDefaultServiceHTTPZap())
h := handler.New(l)
// Add probe handlers
svr.AddLivelinessProbes(h)
// svr.AddReadinessProbes(h)
// svr.AddStartupProbes(h)
// create demo service
svs := http.NewServeMux()
svs.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
svr.AddService(
keel.NewServiceHTTP(l, "demo", ":8080", svs),
)
svr.Run()
}

View File

@ -139,3 +139,11 @@ func WithHTTPPrometheusService(enabled bool) Option {
}
}
}
func WithHTTPProbesService(enabled bool) Option {
return func(inst *Server) {
if config.GetBool(inst.Config(), "service.probes.enabled", enabled)() {
inst.AddService(NewDefaultServiceHTTPProbes(inst.probes))
}
}
}

27
probes.go Normal file
View File

@ -0,0 +1,27 @@
package keel
import "context"
type Probes map[ProbeType][]interface{}
// BoolProbeFn interface
type BoolProbeFn func() bool
// ErrorProbeFn interface
type ErrorProbeFn func() error
// BoolProbeWithContextFn interface
type BoolProbeWithContextFn func(context.Context) bool
// ErrorProbeWithContextFn interface
type ErrorProbeWithContextFn func(context.Context) error
// ErrorPingProbe interface
type ErrorPingProbe interface {
Ping() error
}
// ErrorPingProbeWithContext interface
type ErrorPingProbeWithContext interface {
Ping(context.Context) error
}

16
probetype.go Normal file
View File

@ -0,0 +1,16 @@
package keel
type ProbeType string
const probesServiceName = "probes"
const (
ProbeTypeAll ProbeType = "all"
ProbeTypeStartup ProbeType = "startup"
ProbeTypeReadiness ProbeType = "readiness"
ProbeTypeLiveliness ProbeType = "liveliness"
)
func (t ProbeType) String() string {
return string(t)
}

View File

@ -37,6 +37,7 @@ type Server struct {
shutdownSignals []os.Signal
shutdownTimeout time.Duration
closers []interface{}
probes Probes
ctx context.Context
l *zap.Logger
c *viper.Viper
@ -46,6 +47,7 @@ func NewServer(opts ...Option) *Server {
inst := &Server{
shutdownTimeout: 5 * time.Second,
shutdownSignals: []os.Signal{os.Interrupt, syscall.SIGTERM},
probes: Probes{},
ctx: context.Background(),
c: config.Config(),
l: log.Logger(),
@ -111,9 +113,12 @@ func (s *Server) Context() context.Context {
// AddService add a single service
func (s *Server) AddService(service Service) {
for _, value := range s.services {
for index, value := range s.services {
if value == service {
return
} else if value.Name() == service.Name() {
s.services[index] = service
return
}
}
s.services = append(s.services, service)
@ -128,7 +133,7 @@ func (s *Server) AddServices(services ...Service) {
// AddCloser adds a closer to be called on shutdown
func (s *Server) AddCloser(closer interface{}) {
switch closer.(type) {
switch t := closer.(type) {
case Closer,
CloserFn,
ErrorCloser,
@ -147,17 +152,60 @@ func (s *Server) AddCloser(closer interface{}) {
ErrorUnsubscriberWithContext:
s.closers = append(s.closers, closer)
default:
s.l.Warn("unable to add closer")
s.l.Warn("unable to add closer", log.FValue(t))
}
}
// AddClosers adds a closer to be called on shutdown
// AddClosers adds the given closers to be called on shutdown
func (s *Server) AddClosers(closers ...interface{}) {
for _, closer := range closers {
s.AddCloser(closer)
}
}
// AddProbe adds a probe to be called on healthz checks
func (s *Server) AddProbe(typ ProbeType, probe interface{}) {
switch t := probe.(type) {
case BoolProbeFn,
ErrorProbeFn,
BoolProbeWithContextFn,
ErrorProbeWithContextFn,
ErrorPingProbe,
ErrorPingProbeWithContext:
s.probes[typ] = append(s.probes[typ], probe)
default:
s.l.Warn("unable to add probe", log.FValue(t))
}
}
// AddProbes adds the given probes to be called on healthz checks
func (s *Server) AddProbes(probes ...interface{}) {
for _, probe := range probes {
s.AddProbe(ProbeTypeAll, probe)
}
}
// AddLivelinessProbes adds the liveliness probes to be called on healthz checks
func (s *Server) AddLivelinessProbes(probes ...interface{}) {
for _, probe := range probes {
s.AddProbe(ProbeTypeLiveliness, probe)
}
}
// AddReadinessProbes adds the readiness probes to be called on healthz checks
func (s *Server) AddReadinessProbes(probes ...interface{}) {
for _, probe := range probes {
s.AddProbe(ProbeTypeReadiness, probe)
}
}
// AddStartupProbes adds the startup probes to be called on healthz checks
func (s *Server) AddStartupProbes(probes ...interface{}) {
for _, probe := range probes {
s.AddProbe(ProbeTypeStartup, probe)
}
}
// Run runs the server
func (s *Server) Run() {
s.l.Info("starting server")
@ -167,6 +215,10 @@ func (s *Server) Run() {
g, gctx := errgroup.WithContext(ctx)
if len(s.probes) > 0 {
s.AddService(NewDefaultServiceHTTPProbes(s.probes))
}
for _, service := range s.services {
service := service
g.Go(func() error {

133
servicehttpprobes.go Normal file
View File

@ -0,0 +1,133 @@
package keel
import (
"context"
"errors"
"net/http"
"go.uber.org/zap"
"github.com/foomo/keel/log"
httputils "github.com/foomo/keel/utils/net/http"
)
const (
DefaultServiceHTTPProbesName = probesServiceName
DefaultServiceHTTPProbesAddr = ":9400"
DefaultServiceHTTPProbesPath = "/healthz"
)
// Probe's handler function
type ProbeHandler func(w http.ResponseWriter, r *http.Request)
func NewServiceHTTPProbes(l *zap.Logger, name, addr, path string, probes Probes) *ServiceHTTP {
handler := http.NewServeMux()
call := func(probe interface{}) bool {
switch h := probe.(type) {
case BoolProbeFn:
return h()
case ErrorProbeFn:
if err := h(); err != nil {
log.WithError(l, err).Error("failed to use probe ErrorHealthFn")
return false
}
case ErrorPingProbe:
if err := h.Ping(); err != nil {
log.WithError(l, err).Error("failed to use probe ErrorPingProbe")
return false
}
case ErrorPingProbeWithContext:
if err := h.Ping(context.Background()); err != nil {
log.WithError(l, err).Error("failed to use probe ErrorHealth")
return false
}
case BoolProbeWithContextFn:
return h(context.Background())
case ErrorProbeWithContextFn:
if err := h(context.Background()); err != nil {
log.WithError(l, err).Error("failed to use probe ErrorHealthWithContext")
return false
}
}
return true
}
handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
for _, values := range probes {
for _, p := range values {
if !call(p) {
httputils.InternalServiceUnavailable(l, w, r, errors.New("not ready yet"))
return
}
}
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
handler.HandleFunc(path+"/"+ProbeTypeLiveliness.String(), func(w http.ResponseWriter, r *http.Request) {
var ps []interface{}
if p, ok := probes[ProbeTypeAll]; ok {
ps = append(ps, p...)
}
if p, ok := probes[ProbeTypeLiveliness]; ok {
ps = append(ps, p...)
}
for _, p := range ps {
if !call(p) {
httputils.InternalServiceUnavailable(l, w, r, errors.New("not ready yet"))
return
}
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
handler.HandleFunc(path+"/"+ProbeTypeReadiness.String(), func(w http.ResponseWriter, r *http.Request) {
var ps []interface{}
if p, ok := probes[ProbeTypeAll]; ok {
ps = append(ps, p...)
}
if p, ok := probes[ProbeTypeReadiness]; ok {
ps = append(ps, p...)
}
for _, p := range ps {
if !call(p) {
httputils.InternalServiceUnavailable(l, w, r, errors.New("not ready yet"))
return
}
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
handler.HandleFunc(path+"/"+ProbeTypeStartup.String(), func(w http.ResponseWriter, r *http.Request) {
var ps []interface{}
if p, ok := probes[ProbeTypeAll]; ok {
ps = append(ps, p...)
}
if p, ok := probes[ProbeTypeStartup]; ok {
ps = append(ps, p...)
}
for _, p := range ps {
if !call(p) {
httputils.InternalServiceUnavailable(l, w, r, errors.New("not ready yet"))
return
}
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
return NewServiceHTTP(l, name, addr, handler)
}
func NewDefaultServiceHTTPProbes(probes Probes) *ServiceHTTP {
return NewServiceHTTPProbes(
log.Logger(),
DefaultServiceHTTPProbesName,
DefaultServiceHTTPProbesAddr,
DefaultServiceHTTPProbesPath,
probes,
)
}

View File

@ -13,6 +13,16 @@ func InternalServerError(l *zap.Logger, w http.ResponseWriter, r *http.Request,
ServerError(l, w, r, http.StatusInternalServerError, err)
}
// InternalServiceUnavailable http response
func InternalServiceUnavailable(l *zap.Logger, w http.ResponseWriter, r *http.Request, err error) {
ServerError(l, w, r, http.StatusServiceUnavailable, err)
}
// InternalServiceTooEarly http response
func InternalServiceTooEarly(l *zap.Logger, w http.ResponseWriter, r *http.Request, err error) {
ServerError(l, w, r, http.StatusTooEarly, err)
}
// UnauthorizedServerError http response
func UnauthorizedServerError(l *zap.Logger, w http.ResponseWriter, r *http.Request, err error) {
ServerError(l, w, r, http.StatusUnauthorized, err)