chore: finalize implementation

This commit is contained in:
Stefan Martinov 2025-01-17 17:18:52 +01:00
parent d6cf0145f5
commit ebf86659d0
4 changed files with 57 additions and 32 deletions

View File

@ -133,17 +133,17 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string,
var encodeWriter io.Writer
switch c.compressor {
case CompressorGZIP:
gzipWriter := globalCompressorPools[CompressorGZIP].Get().(*gzip.Writer)
gzipWriter := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer)
gzipWriter.Reset(buffer)
defer globalCompressorPools[CompressorGZIP].Put(gzipWriter)
defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter)
encodeWriter = gzipWriter
case CompressorSnappy:
snappyWriter := globalCompressorPools[CompressorSnappy].Get().(*snappy.Writer)
snappyWriter := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer)
snappyWriter.Reset(buffer)
defer globalCompressorPools[CompressorSnappy].Put(snappyWriter)
defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter)
encodeWriter = snappyWriter
case CompressorNone:
encodeWriter = buffer
@ -177,9 +177,9 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string,
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Accept-Encoding", "snappy")
case CompressorNone:
// Dissalow Automatic Compression
req.Header.Set("Content-Encoding", "")
req.Header.Set("Accept-Encoding", "")
// Disable Automatic Compression
// https://http.dev/accept-encoding
req.Header.Set("Accept-Encoding", "identity")
// uncompressed, nothing to do
default:
// uncompressed, nothing to do
@ -215,14 +215,20 @@ func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string,
var responseBodyReader io.Reader
switch resp.Header.Get("Content-Encoding") {
case "snappy":
responseBodyReader = snappy.NewReader(resp.Body)
snappyReader := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader)
defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader)
snappyReader.Reset(resp.Body)
responseBodyReader = snappyReader
case "gzip":
gzipReader, err := gzip.NewReader(resp.Body)
gzipReader := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader)
defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader)
err := gzipReader.Reset(resp.Body)
if err != nil {
return NewClientError(errors.Wrap(err, "could not create gzip reader"))
}
responseBodyReader = gzipReader
defer gzipReader.Close()
default:
responseBodyReader = resp.Body
}

View File

@ -101,6 +101,8 @@ func BenchmarkBufferedClient(b *testing.B) {
require.NoError(b, err)
benchClient := func(b *testing.B, client Client) {
b.ReportAllocs()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var args []map[string]interface{}
err := LoadArgs(&args, nil, r)
@ -109,7 +111,6 @@ func BenchmarkBufferedClient(b *testing.B) {
_ = Reply([]interface{}{"HI"}, nil, r, w)
}))
defer server.Close()
b.ReportAllocs()
if bc, ok := client.(*BufferedClient); ok {
bc.client = server.Client()
@ -121,11 +122,11 @@ func BenchmarkBufferedClient(b *testing.B) {
}
}
benchmarks := map[string]Compressor{
"none": CompressorNone,
"gzip": CompressorGZIP,
"snappy": CompressorSnappy,
"none": CompressorNone,
//"gzip": CompressorGZIP,
//"snappy": CompressorSnappy,
}
runs := 2
runs := 1
for name, compressor := range benchmarks {
b.Run(name, func(b *testing.B) {

View File

@ -15,6 +15,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/golang/snappy"
@ -26,6 +27,18 @@ import (
type contextKey string
var (
// Read-only global compressor pools
globalCompressorWriterPools = map[Compressor]*sync.Pool{
CompressorGZIP: {New: func() interface{} { return gzip.NewWriter(io.Discard) }},
CompressorSnappy: {New: func() interface{} { return snappy.NewBufferedWriter(io.Discard) }},
}
globalCompressorReaderPools = map[Compressor]*sync.Pool{
CompressorGZIP: {New: func() interface{} { return &gzip.Reader{} }},
CompressorSnappy: {New: func() interface{} { return snappy.NewReader(nil) }},
}
)
const contextStatsKey contextKey = "gotsrpcStats"
func GetCalledFunc(r *http.Request, endPoint string) string {
@ -53,20 +66,34 @@ func LoadArgs(args interface{}, callStats *CallStats, r *http.Request) error {
var bodyReader io.Reader = r.Body
switch r.Header.Get("Content-Encoding") {
case "snappy":
bodyReader = snappy.NewReader(r.Body)
snappyReader := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader)
defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader)
snappyReader.Reset(r.Body)
bodyReader = snappyReader
case "gzip":
gzipReader, err := gzip.NewReader(r.Body)
gzipReader := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader)
defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader)
err := gzipReader.Reset(r.Body)
if err != nil {
return errors.Wrap(err, "could not create gzip reader")
return NewClientError(errors.Wrap(err, "could not create gzip reader"))
}
bodyReader = gzipReader
defer gzipReader.Close()
}
handle := getHandlerForContentType(r.Header.Get("Content-Type")).handle
if err := codec.NewDecoder(bodyReader, handle).Decode(args); err != nil {
return errors.Wrap(err, "could not decode arguments")
}
// We need to close the reader at the end of the function
if writer, ok := bodyReader.(io.Closer); ok {
if err := writer.Close(); err != nil {
return errors.Wrap(err, "failed to write to response body")
}
}
if callStats != nil {
callStats.Unmarshalling = time.Since(start)
callStats.RequestSize = int(r.ContentLength)

View File

@ -7,7 +7,6 @@ import (
"net/http"
"reflect"
"slices"
"sync"
"time"
"github.com/golang/snappy"
@ -15,14 +14,6 @@ import (
"github.com/ugorji/go/codec"
)
var (
// Read-only global compressor pools
globalCompressorPools = map[Compressor]*sync.Pool{
CompressorGZIP: {New: func() interface{} { return gzip.NewWriter(nil) }},
CompressorSnappy: {New: func() interface{} { return snappy.NewBufferedWriter(nil) }},
}
)
// Reply despite the fact, that this is a public method - do not call it, it will be called by generated code
func Reply(response []interface{}, stats *CallStats, r *http.Request, w http.ResponseWriter) error {
responseWriter := newResponseWriterWithLength(w)
@ -38,19 +29,19 @@ func Reply(response []interface{}, stats *CallStats, r *http.Request, w http.Res
responseWriter.Header().Set("Content-Encoding", "snappy")
responseWriter.Header().Set("Vary", "Accept-Encoding")
snappyWriter := globalCompressorPools[CompressorSnappy].Get().(*snappy.Writer)
snappyWriter := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer)
snappyWriter.Reset(responseWriter)
defer globalCompressorPools[CompressorSnappy].Put(snappyWriter)
defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter)
responseBody = snappyWriter
case slices.Contains(r.Header.Values("Accept-Encoding"), "gzip"):
responseWriter.Header().Set("Content-Encoding", "gzip")
responseWriter.Header().Set("Vary", "Accept-Encoding")
gzipWriter := globalCompressorPools[CompressorGZIP].Get().(*gzip.Writer)
gzipWriter := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer)
gzipWriter.Reset(responseWriter)
defer globalCompressorPools[CompressorGZIP].Put(gzipWriter)
defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter)
responseBody = gzipWriter
default:
responseBody = responseWriter