mirror of
https://github.com/foomo/typesense.git
synced 2025-10-16 12:45:37 +00:00
feat: api and indexer implementation
feat: api and indexer implementation
This commit is contained in:
commit
50b4b69567
290
pkg/api/api.go
290
pkg/api/api.go
@ -2,10 +2,15 @@ package typesenseapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
typesense2 "github.com/foomo/typesense/pkg"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
pkgtypesense "github.com/foomo/typesense/pkg"
|
||||
"github.com/typesense/typesense-go/v3/typesense"
|
||||
"github.com/typesense/typesense-go/v3/typesense/api"
|
||||
"github.com/typesense/typesense-go/v3/typesense/api/pointer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -14,16 +19,16 @@ const defaultSearchPresetName = "default"
|
||||
type BaseAPI[indexDocument any, returnType any] struct {
|
||||
l *zap.Logger
|
||||
client *typesense.Client
|
||||
collections map[typesense2.IndexID]*api.CollectionSchema
|
||||
collections map[pkgtypesense.IndexID]*api.CollectionSchema
|
||||
preset *api.PresetUpsertSchema
|
||||
|
||||
revisionID typesense2.RevisionID
|
||||
revisionID pkgtypesense.RevisionID
|
||||
}
|
||||
|
||||
func NewBaseAPI[indexDocument any, returnType any](
|
||||
l *zap.Logger,
|
||||
client *typesense.Client,
|
||||
collections map[typesense2.IndexID]*api.CollectionSchema,
|
||||
collections map[pkgtypesense.IndexID]*api.CollectionSchema,
|
||||
preset *api.PresetUpsertSchema,
|
||||
) *BaseAPI[indexDocument, returnType] {
|
||||
return &BaseAPI[indexDocument, returnType]{
|
||||
@ -42,12 +47,12 @@ func (b *BaseAPI[indexDocument, returnType]) Healthz(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Healthz will check if the revisionID is set
|
||||
func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, error) {
|
||||
// Indices returns a list of all configured index IDs
|
||||
func (b *BaseAPI[indexDocument, returnType]) Indices() ([]pkgtypesense.IndexID, error) {
|
||||
if len(b.collections) == 0 {
|
||||
return nil, errors.New("no collections configured")
|
||||
}
|
||||
indices := make([]typesense2.IndexID, 0, len(b.collections))
|
||||
indices := make([]pkgtypesense.IndexID, 0, len(b.collections))
|
||||
for index := range b.collections {
|
||||
indices = append(indices, index)
|
||||
}
|
||||
@ -55,8 +60,8 @@ func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, er
|
||||
}
|
||||
|
||||
// Initialize
|
||||
// will check the typesense connection and state of the colllections and aliases
|
||||
// if the collections and aliases are not in the correct state it will create new collections and aliases
|
||||
// This function ensures that a new collection is created for each alias on every run
|
||||
// and updates aliases to point to the latest revision.
|
||||
//
|
||||
// example:
|
||||
//
|
||||
@ -69,39 +74,145 @@ func (b *BaseAPI[indexDocument, returnType]) Indices() ([]typesense2.IndexID, er
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// there should be 2 aliases "www-bks-at-de" and "digital-bks-at-de"
|
||||
// there should be at least 2 collections one for each alias
|
||||
// the collection names are concatenated with the revisionID: "www-bks-at-de-2021-01-01-12"
|
||||
// the revisionID is a timestamp in the format "YYYY-MM-DD-HH". If multiple collections are available
|
||||
// the latest revisionID can be identified by the latest timestamp value
|
||||
// There should be 2 aliases: "www-bks-at-de" and "digital-bks-at-de".
|
||||
// There should be at least 2 collections, one for each alias.
|
||||
// The collection names are concatenated with the revision ID: "www-bks-at-de-2021-01-01-12".
|
||||
// The revision ID is a timestamp in the format "YYYY-MM-DD-HH". If multiple collections are available,
|
||||
// the latest revision ID can be identified by the latest timestamp value.
|
||||
//
|
||||
// Additionally, make sure that the configured search preset is present
|
||||
// The system is ok if there is one alias for each collection and the collections are linked to the correct alias
|
||||
// The function will set the revisionID that is currently linked to the aliases internally
|
||||
func (b *BaseAPI[indexDocument, returnType]) Initialize() (typesense2.RevisionID, error) {
|
||||
var revisionID typesense2.RevisionID
|
||||
// use b.client.Health() to check the connection
|
||||
// Additionally, ensure that the configured search preset is present.
|
||||
// The system is considered valid if there is one alias for each collection and the collections
|
||||
// are correctly linked to their respective aliases.
|
||||
// The function sets the revisionID that is currently linked to the aliases internally.
|
||||
func (b *BaseAPI[indexDocument, returnType]) Initialize(ctx context.Context) (pkgtypesense.RevisionID, error) {
|
||||
b.l.Info("Initializing Typesense collections and aliases...")
|
||||
|
||||
b.revisionID = revisionID
|
||||
return "", nil
|
||||
}
|
||||
// Step 1: Check Typesense connection
|
||||
if _, err := b.client.Health(ctx, 5*time.Second); err != nil {
|
||||
b.l.Error("Typesense health check failed", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
|
||||
func (b *BaseAPI[indexDocument, returnType]) NewRevision() (typesense2.RevisionID, error) {
|
||||
var revision typesense2.RevisionID
|
||||
// Step 2: Retrieve existing aliases and collections
|
||||
aliases, err := b.client.Aliases().Retrieve(ctx)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to retrieve aliases", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
|
||||
// create a revisionID based on the current time "YYYY-MM-DD-HH"
|
||||
existingCollections, err := b.fetchExistingCollections(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// for all b.collections
|
||||
// create a new collection in typesense - IndexID + - + revisionID
|
||||
return revision, nil
|
||||
// Step 3: Track latest revisions per alias
|
||||
latestRevisions := make(map[pkgtypesense.IndexID]pkgtypesense.RevisionID)
|
||||
aliasMappings := make(map[pkgtypesense.IndexID]string) // Tracks alias-to-collection mappings
|
||||
|
||||
for _, alias := range aliases {
|
||||
collectionName := alias.CollectionName
|
||||
indexID := pkgtypesense.IndexID(*alias.Name)
|
||||
revisionID := extractRevisionID(collectionName, string(indexID))
|
||||
|
||||
// Ensure alias points to an existing collection
|
||||
if revisionID != "" && existingCollections[collectionName] {
|
||||
latestRevisions[indexID] = revisionID
|
||||
aliasMappings[indexID] = collectionName
|
||||
} else {
|
||||
b.l.Warn("Alias points to missing collection, resetting", zap.String("alias", string(indexID)))
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Ensure all aliases are correctly mapped to collections and create a new revision
|
||||
newRevisionID := b.generateRevisionID()
|
||||
b.l.Info("Generated new revision", zap.String("revisionID", string(newRevisionID)))
|
||||
|
||||
for indexID, schema := range b.collections {
|
||||
collectionName := formatCollectionName(indexID, newRevisionID)
|
||||
|
||||
b.l.Warn("Creating new collection & alias",
|
||||
zap.String("index", string(indexID)),
|
||||
zap.String("new_collection", collectionName),
|
||||
)
|
||||
|
||||
// Create new collection
|
||||
if err := b.createCollectionIfNotExists(ctx, schema, collectionName); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Update alias to point to new collection
|
||||
if err := b.ensureAliasMapping(ctx, indexID, collectionName); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5: Set the latest revision ID and return
|
||||
b.revisionID = newRevisionID
|
||||
|
||||
// Step 6: Ensure search preset is present
|
||||
if b.preset != nil {
|
||||
_, err := b.client.Presets().Upsert(ctx, defaultSearchPresetName, b.preset)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to upsert search preset", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
b.l.Info("Initialization completed", zap.String("revisionID", string(b.revisionID)))
|
||||
|
||||
return b.revisionID, nil
|
||||
}
|
||||
|
||||
func (b *BaseAPI[indexDocument, returnType]) UpsertDocuments(
|
||||
revisionID typesense2.RevisionID,
|
||||
indexID typesense2.IndexID,
|
||||
documents []indexDocument,
|
||||
ctx context.Context,
|
||||
revisionID pkgtypesense.RevisionID,
|
||||
indexID pkgtypesense.IndexID,
|
||||
documents []*indexDocument,
|
||||
) error {
|
||||
// use api to upsert documents
|
||||
if len(documents) == 0 {
|
||||
b.l.Warn("No documents provided for upsert", zap.String("index", string(indexID)))
|
||||
return nil
|
||||
}
|
||||
|
||||
collectionName := formatCollectionName(indexID, revisionID)
|
||||
|
||||
// Convert []indexDocument to []interface{} to satisfy Import() method
|
||||
docInterfaces := make([]interface{}, len(documents))
|
||||
for i, doc := range documents {
|
||||
b.l.Info("doc", zap.Any("doc", doc))
|
||||
docInterfaces[i] = doc
|
||||
}
|
||||
|
||||
// Perform bulk upsert using Import()
|
||||
params := &api.ImportDocumentsParams{
|
||||
Action: (*api.IndexAction)(pointer.String("upsert")),
|
||||
}
|
||||
|
||||
importResults, err := b.client.Collection(collectionName).Documents().Import(ctx, docInterfaces, params)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to bulk upsert documents", zap.String("collection", collectionName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Log success and failure counts
|
||||
successCount, failureCount := 0, 0
|
||||
for _, result := range importResults {
|
||||
if result.Success {
|
||||
successCount++
|
||||
} else {
|
||||
failureCount++
|
||||
b.l.Warn("Document failed to upsert",
|
||||
zap.String("collection", collectionName),
|
||||
zap.String("error", result.Error),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
b.l.Info("Bulk upsert completed",
|
||||
zap.String("collection", collectionName),
|
||||
zap.Int("successful_documents", successCount),
|
||||
zap.Int("failed_documents", failureCount),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -109,29 +220,128 @@ func (b *BaseAPI[indexDocument, returnType]) UpsertDocuments(
|
||||
// it will update the aliases to point to the new revision
|
||||
// additionally it will remove all old collections that are not linked to an alias
|
||||
// keeping only the latest revision and the one before
|
||||
func (b *BaseAPI[indexDocument, returnType]) CommitRevision(revisionID typesense2.RevisionID) error {
|
||||
func (b *BaseAPI[indexDocument, returnType]) CommitRevision(ctx context.Context, revisionID pkgtypesense.RevisionID) error {
|
||||
for indexID := range b.collections {
|
||||
alias := string(indexID)
|
||||
newCollectionName := formatCollectionName(indexID, revisionID)
|
||||
|
||||
// Step 1: Update the alias to point to the new collection
|
||||
_, err := b.client.Aliases().Upsert(ctx, alias,
|
||||
&api.CollectionAliasSchema{
|
||||
CollectionName: newCollectionName,
|
||||
})
|
||||
if err != nil {
|
||||
b.l.Error("Failed to update alias", zap.String("alias", alias), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.l.Info("Updated alias", zap.String("alias", alias), zap.String("collection", newCollectionName))
|
||||
|
||||
// Step 2: Clean up old collections (keep only the last two)
|
||||
err = b.pruneOldCollections(ctx, alias, newCollectionName)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to clean up old collections", zap.String("alias", alias), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RevertRevision will remove the collections created for the given revisionID
|
||||
func (b *BaseAPI[indexDocument, returnType]) RevertRevision(revisionID typesense2.RevisionID) error {
|
||||
func (b *BaseAPI[indexDocument, returnType]) RevertRevision(ctx context.Context, revisionID pkgtypesense.RevisionID) error {
|
||||
for indexID := range b.collections {
|
||||
collectionName := formatCollectionName(indexID, revisionID)
|
||||
|
||||
// Step 1: Delete the collection safely
|
||||
_, err := b.client.Collection(collectionName).Delete(ctx)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to delete collection", zap.String("collection", collectionName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
b.l.Info("Reverted and deleted collection", zap.String("collection", collectionName))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SimpleSearch will perform a search operation on the given index
|
||||
// it will return the documents and the scores
|
||||
func (b *BaseAPI[indexDocument, returnType]) SimpleSearch(
|
||||
index typesense2.IndexID,
|
||||
ctx context.Context,
|
||||
index pkgtypesense.IndexID,
|
||||
q string,
|
||||
filterBy map[string]string,
|
||||
page, perPage int,
|
||||
sortBy string,
|
||||
) ([]returnType, typesense2.Scores, error) {
|
||||
return b.ExpertSearch(index, getSearchCollectionParameters(q, filterBy, page, perPage, sortBy))
|
||||
) ([]returnType, pkgtypesense.Scores, error) {
|
||||
// Call buildSearchParams but also set QueryBy explicitly
|
||||
parameters := buildSearchParams(q, filterBy, page, perPage, sortBy)
|
||||
parameters.QueryBy = pointer.String("title")
|
||||
|
||||
return b.ExpertSearch(ctx, index, parameters)
|
||||
}
|
||||
|
||||
// ExpertSearch will perform a search operation on the given index
|
||||
// it will return the documents and the scores
|
||||
func (b *BaseAPI[indexDocument, returnType]) ExpertSearch(index typesense2.IndexID, parameters *api.SearchCollectionParams) ([]returnType, typesense2.Scores, error) {
|
||||
return nil, nil, nil
|
||||
func (b *BaseAPI[indexDocument, returnType]) ExpertSearch(
|
||||
ctx context.Context,
|
||||
indexID pkgtypesense.IndexID,
|
||||
parameters *api.SearchCollectionParams,
|
||||
) ([]returnType, pkgtypesense.Scores, error) {
|
||||
if parameters == nil {
|
||||
b.l.Error("Search parameters are nil")
|
||||
return nil, nil, errors.New("search parameters cannot be nil")
|
||||
}
|
||||
|
||||
collectionName := string(indexID) // digital-bks-at-de
|
||||
searchResponse, err := b.client.Collection(collectionName).Documents().Search(ctx, parameters)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to perform search", zap.String("index", collectionName), zap.Error(err))
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Parse search results
|
||||
var results = make([]returnType, 0, len(*searchResponse.Hits))
|
||||
scores := make(pkgtypesense.Scores)
|
||||
|
||||
for _, hit := range *searchResponse.Hits {
|
||||
docMap := *hit.Document
|
||||
|
||||
// Extract document ID safely
|
||||
docID, ok := docMap["id"].(string)
|
||||
if !ok {
|
||||
b.l.Warn("Missing or invalid document ID in search result")
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert hit to JSON and then unmarshal into returnType
|
||||
hitJSON, _ := json.Marshal(docMap)
|
||||
var doc returnType
|
||||
if err := json.Unmarshal(hitJSON, &doc); err != nil {
|
||||
b.l.Warn("Failed to unmarshal search result", zap.String("index", collectionName), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
results = append(results, doc)
|
||||
index := 0
|
||||
if hit.TextMatchInfo != nil && hit.TextMatchInfo.Score != nil {
|
||||
if score, err := strconv.Atoi(*hit.TextMatchInfo.Score); err == nil {
|
||||
index = score
|
||||
} else {
|
||||
b.l.Warn("Invalid score value", zap.String("score", *hit.TextMatchInfo.Score), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
scores[pkgtypesense.DocumentID(docID)] = pkgtypesense.Score{
|
||||
ID: pkgtypesense.DocumentID(docID),
|
||||
Index: index,
|
||||
}
|
||||
}
|
||||
|
||||
b.l.Info("Search completed",
|
||||
zap.String("index", collectionName),
|
||||
zap.Int("results_count", len(results)),
|
||||
)
|
||||
|
||||
return results, scores, nil
|
||||
}
|
||||
|
||||
139
pkg/api/utils.go
139
pkg/api/utils.go
@ -1,16 +1,22 @@
|
||||
package typesenseapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pkgtypesense "github.com/foomo/typesense/pkg"
|
||||
"github.com/typesense/typesense-go/v3/typesense/api"
|
||||
"github.com/typesense/typesense-go/v3/typesense/api/pointer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// getSearchCollectionParameters will return the search collection parameters
|
||||
// buildSearchParams will return the search collection parameters
|
||||
// this is meant as a utility function to create the search collection parameters
|
||||
// for the typesense search API without any knowledge of the typesense API
|
||||
func getSearchCollectionParameters(
|
||||
func buildSearchParams(
|
||||
q string,
|
||||
filterBy map[string]string,
|
||||
page, perPage int,
|
||||
@ -18,7 +24,7 @@ func getSearchCollectionParameters(
|
||||
) *api.SearchCollectionParams {
|
||||
parameters := &api.SearchCollectionParams{}
|
||||
parameters.Q = pointer.String(q)
|
||||
if filterByString := getFilterByString(filterBy); filterByString != "" {
|
||||
if filterByString := formatFilterQuery(filterBy); filterByString != "" {
|
||||
parameters.FilterBy = pointer.String(filterByString)
|
||||
}
|
||||
parameters.Page = pointer.Int(page)
|
||||
@ -30,7 +36,7 @@ func getSearchCollectionParameters(
|
||||
return parameters
|
||||
}
|
||||
|
||||
func getFilterByString(filterBy map[string]string) string {
|
||||
func formatFilterQuery(filterBy map[string]string) string {
|
||||
if filterBy == nil {
|
||||
return ""
|
||||
}
|
||||
@ -40,3 +46,128 @@ func getFilterByString(filterBy map[string]string) string {
|
||||
}
|
||||
return strings.Join(filterByString, "&&")
|
||||
}
|
||||
|
||||
func (b *BaseAPI[indexDocument, returnType]) generateRevisionID() pkgtypesense.RevisionID {
|
||||
return pkgtypesense.RevisionID(time.Now().Format("2006-01-02-15-04")) // "YYYY-MM-DD-HH-MM"
|
||||
}
|
||||
|
||||
func getLatestRevisionID(revisions map[pkgtypesense.IndexID]pkgtypesense.RevisionID) pkgtypesense.RevisionID {
|
||||
var latest pkgtypesense.RevisionID
|
||||
for _, rev := range revisions {
|
||||
if rev > latest {
|
||||
latest = rev
|
||||
}
|
||||
}
|
||||
return latest
|
||||
}
|
||||
|
||||
func formatCollectionName(indexID pkgtypesense.IndexID, revisionID pkgtypesense.RevisionID) string {
|
||||
return fmt.Sprintf("%s-%s", indexID, revisionID)
|
||||
}
|
||||
|
||||
func extractRevisionID(collectionName, name string) pkgtypesense.RevisionID {
|
||||
if !strings.HasPrefix(collectionName, name+"-") {
|
||||
return ""
|
||||
}
|
||||
|
||||
revisionID := strings.TrimPrefix(collectionName, name+"-")
|
||||
|
||||
// Validate that the extracted revision ID follows YYYY-MM-DD-HH-MM format (16 chars)
|
||||
if len(revisionID) != 16 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return pkgtypesense.RevisionID(revisionID)
|
||||
}
|
||||
|
||||
// ensureAliasMapping ensures an alias correctly points to the specified collection.
|
||||
func (b *BaseAPI[indexDocument, returnType]) ensureAliasMapping(ctx context.Context, indexID pkgtypesense.IndexID, collectionName string) error {
|
||||
_, err := b.client.Aliases().Upsert(ctx, string(indexID), &api.CollectionAliasSchema{
|
||||
CollectionName: collectionName,
|
||||
})
|
||||
if err != nil {
|
||||
b.l.Error("Failed to upsert alias",
|
||||
zap.String("alias", string(indexID)),
|
||||
zap.String("collection", collectionName),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *BaseAPI[indexDocument, returnType]) pruneOldCollections(ctx context.Context, alias, currentCollection string) error {
|
||||
// Step 1: Retrieve all collections
|
||||
collections, err := b.client.Collections().Retrieve(ctx)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to retrieve collections", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var oldCollections []string
|
||||
for _, col := range collections {
|
||||
if strings.HasPrefix(col.Name, alias+"-") && col.Name != currentCollection {
|
||||
oldCollections = append(oldCollections, col.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Sort collections by timestamp (latest first)
|
||||
sort.Slice(oldCollections, func(i, j int) bool {
|
||||
return oldCollections[i] > oldCollections[j] // Reverse order
|
||||
})
|
||||
|
||||
// Step 3: Delete all but the latest two collections
|
||||
if len(oldCollections) > 1 {
|
||||
toDelete := oldCollections[1:] // Keep only the latest two
|
||||
for _, col := range toDelete {
|
||||
_, err := b.client.Collection(col).Delete(ctx)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to delete collection", zap.String("collection", col), zap.Error(err))
|
||||
} else {
|
||||
b.l.Info("Deleted old collection", zap.String("collection", col))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchExistingCollections retrieves all existing collections and stores them in a map for quick lookup.
|
||||
func (b *BaseAPI[indexDocument, returnType]) fetchExistingCollections(ctx context.Context) (map[string]bool, error) {
|
||||
collections, err := b.client.Collections().Retrieve(ctx)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to retrieve collections", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
existingCollections := make(map[string]bool)
|
||||
for _, col := range collections {
|
||||
existingCollections[col.Name] = true
|
||||
}
|
||||
|
||||
return existingCollections, nil
|
||||
}
|
||||
|
||||
// createCollectionIfNotExists ensures that a collection exists before trying to use it.
|
||||
func (b *BaseAPI[indexDocument, returnType]) createCollectionIfNotExists(ctx context.Context, schema *api.CollectionSchema, collectionName string) error {
|
||||
// Check if collection already exists
|
||||
existingCollections, err := b.fetchExistingCollections(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if existingCollections[collectionName] {
|
||||
b.l.Info("Collection already exists, skipping creation", zap.String("collection", collectionName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set the collection name and create it
|
||||
schema.Name = collectionName
|
||||
_, err = b.client.Collections().Create(ctx, schema)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to create collection", zap.String("collection", collectionName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
b.l.Info("Created new collection", zap.String("collection", collectionName))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3,6 +3,8 @@ package typesenseindexing
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"github.com/foomo/contentserver/client"
|
||||
"github.com/foomo/contentserver/content"
|
||||
typesense "github.com/foomo/typesense/pkg"
|
||||
@ -13,29 +15,32 @@ type ContentServer[indexDocument any] struct {
|
||||
l *zap.Logger
|
||||
contentserverClient *client.Client
|
||||
documentProviderFuncs map[typesense.DocumentType]typesense.DocumentProviderFunc[indexDocument]
|
||||
supportedMimeTypes []string
|
||||
}
|
||||
|
||||
func NewContentServer[indexDocument any](
|
||||
l *zap.Logger,
|
||||
client *client.Client,
|
||||
documentProviderFuncs map[typesense.DocumentType]typesense.DocumentProviderFunc[indexDocument],
|
||||
supportedMimeTypes []string,
|
||||
) *ContentServer[indexDocument] {
|
||||
return &ContentServer[indexDocument]{
|
||||
l: l,
|
||||
contentserverClient: client,
|
||||
documentProviderFuncs: documentProviderFuncs,
|
||||
supportedMimeTypes: supportedMimeTypes,
|
||||
}
|
||||
}
|
||||
|
||||
func (c ContentServer[indexDocument]) Provide(
|
||||
ctx context.Context,
|
||||
indexID typesense.IndexID,
|
||||
) ([]indexDocument, error) {
|
||||
) ([]*indexDocument, error) {
|
||||
documentInfos, err := c.getDocumentIDsByIndexID(ctx, indexID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
documents := make([]indexDocument, len(documentInfos))
|
||||
documents := make([]*indexDocument, len(documentInfos))
|
||||
for index, documentInfo := range documentInfos {
|
||||
if documentProvider, ok := c.documentProviderFuncs[documentInfo.DocumentType]; !ok {
|
||||
c.l.Warn("no document provider available for document type", zap.String("documentType", string(documentInfo.DocumentType)))
|
||||
@ -50,7 +55,9 @@ func (c ContentServer[indexDocument]) Provide(
|
||||
)
|
||||
continue
|
||||
}
|
||||
documents[index] = document
|
||||
if document != nil {
|
||||
documents[index] = document
|
||||
}
|
||||
}
|
||||
}
|
||||
return documents, nil
|
||||
@ -60,9 +67,8 @@ func (c ContentServer[indexDocument]) ProvidePaged(
|
||||
ctx context.Context,
|
||||
indexID typesense.IndexID,
|
||||
offset int,
|
||||
) ([]indexDocument, int, error) {
|
||||
) ([]*indexDocument, int, error) {
|
||||
panic("implement me")
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
func (c ContentServer[indexDocument]) getDocumentIDsByIndexID(
|
||||
@ -83,10 +89,12 @@ func (c ContentServer[indexDocument]) getDocumentIDsByIndexID(
|
||||
nodeMap := createFlatRepoNodeMap(rootRepoNode, map[string]*content.RepoNode{})
|
||||
documentInfos := make([]typesense.DocumentInfo, 0, len(nodeMap))
|
||||
for _, repoNode := range nodeMap {
|
||||
documentInfos = append(documentInfos, typesense.DocumentInfo{
|
||||
DocumentType: typesense.DocumentType(repoNode.MimeType),
|
||||
DocumentID: typesense.DocumentID(repoNode.ID),
|
||||
})
|
||||
if slices.Contains(c.supportedMimeTypes, repoNode.MimeType) {
|
||||
documentInfos = append(documentInfos, typesense.DocumentInfo{
|
||||
DocumentType: typesense.DocumentType(repoNode.MimeType),
|
||||
DocumentID: typesense.DocumentID(repoNode.ID),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return documentInfos, nil
|
||||
|
||||
@ -2,6 +2,7 @@ package typesenseindexing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
typesense "github.com/foomo/typesense/pkg"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -29,59 +30,72 @@ func (b *BaseIndexer[indexDocument, returnType]) Healthz(ctx context.Context) er
|
||||
}
|
||||
|
||||
func (b *BaseIndexer[indexDocument, returnType]) Run(ctx context.Context) error {
|
||||
// return error if the health check fails
|
||||
if err := b.Healthz(ctx); err != nil {
|
||||
// Step 1: Ensure Typesense is initialized
|
||||
revisionID, err := b.typesenseAPI.Initialize(ctx)
|
||||
if err != nil || revisionID == "" {
|
||||
b.l.Error("Failed to initialize Typesense", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// create a new revision
|
||||
revisionID, err := b.typesenseAPI.NewRevision()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// get the configured indices from the typesense API
|
||||
// Step 2: Retrieve all configured indices
|
||||
indices, err := b.typesenseAPI.Indices()
|
||||
if err != nil {
|
||||
b.l.Error("Failed to retrieve indices from Typesense", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// set a variable to check if the upserting of documents was successful
|
||||
// Step 3: Track errors while upserting
|
||||
tainted := false
|
||||
indexedDocuments := 0
|
||||
|
||||
// for each index, get the documents from the document provider and upsert them
|
||||
for _, indexID := range indices {
|
||||
// Fetch documents from the provider
|
||||
documents, err := b.documentProvider.Provide(ctx, indexID)
|
||||
if err != nil {
|
||||
return err
|
||||
b.l.Error("Failed to fetch documents", zap.String("index", string(indexID)), zap.Error(err))
|
||||
tainted = true
|
||||
continue
|
||||
}
|
||||
|
||||
err = b.typesenseAPI.UpsertDocuments(revisionID, indexID, documents)
|
||||
err = b.typesenseAPI.UpsertDocuments(ctx, revisionID, indexID, documents)
|
||||
if err != nil {
|
||||
b.l.Error(
|
||||
"failed to upsert documents",
|
||||
zap.Error(err),
|
||||
"Failed to upsert documents",
|
||||
zap.String("index", string(indexID)),
|
||||
zap.String("revision", string(revisionID)),
|
||||
zap.Int("documents", len(documents)),
|
||||
zap.Error(err),
|
||||
)
|
||||
tainted = true
|
||||
break
|
||||
continue
|
||||
}
|
||||
|
||||
indexedDocuments += len(documents)
|
||||
b.l.Info("Successfully upserted documents",
|
||||
zap.String("index", string(indexID)),
|
||||
zap.Int("count", len(documents)),
|
||||
)
|
||||
}
|
||||
|
||||
if !tainted {
|
||||
// commit the revision if no errors occurred
|
||||
err = b.typesenseAPI.CommitRevision(revisionID)
|
||||
// Step 4: Commit or Revert the Revision
|
||||
if !tainted && indexedDocuments > 0 {
|
||||
// No errors encountered, commit the revision
|
||||
err = b.typesenseAPI.CommitRevision(ctx, revisionID)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to commit revision", zap.String("revision", string(revisionID)), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.l.Info("Successfully committed revision", zap.String("revision", string(revisionID)))
|
||||
} else {
|
||||
// revert the revision if errors occurred
|
||||
err = b.typesenseAPI.RevertRevision(revisionID)
|
||||
// If errors occurred, revert the revision
|
||||
b.l.Warn("Errors detected during upsert, reverting revision", zap.String("revision", string(revisionID)))
|
||||
|
||||
err = b.typesenseAPI.RevertRevision(ctx, revisionID)
|
||||
if err != nil {
|
||||
b.l.Error("Failed to revert revision", zap.String("revision", string(revisionID)), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.l.Info("Successfully reverted revision", zap.String("revision", string(revisionID)))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -2,29 +2,30 @@ package typesense
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/typesense/typesense-go/v3/typesense/api"
|
||||
)
|
||||
|
||||
type API[indexDocument any, returnType any] interface {
|
||||
// this will prepare new indices with the given schema and the index IDs configured for the API
|
||||
NewRevision() (RevisionID, error)
|
||||
CommitRevision(revisionID RevisionID) error
|
||||
RevertRevision(revisionID RevisionID) error
|
||||
UpsertDocuments(revisionID RevisionID, indexID IndexID, documents []indexDocument) error
|
||||
CommitRevision(ctx context.Context, revisionID RevisionID) error
|
||||
RevertRevision(ctx context.Context, revisionID RevisionID) error
|
||||
UpsertDocuments(ctx context.Context, revisionID RevisionID, indexID IndexID, documents []*indexDocument) error
|
||||
|
||||
// this will check the typesense connection and initialize the indices
|
||||
// should be run directly in a main.go or similar to ensure the connection is working
|
||||
Initialize() (RevisionID, error)
|
||||
Initialize(ctx context.Context) (RevisionID, error)
|
||||
|
||||
// perform a search operation on the given index
|
||||
SimpleSearch(
|
||||
ctx context.Context,
|
||||
index IndexID,
|
||||
q string,
|
||||
filterBy map[string]string,
|
||||
page, perPage int,
|
||||
sortBy string,
|
||||
) ([]returnType, Scores, error)
|
||||
ExpertSearch(index IndexID, parameters *api.SearchCollectionParams) ([]returnType, Scores, error)
|
||||
ExpertSearch(ctx context.Context, index IndexID, parameters *api.SearchCollectionParams) ([]returnType, Scores, error)
|
||||
Healthz(ctx context.Context) error
|
||||
Indices() ([]IndexID, error)
|
||||
}
|
||||
@ -34,6 +35,6 @@ type IndexerInterface[indexDocument any, returnType any] interface {
|
||||
}
|
||||
|
||||
type DocumentProvider[indexDocument any] interface {
|
||||
Provide(ctx context.Context, index IndexID) ([]indexDocument, error)
|
||||
ProvidePaged(ctx context.Context, index IndexID, offset int) ([]indexDocument, int, error)
|
||||
Provide(ctx context.Context, index IndexID) ([]*indexDocument, error)
|
||||
ProvidePaged(ctx context.Context, index IndexID, offset int) ([]*indexDocument, int, error)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user