From c5248e77c200229ef72ed27dd0c35f99e1753137 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Mon, 18 Nov 2024 14:28:26 +0100 Subject: [PATCH 01/18] revert: internal otel --- go.mod | 2 +- integration/watermill/gtag/publishermiddleware.go | 7 +++---- integration/watermill/gtag/subscribermiddleware.go | 7 +++---- integration/watermill/mpv2/publishermiddleware.go | 7 +++---- integration/watermill/mpv2/subscribermiddleware.go | 7 +++---- 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 02c7d4e..f55d2c1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/pperaltaisern/watermillzap v1.0.0 github.com/prometheus/common v0.60.1 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -113,6 +112,7 @@ require ( go.opentelemetry.io/collector/pdata v1.12.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go4.org/netipx v0.0.0-20230125063823-8449b0a6169f // indirect diff --git a/integration/watermill/gtag/publishermiddleware.go b/integration/watermill/gtag/publishermiddleware.go index 4a64a48..2bd407d 100644 --- a/integration/watermill/gtag/publishermiddleware.go +++ b/integration/watermill/gtag/publishermiddleware.go @@ -2,16 +2,15 @@ package gtag import ( "github.com/ThreeDotsLabs/watermill/message" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { return func(l *zap.Logger, msg *message.Message) error { if err := next(l, msg); err != nil { - if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } + // if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + // } l.With(zap.Error(err)).Warn("ignoring error") } return nil diff --git a/integration/watermill/gtag/subscribermiddleware.go b/integration/watermill/gtag/subscribermiddleware.go index 71fbc0f..c54b12b 100644 --- a/integration/watermill/gtag/subscribermiddleware.go +++ b/integration/watermill/gtag/subscribermiddleware.go @@ -4,7 +4,6 @@ import ( "net/http" "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -21,9 +20,9 @@ func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { - if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } + // if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + // } l = l.With( zap.String("event_name", gtag.GetDefault(payload.EventName, "-").String()), zap.String("event_user_id", gtag.GetDefault(payload.UserID, "-")), diff --git a/integration/watermill/mpv2/publishermiddleware.go b/integration/watermill/mpv2/publishermiddleware.go index 72f8de7..a1e842f 100644 --- a/integration/watermill/mpv2/publishermiddleware.go +++ b/integration/watermill/mpv2/publishermiddleware.go @@ -5,7 +5,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -13,9 +12,9 @@ import ( func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { return func(l *zap.Logger, msg *message.Message) error { if err := next(l, msg); err != nil { - if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } + // if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + // } l.With(zap.Error(err)).Warn("ignoring error") } return nil diff --git a/integration/watermill/mpv2/subscribermiddleware.go b/integration/watermill/mpv2/subscribermiddleware.go index 6f55947..592b4ab 100644 --- a/integration/watermill/mpv2/subscribermiddleware.go +++ b/integration/watermill/mpv2/subscribermiddleware.go @@ -8,7 +8,6 @@ import ( "github.com/foomo/sesamy-go/pkg/encoding/mpv2" "github.com/foomo/sesamy-go/pkg/session" "github.com/pkg/errors" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -81,9 +80,9 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { eventNames[i] = event.Name.String() } - if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - } + // if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + // } l = l.With( zap.String("event_names", strings.Join(eventNames, ",")), From 2770d4f32f50e2ab42a9e5463cc19bba8ab56810 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Mon, 18 Nov 2024 17:32:55 +0100 Subject: [PATCH 02/18] feat: transform document params --- go.mod | 2 +- .../watermill/gtag/publishermiddleware.go | 7 ++++--- .../watermill/gtag/subscribermiddleware.go | 7 ++++--- .../watermill/mpv2/publishermiddleware.go | 7 ++++--- .../watermill/mpv2/subscribermiddleware.go | 7 ++++--- pkg/encoding/gtag/decode_test.go | 2 +- pkg/encoding/gtagencode/mpv2.go | 9 +++++++++ pkg/encoding/mpv2encode/gtag.go | 16 +++++++++++----- 8 files changed, 38 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index f55d2c1..02c7d4e 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/pperaltaisern/watermillzap v1.0.0 github.com/prometheus/common v0.60.1 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -112,7 +113,6 @@ require ( go.opentelemetry.io/collector/pdata v1.12.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect - go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go4.org/netipx v0.0.0-20230125063823-8449b0a6169f // indirect diff --git a/integration/watermill/gtag/publishermiddleware.go b/integration/watermill/gtag/publishermiddleware.go index 2bd407d..4a64a48 100644 --- a/integration/watermill/gtag/publishermiddleware.go +++ b/integration/watermill/gtag/publishermiddleware.go @@ -2,15 +2,16 @@ package gtag import ( "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { return func(l *zap.Logger, msg *message.Message) error { if err := next(l, msg); err != nil { - // if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - // } + if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + } l.With(zap.Error(err)).Warn("ignoring error") } return nil diff --git a/integration/watermill/gtag/subscribermiddleware.go b/integration/watermill/gtag/subscribermiddleware.go index c54b12b..71fbc0f 100644 --- a/integration/watermill/gtag/subscribermiddleware.go +++ b/integration/watermill/gtag/subscribermiddleware.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -20,9 +21,9 @@ func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { - // if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - // } + if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + } l = l.With( zap.String("event_name", gtag.GetDefault(payload.EventName, "-").String()), zap.String("event_user_id", gtag.GetDefault(payload.UserID, "-")), diff --git a/integration/watermill/mpv2/publishermiddleware.go b/integration/watermill/mpv2/publishermiddleware.go index a1e842f..72f8de7 100644 --- a/integration/watermill/mpv2/publishermiddleware.go +++ b/integration/watermill/mpv2/publishermiddleware.go @@ -5,6 +5,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -12,9 +13,9 @@ import ( func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler { return func(l *zap.Logger, msg *message.Message) error { if err := next(l, msg); err != nil { - // if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - // } + if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + } l.With(zap.Error(err)).Warn("ignoring error") } return nil diff --git a/integration/watermill/mpv2/subscribermiddleware.go b/integration/watermill/mpv2/subscribermiddleware.go index 592b4ab..6f55947 100644 --- a/integration/watermill/mpv2/subscribermiddleware.go +++ b/integration/watermill/mpv2/subscribermiddleware.go @@ -8,6 +8,7 @@ import ( "github.com/foomo/sesamy-go/pkg/encoding/mpv2" "github.com/foomo/sesamy-go/pkg/session" "github.com/pkg/errors" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -80,9 +81,9 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { eventNames[i] = event.Name.String() } - // if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { - // l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) - // } + if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { + l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) + } l = l.With( zap.String("event_names", strings.Join(eventNames, ",")), diff --git a/pkg/encoding/gtag/decode_test.go b/pkg/encoding/gtag/decode_test.go index 33f93e0..0b060a7 100644 --- a/pkg/encoding/gtag/decode_test.go +++ b/pkg/encoding/gtag/decode_test.go @@ -40,7 +40,7 @@ func TestDecode(t *testing.T) { }, { name: "add_to_cart", - args: GTagAddToCart, + args: "v=2&tid=G-F9XM71K45T>m=45he45m0v9184715813z89184708445za200zb9184708445&_p=1716795486104&_dbg=1&gcd=13l3l3l2l1&npa=1&dma_cps=sypham&dma=1&cid=179294588.1715353601&ecid=788548699&ul=en-us&sr=2056x1329&_fplc=0&ur=&uaa=arm&uab=64&uafvl=Chromium%3B124.0.6367.201%7CGoogle%2520Chrome%3B124.0.6367.201%7CNot-A.Brand%3B99.0.0.0&uamb=0&uam=&uap=macOS&uapv=14.4.1&uaw=0&are=1&frm=0&pscdl=noapi&sst.gcd=13l3l3l2l1&sst.tft=1716795486104&sst.ude=0&_s=4&cu=USD&sid=1716793773&sct=14&seg=1&dl=https%3A%2F%2Fsesamy.local.bestbytes.net%2F%3Fgtm_debug%3D1716795486020&dr=https%3A%2F%2Ftagassistant.google.com%2F&dt=Home&en=add_to_cart&pr1=idSKU_12345~nmStan%20and%20Friends%20Tee~afGoogle%20Merchandise%20Store~cpSUMMER_FUN~ds2.22~lp0~brGoogle~caApparel~c2Adult~c3Shirts~c4Crew~c5Short%20sleeve~lirelated_products~lnRelated%20Products~vagreen~loChIJIQBpAG2ahYAR_6128GcTUEo~pr10.01~qt3&ep.enable_page_views=false&epn.value=30.03&_et=1255&tfd=145479&richsstsse", want: `{"consent":{"google_consent_default":"13l3l3l2l1"},"campaign":{},"ecommerce":{"currency":"USD","items":[{"affiliation":"Google Merchandise Store","coupon":"SUMMER_FUN","discount":"2.22","item_brand":"Google","item_category":"Apparel","item_category2":"Adult","item_category3":"Shirts","item_category4":"Crew","item_category5":"Short sleeve","item_id":"SKU_12345","item_list_id":"related_products","item_list_name":"Related Products","item_name":"Stan and Friends Tee","item_variant":"green","item_list_position":"0","location_id":"ChIJIQBpAG2ahYAR_6128GcTUEo","price":"10.01","quantity":"3"}]},"client_hints":{"screen_resolution":"2056x1329","user_language":"en-us","user_agent_architecture":"arm","user_agent_bitness":"64","user_agent_full_version_list":"Chromium;124.0.6367.201|Google%20Chrome;124.0.6367.201|Not-A.Brand;99.0.0.0","user_agent_mobile":"0","user_agent_model":"","user_agent_platform":"macOS","user_agent_platform_version":"14.4.1","user_agent_wow_64":"0","user_region":""},"protocol_version":"2","tracking_id":"G-F9XM71K45T","gtmhash_info":"45he45m0v9184715813z89184708445za200zb9184708445","client_id":"179294588.1715353601","richsstsse":"","document_location":"https://sesamy.local.bestbytes.net/?gtm_debug=1716795486020","document_title":"Home","document_referrer":"https://tagassistant.google.com/","is_debug":"1","event_name":"add_to_cart","event_parameter":{"enable_page_views":"false"},"event_parameter_number":{"value":"30.03"},"session_id":"1716793773","non_personalized_ads":"1","sst":{"tft":"1716795486104","gcd":"13l3l3l2l1","ude":"0"}}`, }, { diff --git a/pkg/encoding/gtagencode/mpv2.go b/pkg/encoding/gtagencode/mpv2.go index 06c5ceb..bdecc98 100644 --- a/pkg/encoding/gtagencode/mpv2.go +++ b/pkg/encoding/gtagencode/mpv2.go @@ -52,6 +52,15 @@ func MPv2(source gtag.Payload, target any) error { "name": source.EventName, } targetEventDataParams := map[string]any{} + if value, ok := sourceData["document_title"]; ok { + targetEventDataParams["page_title"] = value + } + if value, ok := sourceData["document_referrer"]; ok { + targetEventDataParams["page_referrer"] = value + } + if value, ok := sourceData["document_location"]; ok { + targetEventDataParams["page_location"] = value + } if node, ok := sourceData["ecommerce"].(map[string]any); ok { maps.Copy(targetEventDataParams, node) } diff --git a/pkg/encoding/mpv2encode/gtag.go b/pkg/encoding/mpv2encode/gtag.go index 3ca1fab..1a9ed96 100644 --- a/pkg/encoding/mpv2encode/gtag.go +++ b/pkg/encoding/mpv2encode/gtag.go @@ -48,17 +48,23 @@ func GTag[P any](source mpv2.Payload[P], target any) error { targetData["event_name"] = sourceData["name"] if params, ok := sourceData["params"].(map[string]any); ok { + targetData["document_title"] = params["page_title"] + delete(params, "page_title") + targetData["document_referrer"] = params["page_referrer"] + delete(params, "page_referrer") + targetData["document_location"] = params["page_location"] + delete(params, "page_location") targetData["currency"] = params["currency"] - targetData["promotion_id"] = params["promotion_id"] - targetData["promotion_name"] = params["promotion_name"] - targetData["location_id"] = params["location_id"] - targetData["is_conversion"] = params["is_conversion"] - targetData["items"] = params["items"] delete(params, "currency") + targetData["promotion_id"] = params["promotion_id"] delete(params, "promotion_id") + targetData["promotion_name"] = params["promotion_name"] delete(params, "promotion_name") + targetData["location_id"] = params["location_id"] delete(params, "location_id") + targetData["is_conversion"] = params["is_conversion"] delete(params, "is_conversion") + targetData["items"] = params["items"] delete(params, "items") { // user_property targetEventProperty := map[string]any{} From fe5b8c9f0f689e7a054c9d61a8bef1121f627007 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 08:22:57 +0100 Subject: [PATCH 03/18] fix: don't use tft timestamp --- integration/loki/loki.go | 9 ++++++++- pkg/encoding/gtag/sst.go | 2 +- pkg/encoding/gtagencode/mpv2.go | 3 --- pkg/event/params/pageview.go | 1 + 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/integration/loki/loki.go b/integration/loki/loki.go index 5ac23f6..bc28fc7 100644 --- a/integration/loki/loki.go +++ b/integration/loki/loki.go @@ -151,9 +151,16 @@ func (l *Loki) Write(payload mpv2.Payload[any]) { l.l.Warn("buffer size reached", zap.Int("size", l.bufferSize)) } + var timestamp time.Time + if payload.TimestampMicros > 0 { + timestamp = time.UnixMicro(payload.TimestampMicros) + } else { + timestamp = time.Now() + } + l.entries <- logproto.Entry{ Line: string(lineBytes), - Timestamp: time.UnixMicro(payload.TimestampMicros), + Timestamp: timestamp, StructuredMetadata: push.LabelsAdapter{ { Name: "event_name", diff --git a/pkg/encoding/gtag/sst.go b/pkg/encoding/gtag/sst.go index 587f175..4712a9b 100644 --- a/pkg/encoding/gtag/sst.go +++ b/pkg/encoding/gtag/sst.go @@ -13,7 +13,7 @@ type SST struct { GCSub *string `json:"gcsub,omitempty" gtag:"gcsub,omitempty"` // Example: DE UC *string `json:"uc,omitempty" gtag:"uc,omitempty"` - // Example: 1708250245344 + // Session start time, time first seen. Example: 1708250245344 TFT *string `json:"tft,omitempty" gtag:"tft,omitempty"` // Example: 13l3l3l3l1 GCD *string `json:"gcd,omitempty" gtag:"gcd,omitempty"` diff --git a/pkg/encoding/gtagencode/mpv2.go b/pkg/encoding/gtagencode/mpv2.go index bdecc98..cbe7f28 100644 --- a/pkg/encoding/gtagencode/mpv2.go +++ b/pkg/encoding/gtagencode/mpv2.go @@ -29,9 +29,6 @@ func MPv2(source gtag.Payload, target any) error { "non_personalized_ads": source.NonPersonalizedAds, "debug_mode": source.IsDebug, } - if source.SST != nil && source.SST.TFT != nil { - targetData["timestamp_micros"] = gtag.Get(source.SST.TFT) + "000" - } // combine user properties targetUserProperties := map[string]any{} diff --git a/pkg/event/params/pageview.go b/pkg/event/params/pageview.go index 89c71a5..0bbc1a5 100644 --- a/pkg/event/params/pageview.go +++ b/pkg/event/params/pageview.go @@ -3,5 +3,6 @@ package params // PageView https://developers.google.com/analytics/devguides/collection/ga4/views?client_type=gtag#manually_send_page_view_events type PageView struct { PageTitle string `json:"page_title,omitempty"` + PageReferrer string `json:"page_referrer,omitempty"` PageLocation string `json:"page_location,omitempty"` } From 615b57f387cd23980dc650b5458583049092869a Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 09:30:12 +0100 Subject: [PATCH 04/18] feat: add consent --- .../watermill/gtag/messagehandler_test.go | 2 +- pkg/encoding/gtag/consent.go | 39 +++++++++++++++++++ pkg/encoding/gtag/payload.go | 5 --- pkg/encoding/gtagencode/mpv2.go | 11 ++++++ pkg/encoding/mpv2/consent.go | 7 ++++ 5 files changed, 58 insertions(+), 6 deletions(-) diff --git a/integration/watermill/gtag/messagehandler_test.go b/integration/watermill/gtag/messagehandler_test.go index 5aa4085..43c00e6 100644 --- a/integration/watermill/gtag/messagehandler_test.go +++ b/integration/watermill/gtag/messagehandler_test.go @@ -99,7 +99,7 @@ func TestMPv2MessageHandler(t *testing.T) { var done atomic.Bool router.AddHandler("gtag", "in", pubSub, "out", pubSub, gtag.MPv2MessageHandler) router.AddNoPublisherHandler("mpv2", "out", pubSub, func(msg *message.Message) error { - expected := `{"client_id":"C123456","events":[{"name":"add_to_cart","params":{}}],"debug_mode":true}` + expected := `{"client_id":"C123456","consent":{"ad_user_data":"GRANTED","ad_personalization":"GRANTED","analytics_storage":"GRANTED"},"events":[{"name":"add_to_cart","params":{"page_location":"https://foomo.org","page_title":"Home"}}],"debug_mode":true}` if !assert.JSONEq(t, expected, string(msg.Payload)) { fmt.Println(string(msg.Payload)) } diff --git a/pkg/encoding/gtag/consent.go b/pkg/encoding/gtag/consent.go index c1ab4d8..7808b38 100644 --- a/pkg/encoding/gtag/consent.go +++ b/pkg/encoding/gtag/consent.go @@ -1,9 +1,11 @@ package gtag import ( + "slices" "strings" ) +// See https://developers.google.com/tag-platform/security/concepts/consent-mode type Consent struct { // Current Google Consent Status. Format 'G1'+'AdsStorageBoolStatus'`+'AnalyticsStorageBoolStatus' // Example: G101 @@ -17,6 +19,11 @@ type Consent struct { // Will be added with the value "1" if the Google Consent had a default value before getting an update // Example: G111 GoogleConsentDefault *string `json:"google_consent_default,omitempty" gtag:"gcd,omitempty"` + // Example: 1 + // DigitalMarketAct *string `json:"digital_market_act,omitempty" gtag:"dma,omitempty"` + // Example: sypham + // DigitalMarketActParameters *string `json:"digital_market_act_parameters,omitempty" gtag:"dma_cps,omitempty"` + // Example: noapi | denied } // ------------------------------------------------------------------------------------------------ @@ -24,6 +31,12 @@ type Consent struct { // ------------------------------------------------------------------------------------------------ func (c Consent) AdStorage() bool { + if c.GoogleConsentDefault != nil { + gcd := strings.Split(*c.GoogleConsentDefault, "") + if len(gcd) > 3 { + return slices.Contains([]string{"l", "t", "r", "n", "u", "v"}, gcd[2]) + } + } if c.GoogleConsentUpdate != nil { gcs := *c.GoogleConsentUpdate if strings.HasPrefix(gcs, "G1") && len(gcs) == 4 { @@ -35,6 +48,12 @@ func (c Consent) AdStorage() bool { } func (c Consent) AnalyticsStorage() bool { + if c.GoogleConsentDefault != nil { + gcd := strings.Split(*c.GoogleConsentDefault, "") + if len(gcd) > 5 { + return slices.Contains([]string{"l", "t", "r", "n", "u", "v"}, gcd[4]) + } + } if c.GoogleConsentUpdate != nil { gcs := *c.GoogleConsentUpdate if strings.HasPrefix(gcs, "G1") && len(gcs) == 4 { @@ -44,3 +63,23 @@ func (c Consent) AnalyticsStorage() bool { } return true } + +func (c Consent) AdUserData() bool { + if c.GoogleConsentDefault != nil { + gcd := strings.Split(*c.GoogleConsentDefault, "") + if len(gcd) > 7 { + return slices.Contains([]string{"l", "t", "r", "n", "u", "v"}, gcd[6]) + } + } + return c.AdStorage() +} + +func (c Consent) AdPersonalization() bool { + if c.GoogleConsentDefault != nil { + gcd := strings.Split(*c.GoogleConsentDefault, "") + if len(gcd) > 9 { + return slices.Contains([]string{"l", "t", "r", "n", "u", "v"}, gcd[8]) + } + } + return c.AdStorage() +} diff --git a/pkg/encoding/gtag/payload.go b/pkg/encoding/gtag/payload.go index bdaffdd..6438ebd 100644 --- a/pkg/encoding/gtag/payload.go +++ b/pkg/encoding/gtag/payload.go @@ -151,11 +151,6 @@ type Payload struct { NonPersonalizedAds *string `json:"non_personalized_ads,omitempty" gtag:"npa,omitempty"` // Example: 1 // ARE *string `json:"are,omitempty" gtag:"are,omitempty"` - // Example: 1 - // DigitalMarketAct *string `json:"digital_market_act,omitempty" gtag:"dma,omitempty"` - // Example: sypham - // DigitalMarketActParameters *string `json:"digital_market_act_parameters,omitempty" gtag:"dma_cps,omitempty"` - // Example: noapi | denied // PrivacySandboxCookieDeprecationLabel *string `json:"privacy_sandbox_cookie_deprecation_label,omitempty" gtag:"pscdl,omitempty"` // A timestamp measuring the difference between the moment this parameter gets populated and the moment the navigation started on that particular page. // TFD *string `json:"tfd,omitempty" gtag:"tfd,omitempty"` diff --git a/pkg/encoding/gtagencode/mpv2.go b/pkg/encoding/gtagencode/mpv2.go index cbe7f28..547b1da 100644 --- a/pkg/encoding/gtagencode/mpv2.go +++ b/pkg/encoding/gtagencode/mpv2.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -26,10 +27,20 @@ func MPv2(source gtag.Payload, target any) error { targetData := map[string]any{ "client_id": source.ClientID, "user_id": source.UserID, + "session_id": source.SessionID, "non_personalized_ads": source.NonPersonalizedAds, "debug_mode": source.IsDebug, } + // consent + targetConsentData := map[string]any{ + "add_storage": mpv2.ConsentText(source.AdStorage()), + "ad_user_data": mpv2.ConsentText(source.AdUserData()), + "ad_personalization": mpv2.ConsentText(source.AdPersonalization()), + "analytics_storage": mpv2.ConsentText(source.AnalyticsStorage()), + } + targetData["consent"] = targetConsentData + // combine user properties targetUserProperties := map[string]any{} if node, ok := sourceData["user_property"].(map[string]string); ok { diff --git a/pkg/encoding/mpv2/consent.go b/pkg/encoding/mpv2/consent.go index 2a78551..a770e01 100644 --- a/pkg/encoding/mpv2/consent.go +++ b/pkg/encoding/mpv2/consent.go @@ -6,3 +6,10 @@ const ( ConsentDenied Consent = "DENIED" ConsentGranted Consent = "GRANTED" ) + +func ConsentText(v bool) Consent { + if v { + return ConsentGranted + } + return ConsentDenied +} From a58dfafaef2825ea40e13d33fdc3ca006d95da16 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 13:48:00 +0100 Subject: [PATCH 05/18] feat: add collect --- integration/loki/middleware.go | 36 ++++ integration/watermill/gtag/subscriber.go | 64 +----- integration/watermill/mpv2/subscriber.go | 39 +--- pkg/collect/collect.go | 187 ++++++++++++++++++ pkg/http/eventhandler.go | 9 + pkg/http/gtag/handler.go | 61 ++++++ .../http/gtag/middleware.go | 19 +- pkg/http/mpv2/handler.go | 37 ++++ .../http/mpv2/middleware.go | 45 +++-- 9 files changed, 383 insertions(+), 114 deletions(-) create mode 100644 integration/loki/middleware.go create mode 100644 pkg/collect/collect.go create mode 100644 pkg/http/eventhandler.go create mode 100644 pkg/http/gtag/handler.go rename integration/watermill/gtag/subscribermiddleware.go => pkg/http/gtag/middleware.go (57%) create mode 100644 pkg/http/mpv2/handler.go rename integration/watermill/mpv2/subscribermiddleware.go => pkg/http/mpv2/middleware.go (52%) diff --git a/integration/loki/middleware.go b/integration/loki/middleware.go new file mode 100644 index 0000000..5543637 --- /dev/null +++ b/integration/loki/middleware.go @@ -0,0 +1,36 @@ +package loki + +import ( + "net/http" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func GTagMiddleware(loki *Loki) gtaghttp.Middleware { + return func(next gtaghttp.MiddlewareHandler) gtaghttp.MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { + // encode to mpv2 + var mpv2Payload mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + loki.Write(mpv2Payload) + return nil + } + } +} + +func MPv2Middleware(loki *Loki) mpv2http.Middleware { + return func(next mpv2http.MiddlewareHandler) mpv2http.MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + loki.Write(*payload) + return nil + } + } +} diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 5f6314f..9bf1b75 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -3,15 +3,13 @@ package gtag import ( "context" "encoding/json" - "fmt" - "io" "net/http" - "net/url" "strings" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/gtag" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -22,12 +20,10 @@ type ( uuidFunc func() string messages chan *message.Message messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []SubscriberMiddleware + middlewares []gtaghttp.Middleware closed bool } - SubscriberOption func(*Subscriber) - SubscriberHandler func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error - SubscriberMiddleware func(next SubscriberHandler) SubscriberHandler + SubscriberOption func(*Subscriber) ) // ------------------------------------------------------------------------------------------------ @@ -46,7 +42,7 @@ func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *messa } } -func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { +func SubscriberWithMiddlewares(v ...gtaghttp.Middleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) } @@ -69,52 +65,8 @@ func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { } func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var values url.Values - - switch r.Method { - case http.MethodGet: - values = r.URL.Query() - case http.MethodPost: - values = r.URL.Query() - - // read request body - out, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, fmt.Sprintf("failed to read body: %s", err.Error()), http.StatusInternalServerError) - return - } - defer r.Body.Close() - - // append request body to query - if len(out) > 0 { - v, err := url.ParseQuery(string(out)) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse extended url: %s", err.Error()), http.StatusInternalServerError) - return - } - for s2, i := range v { - values.Set(s2, i[0]) - } - } else { - values = r.URL.Query() - } - default: - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - - // unmarshal event - var payload *gtag.Payload - if err := gtag.Decode(values, &payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // validate - if payload.EventName == nil || payload.EventName.String() == "" { - http.Error(w, "missing event name", http.StatusBadRequest) - return - } + // retrieve payload + payload := gtaghttp.Handler(w, r) // compose middlewares next := s.handle @@ -123,13 +75,13 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // run handler - if err := next(s.l, r, payload); err != nil { + if err := next(s.l, w, r, payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } -func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { // marshal message payload data, err := json.Marshal(payload) if err != nil { diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index e9b0a8a..2d8bade 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -9,6 +9,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -19,12 +20,10 @@ type ( uuidFunc func() string messages chan *message.Message messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []SubscriberMiddleware + middlewares []mpv2http.Middleware closed bool } - SubscriberOption func(*Subscriber) - SubscriberHandler func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error - SubscriberMiddleware func(next SubscriberHandler) SubscriberHandler + SubscriberOption func(*Subscriber) ) // ------------------------------------------------------------------------------------------------ @@ -43,7 +42,7 @@ func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *messa } } -func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { +func SubscriberWithMiddlewares(v ...mpv2http.Middleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) } @@ -70,30 +69,8 @@ func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { // ------------------------------------------------------------------------------------------------ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - - // read request body - var payload *mpv2.Payload[any] - err := json.NewDecoder(r.Body).Decode(&payload) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // validate required fields - if len(payload.Events) == 0 { - http.Error(w, "missing events", http.StatusBadRequest) - return - } - for _, event := range payload.Events { - if event.Name == "" { - http.Error(w, "missing event name", http.StatusBadRequest) - return - } - } + // retrieve payload + payload := mpv2http.Handler(w, r) // compose middlewares next := s.handle @@ -102,13 +79,13 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // run handler - if err := next(s.l, r, payload); err != nil { + if err := next(s.l, w, r, payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } -func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { // marshal message payload jsonPayload, err := json.Marshal(payload) if err != nil { diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go new file mode 100644 index 0000000..295e50c --- /dev/null +++ b/pkg/collect/collect.go @@ -0,0 +1,187 @@ +package collect + +import ( + "net/http" + "net/http/httputil" + "net/url" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2encode" + sesamyhttp "github.com/foomo/sesamy-go/pkg/http" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" + "github.com/foomo/sesamy-go/pkg/sesamy" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + Collect struct { + l *zap.Logger + gtagProxy *httputil.ReverseProxy + mpv2Proxy *httputil.ReverseProxy + gtagMiddlewares []gtaghttp.Middleware + mpv2Middlewares []mpv2http.Middleware + eventHandlers []sesamyhttp.EventHandler + } + Option func(*Collect) error +) + +// ------------------------------------------------------------------------------------------------ +// ~ Options +// ------------------------------------------------------------------------------------------------ + +func WithGTag(endpoint string) Option { + return func(c *Collect) error { + target, err := url.Parse(endpoint) + if err != nil { + return err + } + proxy := httputil.NewSingleHostReverseProxy(target) + c.gtagProxy = proxy + return nil + } +} + +func WithMPv2(endpoint string) Option { + return func(c *Collect) error { + target, err := url.Parse(endpoint) + if err != nil { + return err + } + proxy := httputil.NewSingleHostReverseProxy(target) + c.mpv2Proxy = proxy + return nil + } +} + +func WithGTagMiddlewares(v ...gtaghttp.Middleware) Option { + return func(c *Collect) error { + c.gtagMiddlewares = append(c.gtagMiddlewares, v...) + return nil + } +} + +func WithMPv2Middlewares(v ...mpv2http.Middleware) Option { + return func(c *Collect) error { + c.mpv2Middlewares = append(c.mpv2Middlewares, v...) + return nil + } +} + +func WithEventHandlers(v ...sesamyhttp.EventHandler) Option { + return func(c *Collect) error { + c.eventHandlers = append(c.eventHandlers, v...) + return nil + } +} + +// ------------------------------------------------------------------------------------------------ +// ~ Constructor +// ------------------------------------------------------------------------------------------------ + +func New(l *zap.Logger, opts ...Option) (*Collect, error) { + inst := &Collect{ + l: l, + } + + for _, opt := range opts { + if opt != nil { + if err := opt(inst); err != nil { + return nil, err + } + } + } + + return inst, nil +} + +// ------------------------------------------------------------------------------------------------ +// ~ Public methods +// ------------------------------------------------------------------------------------------------ + +func (c *Collect) GTagHTTPHandler(w http.ResponseWriter, r *http.Request) { + // retrieve payload + payload := gtaghttp.Handler(w, r) + + // compose middlewares + next := c.gtagHandler + for _, middleware := range c.gtagMiddlewares { + next = middleware(next) + } + + // run handler + if err := next(c.l, w, r, payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { + // retrieve payload + payload := mpv2http.Handler(w, r) + + // compose middlewares + next := c.mpv2Handler + for _, middleware := range c.mpv2Middlewares { + next = middleware(next) + } + + // run handler + if err := next(c.l, w, r, payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ------------------------------------------------------------------------------------------------ +// ~ Private methods +// ------------------------------------------------------------------------------------------------ + +func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { + var mpv2Payload *mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + + for i, event := range mpv2Payload.Events { + if err := c.mpv2EventHandler(r, &event); err != nil { + return err + } + mpv2Payload.Events[i] = event + } + + if err := mpv2encode.GTag[any](*mpv2Payload, &payload); err != nil { + return errors.Wrap(err, "failed to encode mpv2 to gtag") + } + + if c.gtagProxy == nil { + c.gtagProxy.ServeHTTP(w, r) + } + return nil +} + +func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + for i, event := range payload.Events { + if err := c.mpv2EventHandler(r, &event); err != nil { + return err + } + payload.Events[i] = event + } + + if c.mpv2Proxy == nil { + c.mpv2Proxy.ServeHTTP(w, r) + } + return nil +} + +func (c *Collect) mpv2EventHandler(r *http.Request, event *sesamy.Event[any]) error { + for _, handler := range c.eventHandlers { + if err := handler(r, event); err != nil { + return err + } + } + return nil +} diff --git a/pkg/http/eventhandler.go b/pkg/http/eventhandler.go new file mode 100644 index 0000000..48a5ce1 --- /dev/null +++ b/pkg/http/eventhandler.go @@ -0,0 +1,9 @@ +package http + +import ( + "net/http" + + "github.com/foomo/sesamy-go/pkg/sesamy" +) + +type EventHandler func(r *http.Request, event *sesamy.Event[any]) error diff --git a/pkg/http/gtag/handler.go b/pkg/http/gtag/handler.go new file mode 100644 index 0000000..bd214f8 --- /dev/null +++ b/pkg/http/gtag/handler.go @@ -0,0 +1,61 @@ +package gtag + +import ( + "fmt" + "io" + "net/http" + "net/url" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" +) + +func Handler(w http.ResponseWriter, r *http.Request) *gtag.Payload { + var values url.Values + + switch r.Method { + case http.MethodGet: + values = r.URL.Query() + case http.MethodPost: + values = r.URL.Query() + + // read request body + out, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, fmt.Sprintf("failed to read body: %s", err.Error()), http.StatusInternalServerError) + return nil + } + defer r.Body.Close() + + // append request body to query + if len(out) > 0 { + v, err := url.ParseQuery(string(out)) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse extended url: %s", err.Error()), http.StatusInternalServerError) + return nil + } + for s2, i := range v { + values.Set(s2, i[0]) + } + } else { + values = r.URL.Query() + } + default: + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return nil + } + + // unmarshal event + var payload *gtag.Payload + if err := gtag.Decode(values, &payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return nil + } + + // validate + if payload.EventName == nil || payload.EventName.String() == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return nil + } + + return payload +} diff --git a/integration/watermill/gtag/subscribermiddleware.go b/pkg/http/gtag/middleware.go similarity index 57% rename from integration/watermill/gtag/subscribermiddleware.go rename to pkg/http/gtag/middleware.go index 71fbc0f..50e0fbd 100644 --- a/integration/watermill/gtag/subscribermiddleware.go +++ b/pkg/http/gtag/middleware.go @@ -8,19 +8,24 @@ import ( "go.uber.org/zap" ) -func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +type ( + Middleware func(next MiddlewareHandler) MiddlewareHandler + MiddlewareHandler func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error +) + +func MiddlewareUserID(cookieName string) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { if cookie, err := r.Cookie(cookieName); err == nil { payload.UserID = gtag.Set(cookie.Value) } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +func MiddlewareLogger(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) } @@ -29,7 +34,7 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { zap.String("event_user_id", gtag.GetDefault(payload.UserID, "-")), zap.String("event_session_id", gtag.GetDefault(payload.SessionID, "-")), ) - err := next(l, r, payload) + err := next(l, w, r, payload) if err != nil { l.Error("handled event", zap.Error(err)) } else { diff --git a/pkg/http/mpv2/handler.go b/pkg/http/mpv2/handler.go new file mode 100644 index 0000000..2e59827 --- /dev/null +++ b/pkg/http/mpv2/handler.go @@ -0,0 +1,37 @@ +package mpv2 + +import ( + "encoding/json" + "net/http" + + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" +) + +func Handler(w http.ResponseWriter, r *http.Request) *mpv2.Payload[any] { + if r.Method != http.MethodPost { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return nil + } + + // read request body + var payload *mpv2.Payload[any] + err := json.NewDecoder(r.Body).Decode(&payload) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return nil + } + + // validate required fields + if len(payload.Events) == 0 { + http.Error(w, "missing events", http.StatusBadRequest) + return nil + } + for _, event := range payload.Events { + if event.Name == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return nil + } + } + + return payload +} diff --git a/integration/watermill/mpv2/subscribermiddleware.go b/pkg/http/mpv2/middleware.go similarity index 52% rename from integration/watermill/mpv2/subscribermiddleware.go rename to pkg/http/mpv2/middleware.go index 6f55947..87ce917 100644 --- a/integration/watermill/mpv2/subscribermiddleware.go +++ b/pkg/http/mpv2/middleware.go @@ -12,10 +12,15 @@ import ( "go.uber.org/zap" ) -func SubscriberMiddlewareSessionID(measurementID string) SubscriberMiddleware { +type ( + MiddlewareHandler func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error + Middleware func(next MiddlewareHandler) MiddlewareHandler +) + +func MiddlewareSessionID(measurementID string) Middleware { measurementID = strings.Split(measurementID, "-")[1] - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.SessionID == "" { value, err := session.ParseGASessionID(r, measurementID) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -23,13 +28,13 @@ func SubscriberMiddlewareSessionID(measurementID string) SubscriberMiddleware { } payload.SessionID = value } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareClientID(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareClientID(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.ClientID == "" { value, err := session.ParseGAClientID(r) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -37,22 +42,22 @@ func SubscriberMiddlewareClientID(next SubscriberHandler) SubscriberHandler { } payload.ClientID = value } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareDebugMode(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareDebugMode(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if !payload.DebugMode && session.IsGTMDebug(r) { payload.DebugMode = true } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareUserID(cookieName string) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.UserID == "" { value, err := r.Cookie(cookieName) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -60,22 +65,22 @@ func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { } payload.UserID = value.Value } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareTimestamp(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareTimestamp(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.TimestampMicros == 0 { payload.TimestampMicros = time.Now().UnixMicro() } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareLogger(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { eventNames := make([]string, len(payload.Events)) for i, event := range payload.Events { eventNames[i] = event.Name.String() @@ -90,7 +95,7 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { zap.String("event_user_id", payload.UserID), ) - err := next(l, r, payload) + err := next(l, w, r, payload) if err != nil { l.Error("handled event", zap.Error(err)) } else { From 3f644f8c526b53bfc642bdf83986903901b01ac5 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 15:41:56 +0100 Subject: [PATCH 06/18] refactor: add event handler middleware --- pkg/collect/collect.go | 90 +++++++------------------------------ pkg/http/gtag/middleware.go | 28 ++++++++++++ pkg/http/mpv2/middleware.go | 15 +++++++ 3 files changed, 59 insertions(+), 74 deletions(-) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index 295e50c..bda02a4 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -6,25 +6,18 @@ import ( "net/url" "github.com/foomo/sesamy-go/pkg/encoding/gtag" - "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" - "github.com/foomo/sesamy-go/pkg/encoding/mpv2encode" - sesamyhttp "github.com/foomo/sesamy-go/pkg/http" gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" - "github.com/foomo/sesamy-go/pkg/sesamy" - "github.com/pkg/errors" "go.uber.org/zap" ) type ( Collect struct { - l *zap.Logger - gtagProxy *httputil.ReverseProxy - mpv2Proxy *httputil.ReverseProxy - gtagMiddlewares []gtaghttp.Middleware - mpv2Middlewares []mpv2http.Middleware - eventHandlers []sesamyhttp.EventHandler + l *zap.Logger + taggingProxy *httputil.ReverseProxy + gtagHTTPMiddlewares []gtaghttp.Middleware + mpv2HTTPMiddlewares []mpv2http.Middleware } Option func(*Collect) error ) @@ -33,47 +26,28 @@ type ( // ~ Options // ------------------------------------------------------------------------------------------------ -func WithGTag(endpoint string) Option { +func WithTagging(endpoint string) Option { return func(c *Collect) error { target, err := url.Parse(endpoint) if err != nil { return err } proxy := httputil.NewSingleHostReverseProxy(target) - c.gtagProxy = proxy + c.taggingProxy = proxy return nil } } -func WithMPv2(endpoint string) Option { +func WithGTagHTTPMiddlewares(v ...gtaghttp.Middleware) Option { return func(c *Collect) error { - target, err := url.Parse(endpoint) - if err != nil { - return err - } - proxy := httputil.NewSingleHostReverseProxy(target) - c.mpv2Proxy = proxy + c.gtagHTTPMiddlewares = append(c.gtagHTTPMiddlewares, v...) return nil } } -func WithGTagMiddlewares(v ...gtaghttp.Middleware) Option { +func WithMPv2HTTPMiddlewares(v ...mpv2http.Middleware) Option { return func(c *Collect) error { - c.gtagMiddlewares = append(c.gtagMiddlewares, v...) - return nil - } -} - -func WithMPv2Middlewares(v ...mpv2http.Middleware) Option { - return func(c *Collect) error { - c.mpv2Middlewares = append(c.mpv2Middlewares, v...) - return nil - } -} - -func WithEventHandlers(v ...sesamyhttp.EventHandler) Option { - return func(c *Collect) error { - c.eventHandlers = append(c.eventHandlers, v...) + c.mpv2HTTPMiddlewares = append(c.mpv2HTTPMiddlewares, v...) return nil } } @@ -108,7 +82,7 @@ func (c *Collect) GTagHTTPHandler(w http.ResponseWriter, r *http.Request) { // compose middlewares next := c.gtagHandler - for _, middleware := range c.gtagMiddlewares { + for _, middleware := range c.gtagHTTPMiddlewares { next = middleware(next) } @@ -125,7 +99,7 @@ func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { // compose middlewares next := c.mpv2Handler - for _, middleware := range c.mpv2Middlewares { + for _, middleware := range c.mpv2HTTPMiddlewares { next = middleware(next) } @@ -141,47 +115,15 @@ func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { // ------------------------------------------------------------------------------------------------ func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { - var mpv2Payload *mpv2.Payload[any] - if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { - return errors.Wrap(err, "failed to encode gtag to mpv2") - } - - for i, event := range mpv2Payload.Events { - if err := c.mpv2EventHandler(r, &event); err != nil { - return err - } - mpv2Payload.Events[i] = event - } - - if err := mpv2encode.GTag[any](*mpv2Payload, &payload); err != nil { - return errors.Wrap(err, "failed to encode mpv2 to gtag") - } - - if c.gtagProxy == nil { - c.gtagProxy.ServeHTTP(w, r) + if c.taggingProxy == nil { + c.taggingProxy.ServeHTTP(w, r) } return nil } func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { - for i, event := range payload.Events { - if err := c.mpv2EventHandler(r, &event); err != nil { - return err - } - payload.Events[i] = event - } - - if c.mpv2Proxy == nil { - c.mpv2Proxy.ServeHTTP(w, r) - } - return nil -} - -func (c *Collect) mpv2EventHandler(r *http.Request, event *sesamy.Event[any]) error { - for _, handler := range c.eventHandlers { - if err := handler(r, event); err != nil { - return err - } + if c.taggingProxy == nil { + c.taggingProxy.ServeHTTP(w, r) } return nil } diff --git a/pkg/http/gtag/middleware.go b/pkg/http/gtag/middleware.go index 50e0fbd..b6bfa9a 100644 --- a/pkg/http/gtag/middleware.go +++ b/pkg/http/gtag/middleware.go @@ -4,6 +4,11 @@ import ( "net/http" "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2encode" + sesamyhttp "github.com/foomo/sesamy-go/pkg/http" + "github.com/pkg/errors" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -13,6 +18,29 @@ type ( MiddlewareHandler func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error ) +func MiddlewareEventHandler(h sesamyhttp.EventHandler) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { + var mpv2Payload *mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + + for i, event := range mpv2Payload.Events { + if err := h(r, &event); err != nil { + return err + } + mpv2Payload.Events[i] = event + } + + if err := mpv2encode.GTag[any](*mpv2Payload, &payload); err != nil { + return errors.Wrap(err, "failed to encode mpv2 to gtag") + } + return next(l, w, r, payload) + } + } +} + func MiddlewareUserID(cookieName string) Middleware { return func(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { diff --git a/pkg/http/mpv2/middleware.go b/pkg/http/mpv2/middleware.go index 87ce917..0172342 100644 --- a/pkg/http/mpv2/middleware.go +++ b/pkg/http/mpv2/middleware.go @@ -6,6 +6,7 @@ import ( "time" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + sesamyhttp "github.com/foomo/sesamy-go/pkg/http" "github.com/foomo/sesamy-go/pkg/session" "github.com/pkg/errors" "go.opentelemetry.io/otel/trace" @@ -17,6 +18,20 @@ type ( Middleware func(next MiddlewareHandler) MiddlewareHandler ) +func MiddlewareEventHandler(h sesamyhttp.EventHandler) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + for i, event := range payload.Events { + if err := h(r, &event); err != nil { + return err + } + payload.Events[i] = event + } + return next(l, w, r, payload) + } + } +} + func MiddlewareSessionID(measurementID string) Middleware { measurementID = strings.Split(measurementID, "-")[1] return func(next MiddlewareHandler) MiddlewareHandler { From deadd984adeac3630d8691b4b31c4cf2dea6af53 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 16:07:58 +0100 Subject: [PATCH 07/18] fix: nil check --- pkg/collect/collect.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index bda02a4..3e0caa1 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -115,14 +115,14 @@ func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { // ------------------------------------------------------------------------------------------------ func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { - if c.taggingProxy == nil { + if c.taggingProxy != nil { c.taggingProxy.ServeHTTP(w, r) } return nil } func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { - if c.taggingProxy == nil { + if c.taggingProxy != nil { c.taggingProxy.ServeHTTP(w, r) } return nil From 84f5a2f76bfe2fd25758a02df2d8128e70a315bd Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 16:17:42 +0100 Subject: [PATCH 08/18] wip: debug --- pkg/collect/collect.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index 3e0caa1..c1e30c7 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -32,7 +32,9 @@ func WithTagging(endpoint string) Option { if err != nil { return err } + c.l.Info("--->" + endpoint) proxy := httputil.NewSingleHostReverseProxy(target) + proxy.ErrorLog = zap.NewStdLog(c.l) c.taggingProxy = proxy return nil } From 9af3ced441be373ecfed7b85695711b7e0887652 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 16:59:59 +0100 Subject: [PATCH 09/18] wip: use new request --- pkg/collect/collect.go | 100 ++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 20 deletions(-) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index c1e30c7..de29af1 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -1,23 +1,29 @@ package collect import ( + "bytes" + "encoding/json" + "fmt" + "io" "net/http" - "net/http/httputil" - "net/url" + "strings" "github.com/foomo/sesamy-go/pkg/encoding/gtag" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" + "github.com/pkg/errors" "go.uber.org/zap" ) type ( Collect struct { - l *zap.Logger - taggingProxy *httputil.ReverseProxy - gtagHTTPMiddlewares []gtaghttp.Middleware - mpv2HTTPMiddlewares []mpv2http.Middleware + l *zap.Logger + taggingURL string + taggingClient *http.Client + taggingMaxResponseCode int + gtagHTTPMiddlewares []gtaghttp.Middleware + mpv2HTTPMiddlewares []mpv2http.Middleware } Option func(*Collect) error ) @@ -26,16 +32,16 @@ type ( // ~ Options // ------------------------------------------------------------------------------------------------ -func WithTagging(endpoint string) Option { +func WithTagging(v string) Option { return func(c *Collect) error { - target, err := url.Parse(endpoint) - if err != nil { - return err - } - c.l.Info("--->" + endpoint) - proxy := httputil.NewSingleHostReverseProxy(target) - proxy.ErrorLog = zap.NewStdLog(c.l) - c.taggingProxy = proxy + c.taggingURL = v + return nil + } +} + +func WithTaggingClient(v *http.Client) Option { + return func(c *Collect) error { + c.taggingClient = v return nil } } @@ -60,7 +66,9 @@ func WithMPv2HTTPMiddlewares(v ...mpv2http.Middleware) Option { func New(l *zap.Logger, opts ...Option) (*Collect, error) { inst := &Collect{ - l: l, + l: l, + taggingClient: http.DefaultClient, + taggingMaxResponseCode: http.StatusBadRequest, } for _, opt := range opts { @@ -117,15 +125,67 @@ func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { // ------------------------------------------------------------------------------------------------ func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { - if c.taggingProxy != nil { - c.taggingProxy.ServeHTTP(w, r) + values, body, err := gtag.Encode(payload) + if err != nil { + return err } + + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, fmt.Sprintf("%s%s?%s", c.taggingURL, "/g/collect", gtag.EncodeValues(values)), body) + if err != nil { + return errors.Wrap(err, "failed to create request") + } + + // copy headers + req.Header = r.Header.Clone() + + resp, err := c.taggingClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // copy headers + for name, values := range resp.Header { + r.Header.Add(name, strings.Join(values, ",")) + } + + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return nil } func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { - if c.taggingProxy != nil { - c.taggingProxy.ServeHTTP(w, r) + body, err := json.Marshal(payload) + if err != nil { + return err } + + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, fmt.Sprintf("%s%s", c.taggingURL, "/mp/collect"), bytes.NewReader(body)) + if err != nil { + return errors.Wrap(err, "failed to create request") + } + + // copy headers + req.Header = r.Header.Clone() + // copy raw query + req.URL.RawQuery = r.URL.RawQuery + + resp, err := c.taggingClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + // copy headers + for name, values := range resp.Header { + r.Header.Add(name, strings.Join(values, ",")) + } + + if _, err := io.Copy(w, resp.Body); err != nil { + return err + } + return nil } From 0c1db3cad2ea796937ca2f177deaeabdfd1796ca Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 17:05:26 +0100 Subject: [PATCH 10/18] wip: use new request --- pkg/collect/collect.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index de29af1..99e08c0 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "strings" "github.com/foomo/sesamy-go/pkg/encoding/gtag" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" @@ -145,9 +144,7 @@ func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Requ defer resp.Body.Close() // copy headers - for name, values := range resp.Header { - r.Header.Add(name, strings.Join(values, ",")) - } + r.Header = resp.Header.Clone() if _, err := io.Copy(w, resp.Body); err != nil { return err @@ -179,9 +176,7 @@ func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Requ defer resp.Body.Close() // copy headers - for name, values := range resp.Header { - r.Header.Add(name, strings.Join(values, ",")) - } + r.Header = resp.Header.Clone() if _, err := io.Copy(w, resp.Body); err != nil { return err From 9cca227cd372e0904383432ee924febf1918118d Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 17:08:28 +0100 Subject: [PATCH 11/18] fix: loki next --- integration/loki/middleware.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/integration/loki/middleware.go b/integration/loki/middleware.go index 5543637..dbd88cc 100644 --- a/integration/loki/middleware.go +++ b/integration/loki/middleware.go @@ -15,13 +15,16 @@ import ( func GTagMiddleware(loki *Loki) gtaghttp.Middleware { return func(next gtaghttp.MiddlewareHandler) gtaghttp.MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { - // encode to mpv2 - var mpv2Payload mpv2.Payload[any] - if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { - return errors.Wrap(err, "failed to encode gtag to mpv2") + err := next(l, w, r, payload) + if err != nil { + // encode to mpv2 + var mpv2Payload mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + loki.Write(mpv2Payload) } - loki.Write(mpv2Payload) - return nil + return err } } } @@ -29,8 +32,11 @@ func GTagMiddleware(loki *Loki) gtaghttp.Middleware { func MPv2Middleware(loki *Loki) mpv2http.Middleware { return func(next mpv2http.MiddlewareHandler) mpv2http.MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { - loki.Write(*payload) - return nil + err := next(l, w, r, payload) + if err != nil { + loki.Write(*payload) + } + return err } } } From 757a4f63e5ff68a358df99c6b86b94bc1d8d32a8 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 12:36:05 +0100 Subject: [PATCH 12/18] feat: add middlewares --- pkg/collect/collect.go | 16 ++++++------ pkg/http/mpv2/middleware.go | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go index 99e08c0..8746184 100644 --- a/pkg/collect/collect.go +++ b/pkg/collect/collect.go @@ -17,12 +17,11 @@ import ( type ( Collect struct { - l *zap.Logger - taggingURL string - taggingClient *http.Client - taggingMaxResponseCode int - gtagHTTPMiddlewares []gtaghttp.Middleware - mpv2HTTPMiddlewares []mpv2http.Middleware + l *zap.Logger + taggingURL string + taggingClient *http.Client + gtagHTTPMiddlewares []gtaghttp.Middleware + mpv2HTTPMiddlewares []mpv2http.Middleware } Option func(*Collect) error ) @@ -65,9 +64,8 @@ func WithMPv2HTTPMiddlewares(v ...mpv2http.Middleware) Option { func New(l *zap.Logger, opts ...Option) (*Collect, error) { inst := &Collect{ - l: l, - taggingClient: http.DefaultClient, - taggingMaxResponseCode: http.StatusBadRequest, + l: l, + taggingClient: http.DefaultClient, } for _, opt := range opts { diff --git a/pkg/http/mpv2/middleware.go b/pkg/http/mpv2/middleware.go index 0172342..4251f90 100644 --- a/pkg/http/mpv2/middleware.go +++ b/pkg/http/mpv2/middleware.go @@ -94,6 +94,55 @@ func MiddlewareTimestamp(next MiddlewareHandler) MiddlewareHandler { } } +func MiddlewareUserAgentEventParam(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + if userAgent := r.Header.Get("User-Agent"); userAgent != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["user_agent"] = userAgent + } + payload.Events[i] = event + } + } + return next(l, w, r, payload) + } +} + +func MiddlewareIPOverrideEventParam(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + var ipOverride string + for _, key := range []string{"X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { + if value := r.Header.Get(key); value != "" { + ipOverride = value + break + } + } + if ipOverride != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["ip_override"] = ipOverride + } + payload.Events[i] = event + } + } + return next(l, w, r, payload) + } +} + +func MiddlewarePageLocationEventParam(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + if referrer := r.Header.Get("Referer"); referrer != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["page_location"] = referrer + } + payload.Events[i] = event + } + } + return next(l, w, r, payload) + } +} + func MiddlewareLogger(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { eventNames := make([]string, len(payload.Events)) From 28573f3798b613f2cbeb374353d38e24cd0b373f Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 12:38:11 +0100 Subject: [PATCH 13/18] feat: add middlewares --- pkg/client/mpv2middleware.go | 49 ++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/pkg/client/mpv2middleware.go b/pkg/client/mpv2middleware.go index b52db65..2f92860 100644 --- a/pkg/client/mpv2middleware.go +++ b/pkg/client/mpv2middleware.go @@ -71,3 +71,52 @@ func MPv2MiddlewareTimestamp(next MPv2Handler) MPv2Handler { return next(r, payload) } } + +func MiddlewareUserAgent(next MPv2Handler) MPv2Handler { + return func(r *http.Request, payload *mpv2.Payload[any]) error { + if userAgent := r.Header.Get("User-Agent"); userAgent != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["user_agent"] = userAgent + } + payload.Events[i] = event + } + } + return next(r, payload) + } +} + +func MiddlewareIPOverride(next MPv2Handler) MPv2Handler { + return func(r *http.Request, payload *mpv2.Payload[any]) error { + var ipOverride string + for _, key := range []string{"X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { + if value := r.Header.Get(key); value != "" { + ipOverride = value + break + } + } + if ipOverride != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["ip_override"] = ipOverride + } + payload.Events[i] = event + } + } + return next(r, payload) + } +} + +func MiddlewarePageLocation(next MPv2Handler) MPv2Handler { + return func(r *http.Request, payload *mpv2.Payload[any]) error { + if referrer := r.Header.Get("Referer"); referrer != "" { + for i, event := range payload.Events { + if value, ok := event.Params.(map[string]any); ok { + value["page_location"] = referrer + } + payload.Events[i] = event + } + } + return next(r, payload) + } +} From e7f9e2af6740dd30731968616d169aec40e7693a Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 12:52:25 +0100 Subject: [PATCH 14/18] feat: add middlewares --- pkg/client/mpv2middleware.go | 12 +----------- pkg/http/mpv2/middleware.go | 8 ++++---- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/pkg/client/mpv2middleware.go b/pkg/client/mpv2middleware.go index 2f92860..5aec600 100644 --- a/pkg/client/mpv2middleware.go +++ b/pkg/client/mpv2middleware.go @@ -3,7 +3,6 @@ package client import ( "net/http" "strings" - "time" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" "github.com/foomo/sesamy-go/pkg/session" @@ -63,15 +62,6 @@ func MPv2MiddlewareUserID(cookieName string) MPv2Middleware { } } -func MPv2MiddlewareTimestamp(next MPv2Handler) MPv2Handler { - return func(r *http.Request, payload *mpv2.Payload[any]) error { - if payload.TimestampMicros == 0 { - payload.TimestampMicros = time.Now().UnixMicro() - } - return next(r, payload) - } -} - func MiddlewareUserAgent(next MPv2Handler) MPv2Handler { return func(r *http.Request, payload *mpv2.Payload[any]) error { if userAgent := r.Header.Get("User-Agent"); userAgent != "" { @@ -89,7 +79,7 @@ func MiddlewareUserAgent(next MPv2Handler) MPv2Handler { func MiddlewareIPOverride(next MPv2Handler) MPv2Handler { return func(r *http.Request, payload *mpv2.Payload[any]) error { var ipOverride string - for _, key := range []string{"X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { + for _, key := range []string{"CF-Connecting-IP", "X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { if value := r.Header.Get(key); value != "" { ipOverride = value break diff --git a/pkg/http/mpv2/middleware.go b/pkg/http/mpv2/middleware.go index 4251f90..1fcbb38 100644 --- a/pkg/http/mpv2/middleware.go +++ b/pkg/http/mpv2/middleware.go @@ -94,7 +94,7 @@ func MiddlewareTimestamp(next MiddlewareHandler) MiddlewareHandler { } } -func MiddlewareUserAgentEventParam(next MiddlewareHandler) MiddlewareHandler { +func MiddlewareUserAgent(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if userAgent := r.Header.Get("User-Agent"); userAgent != "" { for i, event := range payload.Events { @@ -108,10 +108,10 @@ func MiddlewareUserAgentEventParam(next MiddlewareHandler) MiddlewareHandler { } } -func MiddlewareIPOverrideEventParam(next MiddlewareHandler) MiddlewareHandler { +func MiddlewareIPOverride(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { var ipOverride string - for _, key := range []string{"X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { + for _, key := range []string{"CF-Connecting-IP", "X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { if value := r.Header.Get(key); value != "" { ipOverride = value break @@ -129,7 +129,7 @@ func MiddlewareIPOverrideEventParam(next MiddlewareHandler) MiddlewareHandler { } } -func MiddlewarePageLocationEventParam(next MiddlewareHandler) MiddlewareHandler { +func MiddlewarePageLocation(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if referrer := r.Header.Get("Referer"); referrer != "" { for i, event := range payload.Events { From 2a125cee301555033bef26b0e4922e79775f21fc Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 12:55:07 +0100 Subject: [PATCH 15/18] fix: middleware name --- pkg/client/mpv2middleware.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/client/mpv2middleware.go b/pkg/client/mpv2middleware.go index 5aec600..cd23ed5 100644 --- a/pkg/client/mpv2middleware.go +++ b/pkg/client/mpv2middleware.go @@ -62,7 +62,7 @@ func MPv2MiddlewareUserID(cookieName string) MPv2Middleware { } } -func MiddlewareUserAgent(next MPv2Handler) MPv2Handler { +func MPv2MiddlewareUserAgent(next MPv2Handler) MPv2Handler { return func(r *http.Request, payload *mpv2.Payload[any]) error { if userAgent := r.Header.Get("User-Agent"); userAgent != "" { for i, event := range payload.Events { @@ -76,7 +76,7 @@ func MiddlewareUserAgent(next MPv2Handler) MPv2Handler { } } -func MiddlewareIPOverride(next MPv2Handler) MPv2Handler { +func MPv2MiddlewareIPOverride(next MPv2Handler) MPv2Handler { return func(r *http.Request, payload *mpv2.Payload[any]) error { var ipOverride string for _, key := range []string{"CF-Connecting-IP", "X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { @@ -97,7 +97,7 @@ func MiddlewareIPOverride(next MPv2Handler) MPv2Handler { } } -func MiddlewarePageLocation(next MPv2Handler) MPv2Handler { +func MPv2MiddlewarePageLocation(next MPv2Handler) MPv2Handler { return func(r *http.Request, payload *mpv2.Payload[any]) error { if referrer := r.Header.Get("Referer"); referrer != "" { for i, event := range payload.Events { From e801701b587f5f57f7d75f9954ee49b1863237b5 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 13:20:13 +0100 Subject: [PATCH 16/18] fix: middleware name --- pkg/client/mpv2middleware.go | 49 ------------------------------------ pkg/http/eventhandler.go | 3 ++- pkg/http/gtag/middleware.go | 2 +- pkg/http/mpv2/middleware.go | 11 +++++--- 4 files changed, 10 insertions(+), 55 deletions(-) diff --git a/pkg/client/mpv2middleware.go b/pkg/client/mpv2middleware.go index cd23ed5..5670d99 100644 --- a/pkg/client/mpv2middleware.go +++ b/pkg/client/mpv2middleware.go @@ -61,52 +61,3 @@ func MPv2MiddlewareUserID(cookieName string) MPv2Middleware { } } } - -func MPv2MiddlewareUserAgent(next MPv2Handler) MPv2Handler { - return func(r *http.Request, payload *mpv2.Payload[any]) error { - if userAgent := r.Header.Get("User-Agent"); userAgent != "" { - for i, event := range payload.Events { - if value, ok := event.Params.(map[string]any); ok { - value["user_agent"] = userAgent - } - payload.Events[i] = event - } - } - return next(r, payload) - } -} - -func MPv2MiddlewareIPOverride(next MPv2Handler) MPv2Handler { - return func(r *http.Request, payload *mpv2.Payload[any]) error { - var ipOverride string - for _, key := range []string{"CF-Connecting-IP", "X-Original-Forwarded-For", "X-Forwarded-For", "X-Real-Ip"} { - if value := r.Header.Get(key); value != "" { - ipOverride = value - break - } - } - if ipOverride != "" { - for i, event := range payload.Events { - if value, ok := event.Params.(map[string]any); ok { - value["ip_override"] = ipOverride - } - payload.Events[i] = event - } - } - return next(r, payload) - } -} - -func MPv2MiddlewarePageLocation(next MPv2Handler) MPv2Handler { - return func(r *http.Request, payload *mpv2.Payload[any]) error { - if referrer := r.Header.Get("Referer"); referrer != "" { - for i, event := range payload.Events { - if value, ok := event.Params.(map[string]any); ok { - value["page_location"] = referrer - } - payload.Events[i] = event - } - } - return next(r, payload) - } -} diff --git a/pkg/http/eventhandler.go b/pkg/http/eventhandler.go index 48a5ce1..de10be5 100644 --- a/pkg/http/eventhandler.go +++ b/pkg/http/eventhandler.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/foomo/sesamy-go/pkg/sesamy" + "go.uber.org/zap" ) -type EventHandler func(r *http.Request, event *sesamy.Event[any]) error +type EventHandler func(l *zap.Logger, r *http.Request, event *sesamy.Event[any]) error diff --git a/pkg/http/gtag/middleware.go b/pkg/http/gtag/middleware.go index b6bfa9a..c2ee813 100644 --- a/pkg/http/gtag/middleware.go +++ b/pkg/http/gtag/middleware.go @@ -27,7 +27,7 @@ func MiddlewareEventHandler(h sesamyhttp.EventHandler) Middleware { } for i, event := range mpv2Payload.Events { - if err := h(r, &event); err != nil { + if err := h(l, r, &event); err != nil { return err } mpv2Payload.Events[i] = event diff --git a/pkg/http/mpv2/middleware.go b/pkg/http/mpv2/middleware.go index 1fcbb38..bcb3fb6 100644 --- a/pkg/http/mpv2/middleware.go +++ b/pkg/http/mpv2/middleware.go @@ -22,7 +22,7 @@ func MiddlewareEventHandler(h sesamyhttp.EventHandler) Middleware { return func(next MiddlewareHandler) MiddlewareHandler { return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { for i, event := range payload.Events { - if err := h(r, &event); err != nil { + if err := h(l, r, &event); err != nil { return err } payload.Events[i] = event @@ -100,8 +100,9 @@ func MiddlewareUserAgent(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["user_agent"] = userAgent + event.Params = value + payload.Events[i] = event } - payload.Events[i] = event } } return next(l, w, r, payload) @@ -121,8 +122,9 @@ func MiddlewareIPOverride(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["ip_override"] = ipOverride + event.Params = value + payload.Events[i] = event } - payload.Events[i] = event } } return next(l, w, r, payload) @@ -135,8 +137,9 @@ func MiddlewarePageLocation(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["page_location"] = referrer + event.Params = value + payload.Events[i] = event } - payload.Events[i] = event } } return next(l, w, r, payload) From 478b678a6e574dbd3831ee026d1a0367f77508ac Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 21 Nov 2024 13:56:10 +0100 Subject: [PATCH 17/18] feat: add decode --- pkg/http/mpv2/middleware.go | 3 --- pkg/sesamy/event.go | 4 ++++ pkg/sesamy/event_test.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) create mode 100644 pkg/sesamy/event_test.go diff --git a/pkg/http/mpv2/middleware.go b/pkg/http/mpv2/middleware.go index bcb3fb6..4cc1c24 100644 --- a/pkg/http/mpv2/middleware.go +++ b/pkg/http/mpv2/middleware.go @@ -100,7 +100,6 @@ func MiddlewareUserAgent(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["user_agent"] = userAgent - event.Params = value payload.Events[i] = event } } @@ -122,7 +121,6 @@ func MiddlewareIPOverride(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["ip_override"] = ipOverride - event.Params = value payload.Events[i] = event } } @@ -137,7 +135,6 @@ func MiddlewarePageLocation(next MiddlewareHandler) MiddlewareHandler { for i, event := range payload.Events { if value, ok := event.Params.(map[string]any); ok { value["page_location"] = referrer - event.Params = value payload.Events[i] = event } } diff --git a/pkg/sesamy/event.go b/pkg/sesamy/event.go index e302325..ab50dbf 100644 --- a/pkg/sesamy/event.go +++ b/pkg/sesamy/event.go @@ -36,3 +36,7 @@ func (e Event[P]) Decode(output any) error { func (e Event[P]) DecodeParams(output any) error { return Decode(e.Params, output) } + +func (e Event[P]) EncodeParams(input any) error { + return Decode(input, &e.Params) +} diff --git a/pkg/sesamy/event_test.go b/pkg/sesamy/event_test.go new file mode 100644 index 0000000..a030d8e --- /dev/null +++ b/pkg/sesamy/event_test.go @@ -0,0 +1,35 @@ +package sesamy_test + +import ( + "testing" + + "github.com/foomo/sesamy-go/pkg/sesamy" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDecodeParams(t *testing.T) { + type params struct { + Title string `json:"title"` + } + + event := sesamy.Event[any]{ + Name: "test", + Params: map[string]any{ + "title": "foo", + "description": "foo", + }, + } + + var p params + require.NoError(t, event.DecodeParams(&p)) + assert.Equal(t, "foo", p.Title) + + p.Title = "bar" + + require.NoError(t, event.EncodeParams(p)) + assert.Equal(t, map[string]any{ + "title": "bar", + "description": "foo", + }, event.Params) +} From 6680fbb169e0581443a5df6a6965d71ce240cb8e Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Mon, 25 Nov 2024 13:41:46 +0100 Subject: [PATCH 18/18] feat: add sendpayload --- pkg/client/mpv2.go | 15 ++++++--------- pkg/encoding/mpv2/payload.go | 8 ++++++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/client/mpv2.go b/pkg/client/mpv2.go index 8bc797a..e8e99e3 100644 --- a/pkg/client/mpv2.go +++ b/pkg/client/mpv2.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "time" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" "github.com/foomo/sesamy-go/pkg/sesamy" @@ -110,16 +109,14 @@ func (c *MPv2) HTTPClient() *http.Client { // ------------------------------------------------------------------------------------------------ func (c *MPv2) Collect(r *http.Request, events ...sesamy.AnyEvent) error { - anyEvents := make([]sesamy.Event[any], len(events)) - for i, event := range events { - anyEvents[i] = event.AnyEvent() - } - - payload := &mpv2.Payload[any]{ - Events: anyEvents, - TimestampMicros: time.Now().UnixMicro(), + payload := mpv2.NewPayload[any]() + for _, event := range events { + payload.Events = append(payload.Events, event.AnyEvent()) } + return c.SendPayload(r, payload) +} +func (c *MPv2) SendPayload(r *http.Request, payload *mpv2.Payload[any]) error { next := c.SendRaw for _, middleware := range c.middlewares { next = middleware(next) diff --git a/pkg/encoding/mpv2/payload.go b/pkg/encoding/mpv2/payload.go index 4abcf25..525276f 100644 --- a/pkg/encoding/mpv2/payload.go +++ b/pkg/encoding/mpv2/payload.go @@ -1,6 +1,8 @@ package mpv2 import ( + "time" + "github.com/foomo/sesamy-go/pkg/sesamy" ) @@ -17,3 +19,9 @@ type Payload[P any] struct { SessionID string `json:"session_id,omitempty"` EngagementTimeMSec int64 `json:"engagement_time_msec,omitempty"` } + +func NewPayload[P any]() *Payload[P] { + return &Payload[P]{ + TimestampMicros: time.Now().UnixMicro(), + } +}