mirror of
https://github.com/foomo/sesamy-go.git
synced 2025-10-16 12:35:43 +00:00
feat: remove watermill
This commit is contained in:
parent
9823bd6eed
commit
24e6f55ea9
@ -1,23 +0,0 @@
|
||||
package loki
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func MPv2MessageHandler(loki *Loki) message.NoPublishHandlerFunc {
|
||||
return func(msg *message.Message) error {
|
||||
var payload mpv2.Payload[any]
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
loki.Write(payload)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -1,12 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMissingEventName = errors.New("missing event name")
|
||||
ErrContextCanceled = errors.New("request stopped without ACK received")
|
||||
ErrMessageNacked = errors.New("message nacked")
|
||||
ErrClosed = errors.New("subscriber already closed")
|
||||
)
|
||||
@ -1,60 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/gtagencode"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) message.HandlerFunc {
|
||||
return func(msg *message.Message) ([]*message.Message, error) {
|
||||
var payload *gtag.Payload
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
// handle payload
|
||||
if err := handler(payload, msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// marshal payload
|
||||
b, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to marshal payload")
|
||||
}
|
||||
msg.Payload = b
|
||||
|
||||
return []*message.Message{msg}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func MPv2MessageHandler(msg *message.Message) ([]*message.Message, error) {
|
||||
var payload gtag.Payload
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
// encode to mpv2
|
||||
var mpv2Payload *mpv2.Payload[any]
|
||||
if err := gtagencode.MPv2(payload, &mpv2Payload); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to encode gtag to mpv2")
|
||||
}
|
||||
|
||||
// marshal payload
|
||||
b, err := json.Marshal(mpv2Payload)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to marshal payload")
|
||||
}
|
||||
msg.Payload = b
|
||||
|
||||
return []*message.Message{msg}, nil
|
||||
}
|
||||
@ -1,148 +0,0 @@
|
||||
package gtag_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
|
||||
"github.com/foomo/sesamy-go/integration/watermill/gtag"
|
||||
encoding "github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
"github.com/foomo/sesamy-go/pkg/sesamy"
|
||||
"github.com/pperaltaisern/watermillzap"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestMessageHandler(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
router, err := message.NewRouter(message.RouterConfig{}, watermillzap.NewLogger(l))
|
||||
require.NoError(t, err)
|
||||
defer router.Close()
|
||||
|
||||
// Create pubSub
|
||||
pubSub := gochannel.NewGoChannel(
|
||||
gochannel.Config{},
|
||||
watermillzap.NewLogger(l),
|
||||
)
|
||||
|
||||
var done atomic.Bool
|
||||
router.AddHandler("gtag", "in", pubSub, "out", pubSub, gtag.MessageHandler(func(payload *encoding.Payload, msg *message.Message) error {
|
||||
expected := `{"consent":{},"campaign":{},"ecommerce":{},"client_hints":{},"protocol_version":"2","client_id":"C123456","richsstsse":"1","document_location":"https://foomo.org","document_title":"Home","is_debug":"1","event_name":"add_to_cart"}`
|
||||
if !assert.JSONEq(t, expected, string(msg.Payload)) {
|
||||
fmt.Println(string(msg.Payload))
|
||||
}
|
||||
done.Store(true)
|
||||
return nil
|
||||
}))
|
||||
|
||||
go func() {
|
||||
assert.NoError(t, router.Run(context.TODO()))
|
||||
}()
|
||||
assert.Eventually(t, router.IsRunning, time.Second, 50*time.Millisecond)
|
||||
|
||||
payload := encoding.Payload{
|
||||
Consent: encoding.Consent{},
|
||||
Campaign: encoding.Campaign{},
|
||||
ECommerce: encoding.ECommerce{},
|
||||
ClientHints: encoding.ClientHints{},
|
||||
ProtocolVersion: encoding.Set("2"),
|
||||
TrackingID: nil,
|
||||
GTMHashInfo: nil,
|
||||
ClientID: encoding.Set("C123456"),
|
||||
Richsstsse: encoding.Set("1"),
|
||||
DocumentLocation: encoding.Set("https://foomo.org"),
|
||||
DocumentTitle: encoding.Set("Home"),
|
||||
DocumentReferrer: nil,
|
||||
IsDebug: encoding.Set("1"),
|
||||
EventName: encoding.Set(sesamy.EventNameAddToCart),
|
||||
EventParameter: nil,
|
||||
EventParameterNumber: nil,
|
||||
UserID: nil,
|
||||
SessionID: nil,
|
||||
UserProperty: nil,
|
||||
UserPropertyNumber: nil,
|
||||
NonPersonalizedAds: nil,
|
||||
SST: nil,
|
||||
Remain: nil,
|
||||
}
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, pubSub.Publish("in", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestMPv2MessageHandler(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
router, err := message.NewRouter(message.RouterConfig{}, watermillzap.NewLogger(l))
|
||||
require.NoError(t, err)
|
||||
defer router.Close()
|
||||
|
||||
// Create pubSub
|
||||
pubSub := gochannel.NewGoChannel(
|
||||
gochannel.Config{},
|
||||
watermillzap.NewLogger(l),
|
||||
)
|
||||
|
||||
var done atomic.Bool
|
||||
router.AddHandler("gtag", "in", pubSub, "out", pubSub, gtag.MPv2MessageHandler)
|
||||
router.AddNoPublisherHandler("mpv2", "out", pubSub, func(msg *message.Message) error {
|
||||
expected := `{"client_id":"C123456","consent":{"ad_user_data":"GRANTED","ad_personalization":"GRANTED","analytics_storage":"GRANTED"},"events":[{"name":"add_to_cart","params":{"page_location":"https://foomo.org","page_title":"Home"}}],"debug_mode":true}`
|
||||
if !assert.JSONEq(t, expected, string(msg.Payload)) {
|
||||
fmt.Println(string(msg.Payload))
|
||||
}
|
||||
done.Store(true)
|
||||
return nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
assert.NoError(t, router.Run(context.TODO()))
|
||||
}()
|
||||
assert.Eventually(t, router.IsRunning, time.Second, 50*time.Millisecond)
|
||||
|
||||
payload := encoding.Payload{
|
||||
Consent: encoding.Consent{},
|
||||
Campaign: encoding.Campaign{},
|
||||
ECommerce: encoding.ECommerce{},
|
||||
ClientHints: encoding.ClientHints{},
|
||||
ProtocolVersion: encoding.Set("2"),
|
||||
TrackingID: nil,
|
||||
GTMHashInfo: nil,
|
||||
ClientID: encoding.Set("C123456"),
|
||||
Richsstsse: encoding.Set("1"),
|
||||
DocumentLocation: encoding.Set("https://foomo.org"),
|
||||
DocumentTitle: encoding.Set("Home"),
|
||||
DocumentReferrer: nil,
|
||||
IsDebug: encoding.Set("1"),
|
||||
EventName: encoding.Set(sesamy.EventNameAddToCart),
|
||||
EventParameter: nil,
|
||||
EventParameterNumber: nil,
|
||||
UserID: nil,
|
||||
SessionID: nil,
|
||||
UserProperty: nil,
|
||||
UserPropertyNumber: nil,
|
||||
NonPersonalizedAds: nil,
|
||||
SST: nil,
|
||||
Remain: nil,
|
||||
}
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, pubSub.Publish("in", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
@ -1,23 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func NoPublishMessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) error {
|
||||
return func(msg *message.Message) error {
|
||||
var payload *gtag.Payload
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
// handle payload
|
||||
return handler(payload, msg)
|
||||
}
|
||||
}
|
||||
@ -1,5 +0,0 @@
|
||||
package gtag
|
||||
|
||||
const (
|
||||
MetadataEventName = "X-Event-Name"
|
||||
)
|
||||
@ -1,173 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrErrorResponse = errors.New("server responded with error status")
|
||||
ErrPublisherClosed = errors.New("publisher is closed")
|
||||
)
|
||||
|
||||
type (
|
||||
Publisher struct {
|
||||
l *zap.Logger
|
||||
host string
|
||||
path string
|
||||
client *http.Client
|
||||
closed bool
|
||||
middlewares []PublisherMiddleware
|
||||
maxResponseCode int
|
||||
}
|
||||
PublisherOption func(*Publisher)
|
||||
PublisherHandler func(l *zap.Logger, msg *message.Message) error
|
||||
PublisherMiddleware func(next PublisherHandler) PublisherHandler
|
||||
// PublisherMarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
|
||||
PublisherMarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Constructor
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func NewPublisher(l *zap.Logger, host string, opts ...PublisherOption) *Publisher {
|
||||
inst := &Publisher{
|
||||
l: l,
|
||||
host: host,
|
||||
path: "/g/collect",
|
||||
client: http.DefaultClient,
|
||||
maxResponseCode: http.StatusBadRequest,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(inst)
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Options
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func PublisherWithPath(v string) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.path = v
|
||||
}
|
||||
}
|
||||
|
||||
func PublisherWithClient(v *http.Client) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.client = v
|
||||
}
|
||||
}
|
||||
|
||||
func PublisherWithMiddlewares(v ...PublisherMiddleware) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.middlewares = append(o.middlewares, v...)
|
||||
}
|
||||
}
|
||||
|
||||
func PublisherWithMaxResponseCode(v int) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.maxResponseCode = v
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Getter
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) Client() *http.Client {
|
||||
return p.client
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Public methods
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
|
||||
if p.closed {
|
||||
return ErrPublisherClosed
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
// compose middlewares
|
||||
next := p.handle
|
||||
for _, middleware := range p.middlewares {
|
||||
next = middleware(next)
|
||||
}
|
||||
|
||||
// run handler
|
||||
if err := next(p.l.With(
|
||||
zap.String("message_id", msg.UUID),
|
||||
), msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) Close() error {
|
||||
if p.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Private methods
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) handle(l *zap.Logger, msg *message.Message) error {
|
||||
var event *gtag.Payload
|
||||
if err := json.Unmarshal(msg.Payload, &event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
values, body, err := gtag.Encode(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(msg.Context(), http.MethodPost, fmt.Sprintf("%s%s?%s", p.host, p.path, gtag.EncodeValues(values)), body)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create request")
|
||||
}
|
||||
|
||||
for s, s2 := range msg.Metadata {
|
||||
req.Header.Set(s, s2)
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
resp, err := p.client.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to publish message: %s", msg.UUID)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
l = l.With(zap.Int("http_status_code", resp.StatusCode))
|
||||
|
||||
if p.maxResponseCode > 0 && resp.StatusCode >= p.maxResponseCode {
|
||||
if body, err := io.ReadAll(resp.Body); err == nil {
|
||||
l = l.With(zap.String("http_response", string(body)))
|
||||
}
|
||||
return errors.Wrap(ErrErrorResponse, resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1,68 +0,0 @@
|
||||
package gtag_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/integration/watermill/gtag"
|
||||
encoding "github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
"github.com/foomo/sesamy-go/pkg/sesamy"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestPublisher(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
var done atomic.Bool
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
expected := `_dbg=1&cid=C123456&dl=https%3A%2F%2Ffoomo.org&dt=Home&en=add_to_cart&v=2&richsstsse`
|
||||
assert.Equal(t, expected, r.URL.RawQuery)
|
||||
done.Store(true)
|
||||
}))
|
||||
|
||||
p := gtag.NewPublisher(l, s.URL)
|
||||
|
||||
payload := encoding.Payload{
|
||||
Consent: encoding.Consent{},
|
||||
Campaign: encoding.Campaign{},
|
||||
ECommerce: encoding.ECommerce{},
|
||||
ClientHints: encoding.ClientHints{},
|
||||
ProtocolVersion: encoding.Set("2"),
|
||||
TrackingID: nil,
|
||||
GTMHashInfo: nil,
|
||||
ClientID: encoding.Set("C123456"),
|
||||
Richsstsse: encoding.Set("1"),
|
||||
DocumentLocation: encoding.Set("https://foomo.org"),
|
||||
DocumentTitle: encoding.Set("Home"),
|
||||
DocumentReferrer: nil,
|
||||
IsDebug: encoding.Set("1"),
|
||||
EventName: encoding.Set(sesamy.EventNameAddToCart),
|
||||
EventParameter: nil,
|
||||
EventParameterNumber: nil,
|
||||
UserID: nil,
|
||||
SessionID: nil,
|
||||
UserProperty: nil,
|
||||
UserPropertyNumber: nil,
|
||||
NonPersonalizedAds: nil,
|
||||
SST: nil,
|
||||
Remain: nil,
|
||||
}
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Println(string(jsonPayload))
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, p.Publish("foo", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
@ -1,19 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler {
|
||||
return func(l *zap.Logger, msg *message.Message) error {
|
||||
if err := next(l, msg); err != nil {
|
||||
if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() {
|
||||
l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String()))
|
||||
}
|
||||
l.With(zap.Error(err)).Warn("ignoring error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -1,143 +0,0 @@
|
||||
package gtag
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/gtag"
|
||||
gtaghttp "github.com/foomo/sesamy-go/pkg/http/gtag"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
Subscriber struct {
|
||||
l *zap.Logger
|
||||
uuidFunc func() string
|
||||
messages chan *message.Message
|
||||
messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error
|
||||
middlewares []gtaghttp.Middleware
|
||||
closed bool
|
||||
}
|
||||
SubscriberOption func(*Subscriber)
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Options
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func SubscriberWithUUIDFunc(v func() string) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.uuidFunc = v
|
||||
}
|
||||
}
|
||||
|
||||
func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.messageFunc = v
|
||||
}
|
||||
}
|
||||
|
||||
func SubscriberWithMiddlewares(v ...gtaghttp.Middleware) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.middlewares = append(o.middlewares, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Constructor
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber {
|
||||
inst := &Subscriber{
|
||||
l: l,
|
||||
uuidFunc: watermill.NewUUID,
|
||||
messages: make(chan *message.Message),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(inst)
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// retrieve payload
|
||||
payload := gtaghttp.Handler(w, r)
|
||||
|
||||
// compose middlewares
|
||||
next := s.handle
|
||||
for _, middleware := range s.middlewares {
|
||||
next = middleware(next)
|
||||
}
|
||||
|
||||
// run handler
|
||||
if err := next(s.l, w, r, payload); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *gtag.Payload) error {
|
||||
// marshal message payload
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to marshal payload")
|
||||
}
|
||||
|
||||
msg := message.NewMessage(s.uuidFunc(), data)
|
||||
l = l.With(zap.String("message_id", msg.UUID))
|
||||
msg.SetContext(context.WithoutCancel(r.Context()))
|
||||
|
||||
if payload.EventName != nil {
|
||||
msg.Metadata.Set(MetadataEventName, gtag.Get(payload.EventName).String())
|
||||
}
|
||||
|
||||
for name, headers := range r.Header {
|
||||
msg.Metadata.Set(name, strings.Join(headers, ","))
|
||||
}
|
||||
|
||||
if s.messageFunc != nil {
|
||||
if err := s.messageFunc(l, r, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range msg.Metadata {
|
||||
l = l.With(zap.String(k, v))
|
||||
}
|
||||
|
||||
// send message
|
||||
s.messages <- msg
|
||||
|
||||
// wait for ACK
|
||||
select {
|
||||
case <-msg.Acked():
|
||||
l.Debug("message acked")
|
||||
return nil
|
||||
case <-msg.Nacked():
|
||||
l.Debug("message nacked")
|
||||
return ErrMessageNacked
|
||||
case <-r.Context().Done():
|
||||
l.Debug("message canceled")
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
|
||||
return s.messages, nil
|
||||
}
|
||||
|
||||
// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
|
||||
func (s *Subscriber) Close() error {
|
||||
if s.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
s.closed = true
|
||||
|
||||
close(s.messages)
|
||||
return nil
|
||||
}
|
||||
@ -1,14 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMissingEventName = errors.New("missing event name")
|
||||
ErrErrorResponse = errors.New("server responded with error status")
|
||||
ErrPublisherClosed = errors.New("publisher is closed")
|
||||
ErrContextCanceled = errors.New("request stopped without ACK received")
|
||||
ErrMessageNacked = errors.New("message nacked")
|
||||
ErrClosed = errors.New("subscriber already closed")
|
||||
)
|
||||
@ -1,34 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.HandlerFunc {
|
||||
return func(msg *message.Message) ([]*message.Message, error) {
|
||||
var payload *mpv2.Payload[any]
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
// handle payload
|
||||
if err := handler(payload, msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// marshal payload
|
||||
b, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to marshal payload")
|
||||
}
|
||||
msg.Payload = b
|
||||
|
||||
return []*message.Message{msg}, nil
|
||||
}
|
||||
}
|
||||
@ -1,5 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
const (
|
||||
MetadataRequestQuery = "RequestQuery"
|
||||
)
|
||||
@ -1,23 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func NoPublishMessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.NoPublishHandlerFunc {
|
||||
return func(msg *message.Message) error {
|
||||
var payload *mpv2.Payload[any]
|
||||
|
||||
// unmarshal payload
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return errors.Wrap(err, "failed to unmarshal payload")
|
||||
}
|
||||
|
||||
// handle payload
|
||||
return handler(payload, msg)
|
||||
}
|
||||
}
|
||||
@ -1,161 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
Publisher struct {
|
||||
l *zap.Logger
|
||||
host string
|
||||
path string
|
||||
httpClient *http.Client
|
||||
middlewares []PublisherMiddleware
|
||||
closed bool
|
||||
}
|
||||
PublisherOption func(*Publisher)
|
||||
PublisherHandler func(l *zap.Logger, msg *message.Message) error
|
||||
PublisherMiddleware func(next PublisherHandler) PublisherHandler
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Constructor
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func NewPublisher(l *zap.Logger, host string, opts ...PublisherOption) *Publisher {
|
||||
inst := &Publisher{
|
||||
l: l,
|
||||
host: host,
|
||||
path: "/mp/collect",
|
||||
httpClient: http.DefaultClient,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(inst)
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Options
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func PublisherWithPath(v string) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.path = v
|
||||
}
|
||||
}
|
||||
|
||||
func PublisherWithHTTPClient(v *http.Client) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.httpClient = v
|
||||
}
|
||||
}
|
||||
|
||||
func PublisherWithMiddlewares(v ...PublisherMiddleware) PublisherOption {
|
||||
return func(o *Publisher) {
|
||||
o.middlewares = append(o.middlewares, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Getter
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) HTTPClient() *http.Client {
|
||||
return p.httpClient
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Public methods
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
|
||||
if p.closed {
|
||||
return ErrPublisherClosed
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
// compose middlewares
|
||||
next := p.handle
|
||||
for _, middleware := range p.middlewares {
|
||||
next = middleware(next)
|
||||
}
|
||||
|
||||
// run handler
|
||||
if err := next(p.l.With(
|
||||
zap.String("message_id", msg.UUID),
|
||||
), msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) Close() error {
|
||||
if p.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Private methods
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (p *Publisher) handle(l *zap.Logger, msg *message.Message) error {
|
||||
req, err := http.NewRequestWithContext(msg.Context(), http.MethodPost, fmt.Sprintf("%s%s", p.host, p.path), bytes.NewReader(msg.Payload))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to create request")
|
||||
}
|
||||
|
||||
for key, value := range msg.Metadata {
|
||||
switch key {
|
||||
case "Cookie":
|
||||
for _, s3 := range strings.Split(value, "; ") {
|
||||
val := strings.Split(s3, "=")
|
||||
req.AddCookie(&http.Cookie{
|
||||
Name: val[0],
|
||||
Value: strings.Join(val[1:], "="),
|
||||
})
|
||||
}
|
||||
case MetadataRequestQuery:
|
||||
req.URL.RawQuery = value
|
||||
default:
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if err := func() error {
|
||||
resp, err := p.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to publish message: %s", msg.UUID)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
l = l.With(zap.Int("http_status_code", resp.StatusCode))
|
||||
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
if body, err := io.ReadAll(resp.Body); err == nil {
|
||||
l = l.With(zap.String("http_response", string(body)))
|
||||
}
|
||||
return errors.Wrap(ErrErrorResponse, resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1,67 +0,0 @@
|
||||
package mpv2_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/integration/watermill/mpv2"
|
||||
encoding "github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/foomo/sesamy-go/pkg/event"
|
||||
"github.com/foomo/sesamy-go/pkg/event/params"
|
||||
"github.com/foomo/sesamy-go/pkg/sesamy"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestPublisher(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
var done atomic.Bool
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
out, err := io.ReadAll(r.Body)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := `{"client_id":"C123456","user_id":"U123456","timestamp_micros":1727701064057701,"events":[{"name":"page_view","params":{"page_title":"Home","page_location":"https://foomo.org"}}],"debug_mode":true,"session_id":"S123456","engagement_time_msec":100}`
|
||||
if !assert.JSONEq(t, expected, string(out)) {
|
||||
fmt.Println(string(out))
|
||||
}
|
||||
done.Store(true)
|
||||
}))
|
||||
|
||||
p := mpv2.NewPublisher(l, s.URL)
|
||||
|
||||
payload := encoding.Payload[params.PageView]{
|
||||
ClientID: "C123456",
|
||||
UserID: "U123456",
|
||||
TimestampMicros: 1727701064057701,
|
||||
UserProperties: nil,
|
||||
Consent: nil,
|
||||
Events: []sesamy.Event[params.PageView]{
|
||||
event.NewPageView(params.PageView{
|
||||
PageTitle: "Home",
|
||||
PageLocation: "https://foomo.org",
|
||||
}),
|
||||
},
|
||||
UserData: nil,
|
||||
DebugMode: true,
|
||||
SessionID: "S123456",
|
||||
EngagementTimeMSec: 100,
|
||||
}
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, p.Publish("foo", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
@ -1,61 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// PublisherMiddlewareIgnoreError ignores error responses from the gtm endpoint to prevent retries.
|
||||
func PublisherMiddlewareIgnoreError(next PublisherHandler) PublisherHandler {
|
||||
return func(l *zap.Logger, msg *message.Message) error {
|
||||
if err := next(l, msg); err != nil {
|
||||
if spanCtx := trace.SpanContextFromContext(msg.Context()); spanCtx.IsValid() && spanCtx.IsSampled() {
|
||||
l = l.With(zap.String("trace_id", spanCtx.TraceID().String()), zap.String("span_id", spanCtx.SpanID().String()))
|
||||
}
|
||||
l.With(zap.Error(err)).Warn("ignoring error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// PublisherMiddlewareEventParams moves the `debug_mode`, `session_id` & `engagement_time_msec` into the events params
|
||||
// since this is required by the measurement protocol but make coding much more complex. That's why it's part of the payload
|
||||
// in this library.
|
||||
func PublisherMiddlewareEventParams(next PublisherHandler) PublisherHandler {
|
||||
return func(l *zap.Logger, msg *message.Message) error {
|
||||
var payload *mpv2.Payload[any]
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
return err
|
||||
}
|
||||
for i, event := range payload.Events {
|
||||
if params, ok := event.Params.(map[string]any); ok {
|
||||
if payload.DebugMode {
|
||||
params["debug_mode"] = "1"
|
||||
payload.DebugMode = false
|
||||
}
|
||||
if len(payload.SessionID) > 0 {
|
||||
params["session_id"] = payload.SessionID
|
||||
payload.SessionID = ""
|
||||
}
|
||||
if payload.EngagementTimeMSec > 0 {
|
||||
params["engagement_time_msec"] = payload.EngagementTimeMSec
|
||||
payload.EngagementTimeMSec = 0
|
||||
}
|
||||
event.Params = params
|
||||
}
|
||||
payload.Events[i] = event
|
||||
|
||||
out, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg.Payload = out
|
||||
}
|
||||
return next(l, msg)
|
||||
}
|
||||
}
|
||||
@ -1,106 +0,0 @@
|
||||
package mpv2_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/integration/watermill/mpv2"
|
||||
encoding "github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
"github.com/foomo/sesamy-go/pkg/event"
|
||||
"github.com/foomo/sesamy-go/pkg/event/params"
|
||||
"github.com/foomo/sesamy-go/pkg/sesamy"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestPublisherMiddlewareIgnoreError(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
var done atomic.Bool
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
done.Store(true)
|
||||
}))
|
||||
|
||||
p := mpv2.NewPublisher(l, s.URL, mpv2.PublisherWithMiddlewares(mpv2.PublisherMiddlewareIgnoreError))
|
||||
|
||||
payload := encoding.Payload[params.PageView]{
|
||||
ClientID: "C123456",
|
||||
UserID: "U123456",
|
||||
TimestampMicros: 1727701064057701,
|
||||
UserProperties: nil,
|
||||
Consent: nil,
|
||||
Events: []sesamy.Event[params.PageView]{
|
||||
event.NewPageView(params.PageView{
|
||||
PageTitle: "Home",
|
||||
PageLocation: "https://foomo.org",
|
||||
}),
|
||||
},
|
||||
UserData: nil,
|
||||
DebugMode: true,
|
||||
SessionID: "S123456",
|
||||
EngagementTimeMSec: 100,
|
||||
}
|
||||
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, p.Publish("foo", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPublisherMiddlewareEventParams(t *testing.T) {
|
||||
l := zaptest.NewLogger(t)
|
||||
|
||||
var done atomic.Bool
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
out, err := io.ReadAll(r.Body)
|
||||
assert.NoError(t, err)
|
||||
|
||||
expected := `{"client_id":"C123456","user_id":"U123456","timestamp_micros":1727701064057701,"events":[{"name":"page_view","params":{"debug_mode":"1","engagement_time_msec":100,"page_location":"https://foomo.org","page_title":"Home","session_id":"S123456"}}]}`
|
||||
if !assert.JSONEq(t, expected, string(out)) {
|
||||
fmt.Println(string(out))
|
||||
}
|
||||
done.Store(true)
|
||||
}))
|
||||
|
||||
p := mpv2.NewPublisher(l, s.URL, mpv2.PublisherWithMiddlewares(mpv2.PublisherMiddlewareEventParams))
|
||||
|
||||
payload := encoding.Payload[params.PageView]{
|
||||
ClientID: "C123456",
|
||||
UserID: "U123456",
|
||||
TimestampMicros: 1727701064057701,
|
||||
UserProperties: nil,
|
||||
Consent: nil,
|
||||
Events: []sesamy.Event[params.PageView]{
|
||||
event.NewPageView(params.PageView{
|
||||
PageTitle: "Home",
|
||||
PageLocation: "https://foomo.org",
|
||||
}),
|
||||
},
|
||||
UserData: nil,
|
||||
DebugMode: true,
|
||||
SessionID: "S123456",
|
||||
EngagementTimeMSec: 100,
|
||||
}
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := message.NewMessage(watermill.NewUUID(), jsonPayload)
|
||||
|
||||
require.NoError(t, p.Publish("foo", msg))
|
||||
|
||||
assert.Eventually(t, done.Load, time.Second, 50*time.Millisecond)
|
||||
}
|
||||
@ -1,147 +0,0 @@
|
||||
package mpv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/ThreeDotsLabs/watermill"
|
||||
"github.com/ThreeDotsLabs/watermill/message"
|
||||
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
||||
mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
Subscriber struct {
|
||||
l *zap.Logger
|
||||
uuidFunc func() string
|
||||
messages chan *message.Message
|
||||
messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error
|
||||
middlewares []mpv2http.Middleware
|
||||
closed bool
|
||||
}
|
||||
SubscriberOption func(*Subscriber)
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Options
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func SubscriberWithUUIDFunc(v func() string) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.uuidFunc = v
|
||||
}
|
||||
}
|
||||
|
||||
func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.messageFunc = v
|
||||
}
|
||||
}
|
||||
|
||||
func SubscriberWithMiddlewares(v ...mpv2http.Middleware) SubscriberOption {
|
||||
return func(o *Subscriber) {
|
||||
o.middlewares = append(o.middlewares, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Constructor
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber {
|
||||
inst := &Subscriber{
|
||||
l: l,
|
||||
uuidFunc: watermill.NewUUID,
|
||||
messages: make(chan *message.Message),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(inst)
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// ~ Public methods
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// retrieve payload
|
||||
payload := mpv2http.Handler(w, r)
|
||||
|
||||
// compose middlewares
|
||||
next := s.handle
|
||||
for _, middleware := range s.middlewares {
|
||||
next = middleware(next)
|
||||
}
|
||||
|
||||
// run handler
|
||||
if err := next(s.l, w, r, payload); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error {
|
||||
// marshal message payload
|
||||
jsonPayload, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to marshal payload")
|
||||
}
|
||||
|
||||
msg := message.NewMessage(s.uuidFunc(), jsonPayload)
|
||||
l = l.With(zap.String("message_id", msg.UUID))
|
||||
msg.SetContext(context.WithoutCancel(r.Context()))
|
||||
|
||||
// store query
|
||||
msg.Metadata.Set(MetadataRequestQuery, r.URL.RawQuery)
|
||||
|
||||
// store header
|
||||
for name, headers := range r.Header {
|
||||
msg.Metadata.Set(name, strings.Join(headers, ","))
|
||||
}
|
||||
|
||||
if s.messageFunc != nil {
|
||||
if err := s.messageFunc(l, r, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range msg.Metadata {
|
||||
l = l.With(zap.String(k, v))
|
||||
}
|
||||
|
||||
// send message
|
||||
s.messages <- msg
|
||||
|
||||
// wait for ACK
|
||||
select {
|
||||
case <-msg.Acked():
|
||||
l.Debug("message acked")
|
||||
return nil
|
||||
case <-msg.Nacked():
|
||||
l.Debug("message nacked")
|
||||
return ErrMessageNacked
|
||||
case <-r.Context().Done():
|
||||
l.Debug("message canceled")
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
|
||||
return s.messages, nil
|
||||
}
|
||||
|
||||
// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
|
||||
func (s *Subscriber) Close() error {
|
||||
if s.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
s.closed = true
|
||||
|
||||
close(s.messages)
|
||||
return nil
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user