diff --git a/client.go b/client.go index 5fb6108..015b7a1 100644 --- a/client.go +++ b/client.go @@ -133,17 +133,17 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string, var encodeWriter io.Writer switch c.compressor { case CompressorGZIP: - gzipWriter := globalCompressorPools[CompressorGZIP].Get().(*gzip.Writer) + gzipWriter := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer) gzipWriter.Reset(buffer) - defer globalCompressorPools[CompressorGZIP].Put(gzipWriter) + defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter) encodeWriter = gzipWriter case CompressorSnappy: - snappyWriter := globalCompressorPools[CompressorSnappy].Get().(*snappy.Writer) + snappyWriter := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer) snappyWriter.Reset(buffer) - defer globalCompressorPools[CompressorSnappy].Put(snappyWriter) + defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter) encodeWriter = snappyWriter case CompressorNone: encodeWriter = buffer @@ -177,9 +177,9 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string, req.Header.Set("Content-Encoding", "snappy") req.Header.Set("Accept-Encoding", "snappy") case CompressorNone: - // Dissalow Automatic Compression - req.Header.Set("Content-Encoding", "") - req.Header.Set("Accept-Encoding", "") + // Disable Automatic Compression + // https://http.dev/accept-encoding + req.Header.Set("Accept-Encoding", "identity") // uncompressed, nothing to do default: // uncompressed, nothing to do @@ -215,14 +215,20 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string, var responseBodyReader io.Reader switch resp.Header.Get("Content-Encoding") { case "snappy": - responseBodyReader = snappy.NewReader(resp.Body) + snappyReader := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader) + defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader) + + snappyReader.Reset(resp.Body) + responseBodyReader = snappyReader case "gzip": - gzipReader, err := gzip.NewReader(resp.Body) + gzipReader := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader) + defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader) + + err := gzipReader.Reset(resp.Body) if err != nil { return NewClientError(errors.Wrap(err, "could not create gzip reader")) } responseBodyReader = gzipReader - defer gzipReader.Close() default: responseBodyReader = resp.Body } diff --git a/client_test.go b/client_test.go index e02f0b6..7d345b9 100644 --- a/client_test.go +++ b/client_test.go @@ -101,6 +101,8 @@ func BenchmarkBufferedClient(b *testing.B) { require.NoError(b, err) benchClient := func(b *testing.B, client Client) { + b.ReportAllocs() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var args []map[string]interface{} err := LoadArgs(&args, nil, r) @@ -109,7 +111,6 @@ func BenchmarkBufferedClient(b *testing.B) { _ = Reply([]interface{}{"HI"}, nil, r, w) })) defer server.Close() - b.ReportAllocs() if bc, ok := client.(*BufferedClient); ok { bc.client = server.Client() @@ -121,11 +122,11 @@ func BenchmarkBufferedClient(b *testing.B) { } } benchmarks := map[string]Compressor{ - "none": CompressorNone, - "gzip": CompressorGZIP, - "snappy": CompressorSnappy, + "none": CompressorNone, + //"gzip": CompressorGZIP, + //"snappy": CompressorSnappy, } - runs := 2 + runs := 1 for name, compressor := range benchmarks { b.Run(name, func(b *testing.B) { diff --git a/gotsrpc.go b/gotsrpc.go index 28c84a6..eacbb0f 100644 --- a/gotsrpc.go +++ b/gotsrpc.go @@ -15,6 +15,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/golang/snappy" @@ -26,6 +27,18 @@ import ( type contextKey string +var ( + // Read-only global compressor pools + globalCompressorWriterPools = map[Compressor]*sync.Pool{ + CompressorGZIP: {New: func() interface{} { return gzip.NewWriter(io.Discard) }}, + CompressorSnappy: {New: func() interface{} { return snappy.NewBufferedWriter(io.Discard) }}, + } + globalCompressorReaderPools = map[Compressor]*sync.Pool{ + CompressorGZIP: {New: func() interface{} { return &gzip.Reader{} }}, + CompressorSnappy: {New: func() interface{} { return snappy.NewReader(nil) }}, + } +) + const contextStatsKey contextKey = "gotsrpcStats" func GetCalledFunc(r *http.Request, endPoint string) string { @@ -53,20 +66,34 @@ func LoadArgs(args interface{}, callStats *CallStats, r *http.Request) error { var bodyReader io.Reader = r.Body switch r.Header.Get("Content-Encoding") { case "snappy": - bodyReader = snappy.NewReader(r.Body) + snappyReader := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader) + defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader) + + snappyReader.Reset(r.Body) + bodyReader = snappyReader case "gzip": - gzipReader, err := gzip.NewReader(r.Body) + gzipReader := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader) + defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader) + + err := gzipReader.Reset(r.Body) if err != nil { - return errors.Wrap(err, "could not create gzip reader") + return NewClientError(errors.Wrap(err, "could not create gzip reader")) } bodyReader = gzipReader - defer gzipReader.Close() } handle := getHandlerForContentType(r.Header.Get("Content-Type")).handle if err := codec.NewDecoder(bodyReader, handle).Decode(args); err != nil { return errors.Wrap(err, "could not decode arguments") } + + // We need to close the reader at the end of the function + if writer, ok := bodyReader.(io.Closer); ok { + if err := writer.Close(); err != nil { + return errors.Wrap(err, "failed to write to response body") + } + } + if callStats != nil { callStats.Unmarshalling = time.Since(start) callStats.RequestSize = int(r.ContentLength) diff --git a/response.go b/response.go index a69cf76..22cdb04 100644 --- a/response.go +++ b/response.go @@ -7,7 +7,6 @@ import ( "net/http" "reflect" "slices" - "sync" "time" "github.com/golang/snappy" @@ -15,14 +14,6 @@ import ( "github.com/ugorji/go/codec" ) -var ( - // Read-only global compressor pools - globalCompressorPools = map[Compressor]*sync.Pool{ - CompressorGZIP: {New: func() interface{} { return gzip.NewWriter(nil) }}, - CompressorSnappy: {New: func() interface{} { return snappy.NewBufferedWriter(nil) }}, - } -) - // Reply despite the fact, that this is a public method - do not call it, it will be called by generated code func Reply(response []interface{}, stats *CallStats, r *http.Request, w http.ResponseWriter) error { responseWriter := newResponseWriterWithLength(w) @@ -38,19 +29,19 @@ func Reply(response []interface{}, stats *CallStats, r *http.Request, w http.Res responseWriter.Header().Set("Content-Encoding", "snappy") responseWriter.Header().Set("Vary", "Accept-Encoding") - snappyWriter := globalCompressorPools[CompressorSnappy].Get().(*snappy.Writer) + snappyWriter := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer) snappyWriter.Reset(responseWriter) - defer globalCompressorPools[CompressorSnappy].Put(snappyWriter) + defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter) responseBody = snappyWriter case slices.Contains(r.Header.Values("Accept-Encoding"), "gzip"): responseWriter.Header().Set("Content-Encoding", "gzip") responseWriter.Header().Set("Vary", "Accept-Encoding") - gzipWriter := globalCompressorPools[CompressorGZIP].Get().(*gzip.Writer) + gzipWriter := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer) gzipWriter.Reset(responseWriter) - defer globalCompressorPools[CompressorGZIP].Put(gzipWriter) + defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter) responseBody = gzipWriter default: responseBody = responseWriter