diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 16137cd..48ccc0b 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -18,11 +18,12 @@ import ( type ( Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - middlewares []SubscriberMiddleware - closed bool + l *zap.Logger + uuidFunc func() string + messages chan *message.Message + metadataFunc func(r *http.Request) map[string]string + middlewares []SubscriberMiddleware + closed bool } SubscriberOption func(*Subscriber) SubscriberHandler func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error @@ -39,6 +40,12 @@ func SubscriberWithUUIDFunc(v func() string) SubscriberOption { } } +func SubscriberWithMetadataFunc(v func(r *http.Request) map[string]string) SubscriberOption { + return func(o *Subscriber) { + o.metadataFunc = v + } +} + func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) @@ -141,6 +148,12 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa msg.Metadata.Set(name, strings.Join(headers, ",")) } + if s.metadataFunc != nil { + for k, v := range s.metadataFunc(r) { + msg.Metadata.Set(k, v) + } + } + for k, v := range msg.Metadata { l = l.With(zap.String(k, v)) } diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index 593d6b7..7b057b0 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -15,11 +15,12 @@ import ( type ( Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - middlewares []SubscriberMiddleware - closed bool + l *zap.Logger + uuidFunc func() string + messages chan *message.Message + metadataFunc func(r *http.Request) map[string]string + middlewares []SubscriberMiddleware + closed bool } SubscriberOption func(*Subscriber) SubscriberHandler func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error @@ -36,6 +37,11 @@ func SubscriberWithUUIDFunc(v func() string) SubscriberOption { } } +func SubscriberWithMetadataFunc(v func(r *http.Request) map[string]string) SubscriberOption { + return func(o *Subscriber) { + o.metadataFunc = v + } +} func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) @@ -116,6 +122,12 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payloa msg.Metadata.Set(name, strings.Join(headers, ",")) } + if s.metadataFunc != nil { + for k, v := range s.metadataFunc(r) { + msg.Metadata.Set(k, v) + } + } + for k, v := range msg.Metadata { l = l.With(zap.String(k, v)) }