feat: add message handler

This commit is contained in:
Kevin Franklin Kim 2024-05-22 11:59:08 +02:00
parent 8419d07ae4
commit 5cb6719df0
No known key found for this signature in database
5 changed files with 72 additions and 45 deletions

View File

@ -0,0 +1,34 @@
package gtag
import (
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
"github.com/pkg/errors"
)
func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *gtag.Payload
// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal payload")
}
// handle payload
if err := handler(payload, msg); err != nil {
return nil, err
}
// marshal payload
b, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal payload")
}
msg.Payload = b
return []*message.Message{msg}, nil
}
}

View File

@ -1,30 +0,0 @@
package v2
import (
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
"github.com/pkg/errors"
)
func EventHandler(eventHandler func(event *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
var event *gtag.Payload
if err := json.Unmarshal(msg.Payload, &event); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal event")
}
if err := eventHandler(event, msg); err != nil {
return nil, err
}
b, err := json.Marshal(event)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event")
}
msg.Payload = b
return []*message.Message{msg}, nil
}
}

View File

@ -0,0 +1,34 @@
package mpv2
import (
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
"github.com/pkg/errors"
)
func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) {
return func(msg *message.Message) ([]*message.Message, error) {
var payload *mpv2.Payload[any]
// unmarshal payload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal payload")
}
// handle payload
if err := handler(payload, msg); err != nil {
return nil, err
}
// marshal payload
b, err := json.Marshal(payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal payload")
}
msg.Payload = b
return []*message.Message{msg}, nil
}
}

View File

@ -1,5 +0,0 @@
package mpv2
const (
MetadataEventName = "X-Event-Name"
)

View File

@ -97,27 +97,21 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Payload[any]) error {
func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error {
// marshal message payload
payload, err := json.Marshal(event)
jsonPayload, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "failed to marshal payload")
}
msg := message.NewMessage(s.uuidFunc(), payload)
msg := message.NewMessage(s.uuidFunc(), jsonPayload)
l = l.With(zap.String("message_id", msg.UUID))
// if labeler, ok := keellog.LabelerFromRequest(r); ok {
// labeler.Add(zap.String("message_id", msg.UUID))
// }
// if event.EventName != nil {
// msg.Metadata.Set(MetadataEventName, gtag.Get(event.EventName).String())
// }
// TODO filter headers?
for name, headers := range r.Header {
msg.Metadata.Set(name, strings.Join(headers, ","))
}
//
// if cookies := r.Cookies(); len(cookies) > 0 {
// values := make([]string, len(cookies))
// for i, cookie := range r.Cookies() {