diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 808d923..16137cd 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -131,12 +131,12 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa msg := message.NewMessage(s.uuidFunc(), data) l = l.With(zap.String("message_id", msg.UUID)) + msg.SetContext(context.WithoutCancel(r.Context())) if payload.EventName != nil { msg.Metadata.Set(MetadataEventName, gtag.Get(payload.EventName).String()) } - // TODO filter headers? for name, headers := range r.Header { msg.Metadata.Set(name, strings.Join(headers, ",")) } @@ -145,11 +145,6 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa l = l.With(zap.String(k, v)) } - // TODO different context? - ctx, cancelCtx := context.WithCancel(r.Context()) - msg.SetContext(ctx) - defer cancelCtx() - // send message s.messages <- msg @@ -162,7 +157,7 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa l.Debug("message nacked") return ErrMessageNacked case <-r.Context().Done(): - l.Debug("message cancled") + l.Debug("message canceled") return ErrContextCanceled } } diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index 195a87c..593d6b7 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -110,29 +110,16 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payloa msg := message.NewMessage(s.uuidFunc(), jsonPayload) l = l.With(zap.String("message_id", msg.UUID)) + msg.SetContext(context.WithoutCancel(r.Context())) - // TODO filter headers? for name, headers := range r.Header { msg.Metadata.Set(name, strings.Join(headers, ",")) } - // if cookies := r.Cookies(); len(cookies) > 0 { - // values := make([]string, len(cookies)) - // for i, cookie := range r.Cookies() { - // values[i] = cookie.String() - // } - // msg.Metadata.Set("Cookie", strings.Join(values, "; ")) - // } - for k, v := range msg.Metadata { l = l.With(zap.String(k, v)) } - // TODO different context? - ctx, cancelCtx := context.WithCancel(r.Context()) - msg.SetContext(ctx) - defer cancelCtx() - // send message s.messages <- msg @@ -145,7 +132,7 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payloa l.Debug("message nacked") return ErrMessageNacked case <-r.Context().Done(): - l.Debug("message cancled") + l.Debug("message canceled") return ErrContextCanceled } }