From a58dfafaef2825ea40e13d33fdc3ca006d95da16 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Wed, 20 Nov 2024 13:48:00 +0100 Subject: [PATCH] feat: add collect --- integration/loki/middleware.go | 36 ++++ integration/watermill/gtag/subscriber.go | 64 +----- integration/watermill/mpv2/subscriber.go | 39 +--- pkg/collect/collect.go | 187 ++++++++++++++++++ pkg/http/eventhandler.go | 9 + pkg/http/gtag/handler.go | 61 ++++++ .../http/gtag/middleware.go | 19 +- pkg/http/mpv2/handler.go | 37 ++++ .../http/mpv2/middleware.go | 45 +++-- 9 files changed, 383 insertions(+), 114 deletions(-) create mode 100644 integration/loki/middleware.go create mode 100644 pkg/collect/collect.go create mode 100644 pkg/http/eventhandler.go create mode 100644 pkg/http/gtag/handler.go rename integration/watermill/gtag/subscribermiddleware.go => pkg/http/gtag/middleware.go (57%) create mode 100644 pkg/http/mpv2/handler.go rename integration/watermill/mpv2/subscribermiddleware.go => pkg/http/mpv2/middleware.go (52%) diff --git a/integration/loki/middleware.go b/integration/loki/middleware.go new file mode 100644 index 0000000..5543637 --- /dev/null +++ b/integration/loki/middleware.go @@ -0,0 +1,36 @@ +package loki + +import ( + "net/http" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +func GTagMiddleware(loki *Loki) gtaghttp.Middleware { + return func(next gtaghttp.MiddlewareHandler) gtaghttp.MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { + // encode to mpv2 + var mpv2Payload mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + loki.Write(mpv2Payload) + return nil + } + } +} + +func MPv2Middleware(loki *Loki) mpv2http.Middleware { + return func(next mpv2http.MiddlewareHandler) mpv2http.MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + loki.Write(*payload) + return nil + } + } +} diff --git a/integration/watermill/gtag/subscriber.go b/integration/watermill/gtag/subscriber.go index 5f6314f..9bf1b75 100644 --- a/integration/watermill/gtag/subscriber.go +++ b/integration/watermill/gtag/subscriber.go @@ -3,15 +3,13 @@ package gtag import ( "context" "encoding/json" - "fmt" - "io" "net/http" - "net/url" "strings" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/gtag" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -22,12 +20,10 @@ type ( uuidFunc func() string messages chan *message.Message messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []SubscriberMiddleware + middlewares []gtaghttp.Middleware closed bool } - SubscriberOption func(*Subscriber) - SubscriberHandler func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error - SubscriberMiddleware func(next SubscriberHandler) SubscriberHandler + SubscriberOption func(*Subscriber) ) // ------------------------------------------------------------------------------------------------ @@ -46,7 +42,7 @@ func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *messa } } -func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { +func SubscriberWithMiddlewares(v ...gtaghttp.Middleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) } @@ -69,52 +65,8 @@ func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { } func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var values url.Values - - switch r.Method { - case http.MethodGet: - values = r.URL.Query() - case http.MethodPost: - values = r.URL.Query() - - // read request body - out, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, fmt.Sprintf("failed to read body: %s", err.Error()), http.StatusInternalServerError) - return - } - defer r.Body.Close() - - // append request body to query - if len(out) > 0 { - v, err := url.ParseQuery(string(out)) - if err != nil { - http.Error(w, fmt.Sprintf("failed to parse extended url: %s", err.Error()), http.StatusInternalServerError) - return - } - for s2, i := range v { - values.Set(s2, i[0]) - } - } else { - values = r.URL.Query() - } - default: - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - - // unmarshal event - var payload *gtag.Payload - if err := gtag.Decode(values, &payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // validate - if payload.EventName == nil || payload.EventName.String() == "" { - http.Error(w, "missing event name", http.StatusBadRequest) - return - } + // retrieve payload + payload := gtaghttp.Handler(w, r) // compose middlewares next := s.handle @@ -123,13 +75,13 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // run handler - if err := next(s.l, r, payload); err != nil { + if err := next(s.l, w, r, payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } -func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { // marshal message payload data, err := json.Marshal(payload) if err != nil { diff --git a/integration/watermill/mpv2/subscriber.go b/integration/watermill/mpv2/subscriber.go index e9b0a8a..2d8bade 100644 --- a/integration/watermill/mpv2/subscriber.go +++ b/integration/watermill/mpv2/subscriber.go @@ -9,6 +9,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -19,12 +20,10 @@ type ( uuidFunc func() string messages chan *message.Message messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error - middlewares []SubscriberMiddleware + middlewares []mpv2http.Middleware closed bool } - SubscriberOption func(*Subscriber) - SubscriberHandler func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error - SubscriberMiddleware func(next SubscriberHandler) SubscriberHandler + SubscriberOption func(*Subscriber) ) // ------------------------------------------------------------------------------------------------ @@ -43,7 +42,7 @@ func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *messa } } -func SubscriberWithMiddlewares(v ...SubscriberMiddleware) SubscriberOption { +func SubscriberWithMiddlewares(v ...mpv2http.Middleware) SubscriberOption { return func(o *Subscriber) { o.middlewares = append(o.middlewares, v...) } @@ -70,30 +69,8 @@ func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber { // ------------------------------------------------------------------------------------------------ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - - // read request body - var payload *mpv2.Payload[any] - err := json.NewDecoder(r.Body).Decode(&payload) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // validate required fields - if len(payload.Events) == 0 { - http.Error(w, "missing events", http.StatusBadRequest) - return - } - for _, event := range payload.Events { - if event.Name == "" { - http.Error(w, "missing event name", http.StatusBadRequest) - return - } - } + // retrieve payload + payload := mpv2http.Handler(w, r) // compose middlewares next := s.handle @@ -102,13 +79,13 @@ func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // run handler - if err := next(s.l, r, payload); err != nil { + if err := next(s.l, w, r, payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } -func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { // marshal message payload jsonPayload, err := json.Marshal(payload) if err != nil { diff --git a/pkg/collect/collect.go b/pkg/collect/collect.go new file mode 100644 index 0000000..295e50c --- /dev/null +++ b/pkg/collect/collect.go @@ -0,0 +1,187 @@ +package collect + +import ( + "net/http" + "net/http/httputil" + "net/url" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/foomo/sesamy-go/pkg/encoding/gtagencode" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2encode" + sesamyhttp "github.com/foomo/sesamy-go/pkg/http" + gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag" + mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2" + "github.com/foomo/sesamy-go/pkg/sesamy" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + Collect struct { + l *zap.Logger + gtagProxy *httputil.ReverseProxy + mpv2Proxy *httputil.ReverseProxy + gtagMiddlewares []gtaghttp.Middleware + mpv2Middlewares []mpv2http.Middleware + eventHandlers []sesamyhttp.EventHandler + } + Option func(*Collect) error +) + +// ------------------------------------------------------------------------------------------------ +// ~ Options +// ------------------------------------------------------------------------------------------------ + +func WithGTag(endpoint string) Option { + return func(c *Collect) error { + target, err := url.Parse(endpoint) + if err != nil { + return err + } + proxy := httputil.NewSingleHostReverseProxy(target) + c.gtagProxy = proxy + return nil + } +} + +func WithMPv2(endpoint string) Option { + return func(c *Collect) error { + target, err := url.Parse(endpoint) + if err != nil { + return err + } + proxy := httputil.NewSingleHostReverseProxy(target) + c.mpv2Proxy = proxy + return nil + } +} + +func WithGTagMiddlewares(v ...gtaghttp.Middleware) Option { + return func(c *Collect) error { + c.gtagMiddlewares = append(c.gtagMiddlewares, v...) + return nil + } +} + +func WithMPv2Middlewares(v ...mpv2http.Middleware) Option { + return func(c *Collect) error { + c.mpv2Middlewares = append(c.mpv2Middlewares, v...) + return nil + } +} + +func WithEventHandlers(v ...sesamyhttp.EventHandler) Option { + return func(c *Collect) error { + c.eventHandlers = append(c.eventHandlers, v...) + return nil + } +} + +// ------------------------------------------------------------------------------------------------ +// ~ Constructor +// ------------------------------------------------------------------------------------------------ + +func New(l *zap.Logger, opts ...Option) (*Collect, error) { + inst := &Collect{ + l: l, + } + + for _, opt := range opts { + if opt != nil { + if err := opt(inst); err != nil { + return nil, err + } + } + } + + return inst, nil +} + +// ------------------------------------------------------------------------------------------------ +// ~ Public methods +// ------------------------------------------------------------------------------------------------ + +func (c *Collect) GTagHTTPHandler(w http.ResponseWriter, r *http.Request) { + // retrieve payload + payload := gtaghttp.Handler(w, r) + + // compose middlewares + next := c.gtagHandler + for _, middleware := range c.gtagMiddlewares { + next = middleware(next) + } + + // run handler + if err := next(c.l, w, r, payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (c *Collect) MPv2HTTPHandler(w http.ResponseWriter, r *http.Request) { + // retrieve payload + payload := mpv2http.Handler(w, r) + + // compose middlewares + next := c.mpv2Handler + for _, middleware := range c.mpv2Middlewares { + next = middleware(next) + } + + // run handler + if err := next(c.l, w, r, payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +// ------------------------------------------------------------------------------------------------ +// ~ Private methods +// ------------------------------------------------------------------------------------------------ + +func (c *Collect) gtagHandler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { + var mpv2Payload *mpv2.Payload[any] + if err := gtagencode.MPv2(*payload, &mpv2Payload); err != nil { + return errors.Wrap(err, "failed to encode gtag to mpv2") + } + + for i, event := range mpv2Payload.Events { + if err := c.mpv2EventHandler(r, &event); err != nil { + return err + } + mpv2Payload.Events[i] = event + } + + if err := mpv2encode.GTag[any](*mpv2Payload, &payload); err != nil { + return errors.Wrap(err, "failed to encode mpv2 to gtag") + } + + if c.gtagProxy == nil { + c.gtagProxy.ServeHTTP(w, r) + } + return nil +} + +func (c *Collect) mpv2Handler(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { + for i, event := range payload.Events { + if err := c.mpv2EventHandler(r, &event); err != nil { + return err + } + payload.Events[i] = event + } + + if c.mpv2Proxy == nil { + c.mpv2Proxy.ServeHTTP(w, r) + } + return nil +} + +func (c *Collect) mpv2EventHandler(r *http.Request, event *sesamy.Event[any]) error { + for _, handler := range c.eventHandlers { + if err := handler(r, event); err != nil { + return err + } + } + return nil +} diff --git a/pkg/http/eventhandler.go b/pkg/http/eventhandler.go new file mode 100644 index 0000000..48a5ce1 --- /dev/null +++ b/pkg/http/eventhandler.go @@ -0,0 +1,9 @@ +package http + +import ( + "net/http" + + "github.com/foomo/sesamy-go/pkg/sesamy" +) + +type EventHandler func(r *http.Request, event *sesamy.Event[any]) error diff --git a/pkg/http/gtag/handler.go b/pkg/http/gtag/handler.go new file mode 100644 index 0000000..bd214f8 --- /dev/null +++ b/pkg/http/gtag/handler.go @@ -0,0 +1,61 @@ +package gtag + +import ( + "fmt" + "io" + "net/http" + "net/url" + + "github.com/foomo/sesamy-go/pkg/encoding/gtag" +) + +func Handler(w http.ResponseWriter, r *http.Request) *gtag.Payload { + var values url.Values + + switch r.Method { + case http.MethodGet: + values = r.URL.Query() + case http.MethodPost: + values = r.URL.Query() + + // read request body + out, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, fmt.Sprintf("failed to read body: %s", err.Error()), http.StatusInternalServerError) + return nil + } + defer r.Body.Close() + + // append request body to query + if len(out) > 0 { + v, err := url.ParseQuery(string(out)) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse extended url: %s", err.Error()), http.StatusInternalServerError) + return nil + } + for s2, i := range v { + values.Set(s2, i[0]) + } + } else { + values = r.URL.Query() + } + default: + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return nil + } + + // unmarshal event + var payload *gtag.Payload + if err := gtag.Decode(values, &payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return nil + } + + // validate + if payload.EventName == nil || payload.EventName.String() == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return nil + } + + return payload +} diff --git a/integration/watermill/gtag/subscribermiddleware.go b/pkg/http/gtag/middleware.go similarity index 57% rename from integration/watermill/gtag/subscribermiddleware.go rename to pkg/http/gtag/middleware.go index 71fbc0f..50e0fbd 100644 --- a/integration/watermill/gtag/subscribermiddleware.go +++ b/pkg/http/gtag/middleware.go @@ -8,19 +8,24 @@ import ( "go.uber.org/zap" ) -func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +type ( + Middleware func(next MiddlewareHandler) MiddlewareHandler + MiddlewareHandler func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error +) + +func MiddlewareUserID(cookieName string) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { if cookie, err := r.Cookie(cookieName); err == nil { payload.UserID = gtag.Set(cookie.Value) } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *gtag.Payload) error { +func MiddlewareLogger(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error { if spanCtx := trace.SpanContextFromContext(r.Context()); spanCtx.IsValid() && spanCtx.IsSampled() { l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String())) } @@ -29,7 +34,7 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { zap.String("event_user_id", gtag.GetDefault(payload.UserID, "-")), zap.String("event_session_id", gtag.GetDefault(payload.SessionID, "-")), ) - err := next(l, r, payload) + err := next(l, w, r, payload) if err != nil { l.Error("handled event", zap.Error(err)) } else { diff --git a/pkg/http/mpv2/handler.go b/pkg/http/mpv2/handler.go new file mode 100644 index 0000000..2e59827 --- /dev/null +++ b/pkg/http/mpv2/handler.go @@ -0,0 +1,37 @@ +package mpv2 + +import ( + "encoding/json" + "net/http" + + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" +) + +func Handler(w http.ResponseWriter, r *http.Request) *mpv2.Payload[any] { + if r.Method != http.MethodPost { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return nil + } + + // read request body + var payload *mpv2.Payload[any] + err := json.NewDecoder(r.Body).Decode(&payload) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return nil + } + + // validate required fields + if len(payload.Events) == 0 { + http.Error(w, "missing events", http.StatusBadRequest) + return nil + } + for _, event := range payload.Events { + if event.Name == "" { + http.Error(w, "missing event name", http.StatusBadRequest) + return nil + } + } + + return payload +} diff --git a/integration/watermill/mpv2/subscribermiddleware.go b/pkg/http/mpv2/middleware.go similarity index 52% rename from integration/watermill/mpv2/subscribermiddleware.go rename to pkg/http/mpv2/middleware.go index 6f55947..87ce917 100644 --- a/integration/watermill/mpv2/subscribermiddleware.go +++ b/pkg/http/mpv2/middleware.go @@ -12,10 +12,15 @@ import ( "go.uber.org/zap" ) -func SubscriberMiddlewareSessionID(measurementID string) SubscriberMiddleware { +type ( + MiddlewareHandler func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error + Middleware func(next MiddlewareHandler) MiddlewareHandler +) + +func MiddlewareSessionID(measurementID string) Middleware { measurementID = strings.Split(measurementID, "-")[1] - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.SessionID == "" { value, err := session.ParseGASessionID(r, measurementID) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -23,13 +28,13 @@ func SubscriberMiddlewareSessionID(measurementID string) SubscriberMiddleware { } payload.SessionID = value } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareClientID(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareClientID(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.ClientID == "" { value, err := session.ParseGAClientID(r) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -37,22 +42,22 @@ func SubscriberMiddlewareClientID(next SubscriberHandler) SubscriberHandler { } payload.ClientID = value } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareDebugMode(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareDebugMode(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if !payload.DebugMode && session.IsGTMDebug(r) { payload.DebugMode = true } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { - return func(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareUserID(cookieName string) Middleware { + return func(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.UserID == "" { value, err := r.Cookie(cookieName) if err != nil && !errors.Is(err, http.ErrNoCookie) { @@ -60,22 +65,22 @@ func SubscriberMiddlewareUserID(cookieName string) SubscriberMiddleware { } payload.UserID = value.Value } - return next(l, r, payload) + return next(l, w, r, payload) } } } -func SubscriberMiddlewareTimestamp(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareTimestamp(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { if payload.TimestampMicros == 0 { payload.TimestampMicros = time.Now().UnixMicro() } - return next(l, r, payload) + return next(l, w, r, payload) } } -func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { - return func(l *zap.Logger, r *http.Request, payload *mpv2.Payload[any]) error { +func MiddlewareLogger(next MiddlewareHandler) MiddlewareHandler { + return func(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error { eventNames := make([]string, len(payload.Events)) for i, event := range payload.Events { eventNames[i] = event.Name.String() @@ -90,7 +95,7 @@ func SubscriberMiddlewareLogger(next SubscriberHandler) SubscriberHandler { zap.String("event_user_id", payload.UserID), ) - err := next(l, r, payload) + err := next(l, w, r, payload) if err != nil { l.Error("handled event", zap.Error(err)) } else {