contentserver/repo/loader.go
2019-05-24 16:23:20 +02:00

293 lines
8.1 KiB
Go

package repo
import (
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/mgutz/ansi"
"github.com/foomo/contentserver/content"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/status"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
errUpdateRejected = errors.New("update rejected: queue full")
)
type updateResponse struct {
repoRuntime int64
err error
}
func (repo *Repo) updateRoutine() {
for {
select {
case resChan := <-repo.updateInProgressChannel:
Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan)))
start := time.Now()
repoRuntime, errUpdate := repo.update()
if errUpdate != nil {
status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc()
}
resChan <- updateResponse{
repoRuntime: repoRuntime,
err: errUpdate,
}
duration := time.Since(start)
Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan)))
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds())
}
}
}
func (repo *Repo) dimensionUpdateRoutine() {
for newDimension := range repo.dimensionUpdateChannel {
Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension))
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
Log.Info("update routine received result")
if err != nil {
Log.Debug("update dimension failed", zap.Error(err))
}
repo.dimensionUpdateDoneChannel <- err
}
}
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
repo.dimensionUpdateChannel <- &repoDimension{
Dimension: dimension,
Node: node,
}
Log.Debug("waiting for done signal")
return <-repo.dimensionUpdateDoneChannel
}
// do not call directly, but only through channel
func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode) error {
newNode.WireParents()
var (
newDirectory = make(map[string]*content.RepoNode)
newURIDirectory = make(map[string]*content.RepoNode)
err = buildDirectory(newNode, newDirectory, newURIDirectory)
)
if err != nil {
return errors.New("update dimension \"" + dimension + "\" failed when building its directory:: " + err.Error())
}
err = wireAliases(newDirectory)
if err != nil {
return err
}
// ---------------------------------------------
// copy old datastructure to prevent concurrent map access
// collect other dimension in the Directory
newRepoDirectory := map[string]*Dimension{}
for d, D := range repo.Directory {
if d != dimension {
newRepoDirectory[d] = D
}
}
// add the new dimension
newRepoDirectory[dimension] = &Dimension{
Node: newNode,
Directory: newDirectory,
URIDirectory: newURIDirectory,
}
repo.Directory = newRepoDirectory
// ---------------------------------------------
// @TODO: why not update only the dimension that has changed instead?
// repo.Directory[dimension] = &Dimension{
// Node: newNode,
// Directory: newDirectory,
// URIDirectory: newURIDirectory,
// }
// ---------------------------------------------
return nil
}
func buildDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error {
// Log.Debug("buildDirectory", zap.String("ID", dirNode.ID))
existingNode, ok := directory[dirNode.ID]
if ok {
return errors.New("duplicate node with id:" + existingNode.ID)
}
directory[dirNode.ID] = dirNode
//todo handle duplicate uris
if _, thereIsAnExistingURINode := uRIDirectory[dirNode.URI]; thereIsAnExistingURINode {
return errors.New("duplicate uri: " + dirNode.URI + " (bad node id: " + dirNode.ID + ")")
}
uRIDirectory[dirNode.URI] = dirNode
for _, childNode := range dirNode.Nodes {
err := buildDirectory(childNode, directory, uRIDirectory)
if err != nil {
return err
}
}
return nil
}
func wireAliases(directory map[string]*content.RepoNode) error {
for _, repoNode := range directory {
if len(repoNode.LinkID) > 0 {
if destinationNode, ok := directory[repoNode.LinkID]; ok {
repoNode.URI = destinationNode.URI
} else {
return errors.New("that link id points nowhere " + repoNode.LinkID + " from " + repoNode.ID)
}
}
}
return nil
}
func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) {
nodes = make(map[string]*content.RepoNode)
err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes)
return nodes, err
}
func (repo *Repo) tryToRestoreCurrent() (err error) {
err = repo.history.getCurrent(&repo.jsonBuf)
if err != nil {
return err
}
return repo.loadJSONBytes()
}
func (repo *Repo) get(URL string) (err error) {
response, err := http.Get(URL)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("Bad HTTP Response: %q", response.Status)
}
Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset)
repo.jsonBuf.Reset()
Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset)
_, err = io.Copy(&repo.jsonBuf, response.Body)
return err
}
func (repo *Repo) update() (repoRuntime int64, err error) {
startTimeRepo := time.Now().UnixNano()
err = repo.get(repo.server)
repoRuntime = time.Now().UnixNano() - startTimeRepo
if err != nil {
// we have no json to load - the repo server did not reply
Log.Debug("failed to load json", zap.Error(err))
return repoRuntime, err
}
Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes())))
nodes, err := repo.loadNodesFromJSON()
if err != nil {
// could not load nodes from json
return repoRuntime, err
}
err = repo.loadNodes(nodes)
if err != nil {
// repo failed to load nodes
return repoRuntime, err
}
return repoRuntime, nil
}
// limit ressources and allow only one update request at once
func (repo *Repo) tryUpdate() (repoRuntime int64, err error) {
c := make(chan updateResponse)
select {
case repo.updateInProgressChannel <- c:
Log.Info("update request added to queue")
ur := <-c
return ur.repoRuntime, ur.err
default:
Log.Info("update request rejected, queue is full")
status.M.UpdatesRejectedCounter.WithLabelValues().Inc()
return 0, errUpdateRejected
}
}
func (repo *Repo) loadJSONBytes() error {
nodes, err := repo.loadNodesFromJSON()
if err != nil {
data := repo.jsonBuf.Bytes()
if len(data) > 10 {
Log.Debug("could not parse json",
zap.String("jsonStart", string(data[:10])),
zap.String("jsonStart", string(data[len(data)-10:])),
)
}
return err
}
err = repo.loadNodes(nodes)
if err == nil {
historyErr := repo.history.add(repo.jsonBuf.Bytes())
if historyErr != nil {
Log.Error("could not add valid json to history", zap.Error(historyErr))
} else {
Log.Info("added valid json to history")
}
cleanUpErr := repo.history.cleanup()
if cleanUpErr != nil {
Log.Error("an error occured while cleaning up my history", zap.Error(cleanUpErr))
} else {
Log.Info("cleaned up history")
}
}
return err
}
func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
newDimensions := []string{}
for dimension, newNode := range newNodes {
newDimensions = append(newDimensions, dimension)
Log.Debug("loading nodes for dimension", zap.String("dimension", dimension))
loadErr := repo.updateDimension(dimension, newNode)
if loadErr != nil {
Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr))
return loadErr
}
}
dimensionIsValid := func(dimension string) bool {
for _, newDimension := range newDimensions {
if dimension == newDimension {
return true
}
}
return false
}
// we need to throw away orphaned dimensions
for dimension := range repo.Directory {
if !dimensionIsValid(dimension) {
Log.Info("removing orphaned dimension", zap.String("dimension", dimension))
delete(repo.Directory, dimension)
}
}
return nil
}