diff --git a/client/client_test.go b/client/client_test.go index b31b421..2a252ee 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -87,6 +87,7 @@ func initTestServer(t testing.TB) (socketAddr, webserverAddr string) { pathContentserver, varDir, server.DefaultRepositoryTimeout, + false, ) if err != nil { t.Fatal("test server crashed: ", err) diff --git a/contentserver.go b/contentserver.go index ad5138a..26edacb 100644 --- a/contentserver.go +++ b/contentserver.go @@ -29,6 +29,8 @@ var ( flagPrometheusListener = flag.String("prometheus-listener", getenv("PROMETHEUS_LISTENER", DefaultPrometheusListener), "address for the prometheus listener") flagRepositoryTimeoutDuration = flag.Duration("repository-timeout-duration", server.DefaultRepositoryTimeout, "timeout duration for the contentserver") + flagPoll = flag.Bool("poll", false, "if true, the address arg will be used to periodically poll the content url") + // debugging / profiling flagDebug = flag.Bool("debug", false, "toggle debug mode") flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes") @@ -90,7 +92,15 @@ func main() { go metrics.RunPrometheusHandler(*flagPrometheusListener) go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName) - err := server.RunServerSocketAndWebServer(flag.Arg(0), *flagAddress, *flagWebserverAddress, *flagWebserverPath, *flagVarDir, *flagRepositoryTimeoutDuration) + err := server.RunServerSocketAndWebServer( + flag.Arg(0), + *flagAddress, + *flagWebserverAddress, + *flagWebserverPath, + *flagVarDir, + *flagRepositoryTimeoutDuration, + *flagPoll, + ) if err != nil { fmt.Println("exiting with error", err) os.Exit(1) diff --git a/repo/loader.go b/repo/loader.go index 26f01f8..10a8065 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -3,6 +3,7 @@ package repo import ( "context" "io" + "io/ioutil" "net/http" "time" @@ -203,14 +204,45 @@ func (repo *Repo) get(URL string) error { func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { startTimeRepo := time.Now().UnixNano() - err = repo.get(repo.server) + + repoURL := repo.server + if repo.pollForUpdates { + resp, err := repo.httpClient.Get(repo.server) + if err != nil { + return repoRuntime, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return repoRuntime, errors.New("could not poll latest repo download url - non 200 response") + } + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return repoRuntime, errors.New("could not poll latest repo download url, could not read body") + } + repoURL = string(responseBytes) + if repoURL == repo.pollVersion { + logger.Log.Info( + "repo is up to date", + zap.String("pollVersion", repo.pollVersion), + ) + // already up to date + return repoRuntime, nil + } else { + logger.Log.Info( + "new repo poll version", + zap.String("pollVersion", repo.pollVersion), + ) + } + } + + err = repo.get(repoURL) repoRuntime = time.Now().UnixNano() - startTimeRepo if err != nil { // we have no json to load - the repo server did not reply logger.Log.Debug("Failed to load json", zap.Error(err)) return repoRuntime, err } - logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) + logger.Log.Debug("loading json", zap.String("server", repoURL), zap.Int("length", len(repo.jsonBuf.Bytes()))) nodes, err := repo.loadNodesFromJSON() if err != nil { // could not load nodes from json @@ -221,6 +253,9 @@ func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { // repo failed to load nodes return repoRuntime, err } + if repo.pollForUpdates { + repo.pollVersion = repoURL + } return repoRuntime, nil } diff --git a/repo/repo.go b/repo/repo.go index c23c807..6cfabae 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -32,9 +32,11 @@ type Dimension struct { // Repo content repositiory type Repo struct { - server string - recovered bool - Directory map[string]*Dimension + pollForUpdates bool + pollVersion string + server string + recovered bool + Directory map[string]*Dimension // updateLock sync.Mutex dimensionUpdateChannel chan *repoDimension dimensionUpdateDoneChannel chan error @@ -54,13 +56,14 @@ type repoDimension struct { } // NewRepo constructor -func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Repo { +func NewRepo(server string, varDir string, repositoryTimeout time.Duration, pollForUpdates bool) *Repo { logger.Log.Info("creating new repo", zap.String("server", server), zap.String("varDir", varDir), ) repo := &Repo{ + pollForUpdates: pollForUpdates, recovered: false, server: server, Directory: map[string]*Dimension{}, @@ -73,6 +76,21 @@ func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Rep go repo.updateRoutine() go repo.dimensionUpdateRoutine() + if pollForUpdates { + go func() { + ticker := time.NewTicker(10 * time.Second) + for range ticker.C { + chanReponse := make(chan updateResponse) + repo.updateInProgressChannel <- chanReponse + response := <-chanReponse + if response.err == nil { + logger.Log.Info("poll update success", zap.String("revision", repo.pollVersion)) + } else { + logger.Log.Info("poll error", zap.Error(response.err)) + } + } + }() + } logger.Log.Info("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() diff --git a/repo/repo_test.go b/repo/repo_test.go index ccfb9d3..4f450d2 100644 --- a/repo/repo_test.go +++ b/repo/repo_test.go @@ -18,7 +18,7 @@ func init() { func NewTestRepo(server, varDir string) *Repo { - r := NewRepo(server, varDir, 2*time.Minute) + r := NewRepo(server, varDir, 2*time.Minute, false) // because the travis CI VMs are very slow, // we need to add some delay to allow the server to startup diff --git a/server/server.go b/server/server.go index 4421c4d..dde526e 100644 --- a/server/server.go +++ b/server/server.go @@ -38,8 +38,8 @@ const ( ) // Run - let it run and enjoy on a socket near you -func Run(server string, address string, varDir string) error { - return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout) +func Run(server string, address string, varDir string, pollUpdates bool) error { + return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout, pollUpdates) } func RunServerSocketAndWebServer( @@ -49,33 +49,36 @@ func RunServerSocketAndWebServer( webserverPath string, varDir string, repositoryTimeout time.Duration, + pollForUpdates bool, ) error { if address == "" && webserverAddress == "" { return errors.New("one of the addresses needs to be set") } Log.Info("building repo with content", zap.String("server", server)) - r := repo.NewRepo(server, varDir, repositoryTimeout) + r := repo.NewRepo(server, varDir, repositoryTimeout, pollForUpdates) // start initial update and handle error - go func() { - resp := r.Update() - if !resp.Success { - Log.Error("failed to update", - zap.String("error", resp.ErrorMessage), - zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), - zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), - zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), - zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), - ) + if !pollForUpdates { + go func() { + resp := r.Update() + if !resp.Success { + Log.Error("failed to update", + zap.String("error", resp.ErrorMessage), + zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), + zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), + zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), + zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), + ) - //Exit only if it hasn't recovered - if !r.Recovered() { - os.Exit(1) + //Exit only if it hasn't recovered + if !r.Recovered() { + os.Exit(1) + } } - } - }() + }() + } // update can run in bg chanErr := make(chan error)