From 5cb6719df03ce55602026a293174fd4a74c086a6 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 22 May 2024 11:59:08 +0200 Subject: [PATCH] feat: add message handler --- integration/watermill/gtag/messagehandler.go | 34 +++++++++++++++++++ .../measurementprotocol/v2/eventhandler.go | 30 ---------------- integration/watermill/mpv2/messagehandler.go | 34 +++++++++++++++++++ integration/watermill/mpv2/provider.go | 5 --- integration/watermill/mpv2/subscriber.go | 14 +++----- 5 files changed, 72 insertions(+), 45 deletions(-) create mode 100644 integration/watermill/gtag/messagehandler.go delete mode 100644 integration/watermill/measurementprotocol/v2/eventhandler.go create mode 100644 integration/watermill/mpv2/messagehandler.go delete mode 100644 integration/watermill/mpv2/provider.go diff --git a/integration/watermill/gtag/messagehandler.go b/integration/watermill/gtag/messagehandler.go new file mode 100644 index 0000000..25be057 --- /dev/null +++ b/integration/watermill/gtag/messagehandler.go @@ -0,0 +1,34 @@ +package gtag + +import ( + "encoding/json" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/pkg/errors" +) + +func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) { + return func(msg *message.Message) ([]*message.Message, error) { + var payload *gtag.Payload + + // unmarshal payload + if err := json.Unmarshal(msg.Payload, &payload); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal payload") + } + + // handle payload + if err := handler(payload, msg); err != nil { + return nil, err + } + + // marshal payload + b, err := json.Marshal(payload) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal payload") + } + msg.Payload = b + + return []*message.Message{msg}, nil + } +} diff --git a/integration/watermill/measurementprotocol/v2/eventhandler.go b/integration/watermill/measurementprotocol/v2/eventhandler.go deleted file mode 100644 index bf05c07..0000000 --- a/integration/watermill/measurementprotocol/v2/eventhandler.go +++ /dev/null @@ -1,30 +0,0 @@ -package v2 - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/pkg/errors" -) - -func EventHandler(eventHandler func(event *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) { - return func(msg *message.Message) ([]*message.Message, error) { - var event *gtag.Payload - if err := json.Unmarshal(msg.Payload, &event); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal event") - } - - if err := eventHandler(event, msg); err != nil { - return nil, err - } - - b, err := json.Marshal(event) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal event") - } - msg.Payload = b - - return []*message.Message{msg}, nil - } -} diff --git a/integration/watermill/mpv2/messagehandler.go b/integration/watermill/mpv2/messagehandler.go new file mode 100644 index 0000000..3fa4c23 --- /dev/null +++ b/integration/watermill/mpv2/messagehandler.go @@ -0,0 +1,34 @@ +package mpv2 + +import ( + "encoding/json" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/pkg/errors" +) + +func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) { + return func(msg *message.Message) ([]*message.Message, error) { + var payload *mpv2.Payload[any] + + // unmarshal payload + if err := json.Unmarshal(msg.Payload, &payload); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal payload") + } + + // handle payload + if err := handler(payload, msg); err != nil { + return nil, err + } + + // marshal payload + b, err := json.Marshal(payload) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal payload") + } + msg.Payload = b + + return []*message.Message{msg}, nil + } +} diff --git a/integration/watermill/mpv2/provider.go b/integration/watermill/mpv2/provider.go deleted file mode 100644 index 341a0d6..0000000 --- a/integration/watermill/mpv2/provider.go +++ /dev/null @@ -1,5 +0,0 @@ -package mpv2 - -const ( - MetadataEventName = "X-Event-Name" -) diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index 518e6a3..828fc8d 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -97,27 +97,21 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Payload[any]) error { +func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { // marshal message payload - payload, err := json.Marshal(event) + jsonPayload, err := json.Marshal(payload) if err != nil { return errors.Wrap(err, "failed to marshal payload") } - msg := message.NewMessage(s.uuidFunc(), payload) + msg := message.NewMessage(s.uuidFunc(), jsonPayload) l = l.With(zap.String("message_id", msg.UUID)) - // if labeler, ok := keellog.LabelerFromRequest(r); ok { - // labeler.Add(zap.String("message_id", msg.UUID)) - // } - // if event.EventName != nil { - // msg.Metadata.Set(MetadataEventName, gtag.Get(event.EventName).String()) - // } // TODO filter headers? for name, headers := range r.Header { msg.Metadata.Set(name, strings.Join(headers, ",")) } - // + // if cookies := r.Cookies(); len(cookies) > 0 { // values := make([]string, len(cookies)) // for i, cookie := range r.Cookies() {