From 24e6f55ea9c849b23b42107307a204825a79ac60 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 13 Mar 2025 07:58:41 +0100 Subject: [PATCH] feat: remove watermill --- integration/loki/messagehandler.go | 23 --- integration/watermill/gtag/errors.go | 12 -- integration/watermill/gtag/messagehandler.go | 60 ------ .../watermill/gtag/messagehandler_test.go | 148 --------------- .../watermill/gtag/nopublishmessagehandler.go | 23 --- integration/watermill/gtag/provider.go | 5 - integration/watermill/gtag/publisher.go | 173 ------------------ integration/watermill/gtag/publisher_test.go | 68 ------- .../watermill/gtag/publishermiddleware.go | 19 -- integration/watermill/gtag/subscriber.go | 143 --------------- integration/watermill/mpv2/errors.go | 14 -- integration/watermill/mpv2/messagehandler.go | 34 ---- integration/watermill/mpv2/metadata.go | 5 - .../watermill/mpv2/nopublishmessagehandler.go | 23 --- integration/watermill/mpv2/publisher.go | 161 ---------------- integration/watermill/mpv2/publisher_test.go | 67 ------- .../watermill/mpv2/publishermiddleware.go | 61 ------ .../mpv2/publishermiddleware_test.go | 106 ----------- integration/watermill/mpv2/subscriber.go | 147 --------------- 19 files changed, 1292 deletions(-) delete mode 100644 integration/loki/messagehandler.go delete mode 100644 integration/watermill/gtag/errors.go delete mode 100644 integration/watermill/gtag/messagehandler.go delete mode 100644 integration/watermill/gtag/messagehandler_test.go delete mode 100644 integration/watermill/gtag/nopublishmessagehandler.go delete mode 100644 integration/watermill/gtag/provider.go delete mode 100644 integration/watermill/gtag/publisher.go delete mode 100644 integration/watermill/gtag/publisher_test.go delete mode 100644 integration/watermill/gtag/publishermiddleware.go delete mode 100644 integration/watermill/gtag/subscriber.go delete mode 100644 integration/watermill/mpv2/errors.go delete mode 100644 integration/watermill/mpv2/messagehandler.go delete mode 100644 integration/watermill/mpv2/metadata.go delete mode 100644 integration/watermill/mpv2/nopublishmessagehandler.go delete mode 100644 integration/watermill/mpv2/publisher.go delete mode 100644 integration/watermill/mpv2/publisher_test.go delete mode 100644 integration/watermill/mpv2/publishermiddleware.go delete mode 100644 integration/watermill/mpv2/publishermiddleware_test.go delete mode 100644 integration/watermill/mpv2/subscriber.go diff --git a/integration/loki/messagehandler.go b/integration/loki/messagehandler.go deleted file mode 100644 index 9c468ad..0000000 --- a/integration/loki/messagehandler.go +++ /dev/null @@ -1,23 +0,0 @@ -package loki - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/pkg/errors" -) - -func MPv2MessageHandler(loki *Loki) message.NoPublishHandlerFunc { - return func(msg *message.Message) error { - var payload mpv2.Payload[any] - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return errors.Wrap(err, "failed to unmarshal payload") - } - - loki.Write(payload) - return nil - } -} diff --git a/integration/watermill/gtag/errors.go b/integration/watermill/gtag/errors.go deleted file mode 100644 index 755bb14..0000000 --- a/integration/watermill/gtag/errors.go +++ /dev/null @@ -1,12 +0,0 @@ -package gtag - -import ( - "errors" -) - -var ( - ErrMissingEventName = errors.New("missing event name") - ErrContextCanceled = errors.New("request stopped without ACK received") - ErrMessageNacked = errors.New("message nacked") - ErrClosed = errors.New("subscriber already closed") -) diff --git a/integration/watermill/gtag/messagehandler.go b/integration/watermill/gtag/messagehandler.go deleted file mode 100644 index 4af3c46..0000000 --- a/integration/watermill/gtag/messagehandler.go +++ /dev/null @@ -1,60 +0,0 @@ -package gtag - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/pkg/errors" -) - -func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) message.HandlerFunc { - return func(msg *message.Message) ([]*message.Message, error) { - var payload *gtag.Payload - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal payload") - } - - // handle payload - if err := handler(payload, msg); err != nil { - return nil, err - } - - // marshal payload - b, err := json.Marshal(payload) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal payload") - } - msg.Payload = b - - return []*message.Message{msg}, nil - } -} - -func MPv2MessageHandler(msg *message.Message) ([]*message.Message, error) { - var payload gtag.Payload - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal payload") - } - - // encode to mpv2 - var mpv2Payload *mpv2.Payload[any] - if err := gtagencode.MPv2(payload, &mpv2Payload); err != nil { - return nil, errors.Wrap(err, "failed to encode gtag to mpv2") - } - - // marshal payload - b, err := json.Marshal(mpv2Payload) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal payload") - } - msg.Payload = b - - return []*message.Message{msg}, nil -} diff --git a/integration/watermill/gtag/messagehandler_test.go b/integration/watermill/gtag/messagehandler_test.go deleted file mode 100644 index 43c00e6..0000000 --- a/integration/watermill/gtag/messagehandler_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package gtag_test - -import ( - "context" - "encoding/json" - "fmt" - "sync/atomic" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" - "github.com/foomo/sesamy-go/integration/watermill/gtag" - encoding "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/foomo/sesamy-go/pkg/sesamy" - "github.com/pperaltaisern/watermillzap" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestMessageHandler(t *testing.T) { - l := zaptest.NewLogger(t) - - router, err := message.NewRouter(message.RouterConfig{}, watermillzap.NewLogger(l)) - require.NoError(t, err) - defer router.Close() - - // Create pubSub - pubSub := gochannel.NewGoChannel( - gochannel.Config{}, - watermillzap.NewLogger(l), - ) - - var done atomic.Bool - router.AddHandler("gtag", "in", pubSub, "out", pubSub, gtag.MessageHandler(func(payload *encoding.Payload, msg *message.Message) error { - expected := `{"consent":{},"campaign":{},"ecommerce":{},"client_hints":{},"protocol_version":"2","client_id":"C123456","richsstsse":"1","document_location":"https://foomo.org","document_title":"Home","is_debug":"1","event_name":"add_to_cart"}` - if !assert.JSONEq(t, expected, string(msg.Payload)) { - fmt.Println(string(msg.Payload)) - } - done.Store(true) - return nil - })) - - go func() { - assert.NoError(t, router.Run(context.TODO())) - }() - assert.Eventually(t, router.IsRunning, time.Second, 50*time.Millisecond) - - payload := encoding.Payload{ - Consent: encoding.Consent{}, - Campaign: encoding.Campaign{}, - ECommerce: encoding.ECommerce{}, - ClientHints: encoding.ClientHints{}, - ProtocolVersion: encoding.Set("2"), - TrackingID: nil, - GTMHashInfo: nil, - ClientID: encoding.Set("C123456"), - Richsstsse: encoding.Set("1"), - DocumentLocation: encoding.Set("https://foomo.org"), - DocumentTitle: encoding.Set("Home"), - DocumentReferrer: nil, - IsDebug: encoding.Set("1"), - EventName: encoding.Set(sesamy.EventNameAddToCart), - EventParameter: nil, - EventParameterNumber: nil, - UserID: nil, - SessionID: nil, - UserProperty: nil, - UserPropertyNumber: nil, - NonPersonalizedAds: nil, - SST: nil, - Remain: nil, - } - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, pubSub.Publish("in", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} - -func TestMPv2MessageHandler(t *testing.T) { - l := zaptest.NewLogger(t) - - router, err := message.NewRouter(message.RouterConfig{}, watermillzap.NewLogger(l)) - require.NoError(t, err) - defer router.Close() - - // Create pubSub - pubSub := gochannel.NewGoChannel( - gochannel.Config{}, - watermillzap.NewLogger(l), - ) - - var done atomic.Bool - router.AddHandler("gtag", "in", pubSub, "out", pubSub, gtag.MPv2MessageHandler) - router.AddNoPublisherHandler("mpv2", "out", pubSub, func(msg *message.Message) error { - expected := `{"client_id":"C123456","consent":{"ad_user_data":"GRANTED","ad_personalization":"GRANTED","analytics_storage":"GRANTED"},"events":[{"name":"add_to_cart","params":{"page_location":"https://foomo.org","page_title":"Home"}}],"debug_mode":true}` - if !assert.JSONEq(t, expected, string(msg.Payload)) { - fmt.Println(string(msg.Payload)) - } - done.Store(true) - return nil - }) - - go func() { - assert.NoError(t, router.Run(context.TODO())) - }() - assert.Eventually(t, router.IsRunning, time.Second, 50*time.Millisecond) - - payload := encoding.Payload{ - Consent: encoding.Consent{}, - Campaign: encoding.Campaign{}, - ECommerce: encoding.ECommerce{}, - ClientHints: encoding.ClientHints{}, - ProtocolVersion: encoding.Set("2"), - TrackingID: nil, - GTMHashInfo: nil, - ClientID: encoding.Set("C123456"), - Richsstsse: encoding.Set("1"), - DocumentLocation: encoding.Set("https://foomo.org"), - DocumentTitle: encoding.Set("Home"), - DocumentReferrer: nil, - IsDebug: encoding.Set("1"), - EventName: encoding.Set(sesamy.EventNameAddToCart), - EventParameter: nil, - EventParameterNumber: nil, - UserID: nil, - SessionID: nil, - UserProperty: nil, - UserPropertyNumber: nil, - NonPersonalizedAds: nil, - SST: nil, - Remain: nil, - } - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, pubSub.Publish("in", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} diff --git a/integration/watermill/gtag/nopublishmessagehandler.go b/integration/watermill/gtag/nopublishmessagehandler.go deleted file mode 100644 index 6107462..0000000 --- a/integration/watermill/gtag/nopublishmessagehandler.go +++ /dev/null @@ -1,23 +0,0 @@ -package gtag - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/pkg/errors" -) - -func NoPublishMessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) error { - return func(msg *message.Message) error { - var payload *gtag.Payload - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return errors.Wrap(err, "failed to unmarshal payload") - } - - // handle payload - return handler(payload, msg) - } -} diff --git a/integration/watermill/gtag/provider.go b/integration/watermill/gtag/provider.go deleted file mode 100644 index 2e6b23a..0000000 --- a/integration/watermill/gtag/provider.go +++ /dev/null @@ -1,5 +0,0 @@ -package gtag - -const ( - MetadataEventName = "X-Event-Name" -) diff --git a/integration/watermill/gtag/publisher.go b/integration/watermill/gtag/publisher.go deleted file mode 100644 index a0a81f9..0000000 --- a/integration/watermill/gtag/publisher.go +++ /dev/null @@ -1,173 +0,0 @@ -package gtag - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -var ( - ErrErrorResponse = errors.New("server responded with error status") - ErrPublisherClosed = errors.New("publisher is closed") -) - -type ( - Publisher struct { - l *zap.Logger - host string - path string - client *http.Client - closed bool - middlewares []PublisherMiddleware - maxResponseCode int - } - PublisherOption func(*Publisher) - PublisherHandler func(l *zap.Logger, msg *message.Message) error - PublisherMiddleware func(next PublisherHandler) PublisherHandler - // PublisherMarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url. - PublisherMarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error) -) - -// ------------------------------------------------------------------------------------------------ -// ~ Constructor -// ------------------------------------------------------------------------------------------------ - -func NewPublisher(l *zap.Logger, host string, opts ...PublisherOption) *Publisher { - inst := &Publisher{ - l: l, - host: host, - path: "/g/collect", - client: http.DefaultClient, - maxResponseCode: http.StatusBadRequest, - } - for _, opt := range opts { - opt(inst) - } - return inst -} - -// ------------------------------------------------------------------------------------------------ -// ~ Options -// ------------------------------------------------------------------------------------------------ - -func PublisherWithPath(v string) PublisherOption { - return func(o *Publisher) { - o.path = v - } -} - -func PublisherWithClient(v *http.Client) PublisherOption { - return func(o *Publisher) { - o.client = v - } -} - -func PublisherWithMiddlewares(v ...PublisherMiddleware) PublisherOption { - return func(o *Publisher) { - o.middlewares = append(o.middlewares, v...) - } -} - -func PublisherWithMaxResponseCode(v int) PublisherOption { - return func(o *Publisher) { - o.maxResponseCode = v - } -} - -// ------------------------------------------------------------------------------------------------ -// ~ Getter -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) Client() *http.Client { - return p.client -} - -// ------------------------------------------------------------------------------------------------ -// ~ Public methods -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) Publish(topic string, messages ...*message.Message) error { - if p.closed { - return ErrPublisherClosed - } - - for _, msg := range messages { - // compose middlewares - next := p.handle - for _, middleware := range p.middlewares { - next = middleware(next) - } - - // run handler - if err := next(p.l.With( - zap.String("message_id", msg.UUID), - ), msg); err != nil { - return err - } - } - - return nil -} - -func (p *Publisher) Close() error { - if p.closed { - return nil - } - - p.closed = true - return nil -} - -// ------------------------------------------------------------------------------------------------ -// ~ Private methods -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) handle(l *zap.Logger, msg *message.Message) error { - var event *gtag.Payload - if err := json.Unmarshal(msg.Payload, &event); err != nil { - return err - } - - values, body, err := gtag.Encode(event) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(msg.Context(), http.MethodPost, fmt.Sprintf("%s%s?%s", p.host, p.path, gtag.EncodeValues(values)), body) - if err != nil { - return errors.Wrap(err, "failed to create request") - } - - for s, s2 := range msg.Metadata { - req.Header.Set(s, s2) - } - - if err := func() error { - resp, err := p.client.Do(req) - if err != nil { - return errors.Wrapf(err, "failed to publish message: %s", msg.UUID) - } - defer resp.Body.Close() - - l = l.With(zap.Int("http_status_code", resp.StatusCode)) - - if p.maxResponseCode > 0 && resp.StatusCode >= p.maxResponseCode { - if body, err := io.ReadAll(resp.Body); err == nil { - l = l.With(zap.String("http_response", string(body))) - } - return errors.Wrap(ErrErrorResponse, resp.Status) - } - - return nil - }(); err != nil { - return err - } - - return nil -} diff --git a/integration/watermill/gtag/publisher_test.go b/integration/watermill/gtag/publisher_test.go deleted file mode 100644 index e38f4a0..0000000 --- a/integration/watermill/gtag/publisher_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package gtag_test - -import ( - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "sync/atomic" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/integration/watermill/gtag" - encoding "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/foomo/sesamy-go/pkg/sesamy" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestPublisher(t *testing.T) { - l := zaptest.NewLogger(t) - - var done atomic.Bool - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expected := `_dbg=1&cid=C123456&dl=https%3A%2F%2Ffoomo.org&dt=Home&en=add_to_cart&v=2&richsstsse` - assert.Equal(t, expected, r.URL.RawQuery) - done.Store(true) - })) - - p := gtag.NewPublisher(l, s.URL) - - payload := encoding.Payload{ - Consent: encoding.Consent{}, - Campaign: encoding.Campaign{}, - ECommerce: encoding.ECommerce{}, - ClientHints: encoding.ClientHints{}, - ProtocolVersion: encoding.Set("2"), - TrackingID: nil, - GTMHashInfo: nil, - ClientID: encoding.Set("C123456"), - Richsstsse: encoding.Set("1"), - DocumentLocation: encoding.Set("https://foomo.org"), - DocumentTitle: encoding.Set("Home"), - DocumentReferrer: nil, - IsDebug: encoding.Set("1"), - EventName: encoding.Set(sesamy.EventNameAddToCart), - EventParameter: nil, - EventParameterNumber: nil, - UserID: nil, - SessionID: nil, - UserProperty: nil, - UserPropertyNumber: nil, - NonPersonalizedAds: nil, - SST: nil, - Remain: nil, - } - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - fmt.Println(string(jsonPayload)) - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, p.Publish("foo", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} diff --git a/integration/watermill/gtag/publishermiddleware.go b/integration/watermill/gtag/publishermiddleware.go deleted file mode 100644 index 4a64a48..0000000 --- a/integration/watermill/gtag/publishermiddleware.go +++ /dev/null @@ -1,19 +0,0 @@ -package gtag - -import ( - "github.com/ThreeDotsLabs/watermill/message" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" -) - -func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { - return func(l *zap.Logger, msg *message.Message) error { - if err := next(l, msg); err != nil { - if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } - l.With(zap.Error(err)).Warn("ignoring error") - } - return nil - } -} diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go deleted file mode 100644 index 9bf1b75..0000000 --- a/integration/watermill/gtag/subscriber.go +++ /dev/null @@ -1,143 +0,0 @@ -package gtag - -import ( - "context" - "encoding/json" - "net/http" - "strings" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/gtag" - gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type ( - Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []gtaghttp.Middleware - closed bool - } - SubscriberOption func(*Subscriber) -) - -// ------------------------------------------------------------------------------------------------ -// ~ Options -// ------------------------------------------------------------------------------------------------ - -func SubscriberWithUUIDFunc(v func() string) SubscriberOption { - return func(o *Subscriber) { - o.uuidFunc = v - } -} - -func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption { - return func(o *Subscriber) { - o.messageFunc = v - } -} - -func SubscriberWithMiddlewares(v ...gtaghttp.Middleware) SubscriberOption { - return func(o *Subscriber) { - o.middlewares = append(o.middlewares, v...) - } -} - -// ------------------------------------------------------------------------------------------------ -// ~ Constructor -// ------------------------------------------------------------------------------------------------ - -func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { - inst := &Subscriber{ - l: l, - uuidFunc: watermill.NewUUID, - messages: make(chan *message.Message), - } - for _, opt := range opts { - opt(inst) - } - return inst -} - -func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // retrieve payload - payload := gtaghttp.Handler(w, r) - - // compose middlewares - next := s.handle - for _, middleware := range s.middlewares { - next = middleware(next) - } - - // run handler - if err := next(s.l, w, r, payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { - // marshal message payload - data, err := json.Marshal(payload) - if err != nil { - return errors.Wrap(err, "failed to marshal payload") - } - - msg := message.NewMessage(s.uuidFunc(), data) - l = l.With(zap.String("message_id", msg.UUID)) - msg.SetContext(context.WithoutCancel(r.Context())) - - if payload.EventName != nil { - msg.Metadata.Set(MetadataEventName, gtag.Get(payload.EventName).String()) - } - - for name, headers := range r.Header { - msg.Metadata.Set(name, strings.Join(headers, ",")) - } - - if s.messageFunc != nil { - if err := s.messageFunc(l, r, msg); err != nil { - return err - } - } - - for k, v := range msg.Metadata { - l = l.With(zap.String(k, v)) - } - - // send message - s.messages <- msg - - // wait for ACK - select { - case <-msg.Acked(): - l.Debug("message acked") - return nil - case <-msg.Nacked(): - l.Debug("message nacked") - return ErrMessageNacked - case <-r.Context().Done(): - l.Debug("message canceled") - return ErrContextCanceled - } -} - -func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { - return s.messages, nil -} - -// Close closes all subscriptions with their output channels and flush offsets etc. when needed. -func (s *Subscriber) Close() error { - if s.closed { - return ErrClosed - } - s.closed = true - - close(s.messages) - return nil -} diff --git a/integration/watermill/mpv2/errors.go b/integration/watermill/mpv2/errors.go deleted file mode 100644 index c919c4d..0000000 --- a/integration/watermill/mpv2/errors.go +++ /dev/null @@ -1,14 +0,0 @@ -package mpv2 - -import ( - "errors" -) - -var ( - ErrMissingEventName = errors.New("missing event name") - ErrErrorResponse = errors.New("server responded with error status") - ErrPublisherClosed = errors.New("publisher is closed") - ErrContextCanceled = errors.New("request stopped without ACK received") - ErrMessageNacked = errors.New("message nacked") - ErrClosed = errors.New("subscriber already closed") -) diff --git a/integration/watermill/mpv2/messagehandler.go b/integration/watermill/mpv2/messagehandler.go deleted file mode 100644 index 3cc2cd7..0000000 --- a/integration/watermill/mpv2/messagehandler.go +++ /dev/null @@ -1,34 +0,0 @@ -package mpv2 - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/pkg/errors" -) - -func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.HandlerFunc { - return func(msg *message.Message) ([]*message.Message, error) { - var payload *mpv2.Payload[any] - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal payload") - } - - // handle payload - if err := handler(payload, msg); err != nil { - return nil, err - } - - // marshal payload - b, err := json.Marshal(payload) - if err != nil { - return nil, errors.Wrap(err, "failed to marshal payload") - } - msg.Payload = b - - return []*message.Message{msg}, nil - } -} diff --git a/integration/watermill/mpv2/metadata.go b/integration/watermill/mpv2/metadata.go deleted file mode 100644 index c174b14..0000000 --- a/integration/watermill/mpv2/metadata.go +++ /dev/null @@ -1,5 +0,0 @@ -package mpv2 - -const ( - MetadataRequestQuery = "RequestQuery" -) diff --git a/integration/watermill/mpv2/nopublishmessagehandler.go b/integration/watermill/mpv2/nopublishmessagehandler.go deleted file mode 100644 index 5777c4a..0000000 --- a/integration/watermill/mpv2/nopublishmessagehandler.go +++ /dev/null @@ -1,23 +0,0 @@ -package mpv2 - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/pkg/errors" -) - -func NoPublishMessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.NoPublishHandlerFunc { - return func(msg *message.Message) error { - var payload *mpv2.Payload[any] - - // unmarshal payload - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return errors.Wrap(err, "failed to unmarshal payload") - } - - // handle payload - return handler(payload, msg) - } -} diff --git a/integration/watermill/mpv2/publisher.go b/integration/watermill/mpv2/publisher.go deleted file mode 100644 index 7140f85..0000000 --- a/integration/watermill/mpv2/publisher.go +++ /dev/null @@ -1,161 +0,0 @@ -package mpv2 - -import ( - "bytes" - "fmt" - "io" - "net/http" - "strings" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type ( - Publisher struct { - l *zap.Logger - host string - path string - httpClient *http.Client - middlewares []PublisherMiddleware - closed bool - } - PublisherOption func(*Publisher) - PublisherHandler func(l *zap.Logger, msg *message.Message) error - PublisherMiddleware func(next PublisherHandler) PublisherHandler -) - -// ------------------------------------------------------------------------------------------------ -// ~ Constructor -// ------------------------------------------------------------------------------------------------ - -func NewPublisher(l *zap.Logger, host string, opts ...PublisherOption) *Publisher { - inst := &Publisher{ - l: l, - host: host, - path: "/mp/collect", - httpClient: http.DefaultClient, - } - for _, opt := range opts { - opt(inst) - } - return inst -} - -// ------------------------------------------------------------------------------------------------ -// ~ Options -// ------------------------------------------------------------------------------------------------ - -func PublisherWithPath(v string) PublisherOption { - return func(o *Publisher) { - o.path = v - } -} - -func PublisherWithHTTPClient(v *http.Client) PublisherOption { - return func(o *Publisher) { - o.httpClient = v - } -} - -func PublisherWithMiddlewares(v ...PublisherMiddleware) PublisherOption { - return func(o *Publisher) { - o.middlewares = append(o.middlewares, v...) - } -} - -// ------------------------------------------------------------------------------------------------ -// ~ Getter -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) HTTPClient() *http.Client { - return p.httpClient -} - -// ------------------------------------------------------------------------------------------------ -// ~ Public methods -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) Publish(topic string, messages ...*message.Message) error { - if p.closed { - return ErrPublisherClosed - } - - for _, msg := range messages { - // compose middlewares - next := p.handle - for _, middleware := range p.middlewares { - next = middleware(next) - } - - // run handler - if err := next(p.l.With( - zap.String("message_id", msg.UUID), - ), msg); err != nil { - return err - } - } - - return nil -} - -func (p *Publisher) Close() error { - if p.closed { - return nil - } - - p.closed = true - return nil -} - -// ------------------------------------------------------------------------------------------------ -// ~ Private methods -// ------------------------------------------------------------------------------------------------ - -func (p *Publisher) handle(l *zap.Logger, msg *message.Message) error { - req, err := http.NewRequestWithContext(msg.Context(), http.MethodPost, fmt.Sprintf("%s%s", p.host, p.path), bytes.NewReader(msg.Payload)) - if err != nil { - return errors.Wrap(err, "failed to create request") - } - - for key, value := range msg.Metadata { - switch key { - case "Cookie": - for _, s3 := range strings.Split(value, "; ") { - val := strings.Split(s3, "=") - req.AddCookie(&http.Cookie{ - Name: val[0], - Value: strings.Join(val[1:], "="), - }) - } - case MetadataRequestQuery: - req.URL.RawQuery = value - default: - req.Header.Set(key, value) - } - } - - if err := func() error { - resp, err := p.httpClient.Do(req) - if err != nil { - return errors.Wrapf(err, "failed to publish message: %s", msg.UUID) - } - defer resp.Body.Close() - - l = l.With(zap.Int("http_status_code", resp.StatusCode)) - - if resp.StatusCode >= http.StatusBadRequest { - if body, err := io.ReadAll(resp.Body); err == nil { - l = l.With(zap.String("http_response", string(body))) - } - return errors.Wrap(ErrErrorResponse, resp.Status) - } - - return nil - }(); err != nil { - return err - } - - return nil -} diff --git a/integration/watermill/mpv2/publisher_test.go b/integration/watermill/mpv2/publisher_test.go deleted file mode 100644 index 7aa289d..0000000 --- a/integration/watermill/mpv2/publisher_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package mpv2_test - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "sync/atomic" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/integration/watermill/mpv2" - encoding "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/foomo/sesamy-go/pkg/event" - "github.com/foomo/sesamy-go/pkg/event/params" - "github.com/foomo/sesamy-go/pkg/sesamy" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestPublisher(t *testing.T) { - l := zaptest.NewLogger(t) - - var done atomic.Bool - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - out, err := io.ReadAll(r.Body) - assert.NoError(t, err) - - expected := `{"client_id":"C123456","user_id":"U123456","timestamp_micros":1727701064057701,"events":[{"name":"page_view","params":{"page_title":"Home","page_location":"https://foomo.org"}}],"debug_mode":true,"session_id":"S123456","engagement_time_msec":100}` - if !assert.JSONEq(t, expected, string(out)) { - fmt.Println(string(out)) - } - done.Store(true) - })) - - p := mpv2.NewPublisher(l, s.URL) - - payload := encoding.Payload[params.PageView]{ - ClientID: "C123456", - UserID: "U123456", - TimestampMicros: 1727701064057701, - UserProperties: nil, - Consent: nil, - Events: []sesamy.Event[params.PageView]{ - event.NewPageView(params.PageView{ - PageTitle: "Home", - PageLocation: "https://foomo.org", - }), - }, - UserData: nil, - DebugMode: true, - SessionID: "S123456", - EngagementTimeMSec: 100, - } - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, p.Publish("foo", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} diff --git a/integration/watermill/mpv2/publishermiddleware.go b/integration/watermill/mpv2/publishermiddleware.go deleted file mode 100644 index 72f8de7..0000000 --- a/integration/watermill/mpv2/publishermiddleware.go +++ /dev/null @@ -1,61 +0,0 @@ -package mpv2 - -import ( - "encoding/json" - - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" -) - -// PublisherMiddlewareIgnoreError ignores error responses from the gtm endpoint to prevent retries. -func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { - return func(l *zap.Logger, msg *message.Message) error { - if err := next(l, msg); err != nil { - if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } - l.With(zap.Error(err)).Warn("ignoring error") - } - return nil - } -} - -// PublisherMiddlewareEventParams moves the `debug_mode`, `session_id` & `engagement_time_msec` into the events params -// since this is required by the measurement protocol but make coding much more complex. That's why it's part of the payload -// in this library. -func PublisherMiddlewareEventParams(next PublisherHandler) PublisherHandler { - return func(l *zap.Logger, msg *message.Message) error { - var payload *mpv2.Payload[any] - if err := json.Unmarshal(msg.Payload, &payload); err != nil { - return err - } - for i, event := range payload.Events { - if params, ok := event.Params.(map[string]any); ok { - if payload.DebugMode { - params["debug_mode"] = "1" - payload.DebugMode = false - } - if len(payload.SessionID) > 0 { - params["session_id"] = payload.SessionID - payload.SessionID = "" - } - if payload.EngagementTimeMSec > 0 { - params["engagement_time_msec"] = payload.EngagementTimeMSec - payload.EngagementTimeMSec = 0 - } - event.Params = params - } - payload.Events[i] = event - - out, err := json.Marshal(payload) - if err != nil { - return err - } - - msg.Payload = out - } - return next(l, msg) - } -} diff --git a/integration/watermill/mpv2/publishermiddleware_test.go b/integration/watermill/mpv2/publishermiddleware_test.go deleted file mode 100644 index 89e9704..0000000 --- a/integration/watermill/mpv2/publishermiddleware_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package mpv2_test - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "sync/atomic" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/integration/watermill/mpv2" - encoding "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/foomo/sesamy-go/pkg/event" - "github.com/foomo/sesamy-go/pkg/event/params" - "github.com/foomo/sesamy-go/pkg/sesamy" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" -) - -func TestPublisherMiddlewareIgnoreError(t *testing.T) { - l := zaptest.NewLogger(t) - - var done atomic.Bool - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - done.Store(true) - })) - - p := mpv2.NewPublisher(l, s.URL, mpv2.PublisherWithMiddlewares(mpv2.PublisherMiddlewareIgnoreError)) - - payload := encoding.Payload[params.PageView]{ - ClientID: "C123456", - UserID: "U123456", - TimestampMicros: 1727701064057701, - UserProperties: nil, - Consent: nil, - Events: []sesamy.Event[params.PageView]{ - event.NewPageView(params.PageView{ - PageTitle: "Home", - PageLocation: "https://foomo.org", - }), - }, - UserData: nil, - DebugMode: true, - SessionID: "S123456", - EngagementTimeMSec: 100, - } - - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, p.Publish("foo", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} - -func TestPublisherMiddlewareEventParams(t *testing.T) { - l := zaptest.NewLogger(t) - - var done atomic.Bool - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - out, err := io.ReadAll(r.Body) - assert.NoError(t, err) - - expected := `{"client_id":"C123456","user_id":"U123456","timestamp_micros":1727701064057701,"events":[{"name":"page_view","params":{"debug_mode":"1","engagement_time_msec":100,"page_location":"https://foomo.org","page_title":"Home","session_id":"S123456"}}]}` - if !assert.JSONEq(t, expected, string(out)) { - fmt.Println(string(out)) - } - done.Store(true) - })) - - p := mpv2.NewPublisher(l, s.URL, mpv2.PublisherWithMiddlewares(mpv2.PublisherMiddlewareEventParams)) - - payload := encoding.Payload[params.PageView]{ - ClientID: "C123456", - UserID: "U123456", - TimestampMicros: 1727701064057701, - UserProperties: nil, - Consent: nil, - Events: []sesamy.Event[params.PageView]{ - event.NewPageView(params.PageView{ - PageTitle: "Home", - PageLocation: "https://foomo.org", - }), - }, - UserData: nil, - DebugMode: true, - SessionID: "S123456", - EngagementTimeMSec: 100, - } - jsonPayload, err := json.Marshal(payload) - require.NoError(t, err) - - msg := message.NewMessage(watermill.NewUUID(), jsonPayload) - - require.NoError(t, p.Publish("foo", msg)) - - assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond) -} diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go deleted file mode 100644 index 2d8bade..0000000 --- a/integration/watermill/mpv2/subscriber.go +++ /dev/null @@ -1,147 +0,0 @@ -package mpv2 - -import ( - "context" - "encoding/json" - "net/http" - "strings" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type ( - Subscriber struct { - l *zap.Logger - uuidFunc func() string - messages chan *message.Message - messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []mpv2http.Middleware - closed bool - } - SubscriberOption func(*Subscriber) -) - -// ------------------------------------------------------------------------------------------------ -// ~ Options -// ------------------------------------------------------------------------------------------------ - -func SubscriberWithUUIDFunc(v func() string) SubscriberOption { - return func(o *Subscriber) { - o.uuidFunc = v - } -} - -func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption { - return func(o *Subscriber) { - o.messageFunc = v - } -} - -func SubscriberWithMiddlewares(v ...mpv2http.Middleware) SubscriberOption { - return func(o *Subscriber) { - o.middlewares = append(o.middlewares, v...) - } -} - -// ------------------------------------------------------------------------------------------------ -// ~ Constructor -// ------------------------------------------------------------------------------------------------ - -func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { - inst := &Subscriber{ - l: l, - uuidFunc: watermill.NewUUID, - messages: make(chan *message.Message), - } - for _, opt := range opts { - opt(inst) - } - return inst -} - -// ------------------------------------------------------------------------------------------------ -// ~ Public methods -// ------------------------------------------------------------------------------------------------ - -func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // retrieve payload - payload := mpv2http.Handler(w, r) - - // compose middlewares - next := s.handle - for _, middleware := range s.middlewares { - next = middleware(next) - } - - // run handler - if err := next(s.l, w, r, payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { - // marshal message payload - jsonPayload, err := json.Marshal(payload) - if err != nil { - return errors.Wrap(err, "failed to marshal payload") - } - - msg := message.NewMessage(s.uuidFunc(), jsonPayload) - l = l.With(zap.String("message_id", msg.UUID)) - msg.SetContext(context.WithoutCancel(r.Context())) - - // store query - msg.Metadata.Set(MetadataRequestQuery, r.URL.RawQuery) - - // store header - for name, headers := range r.Header { - msg.Metadata.Set(name, strings.Join(headers, ",")) - } - - if s.messageFunc != nil { - if err := s.messageFunc(l, r, msg); err != nil { - return err - } - } - - for k, v := range msg.Metadata { - l = l.With(zap.String(k, v)) - } - - // send message - s.messages <- msg - - // wait for ACK - select { - case <-msg.Acked(): - l.Debug("message acked") - return nil - case <-msg.Nacked(): - l.Debug("message nacked") - return ErrMessageNacked - case <-r.Context().Done(): - l.Debug("message canceled") - return ErrContextCanceled - } -} - -func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { - return s.messages, nil -} - -// Close closes all subscriptions with their output channels and flush offsets etc. when needed. -func (s *Subscriber) Close() error { - if s.closed { - return ErrClosed - } - s.closed = true - - close(s.messages) - return nil -}