feat: add postgres persistance

This commit is contained in:
franklin 2021-10-01 16:18:51 +02:00
parent 8d4e9c6b14
commit f96f3fab04
4 changed files with 223 additions and 0 deletions

View File

@ -0,0 +1,55 @@
package main
import (
"github.com/davecgh/go-spew/spew"
"github.com/foomo/keel"
"github.com/foomo/keel/example/persistence/postgres/repository"
"github.com/foomo/keel/log"
keelpostgres "github.com/foomo/keel/persistence/postgres"
)
// docker run -it --rm -p 5432:5432 -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres postgres:11-alpine
func main() {
svr := keel.NewServer()
// get the logger
l := svr.Logger()
// create persistor
persistor, err := keelpostgres.New(
svr.Context(),
"postgres://postgres:postgres@localhost:5432/postgres",
keelpostgres.WithInit(`
create table if not exists tasks (
id serial primary key,
description text not null
);
`),
)
// use log must helper to exit on error
log.Must(l, err, "failed to create persistor")
// ensure to add the persistor to the closers
svr.AddClosers(persistor)
repo := repository.NewTaskRepository(persistor.Conn())
if value, err := repo.List(svr.Context()); err != nil {
l.Error(err.Error())
} else {
spew.Dump(value)
}
if err := repo.Insert(svr.Context(), "one"); err != nil {
l.Error(err.Error())
}
if value, err := repo.List(svr.Context()); err != nil {
l.Error(err.Error())
} else {
spew.Dump(value)
}
svr.Run()
}

View File

@ -0,0 +1,50 @@
package repository
import (
"context"
"github.com/jackc/pgx/v4"
)
type TaskRepository struct {
conn *pgx.Conn
}
// NewTaskRepository constructor
func NewTaskRepository(conn *pgx.Conn) *TaskRepository {
return &TaskRepository{
conn: conn,
}
}
func (r *TaskRepository) List(ctx context.Context) (map[int32]string, error) {
rows, err := r.conn.Query(ctx, "select * from tasks")
if err != nil {
return nil, err
}
ret := map[int32]string{}
for rows.Next() {
var id int32
var description string
err := rows.Scan(&id, &description)
if err != nil {
return nil, err
}
ret[id] = description
}
return ret, rows.Err()
}
func (r *TaskRepository) Insert(ctx context.Context, description string) error {
_, err := r.conn.Exec(context.Background(), "insert into tasks(description) values($1)", description)
return err
}
func (r *TaskRepository) Drop(ctx context.Context) error {
if _, err := r.conn.Exec(ctx, `DROP TABLE IF EXISTS order_numbers;`); err != nil {
return err
}
return nil
}

10
persistence/error.go Normal file
View File

@ -0,0 +1,10 @@
package keelpersistence
import (
"github.com/pkg/errors"
)
var (
ErrNotFound = errors.New("not found error")
ErrDirtyWrite = errors.New("dirty write error")
)

View File

@ -0,0 +1,108 @@
package keelpostgres
import (
"context"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/zapadapter"
"go.uber.org/zap"
"github.com/foomo/keel/log"
"github.com/pkg/errors"
)
// Persistor exported to used also for embedding into other types in foreign packages.
type (
Persistor struct {
db *pgx.Conn
l *zap.Logger
}
Options struct {
Init string
Logger *zap.Logger
}
Option func(*Options)
)
func WithInit(v string) Option {
return func(o *Options) {
o.Init = v
}
}
func WithLogger(v *zap.Logger) Option {
return func(o *Options) {
o.Logger = v
}
}
func DefaultOptions() Options {
return Options{
Logger: log.Logger(),
}
}
func New(ctx context.Context, uri string, opts ...Option) (*Persistor, error) {
// urlExample := "postgres://username:password@localhost:5432/database_name"
o := DefaultOptions()
for _, opt := range opts {
opt(&o)
}
config, err := pgx.ParseConfig(uri)
if err != nil {
return nil, errors.Wrap(err, "failed to parse uri")
}
config.Logger = zapadapter.NewLogger(o.Logger)
// TODO @franklin performance config
db, err := pgx.ConnectConfig(ctx, config)
if err != nil {
return nil, errors.Wrap(err, "failed to connect")
}
// TODO @franklin add telemetry
if err := db.Ping(ctx); err != nil {
return nil, errors.Wrap(err, "failed to ping database")
}
p := &Persistor{
db: db,
l: o.Logger,
}
// initialize
if o.Init != "" {
if _, err := p.db.Exec(ctx, o.Init); err != nil {
return nil, err
}
}
return p, nil
}
func (p *Persistor) TableExists(ctx context.Context, name string) (bool, error) {
var n int64
if err := p.db.QueryRow(ctx, `select 1 from information_schema.tables where table_name=$1`, name).Scan(&n); errors.Is(err, pgx.ErrNoRows) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
func (p *Persistor) Ping(ctx context.Context) error {
return p.db.Ping(ctx)
}
func (p *Persistor) Conn() *pgx.Conn {
return p.db
}
func (p *Persistor) Close(ctx context.Context) error {
return p.db.Close(ctx)
}