fix: add mutex

This commit is contained in:
Kevin Franklin Kim 2025-05-19 09:21:56 +02:00
parent a53de5827c
commit 1ac5dab2aa
No known key found for this signature in database
7 changed files with 79 additions and 43 deletions

View File

@ -27,7 +27,7 @@ doc:
.PHONY: test
## Run tests
test:
@GO_TEST_TAGS=-skip go test -v -tags=safe -coverprofile=coverage.out -race ./...
@GO_TEST_TAGS=-skip go test -v -tags=safe -coverprofile=coverage.out -race -count=1 ./...
#@GO_TEST_TAGS=-skip go test -tags=safe -coverprofile=coverage.out -race -json ./... | gotestfmt
.PHONY: test.update

View File

@ -16,6 +16,7 @@ import (
)
func TestUpdate(t *testing.T) {
t.Parallel()
testWithClients(t, func(t *testing.T, c *client.Client) {
t.Helper()
response, err := c.Update(t.Context())
@ -27,6 +28,7 @@ func TestUpdate(t *testing.T) {
}
func TestGetURIs(t *testing.T) {
t.Parallel()
testWithClients(t, func(t *testing.T, c *client.Client) {
t.Helper()
request := mock.MakeValidURIsRequest()
@ -38,6 +40,7 @@ func TestGetURIs(t *testing.T) {
}
func TestGetRepo(t *testing.T) {
t.Parallel()
testWithClients(t, func(t *testing.T, c *client.Client) {
t.Helper()
r, err := c.GetRepo(t.Context())
@ -49,6 +52,7 @@ func TestGetRepo(t *testing.T) {
}
func TestGetNodes(t *testing.T) {
t.Parallel()
testWithClients(t, func(t *testing.T, c *client.Client) {
t.Helper()
nodesRequest := mock.MakeNodesRequest()
@ -69,6 +73,7 @@ func TestGetNodes(t *testing.T) {
}
func TestGetContent(t *testing.T) {
t.Parallel()
testWithClients(t, func(t *testing.T, c *client.Client) {
t.Helper()
request := mock.MakeValidContentRequest()
@ -118,19 +123,26 @@ func benchmarkClientAndServerGetContent(tb testing.TB, numGroups, numCalls int,
func testWithClients(t *testing.T, testFunc func(t *testing.T, c *client.Client)) {
t.Helper()
l := zaptest.NewLogger(t)
httpRepoServer := initHTTPRepoServer(t, l)
socketRepoServer := initSocketRepoServer(t, l)
httpClient := newHTTPClient(t, httpRepoServer)
socketClient := newSocketClient(t, socketRepoServer.Addr().String())
defer func() {
httpClient.Close()
socketClient.Close()
httpRepoServer.Close()
socketRepoServer.Close()
}()
testFunc(t, httpClient)
testFunc(t, socketClient)
t.Run("http", func(t *testing.T) {
l := zaptest.NewLogger(t)
s := initHTTPRepoServer(t, l)
c := newHTTPClient(t, s)
defer func() {
s.Close()
c.Close()
}()
testFunc(t, c)
})
t.Run("socket", func(t *testing.T) {
l := zaptest.NewLogger(t)
s := initSocketRepoServer(t, l)
c := newSocketClient(t, s.Addr().String())
defer func() {
s.Close()
c.Close()
}()
testFunc(t, c)
})
}
func initRepo(tb testing.TB, l *zap.Logger) *repo.Repo {

View File

@ -1,6 +1,7 @@
package client_test
import (
"context"
"net"
"testing"
"time"
@ -41,7 +42,7 @@ func initSocketRepoServer(tb testing.TB, l *zap.Logger) net.Listener {
for {
// this blocks until connection or error
conn, err := ln.Accept()
if errors.Is(err, net.ErrClosed) {
if errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) {
return
} else if err != nil {
tb.Error("runSocketServer: could not accept connection", err.Error())
@ -50,7 +51,7 @@ func initSocketRepoServer(tb testing.TB, l *zap.Logger) net.Listener {
// a goroutine handles conn so that the loop can accept other connections
go func() {
l.Debug("accepted connection", zap.String("source", conn.RemoteAddr().String()))
// l.Debug("accepted connection", zap.String("source", conn.RemoteAddr().String()))
h.Serve(conn)
}()
}

View File

@ -56,7 +56,7 @@ func (h *Socket) Serve(conn net.Conn) {
}
}()
h.l.Debug("socketServer.handleConnection")
// h.l.Debug("socketServer.handleConnection")
metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Inc()
var (
@ -66,10 +66,13 @@ func (h *Socket) Serve(conn net.Conn) {
for {
// let us read with 1 byte steps on conn until we find "{"
_, readErr := conn.Read(headerBuffer[0:])
if readErr != nil {
// h.l.Debug("looks like the client closed the connection", zap.Error(readErr))
if errors.Is(readErr, io.EOF) {
// client closed the connection
metrics.NumSocketsGauge.WithLabelValues(conn.RemoteAddr().String()).Dec()
return
} else if readErr != nil {
h.l.Error("failed to read from connection", zap.Error(readErr))
return
}
// read next byte
current := headerBuffer[0:]

View File

@ -8,6 +8,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
@ -21,9 +22,10 @@ const (
type (
History struct {
l *zap.Logger
historyDir string
historyLimit int
l *zap.Logger
historyDir string
historyLimit int
currentMutext sync.RWMutex
}
HistoryOption func(*History)
)
@ -67,20 +69,26 @@ func NewHistory(l *zap.Logger, opts ...HistoryOption) *History {
// ------------------------------------------------------------------------------------------------
func (h *History) Add(jsonBytes []byte) error {
var filename = path.Join(h.historyDir, HistoryRepoJSONPrefix+time.Now().Format(time.RFC3339Nano)+HistoryRepoJSONSuffix)
backupFilename := path.Join(h.historyDir, HistoryRepoJSONPrefix+time.Now().Format(time.RFC3339Nano)+HistoryRepoJSONSuffix)
currentFilename := h.GetCurrentFilename()
if err := os.MkdirAll(path.Dir(filename), 0700); err != nil {
if err := os.MkdirAll(path.Dir(backupFilename), 0700); err != nil {
return errors.Wrap(err, "failed to create history dir")
}
if err := os.WriteFile(filename, jsonBytes, 0600); err != nil {
return errors.Wrap(err, "failed to write history")
if err := os.WriteFile(backupFilename, jsonBytes, 0600); err != nil {
return errors.Wrap(err, "failed to write backup history file")
}
h.l.Debug("adding content backup", zap.String("file", filename))
h.l.Debug("writing files",
zap.String("backup", backupFilename),
zap.String("current", currentFilename),
)
// current filename
if err := os.WriteFile(h.GetCurrentFilename(), jsonBytes, 0600); err != nil {
h.currentMutext.Lock()
defer h.currentMutext.Unlock()
if err := os.WriteFile(currentFilename, jsonBytes, 0600); err != nil {
return errors.Wrap(err, "failed to write current history")
}
@ -96,6 +104,8 @@ func (h *History) GetCurrentFilename() string {
}
func (h *History) GetCurrent(buf *bytes.Buffer) error {
h.currentMutext.RLock()
defer h.currentMutext.RUnlock()
f, err := os.Open(h.GetCurrentFilename())
if err != nil {
return err

View File

@ -222,17 +222,27 @@ func (r *Repo) GetRepo() map[string]*content.RepoNode {
// reads the JSON history file from the Filesystem and copies it directly in to the supplied buffer
// the result is wrapped as service response, e.g: {"reply": <contentData>}
func (r *Repo) WriteRepoBytes(w io.Writer) {
filename := r.history.GetCurrentFilename()
r.history.currentMutext.RLock()
defer r.history.currentMutext.RUnlock()
f, err := os.Open(r.history.GetCurrentFilename())
if err != nil {
r.l.Error("Failed to open Repo JSON", zap.Error(err))
r.l.Error("failed to open repo JSON", zap.Error(err), zap.String("history", filename))
return
}
_, _ = w.Write([]byte("{\"reply\":"))
_, err = io.Copy(w, f)
if err != nil {
r.l.Error("Failed to serve Repo JSON", zap.Error(err))
if _, err := w.Write([]byte("{\"reply\":")); err != nil {
r.l.Error("failed to write repo JSON prefix", zap.Error(err))
return
}
if _, err := io.Copy(w, f); err != nil {
r.l.Error("failed to serve repo JSON", zap.Error(err))
return
}
if _, err := w.Write([]byte("}")); err != nil {
r.l.Error("failed to write repo JSON suffix", zap.Error(err))
return
}
_, _ = w.Write([]byte("}"))
}
func (r *Repo) Update() (updateResponse *responses.Update) {

View File

@ -13,7 +13,7 @@ import (
"go.uber.org/zap/zaptest"
)
func newTestRepo(ctx context.Context, l *zap.Logger, url, varDir string) *Repo {
func NewTestRepo(ctx context.Context, l *zap.Logger, url, varDir string) *Repo {
h := NewHistory(l, HistoryWithHistoryLimit(2), HistoryWithHistoryDir(varDir))
r := New(l, url, h)
go r.Start(ctx) //nolint:errcheck
@ -39,7 +39,7 @@ func TestLoad404(t *testing.T) {
l = zaptest.NewLogger(t)
mockServer, varDir = mock.GetMockData(t)
url = mockServer.URL + "/repo-no-have"
r = newTestRepo(t.Context(), l, url, varDir)
r = NewTestRepo(t.Context(), l, url, varDir)
)
response := r.Update()
@ -53,7 +53,7 @@ func TestLoadBrokenRepo(t *testing.T) {
l = zaptest.NewLogger(t)
mockServer, varDir = mock.GetMockData(t)
server = mockServer.URL + "/repo-broken-json.json"
r = newTestRepo(t.Context(), l, server, varDir)
r = NewTestRepo(t.Context(), l, server, varDir)
)
response := r.Update()
@ -67,7 +67,7 @@ func TestLoadRepo(t *testing.T) {
l = zaptest.NewLogger(t)
mockServer, varDir = mock.GetMockData(t)
server = mockServer.URL + "/repo-ok.json"
r = newTestRepo(t.Context(), l, server, varDir)
r = NewTestRepo(t.Context(), l, server, varDir)
)
assertRepoIsEmpty(t, r, false)
@ -95,7 +95,7 @@ func BenchmarkLoadRepo(b *testing.B) {
t = &testing.T{}
mockServer, varDir = mock.GetMockData(t)
server = mockServer.URL + "/repo-ok.json"
r = newTestRepo(b.Context(), l, server, varDir)
r = NewTestRepo(b.Context(), l, server, varDir)
)
b.ReportAllocs()
@ -117,7 +117,7 @@ func TestLoadRepoDuplicateUris(t *testing.T) {
l = zaptest.NewLogger(t)
mockServer, varDir = mock.GetMockData(t)
server = mockServer.URL + "/repo-duplicate-uris.json"
r = newTestRepo(t.Context(), l, server, varDir)
r = NewTestRepo(t.Context(), l, server, varDir)
)
response := r.Update()
@ -131,7 +131,7 @@ func TestDimensionHygiene(t *testing.T) {
mockServer, varDir := mock.GetMockData(t)
server := mockServer.URL + "/repo-two-dimensions.json"
r := newTestRepo(t.Context(), l, server, varDir)
r := NewTestRepo(t.Context(), l, server, varDir)
response := r.Update()
require.True(t, response.Success, "well those two dimension should be fine")
@ -149,7 +149,7 @@ func getTestRepo(t *testing.T, path string) *Repo {
mockServer, varDir := mock.GetMockData(t)
server := mockServer.URL + path
r := newTestRepo(t.Context(), l, server, varDir)
r := NewTestRepo(t.Context(), l, server, varDir)
response := r.Update()
require.True(t, response.Success, "well those two dimension should be fine")
@ -200,7 +200,7 @@ func TestLinkIds(t *testing.T) {
var (
mockServer, varDir = mock.GetMockData(t)
server = mockServer.URL + "/repo-link-ok.json"
r = newTestRepo(t.Context(), l, server, varDir)
r = NewTestRepo(t.Context(), l, server, varDir)
response = r.Update()
)