mirror of
https://github.com/foomo/keel.git
synced 2025-10-16 12:35:34 +00:00
137 lines
4.3 KiB
Go
137 lines
4.3 KiB
Go
package keeltemporal
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.opentelemetry.io/otel"
|
|
"go.temporal.io/api/enums/v1"
|
|
"go.temporal.io/api/namespace/v1"
|
|
"go.temporal.io/api/replication/v1"
|
|
"go.temporal.io/api/serviceerror"
|
|
"go.temporal.io/api/workflowservice/v1"
|
|
"go.temporal.io/sdk/client"
|
|
"go.temporal.io/sdk/contrib/opentelemetry"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/foomo/keel/env"
|
|
"github.com/foomo/keel/log"
|
|
"github.com/foomo/keel/telemetry"
|
|
)
|
|
|
|
type (
|
|
ClientOptions struct {
|
|
Logger *zap.Logger
|
|
Namespace string
|
|
RegisterNamespace *workflowservice.RegisterNamespaceRequest
|
|
OtelEnabled bool
|
|
}
|
|
ClientOption func(o *ClientOptions)
|
|
)
|
|
|
|
func ClientWithOtelEnabled(v bool) ClientOption {
|
|
return func(o *ClientOptions) {
|
|
o.OtelEnabled = v
|
|
}
|
|
}
|
|
|
|
func ClientWithNamespace(v string) ClientOption {
|
|
return func(o *ClientOptions) {
|
|
o.Namespace = v
|
|
}
|
|
}
|
|
|
|
func ClientWithRegisterNamespace(v *workflowservice.RegisterNamespaceRequest) ClientOption {
|
|
return func(o *ClientOptions) {
|
|
o.RegisterNamespace = v
|
|
}
|
|
}
|
|
|
|
func DefaultClientOptions() ClientOptions {
|
|
return ClientOptions{
|
|
Logger: log.Logger(),
|
|
Namespace: "default",
|
|
RegisterNamespace: nil,
|
|
OtelEnabled: env.GetBool("OTEL_TEMPORAL_ENABLED", env.GetBool("OTEL_ENABLED", false)),
|
|
}
|
|
}
|
|
|
|
func NewClient(ctx context.Context, endpoint string, opts ...ClientOption) (client.Client, error) {
|
|
o := DefaultClientOptions()
|
|
|
|
// apply options
|
|
for _, opt := range opts {
|
|
opt(&o)
|
|
}
|
|
|
|
clientOpts := client.Options{
|
|
HostPort: endpoint,
|
|
Namespace: o.Namespace,
|
|
Logger: NewLogger(o.Logger),
|
|
}
|
|
|
|
nsc, err := client.NewNamespaceClient(clientOpts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create temporal namespace client")
|
|
}
|
|
|
|
// setup namespace
|
|
if o.RegisterNamespace != nil {
|
|
var notFoundErr *serviceerror.NotFound
|
|
if ns, err := nsc.Describe(ctx, o.RegisterNamespace.Namespace); errors.As(err, ¬FoundErr) {
|
|
if err := nsc.Register(ctx, o.RegisterNamespace); err != nil {
|
|
return nil, errors.Wrap(err, "failed to register temporal namespace")
|
|
}
|
|
} else if err != nil {
|
|
return nil, errors.Wrap(err, "failed to retrieve temporal namespace info")
|
|
} else if nsInfo := ns.GetNamespaceInfo(); nsInfo.State != enums.NAMESPACE_STATE_REGISTERED { //nolint:nosnakecase
|
|
return nil, errors.New("Could not register namespace due to existing state: " + nsInfo.State.String())
|
|
} else if err := nsc.Update(ctx, &workflowservice.UpdateNamespaceRequest{
|
|
Namespace: o.RegisterNamespace.Namespace,
|
|
UpdateInfo: &namespace.UpdateNamespaceInfo{
|
|
Description: o.RegisterNamespace.Description,
|
|
OwnerEmail: o.RegisterNamespace.OwnerEmail,
|
|
Data: o.RegisterNamespace.Data,
|
|
State: nsInfo.State,
|
|
},
|
|
Config: &namespace.NamespaceConfig{
|
|
WorkflowExecutionRetentionTtl: o.RegisterNamespace.WorkflowExecutionRetentionPeriod,
|
|
BadBinaries: ns.Config.BadBinaries,
|
|
HistoryArchivalState: o.RegisterNamespace.HistoryArchivalState,
|
|
HistoryArchivalUri: o.RegisterNamespace.HistoryArchivalUri,
|
|
VisibilityArchivalState: o.RegisterNamespace.VisibilityArchivalState,
|
|
VisibilityArchivalUri: o.RegisterNamespace.VisibilityArchivalUri,
|
|
},
|
|
ReplicationConfig: &replication.NamespaceReplicationConfig{
|
|
ActiveClusterName: o.RegisterNamespace.ActiveClusterName,
|
|
Clusters: o.RegisterNamespace.Clusters,
|
|
State: ns.ReplicationConfig.State,
|
|
},
|
|
SecurityToken: o.RegisterNamespace.SecurityToken,
|
|
DeleteBadBinary: "",
|
|
PromoteNamespace: false,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "failed to register temporal namespace")
|
|
}
|
|
clientOpts.Namespace = o.RegisterNamespace.Namespace
|
|
}
|
|
|
|
// setup otel
|
|
if o.OtelEnabled {
|
|
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{
|
|
Tracer: telemetry.Tracer(),
|
|
TextMapPropagator: otel.GetTextMapPropagator(),
|
|
SpanContextKey: nil,
|
|
HeaderKey: "",
|
|
SpanStarter: nil,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to create new opentracing interceptor")
|
|
}
|
|
clientOpts.Interceptors = append(clientOpts.Interceptors, tracingInterceptor)
|
|
clientOpts.MetricsHandler = NewMetricsHandler(telemetry.Meter())
|
|
}
|
|
|
|
return client.Dial(clientOpts)
|
|
}
|