From 534bb385a67002881f90c4072e40b053ecdc8bda Mon Sep 17 00:00:00 2001 From: Miroslav Cvetic Date: Wed, 19 Feb 2025 16:51:47 +0100 Subject: [PATCH] BKSDEV-508 feat: base api, collection and alias create --- pkg/api/api.go | 290 +++++++++++++++++++++++++++++----- pkg/api/utils.go | 139 +++++++++++++++- pkg/indexing/contentserver.go | 26 +-- pkg/indexing/indexer.go | 56 ++++--- pkg/interface.go | 17 +- pkg/vo.go | 2 +- 6 files changed, 447 insertions(+), 83 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 78edeef..248d4bb 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -2,10 +2,15 @@ package typesenseapi import ( "context" + "encoding/json" "errors" - typesense2 "github.com/foomo/typesense/pkg" + "strconv" + "time" + + pkgtypesense "github.com/foomo/typesense/pkg" "github.com/typesense/typesense-go/v3/typesense" "github.com/typesense/typesense-go/v3/typesense/api" + "github.com/typesense/typesense-go/v3/typesense/api/pointer" "go.uber.org/zap" ) @@ -14,16 +19,16 @@ const defaultSearchPresetName = "default" type BaseAPI[indexDocument any, returnType any] struct { l *zap.Logger client *typesense.Client - collections map[typesense2.IndexID]*api.CollectionSchema + collections map[pkgtypesense.IndexID]*api.CollectionSchema preset *api.PresetUpsertSchema - revisionID typesense2.RevisionID + revisionID pkgtypesense.RevisionID } func NewBaseAPI[indexDocument any, returnType any]( l *zap.Logger, client *typesense.Client, - collections map[typesense2.IndexID]*api.CollectionSchema, + collections map[pkgtypesense.IndexID]*api.CollectionSchema, preset *api.PresetUpsertSchema, ) *BaseAPI[indexDocument, returnType] { return &BaseAPI[indexDocument, returnType]{ @@ -42,12 +47,12 @@ func (b *BaseAPI[indexDocument, returnType]) Healthz(_ context.Context) error { return nil } -// Healthz will check if the revisionID is set -func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, error) { +// Indices returns a list of all configured index IDs +func (b *BaseAPI[indexDocument, returnType]) Indices() ([]pkgtypesense.IndexID, error) { if len(b.collections) == 0 { return nil, errors.New("no collections configured") } - indices := make([]typesense2.IndexID, 0, len(b.collections)) + indices := make([]pkgtypesense.IndexID, 0, len(b.collections)) for index := range b.collections { indices = append(indices, index) } @@ -55,8 +60,8 @@ func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, er } // Initialize -// will check the typesense connection and state of the colllections and aliases -// if the collections and aliases are not in the correct state it will create new collections and aliases +// This function ensures that a new collection is created for each alias on every run +// and updates aliases to point to the latest revision. // // example: // @@ -69,39 +74,145 @@ func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, er // } // } // -// there should be 2 aliases "www-bks-at-de" and "digital-bks-at-de" -// there should be at least 2 collections one for each alias -// the collection names are concatenated with the revisionID: "www-bks-at-de-2021-01-01-12" -// the revisionID is a timestamp in the format "YYYY-MM-DD-HH". If multiple collections are available -// the latest revisionID can be identified by the latest timestamp value +// There should be 2 aliases: "www-bks-at-de" and "digital-bks-at-de". +// There should be at least 2 collections, one for each alias. +// The collection names are concatenated with the revision ID: "www-bks-at-de-2021-01-01-12". +// The revision ID is a timestamp in the format "YYYY-MM-DD-HH". If multiple collections are available, +// the latest revision ID can be identified by the latest timestamp value. // -// Additionally, make sure that the configured search preset is present -// The system is ok if there is one alias for each collection and the collections are linked to the correct alias -// The function will set the revisionID that is currently linked to the aliases internally -func (b *BaseAPI[indexDocument, returnType]) Initialize() (typesense2.RevisionID, error) { - var revisionID typesense2.RevisionID - // use b.client.Health() to check the connection +// Additionally, ensure that the configured search preset is present. +// The system is considered valid if there is one alias for each collection and the collections +// are correctly linked to their respective aliases. +// The function sets the revisionID that is currently linked to the aliases internally. +func (b *BaseAPI[indexDocument, returnType]) Initialize(ctx context.Context) (pkgtypesense.RevisionID, error) { + b.l.Info("Initializing Typesense collections and aliases...") - b.revisionID = revisionID - return "", nil -} + // Step 1: Check Typesense connection + if _, err := b.client.Health(ctx, 5*time.Second); err != nil { + b.l.Error("Typesense health check failed", zap.Error(err)) + return "", err + } -func (b *BaseAPI[indexDocument, returnType]) NewRevision() (typesense2.RevisionID, error) { - var revision typesense2.RevisionID + // Step 2: Retrieve existing aliases and collections + aliases, err := b.client.Aliases().Retrieve(ctx) + if err != nil { + b.l.Error("Failed to retrieve aliases", zap.Error(err)) + return "", err + } - // create a revisionID based on the current time "YYYY-MM-DD-HH" + existingCollections, err := b.fetchExistingCollections(ctx) + if err != nil { + return "", err + } - // for all b.collections - // create a new collection in typesense - IndexID + - + revisionID - return revision, nil + // Step 3: Track latest revisions per alias + latestRevisions := make(map[pkgtypesense.IndexID]pkgtypesense.RevisionID) + aliasMappings := make(map[pkgtypesense.IndexID]string) // Tracks alias-to-collection mappings + + for _, alias := range aliases { + collectionName := alias.CollectionName + indexID := pkgtypesense.IndexID(*alias.Name) + revisionID := extractRevisionID(collectionName, string(indexID)) + + // Ensure alias points to an existing collection + if revisionID != "" && existingCollections[collectionName] { + latestRevisions[indexID] = revisionID + aliasMappings[indexID] = collectionName + } else { + b.l.Warn("Alias points to missing collection, resetting", zap.String("alias", string(indexID))) + } + } + + // Step 4: Ensure all aliases are correctly mapped to collections and create a new revision + newRevisionID := b.generateRevisionID() + b.l.Info("Generated new revision", zap.String("revisionID", string(newRevisionID))) + + for indexID, schema := range b.collections { + collectionName := formatCollectionName(indexID, newRevisionID) + + b.l.Warn("Creating new collection & alias", + zap.String("index", string(indexID)), + zap.String("new_collection", collectionName), + ) + + // Create new collection + if err := b.createCollectionIfNotExists(ctx, schema, collectionName); err != nil { + return "", err + } + + // Update alias to point to new collection + if err := b.ensureAliasMapping(ctx, indexID, collectionName); err != nil { + return "", err + } + } + + // Step 5: Set the latest revision ID and return + b.revisionID = newRevisionID + + // Step 6: Ensure search preset is present + if b.preset != nil { + _, err := b.client.Presets().Upsert(ctx, defaultSearchPresetName, b.preset) + if err != nil { + b.l.Error("Failed to upsert search preset", zap.Error(err)) + return "", err + } + } + + b.l.Info("Initialization completed", zap.String("revisionID", string(b.revisionID))) + + return b.revisionID, nil } func (b *BaseAPI[indexDocument, returnType]) UpsertDocuments( - revisionID typesense2.RevisionID, - indexID typesense2.IndexID, - documents []indexDocument, + ctx context.Context, + revisionID pkgtypesense.RevisionID, + indexID pkgtypesense.IndexID, + documents []*indexDocument, ) error { - // use api to upsert documents + if len(documents) == 0 { + b.l.Warn("No documents provided for upsert", zap.String("index", string(indexID))) + return nil + } + + collectionName := formatCollectionName(indexID, revisionID) + + // Convert []indexDocument to []interface{} to satisfy Import() method + docInterfaces := make([]interface{}, len(documents)) + for i, doc := range documents { + b.l.Info("doc", zap.Any("doc", doc)) + docInterfaces[i] = doc + } + + // Perform bulk upsert using Import() + params := &api.ImportDocumentsParams{ + Action: (*api.IndexAction)(pointer.String("upsert")), + } + + importResults, err := b.client.Collection(collectionName).Documents().Import(ctx, docInterfaces, params) + if err != nil { + b.l.Error("Failed to bulk upsert documents", zap.String("collection", collectionName), zap.Error(err)) + return err + } + + // Log success and failure counts + successCount, failureCount := 0, 0 + for _, result := range importResults { + if result.Success { + successCount++ + } else { + failureCount++ + b.l.Warn("Document failed to upsert", + zap.String("collection", collectionName), + zap.String("error", result.Error), + ) + } + } + + b.l.Info("Bulk upsert completed", + zap.String("collection", collectionName), + zap.Int("successful_documents", successCount), + zap.Int("failed_documents", failureCount), + ) return nil } @@ -109,29 +220,128 @@ func (b *BaseAPI[indexDocument, returnType]) UpsertDocuments( // it will update the aliases to point to the new revision // additionally it will remove all old collections that are not linked to an alias // keeping only the latest revision and the one before -func (b *BaseAPI[indexDocument, returnType]) CommitRevision(revisionID typesense2.RevisionID) error { +func (b *BaseAPI[indexDocument, returnType]) CommitRevision(ctx context.Context, revisionID pkgtypesense.RevisionID) error { + for indexID := range b.collections { + alias := string(indexID) + newCollectionName := formatCollectionName(indexID, revisionID) + + // Step 1: Update the alias to point to the new collection + _, err := b.client.Aliases().Upsert(ctx, alias, + &api.CollectionAliasSchema{ + CollectionName: newCollectionName, + }) + if err != nil { + b.l.Error("Failed to update alias", zap.String("alias", alias), zap.Error(err)) + return err + } + b.l.Info("Updated alias", zap.String("alias", alias), zap.String("collection", newCollectionName)) + + // Step 2: Clean up old collections (keep only the last two) + err = b.pruneOldCollections(ctx, alias, newCollectionName) + if err != nil { + b.l.Error("Failed to clean up old collections", zap.String("alias", alias), zap.Error(err)) + } + } + return nil } // RevertRevision will remove the collections created for the given revisionID -func (b *BaseAPI[indexDocument, returnType]) RevertRevision(revisionID typesense2.RevisionID) error { +func (b *BaseAPI[indexDocument, returnType]) RevertRevision(ctx context.Context, revisionID pkgtypesense.RevisionID) error { + for indexID := range b.collections { + collectionName := formatCollectionName(indexID, revisionID) + + // Step 1: Delete the collection safely + _, err := b.client.Collection(collectionName).Delete(ctx) + if err != nil { + b.l.Error("Failed to delete collection", zap.String("collection", collectionName), zap.Error(err)) + return err + } + + b.l.Info("Reverted and deleted collection", zap.String("collection", collectionName)) + } + return nil } // SimpleSearch will perform a search operation on the given index // it will return the documents and the scores func (b *BaseAPI[indexDocument, returnType]) SimpleSearch( - index typesense2.IndexID, + ctx context.Context, + index pkgtypesense.IndexID, q string, filterBy map[string]string, page, perPage int, sortBy string, -) ([]returnType, typesense2.Scores, error) { - return b.ExpertSearch(index, getSearchCollectionParameters(q, filterBy, page, perPage, sortBy)) +) ([]returnType, pkgtypesense.Scores, error) { + // Call getSearchCollectionParameters but also set QueryBy explicitly + parameters := buildSearchParams(q, filterBy, page, perPage, sortBy) + parameters.QueryBy = pointer.String("title,mimeType") + + return b.ExpertSearch(ctx, index, parameters) } // ExpertSearch will perform a search operation on the given index // it will return the documents and the scores -func (b *BaseAPI[indexDocument, returnType]) ExpertSearch(index typesense2.IndexID, parameters *api.SearchCollectionParams) ([]returnType, typesense2.Scores, error) { - return nil, nil, nil +func (b *BaseAPI[indexDocument, returnType]) ExpertSearch( + ctx context.Context, + indexID pkgtypesense.IndexID, + parameters *api.SearchCollectionParams, +) ([]returnType, pkgtypesense.Scores, error) { + if parameters == nil { + b.l.Error("Search parameters are nil") + return nil, nil, errors.New("search parameters cannot be nil") + } + + collectionName := string(indexID) // digital-bks-at-de + searchResponse, err := b.client.Collection(collectionName).Documents().Search(ctx, parameters) + if err != nil { + b.l.Error("Failed to perform search", zap.String("index", collectionName), zap.Error(err)) + return nil, nil, err + } + + // Parse search results + var results = make([]returnType, 0, len(*searchResponse.Hits)) + scores := make(pkgtypesense.Scores) + + for _, hit := range *searchResponse.Hits { + docMap := *hit.Document + + // Extract document ID safely + docID, ok := docMap["id"].(string) + if !ok { + b.l.Warn("Missing or invalid document ID in search result") + continue + } + + // Convert hit to JSON and then unmarshal into returnType + hitJSON, _ := json.Marshal(docMap) + var doc returnType + if err := json.Unmarshal(hitJSON, &doc); err != nil { + b.l.Warn("Failed to unmarshal search result", zap.String("index", collectionName), zap.Error(err)) + continue + } + + results = append(results, doc) + index := 0 + if hit.TextMatchInfo != nil && hit.TextMatchInfo.Score != nil { + if score, err := strconv.Atoi(*hit.TextMatchInfo.Score); err == nil { + index = score + } else { + b.l.Warn("Invalid score value", zap.String("score", *hit.TextMatchInfo.Score), zap.Error(err)) + } + } + + scores[pkgtypesense.DocumentID(docID)] = pkgtypesense.Score{ + ID: pkgtypesense.DocumentID(docID), + Index: index, + } + } + + b.l.Info("Search completed", + zap.String("index", collectionName), + zap.Int("results_count", len(results)), + ) + + return results, scores, nil } diff --git a/pkg/api/utils.go b/pkg/api/utils.go index b985efa..7a3a504 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -1,16 +1,22 @@ package typesenseapi import ( + "context" + "fmt" + "sort" "strings" + "time" + pkgtypesense "github.com/foomo/typesense/pkg" "github.com/typesense/typesense-go/v3/typesense/api" "github.com/typesense/typesense-go/v3/typesense/api/pointer" + "go.uber.org/zap" ) -// getSearchCollectionParameters will return the search collection parameters +// buildSearchParams will return the search collection parameters // this is meant as a utility function to create the search collection parameters // for the typesense search API without any knowledge of the typesense API -func getSearchCollectionParameters( +func buildSearchParams( q string, filterBy map[string]string, page, perPage int, @@ -18,7 +24,7 @@ func getSearchCollectionParameters( ) *api.SearchCollectionParams { parameters := &api.SearchCollectionParams{} parameters.Q = pointer.String(q) - if filterByString := getFilterByString(filterBy); filterByString != "" { + if filterByString := formatFilterQuery(filterBy); filterByString != "" { parameters.FilterBy = pointer.String(filterByString) } parameters.Page = pointer.Int(page) @@ -30,7 +36,7 @@ func getSearchCollectionParameters( return parameters } -func getFilterByString(filterBy map[string]string) string { +func formatFilterQuery(filterBy map[string]string) string { if filterBy == nil { return "" } @@ -40,3 +46,128 @@ func getFilterByString(filterBy map[string]string) string { } return strings.Join(filterByString, "&&") } + +func (b *BaseAPI[indexDocument, returnType]) generateRevisionID() pkgtypesense.RevisionID { + return pkgtypesense.RevisionID(time.Now().Format("2006-01-02-15-04")) // "YYYY-MM-DD-HH-MM" +} + +func getLatestRevisionID(revisions map[pkgtypesense.IndexID]pkgtypesense.RevisionID) pkgtypesense.RevisionID { + var latest pkgtypesense.RevisionID + for _, rev := range revisions { + if rev > latest { + latest = rev + } + } + return latest +} + +func formatCollectionName(indexID pkgtypesense.IndexID, revisionID pkgtypesense.RevisionID) string { + return fmt.Sprintf("%s-%s", indexID, revisionID) +} + +func extractRevisionID(collectionName, name string) pkgtypesense.RevisionID { + if !strings.HasPrefix(collectionName, name+"-") { + return "" + } + + revisionID := strings.TrimPrefix(collectionName, name+"-") + + // Validate that the extracted revision ID follows YYYY-MM-DD-HH-MM format (16 chars) + if len(revisionID) != 16 { + return "" + } + + return pkgtypesense.RevisionID(revisionID) +} + +// ensureAliasMapping ensures an alias correctly points to the specified collection. +func (b *BaseAPI[indexDocument, returnType]) ensureAliasMapping(ctx context.Context, indexID pkgtypesense.IndexID, collectionName string) error { + _, err := b.client.Aliases().Upsert(ctx, string(indexID), &api.CollectionAliasSchema{ + CollectionName: collectionName, + }) + if err != nil { + b.l.Error("Failed to upsert alias", + zap.String("alias", string(indexID)), + zap.String("collection", collectionName), + zap.Error(err), + ) + } + return err +} + +func (b *BaseAPI[indexDocument, returnType]) pruneOldCollections(ctx context.Context, alias, currentCollection string) error { + // Step 1: Retrieve all collections + collections, err := b.client.Collections().Retrieve(ctx) + if err != nil { + b.l.Error("Failed to retrieve collections", zap.Error(err)) + return err + } + + var oldCollections []string + for _, col := range collections { + if strings.HasPrefix(col.Name, alias+"-") && col.Name != currentCollection { + oldCollections = append(oldCollections, col.Name) + } + } + + // Step 2: Sort collections by timestamp (latest first) + sort.Slice(oldCollections, func(i, j int) bool { + return oldCollections[i] > oldCollections[j] // Reverse order + }) + + // Step 3: Delete all but the latest two collections + if len(oldCollections) > 1 { + toDelete := oldCollections[1:] // Keep only the latest two + for _, col := range toDelete { + _, err := b.client.Collection(col).Delete(ctx) + if err != nil { + b.l.Error("Failed to delete collection", zap.String("collection", col), zap.Error(err)) + } else { + b.l.Info("Deleted old collection", zap.String("collection", col)) + } + } + } + + return nil +} + +// fetchExistingCollections retrieves all existing collections and stores them in a map for quick lookup. +func (b *BaseAPI[indexDocument, returnType]) fetchExistingCollections(ctx context.Context) (map[string]bool, error) { + collections, err := b.client.Collections().Retrieve(ctx) + if err != nil { + b.l.Error("Failed to retrieve collections", zap.Error(err)) + return nil, err + } + + existingCollections := make(map[string]bool) + for _, col := range collections { + existingCollections[col.Name] = true + } + + return existingCollections, nil +} + +// createCollectionIfNotExists ensures that a collection exists before trying to use it. +func (b *BaseAPI[indexDocument, returnType]) createCollectionIfNotExists(ctx context.Context, schema *api.CollectionSchema, collectionName string) error { + // Check if collection already exists + existingCollections, err := b.fetchExistingCollections(ctx) + if err != nil { + return err + } + + if existingCollections[collectionName] { + b.l.Info("Collection already exists, skipping creation", zap.String("collection", collectionName)) + return nil + } + + // Set the collection name and create it + schema.Name = collectionName + _, err = b.client.Collections().Create(ctx, schema) + if err != nil { + b.l.Error("Failed to create collection", zap.String("collection", collectionName), zap.Error(err)) + return err + } + + b.l.Info("Created new collection", zap.String("collection", collectionName)) + return nil +} diff --git a/pkg/indexing/contentserver.go b/pkg/indexing/contentserver.go index 5e8db4b..fe99311 100644 --- a/pkg/indexing/contentserver.go +++ b/pkg/indexing/contentserver.go @@ -3,6 +3,8 @@ package typesenseindexing import ( "context" "fmt" + "slices" + "github.com/foomo/contentserver/client" "github.com/foomo/contentserver/content" typesense "github.com/foomo/typesense/pkg" @@ -13,29 +15,32 @@ type ContentServer[indexDocument any] struct { l *zap.Logger contentserverClient *client.Client documentProviderFuncs map[typesense.DocumentType]typesense.DocumentProviderFunc[indexDocument] + supportedMimeTypes []string } func NewContentServer[indexDocument any]( l *zap.Logger, client *client.Client, documentProviderFuncs map[typesense.DocumentType]typesense.DocumentProviderFunc[indexDocument], + supportedMimeTypes []string, ) *ContentServer[indexDocument] { return &ContentServer[indexDocument]{ l: l, contentserverClient: client, documentProviderFuncs: documentProviderFuncs, + supportedMimeTypes: supportedMimeTypes, } } func (c ContentServer[indexDocument]) Provide( ctx context.Context, indexID typesense.IndexID, -) ([]indexDocument, error) { +) ([]*indexDocument, error) { documentInfos, err := c.getDocumentIDsByIndexID(ctx, indexID) if err != nil { return nil, err } - documents := make([]indexDocument, len(documentInfos)) + documents := make([]*indexDocument, len(documentInfos)) for index, documentInfo := range documentInfos { if documentProvider, ok := c.documentProviderFuncs[documentInfo.DocumentType]; !ok { c.l.Warn("no document provider available for document type", zap.String("documentType", string(documentInfo.DocumentType))) @@ -50,7 +55,9 @@ func (c ContentServer[indexDocument]) Provide( ) continue } - documents[index] = document + if document != nil { + documents[index] = document + } } } return documents, nil @@ -60,9 +67,8 @@ func (c ContentServer[indexDocument]) ProvidePaged( ctx context.Context, indexID typesense.IndexID, offset int, -) ([]indexDocument, int, error) { +) ([]*indexDocument, int, error) { panic("implement me") - return nil, 0, nil } func (c ContentServer[indexDocument]) getDocumentIDsByIndexID( @@ -83,10 +89,12 @@ func (c ContentServer[indexDocument]) getDocumentIDsByIndexID( nodeMap := createFlatRepoNodeMap(rootRepoNode, map[string]*content.RepoNode{}) documentInfos := make([]typesense.DocumentInfo, 0, len(nodeMap)) for _, repoNode := range nodeMap { - documentInfos = append(documentInfos, typesense.DocumentInfo{ - DocumentType: typesense.DocumentType(repoNode.MimeType), - DocumentID: typesense.DocumentID(repoNode.ID), - }) + if slices.Contains(c.supportedMimeTypes, repoNode.MimeType) { + documentInfos = append(documentInfos, typesense.DocumentInfo{ + DocumentType: typesense.DocumentType(repoNode.MimeType), + DocumentID: typesense.DocumentID(repoNode.ID), + }) + } } return documentInfos, nil diff --git a/pkg/indexing/indexer.go b/pkg/indexing/indexer.go index af76883..fbc30c3 100644 --- a/pkg/indexing/indexer.go +++ b/pkg/indexing/indexer.go @@ -2,6 +2,7 @@ package typesenseindexing import ( "context" + typesense "github.com/foomo/typesense/pkg" "go.uber.org/zap" ) @@ -29,59 +30,72 @@ func (b *BaseIndexer[indexDocument, returnType]) Healthz(ctx context.Context) er } func (b *BaseIndexer[indexDocument, returnType]) Run(ctx context.Context) error { - // return error if the health check fails - if err := b.Healthz(ctx); err != nil { + // Step 1: Ensure Typesense is initialized + revisionID, err := b.typesenseAPI.Initialize(ctx) + if err != nil || revisionID == "" { + b.l.Error("Failed to initialize Typesense", zap.Error(err)) return err } - // create a new revision - revisionID, err := b.typesenseAPI.NewRevision() - if err != nil { - return err - } - - // get the configured indices from the typesense API + // Step 2: Retrieve all configured indices indices, err := b.typesenseAPI.Indices() if err != nil { + b.l.Error("Failed to retrieve indices from Typesense", zap.Error(err)) return err } - // set a variable to check if the upserting of documents was successful + // Step 3: Track errors while upserting tainted := false + indexedDocuments := 0 - // for each index, get the documents from the document provider and upsert them for _, indexID := range indices { + // Fetch documents from the provider documents, err := b.documentProvider.Provide(ctx, indexID) if err != nil { - return err + b.l.Error("Failed to fetch documents", zap.String("index", string(indexID)), zap.Error(err)) + tainted = true + continue } - err = b.typesenseAPI.UpsertDocuments(revisionID, indexID, documents) + err = b.typesenseAPI.UpsertDocuments(ctx, revisionID, indexID, documents) if err != nil { b.l.Error( - "failed to upsert documents", - zap.Error(err), + "Failed to upsert documents", zap.String("index", string(indexID)), zap.String("revision", string(revisionID)), zap.Int("documents", len(documents)), + zap.Error(err), ) tainted = true - break + continue } + + indexedDocuments += len(documents) + b.l.Info("Successfully upserted documents", + zap.String("index", string(indexID)), + zap.Int("count", len(documents)), + ) } - if !tainted { - // commit the revision if no errors occurred - err = b.typesenseAPI.CommitRevision(revisionID) + // Step 4: Commit or Revert the Revision + if !tainted && indexedDocuments > 0 { + // No errors encountered, commit the revision + err = b.typesenseAPI.CommitRevision(ctx, revisionID) if err != nil { + b.l.Error("Failed to commit revision", zap.String("revision", string(revisionID)), zap.Error(err)) return err } + b.l.Info("Successfully committed revision", zap.String("revision", string(revisionID))) } else { - // revert the revision if errors occurred - err = b.typesenseAPI.RevertRevision(revisionID) + // If errors occurred, revert the revision + b.l.Warn("Errors detected during upsert, reverting revision", zap.String("revision", string(revisionID))) + + err = b.typesenseAPI.RevertRevision(ctx, revisionID) if err != nil { + b.l.Error("Failed to revert revision", zap.String("revision", string(revisionID)), zap.Error(err)) return err } + b.l.Info("Successfully reverted revision", zap.String("revision", string(revisionID))) } return nil diff --git a/pkg/interface.go b/pkg/interface.go index 2cf1cf5..3865237 100644 --- a/pkg/interface.go +++ b/pkg/interface.go @@ -2,29 +2,30 @@ package typesense import ( "context" + "github.com/typesense/typesense-go/v3/typesense/api" ) type API[indexDocument any, returnType any] interface { // this will prepare new indices with the given schema and the index IDs configured for the API - NewRevision() (RevisionID, error) - CommitRevision(revisionID RevisionID) error - RevertRevision(revisionID RevisionID) error - UpsertDocuments(revisionID RevisionID, indexID IndexID, documents []indexDocument) error + CommitRevision(ctx context.Context, revisionID RevisionID) error + RevertRevision(ctx context.Context, revisionID RevisionID) error + UpsertDocuments(ctx context.Context, revisionID RevisionID, indexID IndexID, documents []*indexDocument) error // this will check the typesense connection and initialize the indices // should be run directly in a main.go or similar to ensure the connection is working - Initialize() (RevisionID, error) + Initialize(ctx context.Context) (RevisionID, error) // perform a search operation on the given index SimpleSearch( + ctx context.Context, index IndexID, q string, filterBy map[string]string, page, perPage int, sortBy string, ) ([]returnType, Scores, error) - ExpertSearch(index IndexID, parameters *api.SearchCollectionParams) ([]returnType, Scores, error) + ExpertSearch(ctx context.Context, index IndexID, parameters *api.SearchCollectionParams) ([]returnType, Scores, error) Healthz(ctx context.Context) error Indices() ([]IndexID, error) } @@ -34,6 +35,6 @@ type IndexerInterface[indexDocument any, returnType any] interface { } type DocumentProvider[indexDocument any] interface { - Provide(ctx context.Context, index IndexID) ([]indexDocument, error) - ProvidePaged(ctx context.Context, index IndexID, offset int) ([]indexDocument, int, error) + Provide(ctx context.Context, index IndexID) ([]*indexDocument, error) + ProvidePaged(ctx context.Context, index IndexID, offset int) ([]*indexDocument, int, error) } diff --git a/pkg/vo.go b/pkg/vo.go index 8f485ae..1783932 100644 --- a/pkg/vo.go +++ b/pkg/vo.go @@ -19,7 +19,7 @@ type DocumentProviderFunc[indexDocument any] func( ctx context.Context, indexID IndexID, documentID DocumentID, -) (indexDocument, error) +) (*indexDocument, error) type DocumentInfo struct { DocumentType DocumentType