sesamy-go/integration/watermill/mpv2/subscriber.go
Kevin Franklin Kim a58dfafaef
feat: add collect
2024-11-20 13:48:00 +01:00

148 lines
3.7 KiB
Go

package mpv2
import (
"context"
"encoding/json"
"net/http"
"strings"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
mpv2http "github.com/foomo/sesamy-go/pkg/http/mpv2"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type (
Subscriber struct {
l *zap.Logger
uuidFunc func() string
messages chan *message.Message
messageFunc func(l *zap.Logger, r *http.Request, msg *message.Message) error
middlewares []mpv2http.Middleware
closed bool
}
SubscriberOption func(*Subscriber)
)
// ------------------------------------------------------------------------------------------------
// ~ Options
// ------------------------------------------------------------------------------------------------
func SubscriberWithUUIDFunc(v func() string) SubscriberOption {
return func(o *Subscriber) {
o.uuidFunc = v
}
}
func SubscriberWithMessageFunc(v func(l *zap.Logger, r *http.Request, msg *message.Message) error) SubscriberOption {
return func(o *Subscriber) {
o.messageFunc = v
}
}
func SubscriberWithMiddlewares(v ...mpv2http.Middleware) SubscriberOption {
return func(o *Subscriber) {
o.middlewares = append(o.middlewares, v...)
}
}
// ------------------------------------------------------------------------------------------------
// ~ Constructor
// ------------------------------------------------------------------------------------------------
func NewSubscriber(l *zap.Logger, opts ...SubscriberOption) *Subscriber {
inst := &Subscriber{
l: l,
uuidFunc: watermill.NewUUID,
messages: make(chan *message.Message),
}
for _, opt := range opts {
opt(inst)
}
return inst
}
// ------------------------------------------------------------------------------------------------
// ~ Public methods
// ------------------------------------------------------------------------------------------------
func (s *Subscriber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// retrieve payload
payload := mpv2http.Handler(w, r)
// compose middlewares
next := s.handle
for _, middleware := range s.middlewares {
next = middleware(next)
}
// run handler
if err := next(s.l, w, r, payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func (s *Subscriber) handle(l *zap.Logger, w http.ResponseWriter, r *http.Request, payload *mpv2.Payload[any]) error {
// marshal message payload
jsonPayload, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "failed to marshal payload")
}
msg := message.NewMessage(s.uuidFunc(), jsonPayload)
l = l.With(zap.String("message_id", msg.UUID))
msg.SetContext(context.WithoutCancel(r.Context()))
// store query
msg.Metadata.Set(MetadataRequestQuery, r.URL.RawQuery)
// store header
for name, headers := range r.Header {
msg.Metadata.Set(name, strings.Join(headers, ","))
}
if s.messageFunc != nil {
if err := s.messageFunc(l, r, msg); err != nil {
return err
}
}
for k, v := range msg.Metadata {
l = l.With(zap.String(k, v))
}
// send message
s.messages <- msg
// wait for ACK
select {
case <-msg.Acked():
l.Debug("message acked")
return nil
case <-msg.Nacked():
l.Debug("message nacked")
return ErrMessageNacked
case <-r.Context().Done():
l.Debug("message canceled")
return ErrContextCanceled
}
}
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
return s.messages, nil
}
// Close closes all subscriptions with their output channels and flush offsets etc. when needed.
func (s *Subscriber) Close() error {
if s.closed {
return ErrClosed
}
s.closed = true
close(s.messages)
return nil
}