From 9c4ec4a25b4f0e41444d430edc0e2638959da4b1 Mon Sep 17 00:00:00 2001 From: Kevin Franklin Kim Date: Thu, 30 May 2024 10:30:23 +0200 Subject: [PATCH] feat: add no publish handler --- integration/watermill/gtag/messagehandler.go | 2 +- .../watermill/gtag/nopublishmessagehandler.go | 23 +++++++++++++++++++ integration/watermill/mpv2/messagehandler.go | 2 +- .../watermill/mpv2/nopublishmessagehandler.go | 23 +++++++++++++++++++ 4 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 integration/watermill/gtag/nopublishmessagehandler.go create mode 100644 integration/watermill/mpv2/nopublishmessagehandler.go diff --git a/integration/watermill/gtag/messagehandler.go b/integration/watermill/gtag/messagehandler.go index 25be057..eeb8be7 100644 --- a/integration/watermill/gtag/messagehandler.go +++ b/integration/watermill/gtag/messagehandler.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" ) -func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) { +func MessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { var payload *gtag.Payload diff --git a/integration/watermill/gtag/nopublishmessagehandler.go b/integration/watermill/gtag/nopublishmessagehandler.go new file mode 100644 index 0000000..6107462 --- /dev/null +++ b/integration/watermill/gtag/nopublishmessagehandler.go @@ -0,0 +1,23 @@ +package gtag + +import ( + "encoding/json" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/foomo/sesamy-go/pkg/encoding/gtag" + "github.com/pkg/errors" +) + +func NoPublishMessageHandler(handler func(payload *gtag.Payload, msg *message.Message) error) func(msg *message.Message) error { + return func(msg *message.Message) error { + var payload *gtag.Payload + + // unmarshal payload + if err := json.Unmarshal(msg.Payload, &payload); err != nil { + return errors.Wrap(err, "failed to unmarshal payload") + } + + // handle payload + return handler(payload, msg) + } +} diff --git a/integration/watermill/mpv2/messagehandler.go b/integration/watermill/mpv2/messagehandler.go index 3fa4c23..3cc2cd7 100644 --- a/integration/watermill/mpv2/messagehandler.go +++ b/integration/watermill/mpv2/messagehandler.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" ) -func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) func(msg *message.Message) ([]*message.Message, error) { +func MessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { var payload *mpv2.Payload[any] diff --git a/integration/watermill/mpv2/nopublishmessagehandler.go b/integration/watermill/mpv2/nopublishmessagehandler.go new file mode 100644 index 0000000..5777c4a --- /dev/null +++ b/integration/watermill/mpv2/nopublishmessagehandler.go @@ -0,0 +1,23 @@ +package mpv2 + +import ( + "encoding/json" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/foomo/sesamy-go/pkg/encoding/mpv2" + "github.com/pkg/errors" +) + +func NoPublishMessageHandler(handler func(payload *mpv2.Payload[any], msg *message.Message) error) message.NoPublishHandlerFunc { + return func(msg *message.Message) error { + var payload *mpv2.Payload[any] + + // unmarshal payload + if err := json.Unmarshal(msg.Payload, &payload); err != nil { + return errors.Wrap(err, "failed to unmarshal payload") + } + + // handle payload + return handler(payload, msg) + } +}