diff --git a/integration/temporal/service.go b/integration/temporal/service.go index 587379d..cbb54ab 100644 --- a/integration/temporal/service.go +++ b/integration/temporal/service.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "github.com/foomo/keel/log" + "github.com/pkg/errors" ) type service struct { @@ -38,3 +39,42 @@ func (s *service) Close(ctx context.Context) error { s.w.Stop() return nil } + +type toggleableService struct { + l *zap.Logger + newWorker func() worker.Worker + w worker.Worker + name string +} + +func NewToggleableService(l *zap.Logger, name string, newWorker func() worker.Worker) *toggleableService { + if l == nil { + l = log.Logger() + } + // enrich the log + l = log.WithHTTPServerName(l, name) + return &toggleableService{l: l, name: name, newWorker: newWorker} +} + +func (s *toggleableService) Name() string { + return s.name +} + +func (s *toggleableService) Start(ctx context.Context) error { + if s.w != nil { + // to prevent memory leaks, block starting a worker if it has already been started + return errors.New("can not start new worker while another one is running") + } + s.l.Info("creating and starting temporal worker") + s.w = s.newWorker() + return s.w.Start() +} + +func (s *toggleableService) Close(ctx context.Context) error { + if s.w != nil { + s.l.Info("stopping temporal worker") + s.w.Stop() + s.w = nil + } + return nil +} diff --git a/integration/temporal/service_test.go b/integration/temporal/service_test.go new file mode 100644 index 0000000..0ad38b0 --- /dev/null +++ b/integration/temporal/service_test.go @@ -0,0 +1,104 @@ +package keeltemporal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.uber.org/zap" +) + +func TestNewToggleableService(t *testing.T) { + t.Run("NewService_StartCloseStart", func(t *testing.T) { + l := zap.NewNop() + client, err := client.NewLazyClient(client.Options{}) + require.NoError(t, err) + + w := worker.New(client, "test_queue", worker.Options{}) + s := NewService(l, "test", w) + + // start worker + s.Start(t.Context()) + + // close worker + s.Close(t.Context()) + + // starting worker again panics + defer func() { + if e := recover(); e != nil { + if errString, ok := e.(string); ok { + assert.Equal(t, "attempted to start a worker that has been stopped before", errString) + } else { + t.Fatal(e) + } + } else { + t.Fatal("expected a panic") + } + }() + + s.Start(t.Context()) + }) + + t.Run("NewToggleableService_StartCloseStart", func(t *testing.T) { + l := zap.NewNop() + client, err := client.NewLazyClient(client.Options{}) + require.NoError(t, err) + + newWorker := func() worker.Worker { + return worker.New(client, "test_queue", worker.Options{}) + } + s := NewToggleableService(l, "test", newWorker) + + // start worker + s.Start(t.Context()) + + // close worker + s.Close(t.Context()) + + // starting worker again should NOT panic + defer func() { + e := recover() + assert.Nil(t, e) + }() + + s.Start(t.Context()) + }) + + t.Run("NewToggleableService_CloseBeforeStart", func(t *testing.T) { + l := zap.NewNop() + client, err := client.NewLazyClient(client.Options{}) + require.NoError(t, err) + + newWorker := func() worker.Worker { + return worker.New(client, "test_queue", worker.Options{}) + } + s := NewToggleableService(l, "test", newWorker) + + // closing worker before again should NOT panic + defer func() { + e := recover() + assert.Nil(t, e) + }() + s.Close(t.Context()) + }) + + t.Run("NewToggleableService_MultipleCallsToStart", func(t *testing.T) { + l := zap.NewNop() + client, err := client.NewLazyClient(client.Options{}) + require.NoError(t, err) + + newWorker := func() worker.Worker { + return worker.New(client, "test_queue", worker.Options{}) + } + s := NewToggleableService(l, "test", newWorker) + + // start worker + s.Start(t.Context()) + + // attempt to start worker again + err = s.Start(t.Context()) + require.ErrorContains(t, err, "can not start new worker while another one is runnin") + }) +}