diff --git a/client/client_test.go b/client/client_test.go index b31b421..2a252ee 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -87,6 +87,7 @@ func initTestServer(t testing.TB) (socketAddr, webserverAddr string) { pathContentserver, varDir, server.DefaultRepositoryTimeout, + false, ) if err != nil { t.Fatal("test server crashed: ", err) diff --git a/contentserver.go b/contentserver.go index 1e89f20..1c90559 100644 --- a/contentserver.go +++ b/contentserver.go @@ -36,6 +36,8 @@ var ( flagPrometheusListener = flag.String("prometheus-listener", getenv("PROMETHEUS_LISTENER", DefaultPrometheusListener), "address for the prometheus listener") flagRepositoryTimeoutDuration = flag.Duration("repository-timeout-duration", server.DefaultRepositoryTimeout, "timeout duration for the contentserver") + flagPoll = flag.Bool("poll", false, "if true, the address arg will be used to periodically poll the content url") + // debugging / profiling flagDebug = flag.Bool("debug", false, "toggle debug mode") flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes") @@ -63,12 +65,10 @@ func main() { if *flagFreeOSMem > 0 { Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem)) go func() { - for { - select { - case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute): - log.Info("FreeOSMemory") - debug.FreeOSMemory() - } + ticker := time.NewTicker(time.Duration(*flagFreeOSMem) * time.Minute) + for range ticker.C { + log.Info("FreeOSMemory") + debug.FreeOSMemory() } }() } @@ -76,19 +76,17 @@ func main() { if *flagHeapDump > 0 { Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump)) go func() { - for { - select { - case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute): - log.Info("HeapDump") - f, err := os.Create("heapdump") - if err != nil { - panic("failed to create heap dump file") - } - debug.WriteHeapDump(f.Fd()) - err = f.Close() - if err != nil { - panic("failed to create heap dump file") - } + ticker := time.NewTicker(time.Duration(*flagFreeOSMem) * time.Minute) + for range ticker.C { + log.Info("HeapDump") + f, err := os.Create("heapdump") + if err != nil { + panic("failed to create heap dump file") + } + debug.WriteHeapDump(f.Fd()) + err = f.Close() + if err != nil { + panic("failed to create heap dump file") } } }() @@ -101,7 +99,15 @@ func main() { go metrics.RunPrometheusHandler(*flagPrometheusListener) go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName) - err := server.RunServerSocketAndWebServer(flag.Arg(0), *flagAddress, *flagWebserverAddress, *flagWebserverPath, *flagVarDir, *flagRepositoryTimeoutDuration) + err := server.RunServerSocketAndWebServer( + flag.Arg(0), + *flagAddress, + *flagWebserverAddress, + *flagWebserverPath, + *flagVarDir, + *flagRepositoryTimeoutDuration, + *flagPoll, + ) if err != nil { fmt.Println("exiting with error", err) os.Exit(1) diff --git a/go.mod b/go.mod index a080ad9..8650f69 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,25 @@ require ( github.com/apex/log v1.1.0 github.com/davecgh/go-spew v1.1.1 github.com/json-iterator/go v1.1.6 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect - github.com/pkg/errors v0.8.1 // indirect github.com/prometheus/client_golang v0.9.2 github.com/stretchr/testify v1.5.1 - go.uber.org/atomic v1.4.0 // indirect - go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 ) -go 1.14 +require ( + github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect + github.com/golang/protobuf v1.2.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/pkg/errors v0.8.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect + github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 // indirect + github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect +) + +go 1.18 diff --git a/repo/loader.go b/repo/loader.go index 9833d3f..63d41ef 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "time" @@ -26,30 +27,27 @@ type updateResponse struct { } func (repo *Repo) updateRoutine() { - for { - select { - case resChan := <-repo.updateInProgressChannel: - log := logger.Log.With(zap.String("chan", fmt.Sprintf("%p", resChan))) - log.Info("Waiting for update to complete") - start := time.Now() + for resChan := range repo.updateInProgressChannel { + log := logger.Log.With(zap.String("chan", fmt.Sprintf("%p", resChan))) + log.Info("Waiting for update to complete") + start := time.Now() - repoRuntime, errUpdate := repo.update(context.Background()) - if errUpdate != nil { - log.Error("Failed to update content server from routine", zap.Error(errUpdate)) - status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() - } else { - status.M.UpdatesCompletedCounter.WithLabelValues().Inc() - } - - resChan <- updateResponse{ - repoRuntime: repoRuntime, - err: errUpdate, - } - - duration := time.Since(start) - log.Info("Update completed", zap.Duration("duration", duration)) - status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) + repoRuntime, errUpdate := repo.update(context.Background()) + if errUpdate != nil { + log.Error("Failed to update content server from routine", zap.Error(errUpdate)) + status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() + } else { + status.M.UpdatesCompletedCounter.WithLabelValues().Inc() } + + resChan <- updateResponse{ + repoRuntime: repoRuntime, + err: errUpdate, + } + + duration := time.Since(start) + log.Info("Update completed", zap.Duration("duration", duration)) + status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) } } @@ -208,14 +206,45 @@ func (repo *Repo) get(URL string) error { func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { startTimeRepo := time.Now().UnixNano() - err = repo.get(repo.server) + + repoURL := repo.server + if repo.pollForUpdates { + resp, err := repo.httpClient.Get(repo.server) + if err != nil { + return repoRuntime, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return repoRuntime, errors.New("could not poll latest repo download url - non 200 response") + } + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return repoRuntime, errors.New("could not poll latest repo download url, could not read body") + } + repoURL = string(responseBytes) + if repoURL == repo.pollVersion { + logger.Log.Info( + "repo is up to date", + zap.String("pollVersion", repo.pollVersion), + ) + // already up to date + return repoRuntime, nil + } else { + logger.Log.Info( + "new repo poll version", + zap.String("pollVersion", repo.pollVersion), + ) + } + } + + err = repo.get(repoURL) repoRuntime = time.Now().UnixNano() - startTimeRepo if err != nil { // we have no json to load - the repo server did not reply logger.Log.Debug("Failed to load json", zap.Error(err)) return repoRuntime, err } - logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) + logger.Log.Debug("loading json", zap.String("server", repoURL), zap.Int("length", len(repo.jsonBuf.Bytes()))) nodes, err := repo.loadNodesFromJSON() if err != nil { // could not load nodes from json @@ -226,6 +255,9 @@ func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { // repo failed to load nodes return repoRuntime, err } + if repo.pollForUpdates { + repo.pollVersion = repoURL + } return repoRuntime, nil } diff --git a/repo/repo.go b/repo/repo.go index 3de3879..50c5ae3 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -32,9 +32,11 @@ type Dimension struct { // Repo content repositiory type Repo struct { - server string - recovered bool - Directory map[string]*Dimension + pollForUpdates bool + pollVersion string + server string + recovered bool + Directory map[string]*Dimension // updateLock sync.Mutex dimensionUpdateChannel chan *repoDimension dimensionUpdateDoneChannel chan error @@ -54,13 +56,14 @@ type repoDimension struct { } // NewRepo constructor -func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Repo { +func NewRepo(server string, varDir string, repositoryTimeout time.Duration, pollForUpdates bool) *Repo { logger.Log.Info("creating new repo", zap.String("server", server), zap.String("varDir", varDir), ) repo := &Repo{ + pollForUpdates: pollForUpdates, recovered: false, server: server, Directory: map[string]*Dimension{}, @@ -68,11 +71,26 @@ func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Rep dimensionUpdateChannel: make(chan *repoDimension), dimensionUpdateDoneChannel: make(chan error), httpClient: getDefaultHTTPClient(repositoryTimeout), - updateInProgressChannel: make(chan chan updateResponse, 0), + updateInProgressChannel: make(chan chan updateResponse), } go repo.updateRoutine() go repo.dimensionUpdateRoutine() + if pollForUpdates { + go func() { + ticker := time.NewTicker(10 * time.Second) + for range ticker.C { + chanReponse := make(chan updateResponse) + repo.updateInProgressChannel <- chanReponse + response := <-chanReponse + if response.err == nil { + logger.Log.Info("poll update success", zap.String("revision", repo.pollVersion)) + } else { + logger.Log.Info("poll error", zap.Error(response.err)) + } + } + }() + } logger.Log.Info("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() diff --git a/repo/repo_test.go b/repo/repo_test.go index ccfb9d3..4f450d2 100644 --- a/repo/repo_test.go +++ b/repo/repo_test.go @@ -18,7 +18,7 @@ func init() { func NewTestRepo(server, varDir string) *Repo { - r := NewRepo(server, varDir, 2*time.Minute) + r := NewRepo(server, varDir, 2*time.Minute, false) // because the travis CI VMs are very slow, // we need to add some delay to allow the server to startup diff --git a/server/server.go b/server/server.go index 4421c4d..dde526e 100644 --- a/server/server.go +++ b/server/server.go @@ -38,8 +38,8 @@ const ( ) // Run - let it run and enjoy on a socket near you -func Run(server string, address string, varDir string) error { - return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout) +func Run(server string, address string, varDir string, pollUpdates bool) error { + return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout, pollUpdates) } func RunServerSocketAndWebServer( @@ -49,33 +49,36 @@ func RunServerSocketAndWebServer( webserverPath string, varDir string, repositoryTimeout time.Duration, + pollForUpdates bool, ) error { if address == "" && webserverAddress == "" { return errors.New("one of the addresses needs to be set") } Log.Info("building repo with content", zap.String("server", server)) - r := repo.NewRepo(server, varDir, repositoryTimeout) + r := repo.NewRepo(server, varDir, repositoryTimeout, pollForUpdates) // start initial update and handle error - go func() { - resp := r.Update() - if !resp.Success { - Log.Error("failed to update", - zap.String("error", resp.ErrorMessage), - zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), - zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), - zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), - zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), - ) + if !pollForUpdates { + go func() { + resp := r.Update() + if !resp.Success { + Log.Error("failed to update", + zap.String("error", resp.ErrorMessage), + zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), + zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), + zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), + zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), + ) - //Exit only if it hasn't recovered - if !r.Recovered() { - os.Exit(1) + //Exit only if it hasn't recovered + if !r.Recovered() { + os.Exit(1) + } } - } - }() + }() + } // update can run in bg chanErr := make(chan error)