feat: polling support (#27)

This commit is contained in:
Jan Halfar 2022-05-26 15:28:22 +02:00 committed by GitHub
parent 0e44ca809d
commit 2fa98dad5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 26 deletions

View File

@ -87,6 +87,7 @@ func initTestServer(t testing.TB) (socketAddr, webserverAddr string) {
pathContentserver, pathContentserver,
varDir, varDir,
server.DefaultRepositoryTimeout, server.DefaultRepositoryTimeout,
false,
) )
if err != nil { if err != nil {
t.Fatal("test server crashed: ", err) t.Fatal("test server crashed: ", err)

View File

@ -29,6 +29,8 @@ var (
flagPrometheusListener = flag.String("prometheus-listener", getenv("PROMETHEUS_LISTENER", DefaultPrometheusListener), "address for the prometheus listener") flagPrometheusListener = flag.String("prometheus-listener", getenv("PROMETHEUS_LISTENER", DefaultPrometheusListener), "address for the prometheus listener")
flagRepositoryTimeoutDuration = flag.Duration("repository-timeout-duration", server.DefaultRepositoryTimeout, "timeout duration for the contentserver") flagRepositoryTimeoutDuration = flag.Duration("repository-timeout-duration", server.DefaultRepositoryTimeout, "timeout duration for the contentserver")
flagPoll = flag.Bool("poll", false, "if true, the address arg will be used to periodically poll the content url")
// debugging / profiling // debugging / profiling
flagDebug = flag.Bool("debug", false, "toggle debug mode") flagDebug = flag.Bool("debug", false, "toggle debug mode")
flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes") flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes")
@ -90,7 +92,15 @@ func main() {
go metrics.RunPrometheusHandler(*flagPrometheusListener) go metrics.RunPrometheusHandler(*flagPrometheusListener)
go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName) go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName)
err := server.RunServerSocketAndWebServer(flag.Arg(0), *flagAddress, *flagWebserverAddress, *flagWebserverPath, *flagVarDir, *flagRepositoryTimeoutDuration) err := server.RunServerSocketAndWebServer(
flag.Arg(0),
*flagAddress,
*flagWebserverAddress,
*flagWebserverPath,
*flagVarDir,
*flagRepositoryTimeoutDuration,
*flagPoll,
)
if err != nil { if err != nil {
fmt.Println("exiting with error", err) fmt.Println("exiting with error", err)
os.Exit(1) os.Exit(1)

View File

@ -3,6 +3,7 @@ package repo
import ( import (
"context" "context"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"time" "time"
@ -203,14 +204,45 @@ func (repo *Repo) get(URL string) error {
func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) {
startTimeRepo := time.Now().UnixNano() startTimeRepo := time.Now().UnixNano()
err = repo.get(repo.server)
repoURL := repo.server
if repo.pollForUpdates {
resp, err := repo.httpClient.Get(repo.server)
if err != nil {
return repoRuntime, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return repoRuntime, errors.New("could not poll latest repo download url - non 200 response")
}
responseBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return repoRuntime, errors.New("could not poll latest repo download url, could not read body")
}
repoURL = string(responseBytes)
if repoURL == repo.pollVersion {
logger.Log.Info(
"repo is up to date",
zap.String("pollVersion", repo.pollVersion),
)
// already up to date
return repoRuntime, nil
} else {
logger.Log.Info(
"new repo poll version",
zap.String("pollVersion", repo.pollVersion),
)
}
}
err = repo.get(repoURL)
repoRuntime = time.Now().UnixNano() - startTimeRepo repoRuntime = time.Now().UnixNano() - startTimeRepo
if err != nil { if err != nil {
// we have no json to load - the repo server did not reply // we have no json to load - the repo server did not reply
logger.Log.Debug("Failed to load json", zap.Error(err)) logger.Log.Debug("Failed to load json", zap.Error(err))
return repoRuntime, err return repoRuntime, err
} }
logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) logger.Log.Debug("loading json", zap.String("server", repoURL), zap.Int("length", len(repo.jsonBuf.Bytes())))
nodes, err := repo.loadNodesFromJSON() nodes, err := repo.loadNodesFromJSON()
if err != nil { if err != nil {
// could not load nodes from json // could not load nodes from json
@ -221,6 +253,9 @@ func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) {
// repo failed to load nodes // repo failed to load nodes
return repoRuntime, err return repoRuntime, err
} }
if repo.pollForUpdates {
repo.pollVersion = repoURL
}
return repoRuntime, nil return repoRuntime, nil
} }

View File

@ -32,9 +32,11 @@ type Dimension struct {
// Repo content repositiory // Repo content repositiory
type Repo struct { type Repo struct {
server string pollForUpdates bool
recovered bool pollVersion string
Directory map[string]*Dimension server string
recovered bool
Directory map[string]*Dimension
// updateLock sync.Mutex // updateLock sync.Mutex
dimensionUpdateChannel chan *repoDimension dimensionUpdateChannel chan *repoDimension
dimensionUpdateDoneChannel chan error dimensionUpdateDoneChannel chan error
@ -54,13 +56,14 @@ type repoDimension struct {
} }
// NewRepo constructor // NewRepo constructor
func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Repo { func NewRepo(server string, varDir string, repositoryTimeout time.Duration, pollForUpdates bool) *Repo {
logger.Log.Info("creating new repo", logger.Log.Info("creating new repo",
zap.String("server", server), zap.String("server", server),
zap.String("varDir", varDir), zap.String("varDir", varDir),
) )
repo := &Repo{ repo := &Repo{
pollForUpdates: pollForUpdates,
recovered: false, recovered: false,
server: server, server: server,
Directory: map[string]*Dimension{}, Directory: map[string]*Dimension{},
@ -73,6 +76,21 @@ func NewRepo(server string, varDir string, repositoryTimeout time.Duration) *Rep
go repo.updateRoutine() go repo.updateRoutine()
go repo.dimensionUpdateRoutine() go repo.dimensionUpdateRoutine()
if pollForUpdates {
go func() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
chanReponse := make(chan updateResponse)
repo.updateInProgressChannel <- chanReponse
response := <-chanReponse
if response.err == nil {
logger.Log.Info("poll update success", zap.String("revision", repo.pollVersion))
} else {
logger.Log.Info("poll error", zap.Error(response.err))
}
}
}()
}
logger.Log.Info("trying to restore previous state") logger.Log.Info("trying to restore previous state")
restoreErr := repo.tryToRestoreCurrent() restoreErr := repo.tryToRestoreCurrent()

View File

@ -18,7 +18,7 @@ func init() {
func NewTestRepo(server, varDir string) *Repo { func NewTestRepo(server, varDir string) *Repo {
r := NewRepo(server, varDir, 2*time.Minute) r := NewRepo(server, varDir, 2*time.Minute, false)
// because the travis CI VMs are very slow, // because the travis CI VMs are very slow,
// we need to add some delay to allow the server to startup // we need to add some delay to allow the server to startup

View File

@ -38,8 +38,8 @@ const (
) )
// Run - let it run and enjoy on a socket near you // Run - let it run and enjoy on a socket near you
func Run(server string, address string, varDir string) error { func Run(server string, address string, varDir string, pollUpdates bool) error {
return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout) return RunServerSocketAndWebServer(server, address, "", "", varDir, DefaultRepositoryTimeout, pollUpdates)
} }
func RunServerSocketAndWebServer( func RunServerSocketAndWebServer(
@ -49,33 +49,36 @@ func RunServerSocketAndWebServer(
webserverPath string, webserverPath string,
varDir string, varDir string,
repositoryTimeout time.Duration, repositoryTimeout time.Duration,
pollForUpdates bool,
) error { ) error {
if address == "" && webserverAddress == "" { if address == "" && webserverAddress == "" {
return errors.New("one of the addresses needs to be set") return errors.New("one of the addresses needs to be set")
} }
Log.Info("building repo with content", zap.String("server", server)) Log.Info("building repo with content", zap.String("server", server))
r := repo.NewRepo(server, varDir, repositoryTimeout) r := repo.NewRepo(server, varDir, repositoryTimeout, pollForUpdates)
// start initial update and handle error // start initial update and handle error
go func() { if !pollForUpdates {
resp := r.Update() go func() {
if !resp.Success { resp := r.Update()
Log.Error("failed to update", if !resp.Success {
zap.String("error", resp.ErrorMessage), Log.Error("failed to update",
zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), zap.String("error", resp.ErrorMessage),
zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes),
zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs),
zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), zap.Float64("OwnRuntime", resp.Stats.OwnRuntime),
) zap.Float64("RepoRuntime", resp.Stats.RepoRuntime),
)
//Exit only if it hasn't recovered //Exit only if it hasn't recovered
if !r.Recovered() { if !r.Recovered() {
os.Exit(1) os.Exit(1)
}
} }
}
}() }()
}
// update can run in bg // update can run in bg
chanErr := make(chan error) chanErr := make(chan error)