diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..f9d28eb --- /dev/null +++ b/errors.go @@ -0,0 +1,10 @@ +package keel + +import ( + "errors" +) + +var ( + ErrServerNotRunning = errors.New("server not running") + ErrServiceNotRunning = errors.New("service not running") +) diff --git a/example/probes/main.go b/example/probes/main.go index 189a40e..67e216a 100644 --- a/example/probes/main.go +++ b/example/probes/main.go @@ -3,6 +3,7 @@ package main import ( "net/http" "os" + "time" "github.com/foomo/keel" "github.com/foomo/keel/example/probes/handler" @@ -33,6 +34,11 @@ func main() { _, _ = w.Write([]byte("OK")) }) + l.Info("doing some initialization") + time.Sleep(10 * time.Second) + l.Info("initialization done") + + // TODO wait for services to be started svr.AddService( keel.NewServiceHTTP(l, "demo", "localhost:8080", svs), ) diff --git a/option.go b/option.go index 88802ba..41d7ca2 100644 --- a/option.go +++ b/option.go @@ -143,7 +143,10 @@ func WithHTTPPrometheusService(enabled bool) Option { func WithHTTPProbesService(enabled bool) Option { return func(inst *Server) { if config.GetBool(inst.Config(), "service.probes.enabled", enabled)() { - inst.AddService(NewDefaultServiceHTTPProbes(inst.probes)) + service := NewDefaultServiceHTTPProbes(inst.probes) + inst.initServices = append(inst.initServices, service) + inst.AddAnyProbes(service) + inst.AddCloser(service) } } } diff --git a/server.go b/server.go index 7ec988c..32647c1 100644 --- a/server.go +++ b/server.go @@ -30,22 +30,27 @@ import ( // Server struct type Server struct { services []Service + initServices []Service meter metric.MeterMust meterProvider metric.MeterProvider tracer trace.Tracer traceProvider trace.TracerProvider shutdownSignals []os.Signal shutdownTimeout time.Duration + running bool closers []interface{} probes Probes ctx context.Context + ctxCancel context.CancelFunc + g *errgroup.Group + gCtx context.Context l *zap.Logger c *viper.Viper } func NewServer(opts ...Option) *Server { inst := &Server{ - shutdownTimeout: 5 * time.Second, + shutdownTimeout: 30 * time.Second, shutdownSignals: []os.Signal{os.Interrupt, syscall.SIGTERM}, probes: Probes{}, ctx: context.Background(), @@ -57,31 +62,99 @@ func NewServer(opts ...Option) *Server { opt(inst) } - var err error + { // setup error group + var ctx context.Context + ctx, inst.ctxCancel = signal.NotifyContext(inst.ctx, inst.shutdownSignals...) + inst.g, inst.gCtx = errgroup.WithContext(ctx) - // set otel error handler - otel.SetLogger(logr.New(telemetry.NewLogger(inst.l))) - otel.SetErrorHandler(telemetry.NewErrorHandler(inst.l)) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + // gracefully shutdown + inst.g.Go(func() error { + <-inst.gCtx.Done() + inst.l.Debug("keel graceful shutdown") + defer inst.ctxCancel() - if inst.meterProvider == nil { - inst.meterProvider, err = telemetry.NewNoopMeterProvider() - log.Must(inst.l, err, "failed to create meter provider") - } else if env.GetBool("OTEL_ENABLED", false) { - if env.GetBool("OTEL_METRICS_HOST_ENABLED", false) { - log.Must(inst.l, otelhost.Start(), "failed to start otel host metrics") - } - if env.GetBool("OTEL_METRICS_RUNTIME_ENABLED", false) { - log.Must(inst.l, otelruntime.Start(), "failed to start otel runtime metrics") - } + timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctx, inst.shutdownTimeout) + defer timeoutCancel() + + // append internal closers + closers := append(inst.closers, inst.traceProvider, inst.meterProvider) //nolint:gocritic + + for _, closer := range closers { + l := inst.l.With(log.FName(fmt.Sprintf("%T", closer))) + switch c := closer.(type) { + case Closer: + c.Close() + case ErrorCloser: + if err := c.Close(); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorCloser") + } + case CloserWithContext: + c.Close(timeoutCtx) + case ErrorCloserWithContext: + if err := c.Close(timeoutCtx); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorCloserWithContext") + } + case Shutdowner: + c.Shutdown() + case ErrorShutdowner: + if err := c.Shutdown(); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorShutdowner") + } + case ShutdownerWithContext: + c.Shutdown(timeoutCtx) + case ErrorShutdownerWithContext: + if err := c.Shutdown(timeoutCtx); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorShutdownerWithContext") + } + case Unsubscriber: + c.Unsubscribe() + case ErrorUnsubscriber: + if err := c.Unsubscribe(); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriber") + } + case UnsubscriberWithContext: + c.Unsubscribe(timeoutCtx) + case ErrorUnsubscriberWithContext: + if err := c.Unsubscribe(timeoutCtx); err != nil { + log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriberWithContext") + } + } + } + return inst.gCtx.Err() + }) } - inst.meter = telemetry.MustMeter() - if inst.traceProvider == nil { - inst.traceProvider, err = telemetry.NewNoopTraceProvider() - log.Must(inst.l, err, "failed to create tracer provider") + { // setup telemetry + var err error + otel.SetLogger(logr.New(telemetry.NewLogger(inst.l))) + otel.SetErrorHandler(telemetry.NewErrorHandler(inst.l)) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + + if inst.meterProvider == nil { + inst.meterProvider, err = telemetry.NewNoopMeterProvider() + log.Must(inst.l, err, "failed to create meter provider") + } else if env.GetBool("OTEL_ENABLED", false) { + if env.GetBool("OTEL_METRICS_HOST_ENABLED", false) { + log.Must(inst.l, otelhost.Start(), "failed to start otel host metrics") + } + if env.GetBool("OTEL_METRICS_RUNTIME_ENABLED", false) { + log.Must(inst.l, otelruntime.Start(), "failed to start otel runtime metrics") + } + } + inst.meter = telemetry.MustMeter() + + if inst.traceProvider == nil { + inst.traceProvider, err = telemetry.NewNoopTraceProvider() + log.Must(inst.l, err, "failed to create tracer provider") + } + inst.tracer = telemetry.Tracer() } - inst.tracer = telemetry.Tracer() + + // add probe + inst.AddAnyProbes(inst) + + // start init services + inst.startService(inst.initServices...) return inst } @@ -113,15 +186,14 @@ func (s *Server) Context() context.Context { // AddService add a single service func (s *Server) AddService(service Service) { - for index, value := range s.services { + for _, value := range s.services { if value == service { return - } else if value.Name() == service.Name() { - s.services[index] = service - return } } s.services = append(s.services, service) + s.AddAnyProbes(service) + s.AddCloser(service) } // AddServices adds multiple service @@ -133,6 +205,11 @@ func (s *Server) AddServices(services ...Service) { // AddCloser adds a closer to be called on shutdown func (s *Server) AddCloser(closer interface{}) { + for _, value := range s.closers { + if value == closer { + return + } + } switch closer.(type) { case Closer, ErrorCloser, @@ -170,7 +247,7 @@ func (s *Server) AddProbe(typ ProbeType, probe interface{}) { ErrorPingProbeWithContext: s.probes[typ] = append(s.probes[typ], probe) default: - s.l.Warn("unable to add probe", log.FValue(fmt.Sprintf("%T", probe))) + s.l.Debug("not a probe", log.FValue(fmt.Sprintf("%T", probe))) } } @@ -201,22 +278,51 @@ func (s *Server) AddReadinessProbes(probes ...interface{}) { s.AddProbes(ProbeTypeReadiness, probes...) } +// IsCanceled returns true if the internal errgroup has been canceled +func (s *Server) IsCanceled() bool { + return errors.Is(s.gCtx.Err(), context.Canceled) +} + +// Healthz returns true if the server is running +func (s *Server) Healthz() error { + if !s.running { + return ErrServerNotRunning + } + return nil +} + // Run runs the server func (s *Server) Run() { - s.l.Info("starting server") - - ctx, stop := signal.NotifyContext(s.ctx, s.shutdownSignals...) - defer stop() - - g, gctx := errgroup.WithContext(ctx) - - if len(s.probes) > 0 { - s.AddService(NewDefaultServiceHTTPProbes(s.probes)) + if s.IsCanceled() { + s.l.Info("keel server canceled") + return } - for _, service := range s.services { + defer s.ctxCancel() + s.l.Info("starting keel server") + + // start services + s.startService(s.services...) + + // set running + defer func() { + s.running = false + }() + s.running = true + + // wait for shutdown + if err := s.g.Wait(); err != nil && !errors.Is(err, context.Canceled) { + log.WithError(s.l, err).Error("service error") + } + + s.l.Info("keel server stopped") +} + +// startService starts the given services +func (s *Server) startService(services ...Service) { + for _, service := range services { service := service - g.Go(func() error { + s.g.Go(func() error { if err := service.Start(s.ctx); errors.Is(err, http.ErrServerClosed) { log.WithError(s.l, err).Debug("server has closed") } else if err != nil { @@ -225,78 +331,5 @@ func (s *Server) Run() { } return nil }) - // register started service - s.AddCloser(service) } - - // gracefully shutdown servers - g.Go(func() error { - <-gctx.Done() - s.l.Debug("gracefully stopping closers...") - - timeoutCtx, timeoutCancel := context.WithTimeout( - context.Background(), - s.shutdownTimeout, - ) - defer timeoutCancel() - - // append internal closers - closers := append(s.closers, s.traceProvider, s.meterProvider) //nolint:gocritic - - for _, closer := range closers { - switch c := closer.(type) { - case Closer: - c.Close() - case ErrorCloser: - if err := c.Close(); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorCloser") - continue - } - case CloserWithContext: - c.Close(timeoutCtx) - case ErrorCloserWithContext: - if err := c.Close(timeoutCtx); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorCloserWithContext") - continue - } - case Shutdowner: - c.Shutdown() - case ErrorShutdowner: - if err := c.Shutdown(); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorShutdowner") - continue - } - case ShutdownerWithContext: - c.Shutdown(timeoutCtx) - case ErrorShutdownerWithContext: - if err := c.Shutdown(timeoutCtx); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorShutdownerWithContext") - continue - } - case Unsubscriber: - c.Unsubscribe() - case ErrorUnsubscriber: - if err := c.Unsubscribe(); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorUnsubscriber") - continue - } - case UnsubscriberWithContext: - c.Unsubscribe(timeoutCtx) - case ErrorUnsubscriberWithContext: - if err := c.Unsubscribe(timeoutCtx); err != nil { - log.WithError(s.l, err).Error("failed to gracefully stop ErrorUnsubscriberWithContext") - continue - } - } - s.l.Info("stopped registered closer", log.FName(fmt.Sprintf("%T", closer))) - } - return gctx.Err() - }) - - // wait for shutdown - if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) { - log.WithError(s.l, err).Error("service error") - } - - s.l.Info("graceful shutdown complete") } diff --git a/servicehttp.go b/servicehttp.go index 4308169..0f2428b 100644 --- a/servicehttp.go +++ b/servicehttp.go @@ -14,9 +14,10 @@ import ( // ServiceHTTP struct type ServiceHTTP struct { - server *http.Server - name string - l *zap.Logger + running bool + server *http.Server + name string + l *zap.Logger } func NewServiceHTTP(l *zap.Logger, name, addr string, handler http.Handler, middlewares ...middleware.Middleware) *ServiceHTTP { @@ -41,6 +42,13 @@ func (s *ServiceHTTP) Name() string { return s.name } +func (s *ServiceHTTP) Healthz() error { + if !s.running { + return ErrServiceNotRunning + } + return nil +} + func (s *ServiceHTTP) Start(ctx context.Context) error { var fields []zap.Field if value := strings.Split(s.server.Addr, ":"); len(value) == 2 { @@ -52,6 +60,10 @@ func (s *ServiceHTTP) Start(ctx context.Context) error { } s.l.Info("starting http service", fields...) s.server.BaseContext = func(_ net.Listener) context.Context { return ctx } + s.server.RegisterOnShutdown(func() { + s.running = false + }) + s.running = true if err := s.server.ListenAndServe(); err != http.ErrServerClosed { log.WithError(s.l, err).Error("service error") return err @@ -60,6 +72,6 @@ func (s *ServiceHTTP) Start(ctx context.Context) error { } func (s *ServiceHTTP) Close(ctx context.Context) error { - s.l.Info("shutting down http service") + s.l.Info("stopping http service") return s.server.Shutdown(ctx) } diff --git a/servicehttpprobes.go b/servicehttpprobes.go index e9a02bc..5cefaa8 100644 --- a/servicehttpprobes.go +++ b/servicehttpprobes.go @@ -17,9 +17,6 @@ const ( DefaultServiceHTTPProbesPath = "/healthz" ) -// ProbeHandler type -type ProbeHandler func(w http.ResponseWriter, r *http.Request) - func NewServiceHTTPProbes(l *zap.Logger, name, addr, path string, probes Probes) *ServiceHTTP { handler := http.NewServeMux() diff --git a/test/servicehttp.go b/test/servicehttp.go index 98e9aae..2250560 100644 --- a/test/servicehttp.go +++ b/test/servicehttp.go @@ -58,7 +58,7 @@ func (s *ServiceHTTP) Start(ctx context.Context) error { } fields = append(fields, log.FNetHostIP(ip), log.FNetHostPort(port)) } - s.l.Info("starting http service", fields...) + s.l.Info("starting http test service", fields...) s.server.Config.BaseContext = func(_ net.Listener) context.Context { return ctx } s.server.Start() @@ -66,7 +66,7 @@ func (s *ServiceHTTP) Start(ctx context.Context) error { } func (s *ServiceHTTP) Close(_ context.Context) error { - s.l.Info("shutting down http service") + s.l.Info("stopping http test service") s.server.Close() return nil }