This commit is contained in:
Al Moataz Rizk 2025-09-30 14:14:15 +02:00 committed by GitHub
commit 7f8d7b1384
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 144 additions and 0 deletions

View File

@ -7,6 +7,7 @@ import (
"go.uber.org/zap"
"github.com/foomo/keel/log"
"github.com/pkg/errors"
)
type service struct {
@ -38,3 +39,42 @@ func (s *service) Close(ctx context.Context) error {
s.w.Stop()
return nil
}
type toggleableService struct {
l *zap.Logger
newWorker func() worker.Worker
w worker.Worker
name string
}
func NewToggleableService(l *zap.Logger, name string, newWorker func() worker.Worker) *toggleableService {
if l == nil {
l = log.Logger()
}
// enrich the log
l = log.WithHTTPServerName(l, name)
return &toggleableService{l: l, name: name, newWorker: newWorker}
}
func (s *toggleableService) Name() string {
return s.name
}
func (s *toggleableService) Start(ctx context.Context) error {
if s.w != nil {
// to prevent memory leaks, block starting a worker if it has already been started
return errors.New("can not start new worker while another one is running")
}
s.l.Info("creating and starting temporal worker")
s.w = s.newWorker()
return s.w.Start()
}
func (s *toggleableService) Close(ctx context.Context) error {
if s.w != nil {
s.l.Info("stopping temporal worker")
s.w.Stop()
s.w = nil
}
return nil
}

View File

@ -0,0 +1,104 @@
package keeltemporal
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.uber.org/zap"
)
func TestNewToggleableService(t *testing.T) {
t.Run("NewService_StartCloseStart", func(t *testing.T) {
l := zap.NewNop()
client, err := client.NewLazyClient(client.Options{})
require.NoError(t, err)
w := worker.New(client, "test_queue", worker.Options{})
s := NewService(l, "test", w)
// start worker
s.Start(t.Context())
// close worker
s.Close(t.Context())
// starting worker again panics
defer func() {
if e := recover(); e != nil {
if errString, ok := e.(string); ok {
assert.Equal(t, "attempted to start a worker that has been stopped before", errString)
} else {
t.Fatal(e)
}
} else {
t.Fatal("expected a panic")
}
}()
s.Start(t.Context())
})
t.Run("NewToggleableService_StartCloseStart", func(t *testing.T) {
l := zap.NewNop()
client, err := client.NewLazyClient(client.Options{})
require.NoError(t, err)
newWorker := func() worker.Worker {
return worker.New(client, "test_queue", worker.Options{})
}
s := NewToggleableService(l, "test", newWorker)
// start worker
s.Start(t.Context())
// close worker
s.Close(t.Context())
// starting worker again should NOT panic
defer func() {
e := recover()
assert.Nil(t, e)
}()
s.Start(t.Context())
})
t.Run("NewToggleableService_CloseBeforeStart", func(t *testing.T) {
l := zap.NewNop()
client, err := client.NewLazyClient(client.Options{})
require.NoError(t, err)
newWorker := func() worker.Worker {
return worker.New(client, "test_queue", worker.Options{})
}
s := NewToggleableService(l, "test", newWorker)
// closing worker before again should NOT panic
defer func() {
e := recover()
assert.Nil(t, e)
}()
s.Close(t.Context())
})
t.Run("NewToggleableService_MultipleCallsToStart", func(t *testing.T) {
l := zap.NewNop()
client, err := client.NewLazyClient(client.Options{})
require.NoError(t, err)
newWorker := func() worker.Worker {
return worker.New(client, "test_queue", worker.Options{})
}
s := NewToggleableService(l, "test", newWorker)
// start worker
s.Start(t.Context())
// attempt to start worker again
err = s.Start(t.Context())
require.ErrorContains(t, err, "can not start new worker while another one is runnin")
})
}