diff --git a/errors.go b/errors.go index 02d3e6f..60f162a 100644 --- a/errors.go +++ b/errors.go @@ -6,4 +6,5 @@ import ( var ( ErrServerNotRunning = errors.New("server not running") + ErrServerShutdown = errors.New("server is shutting down") ) diff --git a/examples/graceful/main.go b/examples/graceful/main.go index 0e50152..f1bbe61 100644 --- a/examples/graceful/main.go +++ b/examples/graceful/main.go @@ -3,7 +3,6 @@ package main import ( "context" "net/http" - "sync" "syscall" "time" @@ -12,107 +11,76 @@ import ( "go.uber.org/zap" "github.com/foomo/keel" - "github.com/foomo/keel/log" ) func main() { + service.DefaultHTTPHealthzAddr = "localhost:9400" + + l := zap.NewExample().Named("root") + + l.Info("1. starting readiness checks") + go call(l.Named("readiness"), "http://localhost:9400/healthz/readiness") + svr := keel.NewServer( - //keel.WithLogger(zap.NewExample()), - keel.WithHTTPZapService(true), - keel.WithHTTPViperService(true), - keel.WithHTTPPrometheusService(true), + keel.WithLogger(l.Named("server")), keel.WithHTTPHealthzService(true), ) - l := svr.Logger() - - go waitGroup(svr.CancelContext(), l.With(log.FServiceName("waitGroup"))) - // create demo service svs := http.NewServeMux() svs.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - l.Info("handling request...") - time.Sleep(3 * time.Second) - w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("OK")) - l.Info("... handled request") }) svr.AddService( - service.NewHTTP(l, "demo", "localhost:8080", svs), + service.NewHTTP(l, "http", "localhost:8080", svs), ) - svr.AddCloser(interfaces.CloseFunc(func(ctx context.Context) error { - l.Info("custom closer") + svr.AddCloser(interfaces.CloserFunc(func(ctx context.Context) error { + l := l.Named("closer") + l.Info("closing stuff") + time.Sleep(3 * time.Second) + l.Info("done closing stuff") return nil })) - go svr.Run() - time.Sleep(1 * time.Second) - l.Info("1. starting test") - - { - l.Info("2. checking healthz") - readiness(l, "http://localhost:9400/healthz/readiness") - } - go func() { - l.Info("2. sending request") - if r, err := http.Get("http://localhost:8080"); err != nil { + + l.Info("3. starting http checks") + go call(l.Named("http"), "http://localhost:8080") + + l.Info("4. sleeping for 5 seconds") + time.Sleep(5 * time.Second) + + l.Info("5. sending shutdown signal") + if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil { l.Fatal(err.Error()) - } else { - l.Info(" /", zap.Int("status", r.StatusCode)) } + }() - time.Sleep(100 * time.Millisecond) - l.Info("3. sending shutdown signal") - if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil { - l.Fatal(err.Error()) - } - - { - l.Info("2. checking healthz") - readiness(l, "http://localhost:9400/healthz/readiness") - } - - l.Info("4. waiting for shutdown") - time.Sleep(10 * time.Second) - l.Info(" done") + svr.Run() + l.Info("done") } -func readiness(l *zap.Logger, url string) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - l.Error(err.Error()) - return - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - l.Error(err.Error()) - return - } - l.Info(url, zap.Int("status", resp.StatusCode)) -} - -func waitGroup(ctx context.Context, l *zap.Logger) { - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - l.Info("Break the loop") +func call(l *zap.Logger, url string) { + l = l.With(zap.String("url", url)) + for { + func() { + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + l.With(zap.Error(err)).Error("failed to create request") return - case <-time.After(3 * time.Second): - l.Info("Hello in a loop") } - } - }() - - wg.Wait() + resp, err := http.DefaultClient.Do(req) + if err != nil { + l.With(zap.Error(err)).Error("failed to send request") + return + } + l.Info("ok", zap.Int("status", resp.StatusCode)) + }() + time.Sleep(time.Second) + } } diff --git a/examples/healthz/main.go b/examples/healthz/main.go index 8c5588e..78c539c 100644 --- a/examples/healthz/main.go +++ b/examples/healthz/main.go @@ -15,12 +15,12 @@ import ( // See k8s for probe documentation // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#types-of-probe func main() { + service.DefaultHTTPHealthzAddr = "localhost:9400" + // you can override the below config by settings env vars _ = os.Setenv("SERVICE_HEALTHZ_ENABLED", "true") svr := keel.NewServer( - keel.WithHTTPZapService(true), - keel.WithHTTPViperService(true), // allows you to use probes for health checks in cluster: // GET :9400/healthz // GET :9400/healthz/readiness diff --git a/option.go b/option.go index b00a486..709513a 100644 --- a/option.go +++ b/option.go @@ -62,6 +62,13 @@ func WithShutdownSignals(shutdownSignals ...os.Signal) Option { } } +// WithGracefulTimeout option +func WithGracefulTimeout(gracefulTimeout time.Duration) Option { + return func(inst *Server) { + inst.gracefulTimeout = gracefulTimeout + } +} + // WithShutdownTimeout option func WithShutdownTimeout(shutdownTimeout time.Duration) Option { return func(inst *Server) { diff --git a/server.go b/server.go index abd55b4..d987ec4 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package keel import ( "context" + "errors" "fmt" "net/http" "os" @@ -13,13 +14,16 @@ import ( "syscall" "time" + "github.com/foomo/keel/config" + "github.com/foomo/keel/env" "github.com/foomo/keel/healthz" "github.com/foomo/keel/interfaces" + "github.com/foomo/keel/log" "github.com/foomo/keel/markdown" "github.com/foomo/keel/metrics" "github.com/foomo/keel/service" + "github.com/foomo/keel/telemetry" "github.com/go-logr/logr" - "github.com/pkg/errors" "github.com/spf13/viper" otelhost "go.opentelemetry.io/contrib/instrumentation/host" otelruntime "go.opentelemetry.io/contrib/instrumentation/runtime" @@ -29,22 +33,21 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" - - "github.com/foomo/keel/config" - "github.com/foomo/keel/env" - "github.com/foomo/keel/log" - "github.com/foomo/keel/telemetry" ) // Server struct type Server struct { - services []Service - initServices []Service - meter metric.Meter - meterProvider metric.MeterProvider - tracer trace.Tracer - traceProvider trace.TracerProvider - shutdownSignals []os.Signal + services []Service + initServices []Service + meter metric.Meter + meterProvider metric.MeterProvider + tracer trace.Tracer + traceProvider trace.TracerProvider + shutdown atomic.Bool + shutdownSignals []os.Signal + // gracefulTimeout should equal the readinessProbe's periodSeconds * failureThreshold + gracefulTimeout time.Duration + // shutdownTimeout should equal the readinessProbe's terminationGracePeriodSeconds shutdownTimeout time.Duration running atomic.Bool syncClosers []interface{} @@ -64,6 +67,7 @@ type Server struct { func NewServer(opts ...Option) *Server { inst := &Server{ + gracefulTimeout: 10 * 3 * time.Second, shutdownTimeout: 30 * time.Second, shutdownSignals: []os.Signal{syscall.SIGTERM}, syncReadmers: []interfaces.Readmer{}, @@ -78,21 +82,42 @@ func NewServer(opts ...Option) *Server { } { // setup error group + inst.AddReadinessHealthzers(healthz.NewHealthzerFn(func(ctx context.Context) error { + if inst.shutdown.Load() { + return ErrServerShutdown + } + return nil + })) + inst.ctxCancel, inst.ctxCancelFn = signal.NotifyContext(inst.ctx, inst.shutdownSignals...) inst.g, inst.gCtx = errgroup.WithContext(inst.ctxCancel) // gracefully shutdown inst.g.Go(func() error { <-inst.gCtx.Done() - inst.l.Debug("keel graceful shutdown") defer inst.ctxCancelFn() + inst.l.Info("keel graceful shutdown") - timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctx, inst.shutdownTimeout) + timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctxCancel, inst.shutdownTimeout) defer timeoutCancel() + inst.shutdown.Store(true) + + inst.l.Info("keel pausing graceful shutdown", log.FDuration(inst.gracefulTimeout)) + { + timer := time.NewTimer(inst.gracefulTimeout) + select { + case <-timeoutCtx.Done(): + timer.Stop() + case <-timer.C: + } + } + inst.l.Info("keel resuming graceful shutdown") + // append internal closers closers := append(inst.closers(), inst.traceProvider, inst.meterProvider) + inst.l.Debug("keel iterating closers") for _, closer := range closers { l := inst.l.With(log.FName(fmt.Sprintf("%T", closer))) switch c := closer.(type) { @@ -146,7 +171,10 @@ func NewServer(opts ...Option) *Server { } } } - return inst.gCtx.Err() + + inst.l.Debug("keel done closing") + + return nil }) } @@ -307,9 +335,9 @@ func (s *Server) AddReadinessHealthzers(probes ...interface{}) { } // IsCanceled returns true if the internal errgroup has been canceled -func (s *Server) IsCanceled() bool { - return errors.Is(s.gCtx.Err(), context.Canceled) -} +// func (s *Server) IsCanceled() bool { +// return errors.Is(s.gCtx.Err(), context.Canceled) +// } // Healthz returns true if the server is running func (s *Server) Healthz() error { @@ -321,12 +349,12 @@ func (s *Server) Healthz() error { // Run runs the server func (s *Server) Run() { - if s.IsCanceled() { - s.l.Info("keel server canceled") - return - } + // if s.IsCanceled() { + // s.l.Info("keel server canceled") + // return + // } - defer s.ctxCancelFn() + // defer s.ctxCancelFn() s.l.Info("starting keel server") // start services