From 969d72b7032f5e524b048c63d51a9706f9577667 Mon Sep 17 00:00:00 2001 From: Jan Halfar Date: Wed, 9 Mar 2016 12:13:39 +0100 Subject: [PATCH] added a client, further server clean ups --- client/client.go | 100 +++++++++++++++++++++++++++++++++++++++++ client/client_test.go | 48 +++++++++++++++++++- repo/mock/mock.go | 26 ++++++++++- repo/repo_test.go | 33 +++----------- responses/responses.go | 6 +++ server/server.go | 61 ++++++++++++++++--------- 6 files changed, 224 insertions(+), 50 deletions(-) diff --git a/client/client.go b/client/client.go index 76c4637..32d173b 100644 --- a/client/client.go +++ b/client/client.go @@ -1,18 +1,118 @@ package client import ( + "encoding/json" + "errors" + "fmt" + "io" + "net" + "strconv" + "github.com/foomo/contentserver/content" "github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/responses" "github.com/foomo/contentserver/server" ) +type serverResponse struct { + Reply interface{} +} + // Client a content server client type Client struct { Server string + conn net.Conn +} + +func (c *Client) closeConnection() error { + if c.conn != nil { + err := c.conn.Close() + if err != nil { + return err + } + c.conn = nil + } + return nil +} + +func (c *Client) getConnection() (conn net.Conn, err error) { + if c.conn == nil { + conn, err := net.Dial("tcp", c.Server) + if err != nil { + return nil, err + } + c.conn = conn + } + return c.conn, nil } func (c *Client) call(handler string, request interface{}, response interface{}) error { + jsonBytes, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("could not marshal request : %q", err) + } + conn, err := c.getConnection() + if err != nil { + return fmt.Errorf("can not call server - connection error: %q", err) + } + // write header result will be like handler:2{} + jsonBytes = append([]byte(fmt.Sprintf("%s:%d", handler, len(jsonBytes))), jsonBytes...) + + // send request + written := 0 + l := len(jsonBytes) + for written < l { + n, err := conn.Write(jsonBytes[written:]) + if err != nil { + return fmt.Errorf("failed to send request: %q", err) + } + written += n + } + + // read response + responseBytes := []byte{} + buf := make([]byte, 4096) + responseLength := 0 + for { + n, err := conn.Read(buf) + if err != nil && err != io.EOF { + c.closeConnection() + return fmt.Errorf("an error occured while reading the response: %q", err) + } + if n == 0 { + break + } + responseBytes = append(responseBytes, buf[0:n]...) + if responseLength == 0 { + for index, byte := range responseBytes { + if byte == 123 { + // opening bracket + responseLength, err = strconv.Atoi(string(responseBytes[0:index])) + if err != nil { + return errors.New("could not read response length: " + err.Error()) + } + responseBytes = responseBytes[index:] + break + } + } + } + if responseLength > 0 && len(responseBytes) == responseLength { + break + } + } + + // unmarshal response + responseJSONErr := json.Unmarshal(responseBytes, &serverResponse{Reply: response}) + if responseJSONErr != nil { + // is it an error ? + remoteErr := responses.Error{} + remoteErrJSONErr := json.Unmarshal(responseBytes, remoteErr) + if remoteErrJSONErr == nil { + return remoteErr + } + return fmt.Errorf("could not unmarshal response : %q %q", remoteErrJSONErr, string(responseBytes)) + } + //c.closeConnection() return nil } diff --git a/client/client_test.go b/client/client_test.go index c74feb6..b7a2957 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,20 +1,35 @@ package client import ( + "encoding/json" "testing" + "time" + "github.com/foomo/contentserver/content" + "github.com/foomo/contentserver/log" "github.com/foomo/contentserver/repo/mock" "github.com/foomo/contentserver/server" ) var testServerIsRunning = false +func dump(t *testing.T, v interface{}) { + jsonBytes, err := json.MarshalIndent(v, "", " ") + if err != nil { + t.Fatal("could not dump v", v, "err", err) + return + } + t.Log(string(jsonBytes)) +} + func getTestClient(t *testing.T) *Client { + log.SelectedLevel = log.LevelError addr := "127.0.0.1:9999" if !testServerIsRunning { testServerIsRunning = true testServer, varDir := mock.GetMockData(t) go server.Run(testServer.URL+"/repo-two-dimensions.json", addr, varDir) + time.Sleep(time.Millisecond * 100) } return &Client{ Server: addr, @@ -24,5 +39,36 @@ func getTestClient(t *testing.T) *Client { func TestUpdate(t *testing.T) { c := getTestClient(t) response, err := c.Update() - t.Log("test update", response, err) + if err != nil { + t.Fatal("unexpected err", err) + } + if !response.Success { + t.Fatal("update has to return .Sucesss true", response) + } + stats := response.Stats + if !(stats.RepoRuntime > float64(0.0)) || !(stats.OwnRuntime > float64(0.0)) { + t.Fatal("stats invalid") + } +} + +func TestGetContent(t *testing.T) { + c := getTestClient(t) + request := mock.MakeValidContentRequest() + for i := 0; i < 1000; i++ { + response, err := c.GetContent(request) + if err != nil { + t.Fatal("unexpected err", err) + } + if request.URI != response.URI { + dump(t, request) + dump(t, response) + t.Fatal("uri mismatch") + } + + if response.Status != content.StatusOk { + t.Fatal("unexpected status") + } + + } + } diff --git a/repo/mock/mock.go b/repo/mock/mock.go index 2dd5109..2bc7d00 100644 --- a/repo/mock/mock.go +++ b/repo/mock/mock.go @@ -1,7 +1,6 @@ package mock import ( - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -9,6 +8,8 @@ import ( "runtime" "testing" "time" + + "github.com/foomo/contentserver/requests" ) // GetMockData mock data to run a repo @@ -20,7 +21,6 @@ func GetMockData(t *testing.T) (server *httptest.Server, varDir string) { server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { time.Sleep(time.Millisecond * 50) mockFilename := path.Join(mockDir, req.URL.Path[1:]) - fmt.Println("----------------------------------->", mockFilename) http.ServeFile(w, req, mockFilename) })) varDir, err := ioutil.TempDir("", "content-server-test") @@ -29,3 +29,25 @@ func GetMockData(t *testing.T) (server *httptest.Server, varDir string) { } return server, varDir } + +// MakeValidContentRequest a mock content request +func MakeValidContentRequest() *requests.Content { + dimensions := []string{"dimension_foo"} + return &requests.Content{ + URI: "/a", + Env: &requests.Env{ + Dimensions: dimensions, + Groups: []string{}, + }, + Nodes: map[string]*requests.Node{ + "id-root": &requests.Node{ + ID: "id-root", + Dimension: dimensions[0], + MimeTypes: []string{"application/x-node"}, + Expand: true, + DataFields: []string{}, + }, + }, + } + +} diff --git a/repo/repo_test.go b/repo/repo_test.go index f776a33..658cbfb 100644 --- a/repo/repo_test.go +++ b/repo/repo_test.go @@ -108,7 +108,7 @@ func getTestRepo(path string, t *testing.T) *Repo { func TestResolveContent(t *testing.T) { r := getTestRepo("/repo-two-dimensions.json", t) - contentRequest := makeValidRequest() + contentRequest := mock.MakeValidContentRequest() siteContent, err := r.GetContent(contentRequest) if siteContent.URI != contentRequest.URI { @@ -137,50 +137,29 @@ func TestLinkIds(t *testing.T) { } -func makeValidRequest() *requests.Content { - dimensions := []string{"dimension_foo"} - return &requests.Content{ - URI: "/a", - Env: &requests.Env{ - Dimensions: dimensions, - Groups: []string{}, - }, - Nodes: map[string]*requests.Node{ - "id-root": &requests.Node{ - ID: "id-root", - Dimension: dimensions[0], - MimeTypes: []string{"application/x-node"}, - Expand: true, - DataFields: []string{}, - }, - }, - } - -} - func TestInvalidRequest(t *testing.T) { r := getTestRepo("/repo-two-dimensions.json", t) - if r.validateContentRequest(makeValidRequest()) != nil { + if r.validateContentRequest(mock.MakeValidContentRequest()) != nil { t.Fatal("failed validation a valid request") } tests := map[string]*requests.Content{} - rEmptyURI := makeValidRequest() + rEmptyURI := mock.MakeValidContentRequest() rEmptyURI.URI = "" tests["empty uri"] = rEmptyURI - rEmptyEnv := makeValidRequest() + rEmptyEnv := mock.MakeValidContentRequest() rEmptyEnv.Env = nil tests["empty env"] = rEmptyEnv - rEmptyEnvDimensions := makeValidRequest() + rEmptyEnvDimensions := mock.MakeValidContentRequest() rEmptyEnvDimensions.Env.Dimensions = []string{} tests["empty env dimensions"] = rEmptyEnvDimensions - //rNodesValidID := makeValidRequest() + //rNodesValidID := mock.MakeValidContentRequest() //rNodesValidID.Nodes["id-root"].Id = "" //tests["nodes must have a valid id"] = rNodesValidID diff --git a/responses/responses.go b/responses/responses.go index 8ad9a90..f98010b 100644 --- a/responses/responses.go +++ b/responses/responses.go @@ -1,11 +1,17 @@ package responses +import "fmt" + // Error describes an error for humans and machines type Error struct { Code int `json:"code"` Message string `json:"message"` } +func (e Error) Error() string { + return fmt.Sprintf("code: %q, message: %q", e.Code, e.Message) +} + // NewError - a brand new error func NewError(code int, message string) *Error { return &Error{ diff --git a/server/server.go b/server/server.go index 3bd1bd3..eb76b9f 100644 --- a/server/server.go +++ b/server/server.go @@ -102,10 +102,6 @@ func (s *socketServer) handle(handler Handler, jsonBytes []byte) (replyBytes []b errorResponse := responses.NewError(1, "unknown handler") reply = errorResponse } - return s.reply(reply, jsonErr, apiErr) -} - -func (s *socketServer) reply(reply interface{}, jsonErr error, apiErr error) (replyBytes []byte, err error) { if jsonErr != nil { err = jsonErr log.Error(" could not read incoming json:", jsonErr) @@ -116,6 +112,10 @@ func (s *socketServer) reply(reply interface{}, jsonErr error, apiErr error) (re err = apiErr reply = responses.NewError(3, "internal error "+apiErr.Error()) } + return s.encodeReply(reply) +} + +func (s *socketServer) encodeReply(reply interface{}) (replyBytes []byte, err error) { encodedBytes, jsonReplyErr := json.MarshalIndent(map[string]interface{}{ "reply": reply, }, "", " ") @@ -128,13 +128,19 @@ func (s *socketServer) reply(reply interface{}, jsonErr error, apiErr error) (re return replyBytes, err } -func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int) { +func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int, err error) { headerParts := strings.Split(header, ":") - jsonLength, _ = strconv.Atoi(headerParts[1]) - return Handler(headerParts[0]), jsonLength + if len(headerParts) != 2 { + return "", 0, errors.New("invalid header") + } + jsonLength, err = strconv.Atoi(headerParts[1]) + if err != nil { + err = fmt.Errorf("could not parse length in header: %q", header) + } + return Handler(headerParts[0]), jsonLength, err } -func (s *socketServer) execute(conn net.Conn, handler Handler, jsonBytes []byte) { +func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) { s.stats.countRequest() log.Record("socket.handleSocketRequest(%d): %s", s.stats.requests, handler) if log.SelectedLevel == log.LevelDebug { @@ -143,12 +149,11 @@ func (s *socketServer) execute(conn net.Conn, handler Handler, jsonBytes []byte) reply, handlingError := s.handle(handler, jsonBytes) if handlingError != nil { log.Error("socket.handleConnection handlingError :", handlingError) - if reply == nil { - log.Error("giving up with nil reply") - conn.Close() - return - } } + return reply +} + +func (s *socketServer) writeResponse(conn net.Conn, reply []byte) { headerBytes := []byte(strconv.Itoa(len(reply))) reply = append(headerBytes, reply...) log.Debug(" replying: " + string(reply)) @@ -158,6 +163,7 @@ func (s *socketServer) execute(conn net.Conn, handler Handler, jsonBytes []byte) return } log.Debug(" replied. waiting for next request on open connection") + } func (s *socketServer) handleConnection(conn net.Conn) { @@ -168,16 +174,26 @@ func (s *socketServer) handleConnection(conn net.Conn) { // let us read with 1 byte steps on conn until we find "{" _, readErr := conn.Read(headerBuffer[0:]) if readErr != nil { - log.Debug(" looks like the client closed the connection - this is my readError: " + fmt.Sprint(readErr)) + log.Debug(" looks like the client closed the connection: ", readErr) return } // read next byte current := headerBuffer[0:] if string(current) == "{" { + // json has started + handler, jsonLength, headerErr := extractHandlerAndJSONLentgh(header) // reset header header = "" - // json has started - handler, jsonLength := extractHandlerAndJSONLentgh(header) + if headerErr != nil { + log.Error("invalid request could not read header", headerErr) + encodedErr, encodingErr := s.encodeReply(responses.NewError(4, "invalid header "+headerErr.Error())) + if encodingErr == nil { + s.writeResponse(conn, encodedErr) + } else { + log.Error("could not respond to invalid request", encodingErr) + } + return + } log.Debug(fmt.Sprintf(" found json with %d bytes", jsonLength)) if jsonLength > 0 { // let us try to read some json @@ -192,11 +208,11 @@ func (s *socketServer) handleConnection(conn net.Conn) { if log.SelectedLevel == log.LevelDebug { log.Debug(" read json: " + string(jsonBytes)) } - s.execute(conn, handler, jsonBytes) - return + s.writeResponse(conn, s.execute(handler, jsonBytes)) + // note: connection remains open + continue } log.Error("can not read empty json") - conn.Close() return } // adding to header byte by byte @@ -230,6 +246,11 @@ func Run(server string, address string, varDir string) error { continue } // a goroutine handles conn so that the loop can accept other connections - go s.handleConnection(conn) + go func() { + log.Debug("accepted connection") + s.handleConnection(conn) + conn.Close() + log.Debug("connection closed") + }() } }