mirror of
https://github.com/foomo/contentserver.git
synced 2025-10-16 12:25:44 +00:00
233 lines
6.9 KiB
Go
233 lines
6.9 KiB
Go
package repo
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/foomo/contentserver/content"
|
|
"github.com/foomo/contentserver/log"
|
|
"github.com/foomo/contentserver/responses"
|
|
)
|
|
|
|
func (repo *Repo) updateRoutine() {
|
|
go func() {
|
|
for {
|
|
log.Debug("update routine is about to select")
|
|
select {
|
|
case newDimension := <-repo.updateChannel:
|
|
log.Debug("update routine received a new dimension: " + newDimension.Dimension)
|
|
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
|
|
log.Debug("update routine received result")
|
|
if err != nil {
|
|
log.Debug(" update routine error: " + err.Error())
|
|
}
|
|
repo.updateDoneChannel <- err
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
|
|
repo.updateChannel <- &repoDimension{
|
|
Dimension: dimension,
|
|
Node: node,
|
|
}
|
|
return <-repo.updateDoneChannel
|
|
}
|
|
|
|
// do not call directly, but only through channel
|
|
func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode) error {
|
|
newNode.WireParents()
|
|
newDirectory := make(map[string]*content.RepoNode)
|
|
newURIDirectory := make(map[string]*content.RepoNode)
|
|
|
|
err := builDirectory(newNode, newDirectory, newURIDirectory)
|
|
if err != nil {
|
|
return errors.New("update dimension \"" + dimension + "\" failed when building its directory:: " + err.Error())
|
|
}
|
|
err = wireAliases(newDirectory)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
repo.Directory[dimension] = &Dimension{
|
|
Node: newNode,
|
|
Directory: newDirectory,
|
|
URIDirectory: newURIDirectory,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error {
|
|
log.Debug("repo.buildDirectory: " + dirNode.ID)
|
|
existingNode, ok := directory[dirNode.ID]
|
|
if ok {
|
|
return errors.New("duplicate node with id:" + existingNode.ID)
|
|
}
|
|
directory[dirNode.ID] = dirNode
|
|
//todo handle duplicate uris
|
|
if _, thereIsAnExistingURINode := uRIDirectory[dirNode.URI]; thereIsAnExistingURINode {
|
|
return errors.New("duplicate uri: " + dirNode.URI + " (bad node id: " + dirNode.ID + ")")
|
|
}
|
|
uRIDirectory[dirNode.URI] = dirNode
|
|
for _, childNode := range dirNode.Nodes {
|
|
err := builDirectory(childNode, directory, uRIDirectory)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func wireAliases(directory map[string]*content.RepoNode) error {
|
|
for _, repoNode := range directory {
|
|
if len(repoNode.LinkID) > 0 {
|
|
if destinationNode, ok := directory[repoNode.LinkID]; ok {
|
|
repoNode.URI = destinationNode.URI
|
|
} else {
|
|
return errors.New("that link id points nowhere " + repoNode.LinkID + " from " + repoNode.ID)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, err error) {
|
|
nodes = make(map[string]*content.RepoNode)
|
|
err = json.Unmarshal(jsonBytes, &nodes)
|
|
return nodes, err
|
|
}
|
|
|
|
// Update - reload contents of repository with json from repo.server
|
|
func (repo *Repo) Update() (updateResponse *responses.Update) {
|
|
floatSeconds := func(nanoSeconds int64) float64 {
|
|
return float64(float64(nanoSeconds) / float64(1000000000.0))
|
|
}
|
|
startTime := time.Now().UnixNano()
|
|
updateRepotime, jsonBytes, updateErr := repo.update()
|
|
updateResponse = &responses.Update{}
|
|
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
|
|
|
|
if updateErr != nil {
|
|
updateResponse.Success = false
|
|
updateResponse.Stats.NumberOfNodes = -1
|
|
updateResponse.Stats.NumberOfURIs = -1
|
|
// let us try to restore the world from a file
|
|
log.Error("could not update repository:" + updateErr.Error())
|
|
updateResponse.ErrorMessage = updateErr.Error()
|
|
restoreErr := repo.tryToRestoreCurrent()
|
|
if restoreErr != nil {
|
|
log.Error("failed to restore preceding repo version: " + restoreErr.Error())
|
|
} else {
|
|
log.Record("restored current repo from local history")
|
|
}
|
|
} else {
|
|
updateResponse.Success = true
|
|
// persist the currently loaded one
|
|
historyErr := repo.history.add(jsonBytes)
|
|
if historyErr != nil {
|
|
log.Warning("could not persist current repo in history: " + historyErr.Error())
|
|
}
|
|
// add some stats
|
|
for dimension := range repo.Directory {
|
|
updateResponse.Stats.NumberOfNodes += len(repo.Directory[dimension].Directory)
|
|
updateResponse.Stats.NumberOfURIs += len(repo.Directory[dimension].URIDirectory)
|
|
}
|
|
}
|
|
updateResponse.Stats.OwnRuntime = floatSeconds(time.Now().UnixNano()-startTime) - updateResponse.Stats.RepoRuntime
|
|
return updateResponse
|
|
}
|
|
|
|
func (repo *Repo) tryToRestoreCurrent() error {
|
|
currentJSONBytes, err := repo.history.getCurrent()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return repo.loadJSONBytes(currentJSONBytes)
|
|
}
|
|
|
|
func get(URL string) (data []byte, err error) {
|
|
response, err := http.Get(URL)
|
|
if err != nil {
|
|
return data, err
|
|
}
|
|
defer response.Body.Close()
|
|
if response.StatusCode != http.StatusOK {
|
|
return data, fmt.Errorf("Bad HTTP Response: %q", response.Status)
|
|
}
|
|
return ioutil.ReadAll(response.Body)
|
|
}
|
|
|
|
func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
|
|
startTimeRepo := time.Now().UnixNano()
|
|
jsonBytes, err = get(repo.server)
|
|
repoRuntime = time.Now().UnixNano() - startTimeRepo
|
|
if err != nil {
|
|
// we have no json to load - the repo server did not reply
|
|
log.Debug("we have no json to load - the repo server did not reply", err)
|
|
return repoRuntime, jsonBytes, err
|
|
}
|
|
log.Debug("loading json from: "+repo.server, string(jsonBytes))
|
|
nodes, err := loadNodesFromJSON(jsonBytes)
|
|
if err != nil {
|
|
// could not load nodes from json
|
|
return repoRuntime, jsonBytes, err
|
|
}
|
|
err = repo.loadNodes(nodes)
|
|
if err != nil {
|
|
// repo failed to load nodes
|
|
return repoRuntime, jsonBytes, err
|
|
}
|
|
return repoRuntime, jsonBytes, nil
|
|
}
|
|
|
|
func (repo *Repo) loadJSONBytes(jsonBytes []byte) error {
|
|
nodes, err := loadNodesFromJSON(jsonBytes)
|
|
if err != nil {
|
|
log.Debug("could not parse json", string(jsonBytes))
|
|
return err
|
|
}
|
|
err = repo.loadNodes(nodes)
|
|
if err == nil {
|
|
historyErr := repo.history.add(jsonBytes)
|
|
if historyErr != nil {
|
|
log.Warning("could not add valid json to history:" + historyErr.Error())
|
|
} else {
|
|
log.Record("added valid json to history")
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
|
|
newDimensions := []string{}
|
|
for dimension, newNode := range newNodes {
|
|
newDimensions = append(newDimensions, dimension)
|
|
log.Debug("loading nodes for dimension " + dimension)
|
|
loadErr := repo.updateDimension(dimension, newNode)
|
|
if loadErr != nil {
|
|
log.Debug(" failed to load " + dimension + ": " + loadErr.Error())
|
|
return loadErr
|
|
}
|
|
}
|
|
dimensionIsValid := func(dimension string) bool {
|
|
for _, newDimension := range newDimensions {
|
|
if dimension == newDimension {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
// we need to throw away orphaned dimensions
|
|
for dimension := range repo.Directory {
|
|
if !dimensionIsValid(dimension) {
|
|
log.Notice("removing orphaned dimension:" + dimension)
|
|
delete(repo.Directory, dimension)
|
|
}
|
|
}
|
|
return nil
|
|
}
|