feat: update publisher

This commit is contained in:
Kevin Franklin Kim 2024-03-12 13:40:13 +01:00
parent 67a90c8a8d
commit 391f82e2f0
No known key found for this signature in database
2 changed files with 20 additions and 52 deletions

View File

@ -1,6 +0,0 @@
package gtm
const (
HeaderUUID = "Message-Uuid"
ProviderName = "gtm"
)

View File

@ -3,13 +3,15 @@ package gtm
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"strings"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/foomo/keel/log"
mpv2 "github.com/foomo/sesamy-go/measurementprotocol/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var (
@ -19,20 +21,24 @@ var (
type (
Publisher struct {
l *zap.Logger
url string
client *http.Client
marshalMessageFunc MarshalMessageFunc
marshalMessageFunc PublisherMarshalMessageFunc
closed bool
}
PublisherOption func(*Publisher)
// PublisherMarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
PublisherMarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)
)
// ------------------------------------------------------------------------------------------------
// ~ Constructor
// ------------------------------------------------------------------------------------------------
func NewPublisher(url string, opts ...PublisherOption) *Publisher {
func NewPublisher(l *zap.Logger, url string, opts ...PublisherOption) *Publisher {
inst := &Publisher{
l: l,
url: url,
client: http.DefaultClient,
}
@ -52,7 +58,7 @@ func PublisherWithClient(v *http.Client) PublisherOption {
}
}
func PublisherWithMarshalMessageFunc(v MarshalMessageFunc) PublisherOption {
func PublisherWithMarshalMessageFunc(v PublisherMarshalMessageFunc) PublisherOption {
return func(o *Publisher) {
o.marshalMessageFunc = v
}
@ -105,31 +111,27 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
}
}
// logFields := watermill.LogFields{
// "uuid": msg.UUID,
// "provider": ProviderName,
// }
// p.l.Trace("Publishing message", logFields)
r, _ := httputil.DumpRequestOut(req, true)
fmt.Println("--> Outgoing publish")
fmt.Println(string(r))
l := log.WithHTTPRequestOut(p.l, req).With(
zap.String("message_id", msg.UUID),
)
resp, err := p.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to publish message: %s", msg.UUID)
}
defer resp.Body.Close()
if err = p.handleResponseBody(resp); err != nil {
return err
}
l = l.With(log.FHTTPStatusCode(resp.StatusCode))
if resp.StatusCode >= http.StatusBadRequest {
if body, err := io.ReadAll(resp.Body); err == nil {
l = l.With(zap.String("http_response", string(body)))
}
l.Info("server responded with error")
return errors.Wrap(ErrErrorResponse, resp.Status)
}
// p.l.Trace("Message published", logFields)
l.Debug("message published")
}
return nil
@ -143,31 +145,3 @@ func (p *Publisher) Close() error {
p.closed = true
return nil
}
// ------------------------------------------------------------------------------------------------
// ~ Private methods
// ------------------------------------------------------------------------------------------------
func (p *Publisher) handleResponseBody(resp *http.Response) error {
defer resp.Body.Close()
if resp.StatusCode < http.StatusBadRequest {
return nil
}
// body, err := io.ReadAll(resp.Body)
// if err != nil {
// return errors.Wrap(err, "could not read response body")
// }
// logFields = logFields.Add(watermill.LogFields{
// "http_status": resp.StatusCode,
// "http_response": string(body),
// })
// p.l.Info("Server responded with error", logFields)
return nil
}
// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)