From c9641539713932e48d786e41e4af3782c9e6b052 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Mon, 11 Sep 2023 10:37:04 +0200 Subject: [PATCH] feat: add nats readmer --- interfaces/readmer.go | 14 ++++++++ markdown/markdown.go | 1 + net/stream/jetstream/readme.go | 62 +++++++++++++++++++++++++++++++++ net/stream/jetstream/stream.go | 29 +++++++++++++++ persistence/mongo/collection.go | 5 --- persistence/mongo/readme.go | 5 +++ server.go | 18 +++++----- service/goroutine.go | 10 ++++-- service/goroutine_test.go | 2 ++ service/httpreadme_test.go | 24 ------------- 10 files changed, 129 insertions(+), 41 deletions(-) create mode 100644 net/stream/jetstream/readme.go diff --git a/interfaces/readmer.go b/interfaces/readmer.go index 64e9a42..a16ba1f 100644 --- a/interfaces/readmer.go +++ b/interfaces/readmer.go @@ -4,3 +4,17 @@ package interfaces type Readmer interface { Readme() string } + +type ReadmeHandler struct { + Value func() string +} + +func (r ReadmeHandler) Readme() string { + return r.Value() +} + +func ReadmeFunc(v func() string) ReadmeHandler { + return ReadmeHandler{ + Value: v, + } +} diff --git a/markdown/markdown.go b/markdown/markdown.go index fd9aa1b..f195964 100644 --- a/markdown/markdown.go +++ b/markdown/markdown.go @@ -6,6 +6,7 @@ import ( markdowntable "github.com/fbiville/markdown-table-formatter/pkg/markdown" ) +// Markdown output helper type Markdown struct { value string } diff --git a/net/stream/jetstream/readme.go b/net/stream/jetstream/readme.go new file mode 100644 index 0000000..75140d9 --- /dev/null +++ b/net/stream/jetstream/readme.go @@ -0,0 +1,62 @@ +package jetstream + +import ( + "github.com/foomo/keel/markdown" +) + +type ( + publisher struct { + Namespace string + Stream string + Subject string + } + subscriber struct { + Namespace string + Stream string + Subject string + } +) + +var ( + publishers []publisher + subscribers []subscriber +) + +func Readme() string { + if len(publishers) == 0 && len(subscribers) == 0 { + return "" + } + + var rows [][]string + md := &markdown.Markdown{} + md.Println("### NATS") + md.Println("") + md.Println("List of all registered nats publishers & subscribers.") + md.Println("") + + if len(publishers) > 0 { + for _, value := range publishers { + rows = append(rows, []string{ + markdown.Code(value.Namespace), + markdown.Code(value.Stream), + markdown.Code(value.Subject), + markdown.Code("publish"), + }) + } + } + + if len(subscribers) > 0 { + for _, value := range subscribers { + rows = append(rows, []string{ + markdown.Code(value.Namespace), + markdown.Code(value.Stream), + markdown.Code(value.Subject), + markdown.Code("subscribe"), + }) + } + } + + md.Table([]string{"Namespace", "Stream", "Subject", "Type"}, rows) + + return md.String() +} diff --git a/net/stream/jetstream/stream.go b/net/stream/jetstream/stream.go index 37048fd..129d330 100644 --- a/net/stream/jetstream/stream.go +++ b/net/stream/jetstream/stream.go @@ -2,6 +2,7 @@ package jetstream import ( "encoding/json" + "slices" "time" "github.com/nats-io/nats.go" @@ -280,6 +281,20 @@ func (s *Stream) Publisher(subject string, opts ...PublisherOption) *Publisher { opt(pub) } } + + { // append to recoreded publishers + value := publisher{ + Stream: s.name, + Namespace: s.namespace, + Subject: subject, + } + if !slices.ContainsFunc(publishers, func(p publisher) bool { + return p.Stream == value.Stream && p.Namespace == value.Namespace && p.Subject == value.Subject + }) { + publishers = append(publishers, value) + } + } + return pub } @@ -295,6 +310,20 @@ func (s *Stream) Subscriber(subject string, opts ...SubscriberOption) *Subscribe opt(sub) } } + + { // append to recoreded publishers + value := subscriber{ + Stream: s.name, + Namespace: s.namespace, + Subject: subject, + } + if !slices.ContainsFunc(subscribers, func(p subscriber) bool { + return p.Stream == value.Stream && p.Namespace == value.Namespace && p.Subject == value.Subject + }) { + subscribers = append(subscribers, value) + } + } + return sub } diff --git a/persistence/mongo/collection.go b/persistence/mongo/collection.go index 4b39597..c50a21c 100644 --- a/persistence/mongo/collection.go +++ b/persistence/mongo/collection.go @@ -114,11 +114,6 @@ func CollectionWithIndexesCommitQuorumVotingMembers(v context.Context) Collectio // ~ Constructor // ------------------------------------------------------------------------------------------------ -var ( - dbs = map[string][]string{} - indices = map[string]map[string][]string{} -) - func NewCollection(db *mongo.Database, name string, opts ...CollectionOption) (*Collection, error) { o := DefaultCollectionOptions() for _, opt := range opts { diff --git a/persistence/mongo/readme.go b/persistence/mongo/readme.go index 6ad2580..21a51f9 100644 --- a/persistence/mongo/readme.go +++ b/persistence/mongo/readme.go @@ -6,6 +6,11 @@ import ( "github.com/foomo/keel/markdown" ) +var ( + dbs = map[string][]string{} + indices = map[string]map[string][]string{} +) + func Readme() string { var rows [][]string md := &markdown.Markdown{} diff --git a/server.go b/server.go index d616d9b..d87be31 100644 --- a/server.go +++ b/server.go @@ -17,7 +17,6 @@ import ( "github.com/foomo/keel/interfaces" "github.com/foomo/keel/markdown" "github.com/foomo/keel/metrics" - keelmongo "github.com/foomo/keel/persistence/mongo" "github.com/foomo/keel/service" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -179,7 +178,12 @@ func NewServer(opts ...Option) *Server { // add probe inst.AddAlwaysHealthzers(inst) - inst.AddReadmer(inst) + inst.AddReadmers( + interfaces.ReadmeFunc(env.Readme), + interfaces.ReadmeFunc(config.Readme), + inst, + interfaces.ReadmeFunc(metrics.Readme), + ) // start init services inst.startService(inst.initServices...) @@ -258,15 +262,13 @@ func (s *Server) AddClosers(closers ...interface{}) { // AddReadmer adds a readmer to be added to the exposed readme func (s *Server) AddReadmer(readmer interfaces.Readmer) { - if !slices.Contains(s.readmers, readmer) { - s.readmers = append(s.readmers, readmer) - } + s.readmers = append(s.readmers, readmer) } // AddReadmers adds readmers to be added to the exposed readme func (s *Server) AddReadmers(readmers ...interfaces.Readmer) { for _, readmer := range readmers { - s.AddCloser(readmer) + s.AddReadmer(readmer) } } @@ -355,13 +357,9 @@ func (s *Server) Run() { func (s *Server) Readme() string { md := &markdown.Markdown{} - md.Print(env.Readme()) - md.Print(config.Readme()) md.Println(s.readmeServices()) md.Println(s.readmeHealthz()) md.Print(s.readmeCloser()) - md.Print(keelmongo.Readme()) - md.Print(metrics.Readme()) return md.String() } diff --git a/service/goroutine.go b/service/goroutine.go index 74c625e..ef895b7 100644 --- a/service/goroutine.go +++ b/service/goroutine.go @@ -28,7 +28,7 @@ type ( GoRoutineFn func(ctx context.Context, l *zap.Logger) error ) -func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn) *GoRoutine { +func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn, opts ...GoRoutineOption) *GoRoutine { if l == nil { l = log.Logger() } @@ -38,12 +38,18 @@ func NewGoRoutine(l *zap.Logger, name string, handler GoRoutineFn) *GoRoutine { log.KeelServiceNameKey.String(name), ) - return &GoRoutine{ + inst := &GoRoutine{ handler: handler, name: name, parallel: 1, l: l, } + + for _, opt := range opts { + opt(inst) + } + + return inst } // ------------------------------------------------------------------------------------------------ diff --git a/service/goroutine_test.go b/service/goroutine_test.go index 339b9c4..656a955 100644 --- a/service/goroutine_test.go +++ b/service/goroutine_test.go @@ -20,12 +20,14 @@ func ExampleNewGoRoutine() { svr.AddService( service.NewGoRoutine(svr.Logger(), "demo", func(ctx context.Context, l *zap.Logger) error { for { + // handle graceful shutdowns if err := ctx.Err(); errors.Is(context.Cause(ctx), service.ErrServiceShutdown) { l.Info("context has been canceled du to graceful shutdow") return nil } else if err != nil { return errors.Wrap(err, "unexpected context error") } + l.Info("ping") time.Sleep(time.Second) } diff --git a/service/httpreadme_test.go b/service/httpreadme_test.go index c7a6236..e94f9e5 100644 --- a/service/httpreadme_test.go +++ b/service/httpreadme_test.go @@ -6,14 +6,10 @@ import ( "io" "net/http" "os" - "time" "github.com/foomo/keel" "github.com/foomo/keel/config" "github.com/foomo/keel/env" - "github.com/foomo/keel/examples/persistence/mongo/store" - "github.com/foomo/keel/log" - keelmongo "github.com/foomo/keel/persistence/mongo" "github.com/foomo/keel/service" "go.uber.org/zap" ) @@ -44,26 +40,6 @@ func ExampleNewHTTPReadme() { _ = config.MustGetBool(c, "example.required.bool") _ = config.MustGetString(c, "example.required.string") - // create persistor - persistor, err := keelmongo.New(svr.Context(), "mongodb://localhost:27017/dummy") - log.Must(l, err, "failed to create persistor") - - // ensure to add the persistor to the closers - svr.AddClosers(persistor) - - // create repositories - _, err = persistor.Collection( - "dummy", - // define indexes but beware of changes on large dbs - keelmongo.CollectionWithIndexes( - store.EntityIndex, - store.EntityWithVersionsIndex, - ), - // define max time for index creation - keelmongo.CollectionWithIndexesMaxTime(time.Minute), - ) - log.Must(l, err, "failed to create collection") - // add http service svr.AddService(service.NewHTTP(l, "demp-http", "localhost:8080", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK)