diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 48ccc0b..5f6314f 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -18,12 +18,12 @@ import ( type ( Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - metadataFunc func(r *http.Request) map[string]string - middlewares []SubscriberMiddleware - closed bool + l *zap.Logger + uuidFunc func() string + messages chan *message.Message + messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error + middlewares []SubscriberMiddleware + closed bool } SubscriberOption func(*Subscriber) SubscriberHandler func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error @@ -40,9 +40,9 @@ func SubscriberWithUUIDFunc(v func() string) SubscriberOption { } } -func SubscriberWithMetadataFunc(v func(r *http.Request) map[string]string) SubscriberOption { +func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption { return func(o *Subscriber) { - o.metadataFunc = v + o.messageFunc = v } } @@ -148,9 +148,9 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa msg.Metadata.Set(name, strings.Join(headers, ",")) } - if s.metadataFunc != nil { - for k, v := range s.metadataFunc(r) { - msg.Metadata.Set(k, v) + if s.messageFunc != nil { + if err := s.messageFunc(l, r, msg); err != nil { + return err } } diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index 7b057b0..543e66e 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -15,12 +15,12 @@ import ( type ( Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - metadataFunc func(r *http.Request) map[string]string - middlewares []SubscriberMiddleware - closed bool + l *zap.Logger + uuidFunc func() string + messages chan *message.Message + messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error + middlewares []SubscriberMiddleware + closed bool } SubscriberOption func(*Subscriber) SubscriberHandler func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error @@ -37,11 +37,12 @@ func SubscriberWithUUIDFunc(v func() string) SubscriberOption { } } -func SubscriberWithMetadataFunc(v func(r *http.Request) map[string]string) SubscriberOption { +func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption { return func(o *Subscriber) { - o.metadataFunc = v + o.messageFunc = v } } + func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) @@ -122,9 +123,9 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payloa msg.Metadata.Set(name, strings.Join(headers, ",")) } - if s.metadataFunc != nil { - for k, v := range s.metadataFunc(r) { - msg.Metadata.Set(k, v) + if s.messageFunc != nil { + if err := s.messageFunc(l, r, msg); err != nil { + return err } }