diff --git a/Makefile b/Makefile index 29eec06..d0a3786 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ doc: .PHONY: test ## Run tests test: - @go test -coverprofile=coverage.out -race -json ./... | gotestfmt + @GO_TEST_TAGS=-skip go test -coverprofile=coverage.out -race -json ./... | gotestfmt .PHONY: lint ## Run linter diff --git a/integration/watermill/gtag/errors.go b/integration/watermill/gtag/errors.go new file mode 100644 index 0000000..755bb14 --- /dev/null +++ b/integration/watermill/gtag/errors.go @@ -0,0 +1,12 @@ +package gtag + +import ( + "errors" +) + +var ( + ErrMissingEventName = errors.New("missing event name") + ErrContextCanceled = errors.New("request stopped without ACK received") + ErrMessageNacked = errors.New("message nacked") + ErrClosed = errors.New("subscriber already closed") +) diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 181971c..2129c66 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -16,13 +16,6 @@ import ( "go.uber.org/zap" ) -var ( - ErrMissingEventName = errors.New("missing event name") - ErrContextCanceled = errors.New("request stopped without ACK received") - ErrMessageNacked = errors.New("message nacked") - ErrClosed = errors.New("subscriber already closed") -) - type ( Subscriber struct { l *zap.Logger @@ -52,33 +45,6 @@ func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { } } -func SubscriberWithLogger(fields ...zap.Field) SubscriberOption { - return func(o *Subscriber) { - o.middlewares = append(o.middlewares, func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, event *gtag.Payload) error { - fields := append(fields, zap.String("event_name", gtag.GetDefault(event.EventName, "-").String())) - // if labeler, ok := keellog.LabelerFromRequest(r); ok { - // labeler.Add(fields...) - // } - return next(l.With(fields...), r, event) - } - }) - } -} - -func SubscriberWithRequireEventName() SubscriberOption { - return func(o *Subscriber) { - o.middlewares = append(o.middlewares, func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, event *gtag.Payload) error { - if event.EventName == nil { - return ErrMissingEventName - } - return next(l, r, event) - } - }) - } -} - // ------------------------------------------------------------------------------------------------ // ~ Constructor // ------------------------------------------------------------------------------------------------ @@ -137,6 +103,12 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // validate + if payload.EventName.String() == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return + } + // compose middlewares next := s.handle for _, middleware := range s.middlewares { diff --git a/integration/watermill/gtag/subscribermiddleware.go b/integration/watermill/gtag/subscribermiddleware.go new file mode 100644 index 0000000..866840e --- /dev/null +++ b/integration/watermill/gtag/subscribermiddleware.go @@ -0,0 +1,36 @@ +package gtag + +import ( + "net/http" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "go.uber.org/zap" +) + +func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { + return func(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { + if cookie, err := r.Cookie(cookieName); err == nil { + payload.UserID = gtag.Set(cookie.Value) + } + return next(l, r, payload) + } + } +} + +func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { + l = l.With( + zap.String("event_name", gtag.GetDefault(payload.EventName, "-").String()), + zap.String("event_user_id", gtag.GetDefault(payload.UserID, "-")), + zap.String("event_session_id", gtag.GetDefault(payload.SessionID, "-")), + ) + err := next(l, r, payload) + if err != nil { + l.Error("handled event", zap.Error(err)) + } else { + l.Info("handled event") + } + return err + } +} diff --git a/integration/watermill/mpv2/errors.go b/integration/watermill/mpv2/errors.go new file mode 100644 index 0000000..c919c4d --- /dev/null +++ b/integration/watermill/mpv2/errors.go @@ -0,0 +1,14 @@ +package mpv2 + +import ( + "errors" +) + +var ( + ErrMissingEventName = errors.New("missing event name") + ErrErrorResponse = errors.New("server responded with error status") + ErrPublisherClosed = errors.New("publisher is closed") + ErrContextCanceled = errors.New("request stopped without ACK received") + ErrMessageNacked = errors.New("message nacked") + ErrClosed = errors.New("subscriber already closed") +) diff --git a/integration/watermill/mpv2/publisher.go b/integration/watermill/mpv2/publisher.go index 9979158..adfb406 100644 --- a/integration/watermill/mpv2/publisher.go +++ b/integration/watermill/mpv2/publisher.go @@ -12,11 +12,6 @@ import ( "go.uber.org/zap" ) -var ( - ErrErrorResponse = errors.New("server responded with error status") - ErrPublisherClosed = errors.New("publisher is closed") -) - type ( Publisher struct { l *zap.Logger diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index 68825fc..518e6a3 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" "strings" - "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" @@ -14,13 +13,6 @@ import ( "go.uber.org/zap" ) -var ( - ErrMissingEventName = errors.New("missing event name") - ErrContextCanceled = errors.New("request stopped without ACK received") - ErrMessageNacked = errors.New("message nacked") - ErrClosed = errors.New("subscriber already closed") -) - type ( Subscriber struct { l *zap.Logger @@ -50,37 +42,6 @@ func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { } } -func SubscriberWithLogger(fields ...zap.Field) SubscriberOption { - return func(o *Subscriber) { - o.middlewares = append(o.middlewares, func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { - fields := append(fields, - zap.String("user_id", payload.UserID), - zap.String("client_id", payload.ClientID), - zap.Time("timestamp", time.UnixMicro(payload.TimestampMicros)), - ) - // if labeler, ok := keellog.LabelerFromRequest(r); ok { - // labeler.Add(fields...) - // } - return next(l.With(fields...), r, payload) - } - }) - } -} - -// func SubscriberWithRequireEventName() SubscriberOption { -// return func(o *Subscriber) { -// o.middlewares = append(o.middlewares, func(next SubscriberHandler) SubscriberHandler { -// return func(l *zap.Logger, r *http.Request, event *mpv2.Payload[any]) error { -// if event.EventName == nil { -// return ErrMissingEventName -// } -// return next(l, r, event) -// } -// }) -// } -// } - // ------------------------------------------------------------------------------------------------ // ~ Constructor // ------------------------------------------------------------------------------------------------ @@ -111,6 +72,18 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // validate required fields + if len(payload.Events) == 0 { + http.Error(w, "missing events", http.StatusBadRequest) + return + } + for _, event := range payload.Events { + if event.Name == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return + } + } + // compose middlewares next := s.handle for _, middleware := range s.middlewares { diff --git a/integration/watermill/mpv2/subscribermiddleware.go b/integration/watermill/mpv2/subscribermiddleware.go new file mode 100644 index 0000000..bc51b0d --- /dev/null +++ b/integration/watermill/mpv2/subscribermiddleware.go @@ -0,0 +1,64 @@ +package mpv2 + +import ( + "net/http" + "strings" + "time" + + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/foomo/sesamy-go/pkg/session" + "go.uber.org/zap" +) + +func SubscriberMiddlewareClientID(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + if payload.ClientID == "" { + clientID, err := session.ParseGAClientID(r) + if err != nil { + return err + } + payload.ClientID = clientID + } + return next(l, r, payload) + } +} + +func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { + return func(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + if cookie, err := r.Cookie(cookieName); err == nil { + payload.UserID = cookie.Value + } + return next(l, r, payload) + } + } +} + +func SubscriberMiddlewareTimestamp(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + payload.TimestampMicros = time.Now().UnixMicro() + return next(l, r, payload) + } +} + +func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { + return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + eventNames := make([]string, len(payload.Events)) + for i, event := range payload.Events { + eventNames[i] = event.Name.String() + } + + l = l.With( + zap.String("event_names", strings.Join(eventNames, ",")), + zap.String("event_user_id", payload.UserID), + ) + + err := next(l, r, payload) + if err != nil { + l.Error("handled event", zap.Error(err)) + } else { + l.Info("handled event") + } + return err + } +} diff --git a/pkg/session/ga.go b/pkg/session/ga.go new file mode 100644 index 0000000..bf51b7f --- /dev/null +++ b/pkg/session/ga.go @@ -0,0 +1,40 @@ +package session + +import ( + "net/http" + "strings" + + "github.com/pkg/errors" +) + +func ParseGAClientID(r *http.Request) (string, error) { + cookie, err := r.Cookie("_ga") + if err != nil { + return "", errors.Wrap(err, "failed to retrieve _ga cookie") + } + + parts := strings.Split(cookie.Value, ".") + + // validate + if !strings.HasPrefix(cookie.Value, "GA1.1") || len(parts) < 4 { + return "", errors.New("invalid _ga cookie value") + } + + return parts[2] + "." + parts[3], nil +} + +func ParseGASessionID(r *http.Request, id string) (string, error) { + cookie, err := r.Cookie("_ga_" + id) + if err != nil { + return "", errors.Wrap(err, "failed to retrieve _ga cookie") + } + + parts := strings.Split(cookie.Value, ".") + + // validate + if !strings.HasPrefix(cookie.Value, "GS1.1") || len(parts) < 3 { + return "", errors.New("invalid _ga cookie value") + } + + return parts[2], nil +} diff --git a/pkg/session/gtm.go b/pkg/session/gtm.go new file mode 100644 index 0000000..5ef9507 --- /dev/null +++ b/pkg/session/gtm.go @@ -0,0 +1,15 @@ +package session + +import ( + "net/http" +) + +func IsGTMDebug(r *http.Request) bool { + _, err := r.Cookie("gtm_debug") + return err == nil +} + +func IsGTMPreview(r *http.Request) bool { + _, err := r.Cookie("gtm_preview") + return err == nil +}