mirror of
https://github.com/foomo/gotsrpc.git
synced 2025-10-16 12:35:35 +00:00
265 lines
6.8 KiB
Go
265 lines
6.8 KiB
Go
package gotsrpc
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/pkg/errors"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
const (
|
|
HeaderServiceToService = "X-Foomo-S2S"
|
|
)
|
|
|
|
type Compressor int
|
|
|
|
const (
|
|
CompressorNone Compressor = iota
|
|
CompressorGZIP
|
|
CompressorSnappy
|
|
)
|
|
|
|
func (c Compressor) String() string {
|
|
switch c {
|
|
case CompressorNone:
|
|
return "none"
|
|
case CompressorGZIP:
|
|
return "gzip"
|
|
case CompressorSnappy:
|
|
return "snappy"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// ClientTransport to use for calls
|
|
// var ClientTransport = &http.Transport{}
|
|
|
|
var _ Client = &BufferedClient{}
|
|
|
|
type Client interface {
|
|
Call(ctx context.Context, url string, endpoint string, method string, args []interface{}, reply []interface{}) (err error)
|
|
}
|
|
|
|
func NewClientWithHttpClient(client *http.Client) Client { //nolint:stylecheck
|
|
return NewBufferedClient(WithHTTPClient(client))
|
|
}
|
|
|
|
func newRequest(ctx context.Context, url string, contentType string, reader io.Reader, headers http.Header) (r *http.Request, err error) {
|
|
request, errRequest := http.NewRequestWithContext(ctx, http.MethodPost, url, reader)
|
|
if errRequest != nil {
|
|
return nil, errors.Wrap(errRequest, "could not create a request")
|
|
}
|
|
if len(headers) > 0 {
|
|
request.Header = headers
|
|
}
|
|
request.Header.Set("Content-Type", contentType)
|
|
request.Header.Set("Accept", contentType)
|
|
request.Header.Set(HeaderServiceToService, "true")
|
|
|
|
return request, nil
|
|
}
|
|
|
|
type BufferedClient struct {
|
|
client *http.Client
|
|
handle *clientHandle
|
|
headers http.Header
|
|
compressor Compressor
|
|
}
|
|
|
|
// ClientOption is a function that configures a BufferedClient.
|
|
type ClientOption func(*BufferedClient)
|
|
|
|
// WithHTTPClient allows you to specify a custom *http.Client.
|
|
func WithHTTPClient(c *http.Client) ClientOption {
|
|
return func(bc *BufferedClient) {
|
|
if c == nil {
|
|
bc.client = defaultHttpFactory()
|
|
} else {
|
|
bc.client = c
|
|
}
|
|
}
|
|
}
|
|
|
|
func WithClientEncoding(encoding ClientEncoding) ClientOption {
|
|
return func(bc *BufferedClient) {
|
|
bc.handle = getHandleForType(encoding)
|
|
}
|
|
}
|
|
|
|
// WithHeaders allows you to specify custom HTTP headers.
|
|
func WithHeaders(h http.Header) ClientOption {
|
|
return func(bc *BufferedClient) {
|
|
bc.headers = h
|
|
}
|
|
}
|
|
|
|
func WithCompressor(compressor Compressor) ClientOption {
|
|
return func(bc *BufferedClient) {
|
|
bc.compressor = compressor
|
|
}
|
|
}
|
|
|
|
func WithSnappyCompression() ClientOption {
|
|
return WithCompressor(CompressorSnappy)
|
|
}
|
|
|
|
func WithGZIPCompression() ClientOption {
|
|
return WithCompressor(CompressorGZIP)
|
|
}
|
|
|
|
func WithNoCompression() ClientOption {
|
|
return WithCompressor(CompressorNone)
|
|
}
|
|
|
|
// NewBufferedClient is the constructor that applies all functional options.
|
|
func NewBufferedClient(opts ...ClientOption) *BufferedClient {
|
|
// Set reasonable defaults here
|
|
bc := &BufferedClient{
|
|
client: defaultHttpFactory(),
|
|
headers: make(http.Header),
|
|
handle: getHandleForType(EncodingMsgpack),
|
|
compressor: CompressorNone,
|
|
}
|
|
|
|
// Apply each option
|
|
for _, opt := range opts {
|
|
opt(bc)
|
|
}
|
|
return bc
|
|
}
|
|
|
|
// Call calls a method on the remove service
|
|
func (c *BufferedClient) Call(ctx context.Context, url string, endpoint string, method string, args []interface{}, reply []interface{}) error {
|
|
// Marshall args
|
|
buffer := &bytes.Buffer{}
|
|
|
|
// If no arguments are set, remove
|
|
|
|
var encodeWriter io.Writer
|
|
switch c.compressor {
|
|
case CompressorGZIP:
|
|
if gzipWriter, ok := globalCompressorWriterPools[CompressorGZIP].Get().(*gzip.Writer); ok {
|
|
gzipWriter.Reset(buffer)
|
|
|
|
defer globalCompressorWriterPools[CompressorGZIP].Put(gzipWriter)
|
|
|
|
encodeWriter = gzipWriter
|
|
}
|
|
case CompressorSnappy:
|
|
if snappyWriter, ok := globalCompressorWriterPools[CompressorSnappy].Get().(*snappy.Writer); ok {
|
|
snappyWriter.Reset(buffer)
|
|
|
|
defer globalCompressorWriterPools[CompressorSnappy].Put(snappyWriter)
|
|
encodeWriter = snappyWriter
|
|
}
|
|
case CompressorNone:
|
|
encodeWriter = buffer
|
|
default:
|
|
encodeWriter = buffer
|
|
}
|
|
|
|
err := codec.NewEncoder(encodeWriter, c.handle.handle).Encode(args)
|
|
if err != nil {
|
|
return errors.Wrap(err, "could not encode data")
|
|
}
|
|
|
|
if writer, ok := encodeWriter.(io.Closer); ok {
|
|
if err = writer.Close(); err != nil {
|
|
return errors.Wrap(err, "failed to write to request body")
|
|
}
|
|
}
|
|
|
|
// Create post url
|
|
postURL := fmt.Sprintf("%s%s/%s", url, endpoint, method)
|
|
req, err := newRequest(ctx, postURL, c.handle.contentType, buffer, c.headers.Clone())
|
|
if err != nil {
|
|
return NewClientError(errors.Wrap(err, "failed to create request"))
|
|
}
|
|
|
|
switch c.compressor {
|
|
case CompressorGZIP:
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
req.Header.Set("Accept-Encoding", "gzip")
|
|
case CompressorSnappy:
|
|
req.Header.Set("Content-Encoding", "snappy")
|
|
req.Header.Set("Accept-Encoding", "snappy")
|
|
case CompressorNone:
|
|
// Disable Automatic Compression
|
|
// https://http.dev/accept-encoding
|
|
req.Header.Set("Accept-Encoding", "identity")
|
|
// uncompressed, nothing to do
|
|
default:
|
|
// uncompressed, nothing to do
|
|
}
|
|
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return NewClientError(errors.Wrap(err, "failed to send request"))
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Check status
|
|
if resp.StatusCode != http.StatusOK {
|
|
var msg string
|
|
if value, err := io.ReadAll(resp.Body); err != nil {
|
|
msg = "failed to read response body: " + err.Error()
|
|
} else {
|
|
msg = string(value)
|
|
}
|
|
return NewClientError(NewHTTPError(msg, resp.StatusCode))
|
|
}
|
|
clientHandle := getHandlerForContentType(resp.Header.Get("Content-Type"))
|
|
|
|
wrappedReply := reply
|
|
if clientHandle.beforeDecodeReply != nil {
|
|
if value, err := clientHandle.beforeDecodeReply(reply); err != nil {
|
|
return NewClientError(errors.Wrap(err, "failed to call beforeDecodeReply hook"))
|
|
} else {
|
|
wrappedReply = value
|
|
}
|
|
}
|
|
|
|
var responseBodyReader io.Reader
|
|
switch resp.Header.Get("Content-Encoding") {
|
|
case "snappy":
|
|
if snappyReader, ok := globalCompressorReaderPools[CompressorSnappy].Get().(*snappy.Reader); ok {
|
|
defer globalCompressorReaderPools[CompressorSnappy].Put(snappyReader)
|
|
|
|
snappyReader.Reset(resp.Body)
|
|
responseBodyReader = snappyReader
|
|
}
|
|
case "gzip":
|
|
if gzipReader, ok := globalCompressorReaderPools[CompressorGZIP].Get().(*gzip.Reader); ok {
|
|
defer globalCompressorReaderPools[CompressorGZIP].Put(gzipReader)
|
|
|
|
err := gzipReader.Reset(resp.Body)
|
|
if err != nil {
|
|
return NewClientError(errors.Wrap(err, "could not create gzip reader"))
|
|
}
|
|
responseBodyReader = gzipReader
|
|
}
|
|
default:
|
|
responseBodyReader = resp.Body
|
|
}
|
|
|
|
if err := codec.NewDecoder(responseBodyReader, clientHandle.handle).Decode(wrappedReply); err != nil {
|
|
return NewClientError(errors.Wrap(err, "failed to decode response"))
|
|
}
|
|
|
|
// replace error
|
|
if clientHandle.afterDecodeReply != nil {
|
|
if err := clientHandle.afterDecodeReply(&reply, wrappedReply); err != nil {
|
|
return NewClientError(errors.Wrap(err, "failed to call afterDecodeReply hook"))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|