feat: add poll interval flag

This commit is contained in:
Kevin Franklin Kim 2024-03-22 12:38:43 +01:00
parent 3f8561359c
commit 52c1a67b7a
No known key found for this signature in database
10 changed files with 127 additions and 104 deletions

View File

@ -126,7 +126,7 @@ func initRepo(tb testing.TB, l *zap.Logger) *repo.Repo {
r := repo.New(l,
testRepoServer.URL+"/repo-two-dimensions.json",
repo.NewHistory(l,
repo.HistoryWithVarDir(varDir),
repo.HistoryWithHistoryDir(varDir),
),
)
up := make(chan bool, 1)

View File

@ -3,7 +3,6 @@ package cmd
import (
"time"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
@ -32,9 +31,9 @@ func addressFlag(v *viper.Viper) string {
return v.GetString("address")
}
func addAddressFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().String("address", ":8080", "Address to bind to (host:port)")
_ = v.BindPFlag("address", cmd.Flags().Lookup("address"))
func addAddressFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.String("address", ":8080", "Address to bind to (host:port)")
_ = v.BindPFlag("address", flags.Lookup("address"))
_ = v.BindEnv("address", "CONTENT_SERVER_ADDRESS")
}
@ -42,39 +41,49 @@ func basePathFlag(v *viper.Viper) string {
return v.GetString("base_path")
}
func addBasePathFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().String("base-path", "/contentserver", "Base path to export the webserver on")
_ = v.BindPFlag("base_path", cmd.Flags().Lookup("base_path"))
func addBasePathFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.String("base-path", "/contentserver", "Base path to export the webserver on")
_ = v.BindPFlag("base_path", flags.Lookup("base_path"))
_ = v.BindEnv("base_path", "CONTENT_SERVER_BASE_PATH")
}
func pollFlag(v *viper.Viper) bool {
return v.GetBool("poll")
return v.GetBool("poll.enabled")
}
func addPollFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Bool("poll", false, "If true, the address arg will be used to periodically poll the content url")
_ = v.BindPFlag("poll", cmd.Flags().Lookup("poll"))
_ = v.BindEnv("poll", "CONTENT_SERVER_POLL")
func addPollFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Bool("poll", false, "If true, the address arg will be used to periodically poll the content url")
_ = v.BindPFlag("poll.enabled", flags.Lookup("poll"))
_ = v.BindEnv("poll.enabled", "CONTENT_SERVER_POLL")
}
func pollIntevalFlag(v *viper.Viper) time.Duration {
return v.GetDuration("poll.interval")
}
func addPollIntervalFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Duration("poll-interval", time.Minute, "Specifies the poll interval")
_ = v.BindPFlag("poll", flags.Lookup("poll-interval"))
_ = v.BindEnv("poll", "CONTENT_SERVER_POLL_INTERVAL")
}
func historyDirFlag(v *viper.Viper) string {
return v.GetString("history.dir")
}
func addHistoryDirFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().String("history-dir", "/var/lib/contentserver", "Where to put my data")
_ = v.BindPFlag("history.dir", cmd.Flags().Lookup("history-dir"))
func addHistoryDirFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.String("history-dir", "/var/lib/contentserver", "Where to put my data")
_ = v.BindPFlag("history.dir", flags.Lookup("history-dir"))
_ = v.BindEnv("history.dir", "CONTENT_SERVER_HISTORY_DIR")
}
func historyLimitFlag(v *viper.Viper) string {
return v.GetString("history.limit")
func historyLimitFlag(v *viper.Viper) int {
return v.GetInt("history.limit")
}
func addHistoryLimitFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Int("history-limit", 2, "Number of history records to keep")
_ = v.BindPFlag("history.limit", cmd.Flags().Lookup("history-limit"))
func addHistoryLimitFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Int("history-limit", 2, "Number of history records to keep")
_ = v.BindPFlag("history.limit", flags.Lookup("history-limit"))
_ = v.BindEnv("history.limit", "CONTENT_SERVER_HISTORY_LIMIT")
}
@ -82,9 +91,9 @@ func gracefulTimeoutFlag(v *viper.Viper) time.Duration {
return v.GetDuration("graceful_timeout")
}
func addGracefulTimeoutFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Duration("graceful-timeout", 0, "Timeout duration for graceful shutdown")
_ = v.BindPFlag("graceful_timeout", cmd.Flags().Lookup("graceful-timeout"))
func addGracefulTimeoutFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Duration("graceful-timeout", 0, "Timeout duration for graceful shutdown")
_ = v.BindPFlag("graceful_timeout", flags.Lookup("graceful-timeout"))
_ = v.BindEnv("graceful_timeout", "CONTENT_SERVER_GRACEFUL_TIMEOUT")
}
@ -92,9 +101,9 @@ func shutdownTimeoutFlag(v *viper.Viper) time.Duration {
return v.GetDuration("shutdown_timeout")
}
func addShutdownTimeoutFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Duration("shutdown-timeout", 0, "Timeout duration for shutdown")
_ = v.BindPFlag("shutdown_timeout", cmd.Flags().Lookup("shutdown-timeout"))
func addShutdownTimeoutFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Duration("shutdown-timeout", 0, "Timeout duration for shutdown")
_ = v.BindPFlag("shutdown_timeout", flags.Lookup("shutdown-timeout"))
_ = v.BindEnv("shutdown_timeout", "CONTENT_SERVER_SHUTDOWN_TIMEOUT")
}
@ -102,26 +111,26 @@ func serviceHealthzEnabledFlag(v *viper.Viper) bool {
return v.GetBool("service.healthz.enabled")
}
func addServiceHealthzEnabledFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Bool("service-healthz-enabled", false, "Enable healthz service")
_ = v.BindPFlag("service.healthz.enabled", cmd.Flags().Lookup("service-healthz-enabled"))
func addServiceHealthzEnabledFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Bool("service-healthz-enabled", false, "Enable healthz service")
_ = v.BindPFlag("service.healthz.enabled", flags.Lookup("service-healthz-enabled"))
}
func servicePrometheusEnabledFlag(v *viper.Viper) bool {
return v.GetBool("service.prometheus.enabled")
}
func addServicePrometheusEnabledFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Bool("service-prometheus-enabled", false, "Enable prometheus service")
_ = v.BindPFlag("service.prometheus.enabled", cmd.Flags().Lookup("service-prometheus-enabled"))
func addServicePrometheusEnabledFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Bool("service-prometheus-enabled", false, "Enable prometheus service")
_ = v.BindPFlag("service.prometheus.enabled", flags.Lookup("service-prometheus-enabled"))
}
func otelEnabledFlag(v *viper.Viper) bool {
return v.GetBool("otel.enabled")
}
func addOtelEnabledFlag(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().Bool("otel-enabled", false, "Enable otel service")
_ = v.BindPFlag("otel.enabled", cmd.Flags().Lookup("otel-enabled"))
func addOtelEnabledFlag(flags *pflag.FlagSet, v *viper.Viper) {
flags.Bool("otel-enabled", false, "Enable otel service")
_ = v.BindPFlag("otel.enabled", flags.Lookup("otel-enabled"))
_ = v.BindEnv("otel.enabled", "OTEL_ENABLED")
}

View File

@ -33,12 +33,12 @@ func NewHTTPCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
svr := keel.NewServer(
keel.WithHTTPReadmeService(true),
keel.WithHTTPPrometheusService(v.GetBool("service.prometheus.enabled")),
keel.WithHTTPHealthzService(v.GetBool("service.healthz.enabled")),
keel.WithPrometheusMeter(v.GetBool("service.prometheus.enabled")),
keel.WithOTLPGRPCTracer(v.GetBool("otel.enabled")),
keel.WithGracefulTimeout(v.GetDuration("graceful.timeout")),
keel.WithShutdownTimeout(v.GetDuration("shutdown.timeout")),
keel.WithHTTPPrometheusService(servicePrometheusEnabledFlag(v)),
keel.WithHTTPHealthzService(serviceHealthzEnabledFlag(v)),
keel.WithPrometheusMeter(servicePrometheusEnabledFlag(v)),
keel.WithOTLPGRPCTracer(otelEnabledFlag(v)),
keel.WithGracefulTimeout(gracefulTimeoutFlag(v)),
keel.WithShutdownTimeout(shutdownTimeoutFlag(v)),
)
l := svr.Logger()
@ -48,15 +48,16 @@ func NewHTTPCommand() *cobra.Command {
r := repo.New(l,
args[0],
repo.NewHistory(l,
repo.HistoryWithVarDir(v.GetString("history.dir")),
repo.HistoryWithMax(v.GetInt("history.limit")),
repo.HistoryWithHistoryDir(historyDirFlag(v)),
repo.HistoryWithHistoryLimit(historyLimitFlag(v)),
),
repo.WithHTTPClient(
keelhttp.NewHTTPClient(
keelhttp.HTTPClientWithTelemetry(),
),
),
repo.WithPollForUpdates(v.GetBool("poll")),
repo.WithPollInterval(pollIntevalFlag(v)),
repo.WithPoll(pollFlag(v)),
)
// start initial update and handle error
@ -68,8 +69,8 @@ func NewHTTPCommand() *cobra.Command {
}))
svr.AddServices(
service.NewHTTP(l, "http", v.GetString("address"),
handler.NewHTTP(l, r, handler.WithPath(v.GetString("path"))),
service.NewHTTP(l, "http", addressFlag(v),
handler.NewHTTP(l, r, handler.WithBasePath(basePathFlag(v))),
middleware.Telemetry(),
middleware.Logger(),
middleware.Recover(),
@ -84,16 +85,18 @@ func NewHTTPCommand() *cobra.Command {
},
}
addAddressFlag(cmd, v)
addBasePathFlag(cmd, v)
addPollFlag(cmd, v)
addHistoryDirFlag(cmd, v)
addHistoryLimitFlag(cmd, v)
addGracefulTimeoutFlag(cmd, v)
addShutdownTimeoutFlag(cmd, v)
addOtelEnabledFlag(cmd, v)
addServiceHealthzEnabledFlag(cmd, v)
addServicePrometheusEnabledFlag(cmd, v)
flags := cmd.Flags()
addAddressFlag(flags, v)
addBasePathFlag(flags, v)
addPollFlag(flags, v)
addPollIntervalFlag(flags, v)
addHistoryDirFlag(flags, v)
addHistoryLimitFlag(flags, v)
addGracefulTimeoutFlag(flags, v)
addShutdownTimeoutFlag(flags, v)
addOtelEnabledFlag(flags, v)
addServiceHealthzEnabledFlag(flags, v)
addServicePrometheusEnabledFlag(flags, v)
return cmd
}

View File

@ -34,22 +34,23 @@ func NewSocketCommand() *cobra.Command {
r := repo.New(l,
args[0],
repo.NewHistory(l,
repo.HistoryWithVarDir(v.GetString("history.dir")),
repo.HistoryWithMax(v.GetInt("history.limit")),
repo.HistoryWithHistoryDir(historyDirFlag(v)),
repo.HistoryWithHistoryLimit(historyLimitFlag(v)),
),
repo.WithHTTPClient(
keelhttp.NewHTTPClient(
keelhttp.HTTPClientWithTelemetry(),
),
),
repo.WithPollForUpdates(v.GetBool("poll")),
repo.WithPoll(pollFlag(v)),
repo.WithPollInterval(pollIntevalFlag(v)),
)
// create socket server
handle := handler.NewSocket(l, r)
// listen on socket
ln, err := net.Listen("tcp", v.GetString("address"))
ln, err := net.Listen("tcp", addressFlag(v))
if err != nil {
return err
}
@ -62,7 +63,7 @@ func NewSocketCommand() *cobra.Command {
go r.Start(context.Background()) //nolint:errcheck
<-up
l.Info("started listening", zap.String("address", v.GetString("address")))
l.Info("started listening", zap.String("address", addressFlag(v)))
for {
// this blocks until connection or error
@ -84,10 +85,12 @@ func NewSocketCommand() *cobra.Command {
},
}
addAddressFlag(cmd, v)
addPollFlag(cmd, v)
addHistoryDirFlag(cmd, v)
addHistoryLimitFlag(cmd, v)
flags := cmd.Flags()
addAddressFlag(flags, v)
addPollFlag(flags, v)
addPollIntervalFlag(flags, v)
addHistoryDirFlag(flags, v)
addHistoryLimitFlag(flags, v)
return cmd
}

View File

@ -17,9 +17,9 @@ import (
type (
HTTP struct {
l *zap.Logger
path string
repo *repo.Repo
l *zap.Logger
basePath string
repo *repo.Repo
}
HTTPOption func(*HTTP)
)
@ -31,9 +31,9 @@ type (
// NewHTTP returns a shiny new web server
func NewHTTP(l *zap.Logger, repo *repo.Repo, opts ...HTTPOption) http.Handler {
inst := &HTTP{
l: l.Named("http"),
path: "/contentserver",
repo: repo,
l: l.Named("http"),
basePath: "/contentserver",
repo: repo,
}
for _, opt := range opts {
@ -47,9 +47,9 @@ func NewHTTP(l *zap.Logger, repo *repo.Repo, opts ...HTTPOption) http.Handler {
// ~ Options
// ------------------------------------------------------------------------------------------------
func WithPath(v string) HTTPOption {
func WithBasePath(v string) HTTPOption {
return func(o *HTTP) {
o.path = v
o.basePath = v
}
}
@ -73,7 +73,7 @@ func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
route := Route(strings.TrimPrefix(r.URL.Path, h.path+"/"))
route := Route(strings.TrimPrefix(r.URL.Path, h.basePath+"/"))
if route == RouteGetRepo {
h.repo.WriteRepoBytes(w)
w.Header().Set("Content-Type", "application/json")

View File

@ -21,9 +21,9 @@ const (
type (
History struct {
l *zap.Logger
max int
varDir string
l *zap.Logger
historyDir string
historyLimit int
}
HistoryOption func(*History)
)
@ -32,15 +32,15 @@ type (
// ~ Options
// ------------------------------------------------------------------------------------------------
func HistoryWithMax(v int) HistoryOption {
func HistoryWithHistoryLimit(v int) HistoryOption {
return func(o *History) {
o.max = v
o.historyLimit = v
}
}
func HistoryWithVarDir(v string) HistoryOption {
func HistoryWithHistoryDir(v string) HistoryOption {
return func(o *History) {
o.varDir = v
o.historyDir = v
}
}
@ -50,9 +50,9 @@ func HistoryWithVarDir(v string) HistoryOption {
func NewHistory(l *zap.Logger, opts ...HistoryOption) *History {
inst := &History{
l: l,
max: 2,
varDir: "/var/lib/contentserver",
l: l,
historyDir: "/var/lib/contentserver",
historyLimit: 2,
}
for _, opt := range opts {
@ -69,7 +69,7 @@ func NewHistory(l *zap.Logger, opts ...HistoryOption) *History {
func (h *History) Add(jsonBytes []byte) error {
var (
// historiy file name
filename = path.Join(h.varDir, HistoryRepoJSONPrefix+time.Now().Format(time.RFC3339Nano)+HistoryRepoJSONSuffix)
filename = path.Join(h.historyDir, HistoryRepoJSONPrefix+time.Now().Format(time.RFC3339Nano)+HistoryRepoJSONSuffix)
err = os.WriteFile(filename, jsonBytes, 0600)
)
if err != nil {
@ -94,7 +94,7 @@ func (h *History) Add(jsonBytes []byte) error {
}
func (h *History) GetCurrentFilename() string {
return path.Join(h.varDir, HistoryRepoJSONPrefix+"current"+HistoryRepoJSONSuffix)
return path.Join(h.historyDir, HistoryRepoJSONPrefix+"current"+HistoryRepoJSONSuffix)
}
func (h *History) GetCurrent(buf *bytes.Buffer) (err error) {
@ -112,7 +112,7 @@ func (h *History) GetCurrent(buf *bytes.Buffer) (err error) {
// ------------------------------------------------------------------------------------------------
func (h *History) getHistory() (files []string, err error) {
fileInfos, err := os.ReadDir(h.varDir)
fileInfos, err := os.ReadDir(h.historyDir)
if err != nil {
return
}
@ -121,7 +121,7 @@ func (h *History) getHistory() (files []string, err error) {
if !f.IsDir() {
filename := f.Name()
if filename != currentName && (strings.HasPrefix(filename, HistoryRepoJSONPrefix) && strings.HasSuffix(filename, HistoryRepoJSONSuffix)) {
files = append(files, path.Join(h.varDir, filename))
files = append(files, path.Join(h.historyDir, filename))
}
}
}
@ -130,7 +130,7 @@ func (h *History) getHistory() (files []string, err error) {
}
func (h *History) cleanup() error {
files, err := h.getFilesForCleanup(h.max)
files, err := h.getFilesForCleanup(h.historyLimit)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ func TestHistoryCleanup(t *testing.T) {
func TestHistoryOrder(t *testing.T) {
h := testHistory(t)
h.varDir = "testdata/order"
h.historyDir = "testdata/order"
files, err := h.getHistory()
require.NoError(t, err)
@ -59,7 +59,7 @@ func TestHistoryOrder(t *testing.T) {
func TestGetFilesForCleanup(t *testing.T) {
h := testHistory(t)
h.varDir = "testdata/order"
h.historyDir = "testdata/order"
files, err := h.getFilesForCleanup(2)
require.NoError(t, err)
@ -71,5 +71,5 @@ func testHistory(t *testing.T) *History {
l := zaptest.NewLogger(t)
tempDir, err := os.MkdirTemp(os.TempDir(), "contentserver-history-test")
require.NoError(t, err)
return NewHistory(l, HistoryWithMax(2), HistoryWithVarDir(tempDir))
return NewHistory(l, HistoryWithHistoryLimit(2), HistoryWithHistoryDir(tempDir))
}

View File

@ -28,7 +28,7 @@ type updateResponse struct {
func (r *Repo) PollRoutine(ctx context.Context) error {
l := r.l.Named("routine.poll")
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(r.pollInterval)
for {
select {
case <-ctx.Done():

View File

@ -26,14 +26,15 @@ const maxGetURIForNodeRecursionLevel = 1000
// Repo content repository
type (
Repo struct {
l *zap.Logger
url string
poll bool
pollVersion string
onStart func()
loaded *atomic.Bool
history *History
httpClient *http.Client
l *zap.Logger
url string
poll bool
pollInterval time.Duration
pollVersion string
onStart func()
loaded *atomic.Bool
history *History
httpClient *http.Client
// updateLock sync.Mutex
dimensionUpdateChannel chan *RepoDimension
dimensionUpdateDoneChannel chan error
@ -56,6 +57,7 @@ func New(l *zap.Logger, url string, history *History, opts ...Option) *Repo {
url: url,
poll: false,
loaded: &atomic.Bool{},
pollInterval: time.Minute,
history: history,
httpClient: http.DefaultClient,
directory: map[string]*Dimension{},
@ -81,12 +83,18 @@ func WithHTTPClient(v *http.Client) Option {
}
}
func WithPollForUpdates(v bool) Option {
func WithPoll(v bool) Option {
return func(o *Repo) {
o.poll = v
}
}
func WithPollInterval(v time.Duration) Option {
return func(o *Repo) {
o.pollInterval = v
}
}
// ------------------------------------------------------------------------------------------------
// ~ Getter
// ------------------------------------------------------------------------------------------------

View File

@ -15,7 +15,7 @@ import (
)
func NewTestRepo(l *zap.Logger, url, varDir string) *Repo {
h := NewHistory(l, HistoryWithMax(2), HistoryWithVarDir(varDir))
h := NewHistory(l, HistoryWithHistoryLimit(2), HistoryWithHistoryDir(varDir))
r := New(l, url, h)
go r.Start(context.Background()) //nolint:errcheck
time.Sleep(100 * time.Millisecond)