feat: adds UpsertMany & InsertMany and fixes insert

This commit is contained in:
franklin 2023-01-27 15:56:45 +01:00
parent 4535e78be1
commit d9d53b9699
3 changed files with 219 additions and 29 deletions

View File

@ -4,8 +4,11 @@ import (
"context"
"time"
keelpersistence "github.com/foomo/keel/persistence"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/foomo/keel"
@ -62,17 +65,45 @@ func main() {
newEntity := &store.Dummy{
Entity: store.NewEntity(uuid.New().String()),
}
log.Must(l, repo.Upsert(context.Background(), newEntity), "failed to insert")
log.Must(l, repo.Insert(context.Background(), newEntity), "failed to insert")
// fail insert for duplicate entity
duplicateEntity := &store.Dummy{
l.Info("Try to insert with duplicate key")
if err := repo.Insert(context.Background(), &store.Dummy{
Entity: store.NewEntity(newEntity.ID),
}
if err := repo.Upsert(context.Background(), duplicateEntity); err != nil {
}); mongo.IsDuplicateKeyError(err) {
l.Info("OK: expected error", log.FValue(err.Error()))
} else if err != nil {
l.Error("unexpected error", log.FValue(err.Error()))
} else {
l.Error("unexpected success")
}
// fail insert for duplicate entity
l.Info("Try to upsert with duplicate key")
if err := repo.Upsert(context.Background(), &store.Dummy{
Entity: store.NewEntity(newEntity.ID),
}); mongo.IsDuplicateKeyError(err) {
l.Info("OK: expected error", log.FValue(err.Error()))
} else if err != nil {
l.Error("unexpected error", log.FValue(err.Error()))
} else {
l.Error("unexpected success")
}
l.Info("Try to upsert many with duplicate key")
if err := repo.UpsertMany(context.Background(), []*store.Dummy{{
Entity: store.NewEntity(newEntity.ID),
}}); mongo.IsDuplicateKeyError(err) {
l.Info("OK: expected error", log.FValue(err.Error()))
} else if err != nil {
l.Error("unexpected error", log.FValue(err.Error()))
} else {
l.Error("unexpected success")
}
// get entity x2
l.Info("Try to upsert with dirty write")
newEntityA, err := repo.Get(context.Background(), newEntity.ID)
log.Must(l, err, "failed to load new entity")
@ -80,12 +111,34 @@ func main() {
log.Must(l, err, "failed to load new entity")
// update entity A
if err := repo.Upsert(context.Background(), newEntityA); err != nil {
l.Error("ERROR: failed to load new entity")
}
log.Must(l, repo.Upsert(context.Background(), newEntityA), "ERROR: failed to load new entity")
// update entity B
if err := repo.Upsert(context.Background(), newEntityB); err != nil {
if err := repo.Upsert(context.Background(), newEntityB); errors.Is(err, keelpersistence.ErrDirtyWrite) {
l.Info("OK: expected error", log.FValue(err.Error()))
} else if err != nil {
l.Error("unexpected error", log.FValue(err.Error()))
} else {
l.Error("unexpected success")
}
l.Info("Try to upsert many with dirty write")
newEntityA, err = repo.Get(context.Background(), newEntity.ID)
log.Must(l, err, "failed to load new entity")
newEntityB, err = repo.Get(context.Background(), newEntity.ID)
log.Must(l, err, "failed to load new entity")
// update entity A
log.Must(l, repo.UpsertMany(context.Background(), []*store.Dummy{newEntityA}), "ERROR: failed to load new entity")
l.Info("Try to upsert many with dirty write")
if err := repo.UpsertMany(context.Background(), []*store.Dummy{newEntityB}); errors.Is(err, keelpersistence.ErrDirtyWrite) {
l.Info("OK: expected error", log.FValue(err.Error()))
} else if err != nil {
l.Error("unexpected error", log.FValue(err.Error()))
} else {
l.Error("unexpected success")
}
svr.Run()

View File

@ -20,7 +20,6 @@ func NewDummyRepository(collection *keelmongo.Collection) *DummyRepository {
}
}
// Get entity
func (r *DummyRepository) Get(ctx context.Context, id string, opts ...*options.FindOneOptions) (*store.Dummy, error) {
var ret store.Dummy
if err := r.collection.Get(ctx, id, &ret, opts...); err != nil {
@ -29,7 +28,13 @@ func (r *DummyRepository) Get(ctx context.Context, id string, opts ...*options.F
return &ret, nil
}
// Upsert entity
func (r *DummyRepository) Insert(ctx context.Context, entity *store.Dummy) error {
if err := r.collection.Insert(ctx, entity); err != nil {
return err
}
return nil
}
func (r *DummyRepository) Upsert(ctx context.Context, entity *store.Dummy) error {
if err := r.collection.Upsert(ctx, entity.GetID(), entity); err != nil {
return err
@ -37,7 +42,17 @@ func (r *DummyRepository) Upsert(ctx context.Context, entity *store.Dummy) error
return nil
}
// Delete entity
func (r *DummyRepository) UpsertMany(ctx context.Context, entities []*store.Dummy) error {
v := make([]keelmongo.Entity, len(entities))
for i, entity := range entities {
v[i] = entity
}
if err := r.collection.UpsertMany(ctx, v); err != nil {
return err
}
return nil
}
func (r *DummyRepository) Delete(ctx context.Context, id string) error {
return r.collection.Delete(ctx, id)
}

View File

@ -4,6 +4,9 @@ import (
"context"
"time"
keelerrors "github.com/foomo/keel/errors"
keelpersistence "github.com/foomo/keel/persistence"
keeltime "github.com/foomo/keel/time"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
@ -12,10 +15,6 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
keelerrors "github.com/foomo/keel/errors"
keelpersistence "github.com/foomo/keel/persistence"
keeltime "github.com/foomo/keel/time"
)
type (
@ -38,6 +37,10 @@ type (
CollectionOption func(*CollectionOptions)
)
// ------------------------------------------------------------------------------------------------
// ~ Options
// ------------------------------------------------------------------------------------------------
func DefaultCollectionOptions() CollectionOptions {
return CollectionOptions{
CollectionOptions: options.Collection(),
@ -106,6 +109,10 @@ func CollectionWithIndexesCommitQuorumVotingMembers(v context.Context) Collectio
}
}
// ------------------------------------------------------------------------------------------------
// ~ Constructor
// ------------------------------------------------------------------------------------------------
func NewCollection(db *mongo.Database, name string, opts ...CollectionOption) (*Collection, error) {
o := DefaultCollectionOptions()
for _, opt := range opts {
@ -126,6 +133,10 @@ func NewCollection(db *mongo.Database, name string, opts ...CollectionOption) (*
}, nil
}
// ------------------------------------------------------------------------------------------------
// ~ Getter
// ------------------------------------------------------------------------------------------------
func (c *Collection) DB() *mongo.Database {
return c.db
}
@ -134,7 +145,10 @@ func (c *Collection) Col() *mongo.Collection {
return c.collection
}
// Get ...
// ------------------------------------------------------------------------------------------------
// ~ Public methods
// ------------------------------------------------------------------------------------------------
func (c *Collection) Get(ctx context.Context, id string, result interface{}, opts ...*options.FindOneOptions) error {
if id == "" {
return keelpersistence.ErrNotFound
@ -142,7 +156,6 @@ func (c *Collection) Get(ctx context.Context, id string, result interface{}, opt
return c.FindOne(ctx, bson.M{"id": id}, result, opts...)
}
// Exists ...
func (c *Collection) Exists(ctx context.Context, id string) (bool, error) {
if id == "" {
return false, nil
@ -151,8 +164,6 @@ func (c *Collection) Exists(ctx context.Context, id string) (bool, error) {
return ret > 0, err
}
// Upsert inserts/updates with protection against dirty-writes
// requires an unique index on id AND id + version
func (c *Collection) Upsert(ctx context.Context, id string, entity Entity) error {
if id == "" {
return errors.New("id must not be empty")
@ -198,13 +209,133 @@ func (c *Collection) Upsert(ctx context.Context, id string, entity Entity) error
return nil
}
// UpsertMany - NOTE: upsert many does NOT through an explicit error on dirty write so we can only assume it.
func (c *Collection) UpsertMany(ctx context.Context, entities []Entity) error {
var versionUpserts int64
var operations []mongo.WriteModel
for _, entity := range entities {
if entity == nil {
return errors.New("entity must not be nil")
} else if entity.GetID() == "" {
return errors.New("id must not be empty")
}
if v, ok := entity.(EntityWithTimestamps); ok {
now := keeltime.Now()
if ct := v.GetCreatedAt(); ct.IsZero() {
v.SetCreatedAt(now)
}
v.SetUpdatedAt(now)
}
if v, ok := entity.(EntityWithVersion); ok {
currentVersion := v.GetVersion()
// increment version
v.IncreaseVersion()
if currentVersion == 0 {
operations = append(operations,
mongo.NewInsertOneModel().SetDocument(entity),
)
} else {
versionUpserts++
operations = append(operations,
mongo.NewUpdateOneModel().
SetFilter(bson.D{{Key: "id", Value: entity.GetID()}, {Key: "version", Value: currentVersion}}).
SetUpdate(bson.D{{Key: "$set", Value: entity}}).
SetUpsert(false),
)
}
} else {
operations = append(operations,
mongo.NewUpdateOneModel().
SetFilter(bson.D{{Key: "id", Value: entity.GetID()}}).
SetUpdate(bson.D{{Key: "$set", Value: entity}}).
SetUpsert(true),
)
}
}
// Specify an option to turn the bulk insertion in order of operation
bulkOption := options.BulkWriteOptions{}
bulkOption.SetOrdered(false)
res, err := c.Col().BulkWrite(ctx, operations, &bulkOption)
if err != nil {
return err
} else if versionUpserts > 0 && (res.MatchedCount < versionUpserts || res.ModifiedCount != res.MatchedCount) {
// log.Logger().Info("missing upserts",
// zap.Int64("MatchedCount", res.MatchedCount),
// zap.Int64("InsertedCount", res.InsertedCount),
// zap.Int64("UpsertedCount", res.UpsertedCount),
// zap.Int64("ModifiedCount", res.ModifiedCount),
// zap.Any("UpsertedIDs", res.UpsertedIDs),
// zap.Any("versionUpserts", versionUpserts),
// )
return keelpersistence.ErrDirtyWrite
}
return nil
}
func (c *Collection) Insert(ctx context.Context, entity Entity) error {
if entity == nil {
return errors.New("entity must not be nil")
} else if entity.GetID() == "" {
return errors.New("id must not be empty")
}
if v, ok := entity.(EntityWithTimestamps); ok {
now := keeltime.Now()
if ct := v.GetCreatedAt(); ct.IsZero() {
v.SetCreatedAt(now)
}
v.SetUpdatedAt(now)
}
if v, ok := entity.(EntityWithVersion); ok {
// increment version
v.IncreaseVersion()
}
if _, err := c.collection.InsertOne(ctx, entity); err != nil {
return err
}
return nil
}
func (c *Collection) InsertMany(ctx context.Context, entities []Entity) error {
inserts := make([]interface{}, len(entities))
for i, entity := range entities {
if entity == nil {
return errors.New("entity must not be nil")
} else if entity.GetID() == "" {
return errors.New("id must not be empty")
}
if v, ok := entity.(EntityWithTimestamps); ok {
now := keeltime.Now()
if ct := v.GetCreatedAt(); ct.IsZero() {
v.SetCreatedAt(now)
}
v.SetUpdatedAt(now)
}
if v, ok := entity.(EntityWithVersion); ok {
// increment version
v.IncreaseVersion()
}
inserts[i] = entity
}
if _, err := c.collection.InsertMany(ctx, inserts); err != nil {
return err
}
return nil
}
func (c *Collection) Delete(ctx context.Context, id string) error {
if id == "" {
return keelpersistence.ErrNotFound
@ -217,7 +348,6 @@ func (c *Collection) Delete(ctx context.Context, id string) error {
return nil
}
// Find ...
func (c *Collection) Find(ctx context.Context, filter, results interface{}, opts ...*options.FindOptions) error {
cursor, err := c.collection.Find(ctx, filter, opts...)
if errors.Is(err, mongo.ErrNoDocuments) {
@ -230,7 +360,6 @@ func (c *Collection) Find(ctx context.Context, filter, results interface{}, opts
return err
}
// FindOne ...
func (c *Collection) FindOne(ctx context.Context, filter, result interface{}, opts ...*options.FindOneOptions) error {
res := c.collection.FindOne(ctx, filter, opts...)
if errors.Is(res.Err(), mongo.ErrNoDocuments) {
@ -241,7 +370,6 @@ func (c *Collection) FindOne(ctx context.Context, filter, result interface{}, op
return res.Decode(result)
}
// FindIterate ...
func (c *Collection) FindIterate(ctx context.Context, filter interface{}, handler IterateHandlerFn, opts ...*options.FindOptions) error {
cursor, err := c.collection.Find(ctx, filter, opts...)
if errors.Is(err, mongo.ErrNoDocuments) {
@ -258,7 +386,6 @@ func (c *Collection) FindIterate(ctx context.Context, filter interface{}, handle
return err
}
// Aggregate ...
func (c *Collection) Aggregate(ctx context.Context, pipeline mongo.Pipeline, results interface{}, opts ...*options.AggregateOptions) error {
cursor, err := c.collection.Aggregate(ctx, pipeline, opts...)
if err != nil {
@ -269,12 +396,7 @@ func (c *Collection) Aggregate(ctx context.Context, pipeline mongo.Pipeline, res
return err
}
func (c *Collection) AggregateIterate(
ctx context.Context,
pipeline mongo.Pipeline,
handler IterateHandlerFn,
opts ...*options.AggregateOptions,
) error {
func (c *Collection) AggregateIterate(ctx context.Context, pipeline mongo.Pipeline, handler IterateHandlerFn, opts ...*options.AggregateOptions) error {
cursor, err := c.collection.Aggregate(ctx, pipeline, opts...)
if err != nil {
return err