Make indexer code more reusable (#2590)

This commit is contained in:
Ethan Koenig 2017-09-24 17:08:48 -07:00 committed by Lauris BH
parent 0b0d85c90d
commit fa28de820e
3 changed files with 83 additions and 35 deletions

View File

@ -25,6 +25,7 @@ func InitIssueIndexer() {
// populateIssueIndexer populate the issue indexer with issue data // populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer() error { func populateIssueIndexer() error {
batch := indexer.IssueIndexerBatch()
for page := 1; ; page++ { for page := 1; ; page++ {
repos, _, err := Repositories(&SearchRepoOptions{ repos, _, err := Repositories(&SearchRepoOptions{
Page: page, Page: page,
@ -34,7 +35,7 @@ func populateIssueIndexer() error {
return fmt.Errorf("Repositories: %v", err) return fmt.Errorf("Repositories: %v", err)
} }
if len(repos) == 0 { if len(repos) == 0 {
return nil return batch.Flush()
} }
for _, repo := range repos { for _, repo := range repos {
issues, err := Issues(&IssuesOptions{ issues, err := Issues(&IssuesOptions{
@ -42,29 +43,37 @@ func populateIssueIndexer() error {
IsClosed: util.OptionalBoolNone, IsClosed: util.OptionalBoolNone,
IsPull: util.OptionalBoolNone, IsPull: util.OptionalBoolNone,
}) })
updates := make([]indexer.IssueIndexerUpdate, len(issues)) if err != nil {
for i, issue := range issues { return err
updates[i] = issue.update() }
for _, issue := range issues {
if err := batch.Add(issue.update()); err != nil {
return err
} }
if err = indexer.BatchUpdateIssues(updates...); err != nil {
return fmt.Errorf("BatchUpdate: %v", err)
} }
} }
} }
} }
func processIssueIndexerUpdateQueue() { func processIssueIndexerUpdateQueue() {
batch := indexer.IssueIndexerBatch()
for { for {
var issueID int64
select { select {
case issueID := <-issueIndexerUpdateQueue: case issueID = <-issueIndexerUpdateQueue:
default:
// flush whatever updates we currently have, since we
// might have to wait a while
if err := batch.Flush(); err != nil {
log.Error(4, "IssueIndexer: %v", err)
}
issueID = <-issueIndexerUpdateQueue
}
issue, err := GetIssueByID(issueID) issue, err := GetIssueByID(issueID)
if err != nil { if err != nil {
log.Error(4, "issuesIndexer.Index: %v", err) log.Error(4, "GetIssueByID: %v", err)
continue } else if err = batch.Add(issue.update()); err != nil {
} log.Error(4, "IssueIndexer: %v", err)
if err = indexer.UpdateIssue(issue.update()); err != nil {
log.Error(4, "issuesIndexer.Index: %v", err)
}
} }
} }
} }

View File

@ -9,6 +9,8 @@ import (
"strconv" "strconv"
"github.com/blevesearch/bleve" "github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/mapping"
"github.com/blevesearch/bleve/search/query" "github.com/blevesearch/bleve/search/query"
) )
@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase
q.Analyzer = analyzer q.Analyzer = analyzer
return q return q
} }
const unicodeNormalizeName = "unicodeNormalize"
func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
})
}
// Update represents an update to an indexer
type Update interface {
addToBatch(batch *bleve.Batch) error
}
const maxBatchSize = 16
// Batch batch of indexer updates that automatically flushes once it
// reaches a certain size
type Batch struct {
batch *bleve.Batch
index bleve.Index
}
// Add add update to batch, possibly flushing
func (batch *Batch) Add(update Update) error {
if err := update.addToBatch(batch.batch); err != nil {
return err
}
return batch.flushIfFull()
}
func (batch *Batch) flushIfFull() error {
if batch.batch.Size() >= maxBatchSize {
return batch.Flush()
}
return nil
}
// Flush manually flush the batch, regardless of its size
func (batch *Batch) Flush() error {
if err := batch.index.Batch(batch.batch); err != nil {
return err
}
batch.batch.Reset()
return nil
}

View File

@ -13,7 +13,6 @@ import (
"github.com/blevesearch/bleve" "github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/custom" "github.com/blevesearch/bleve/analysis/analyzer/custom"
"github.com/blevesearch/bleve/analysis/token/lowercase" "github.com/blevesearch/bleve/analysis/token/lowercase"
"github.com/blevesearch/bleve/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/analysis/tokenizer/unicode" "github.com/blevesearch/bleve/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/index/upsidedown" "github.com/blevesearch/bleve/index/upsidedown"
) )
@ -35,6 +34,10 @@ type IssueIndexerUpdate struct {
Data *IssueIndexerData Data *IssueIndexerData
} }
func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
return batch.Index(indexerID(update.IssueID), update.Data)
}
const issueIndexerAnalyzer = "issueIndexer" const issueIndexerAnalyzer = "issueIndexer"
// InitIssueIndexer initialize issue indexer // InitIssueIndexer initialize issue indexer
@ -74,17 +77,13 @@ func createIssueIndexer() error {
docMapping.AddFieldMappingsAt("Content", textFieldMapping) docMapping.AddFieldMappingsAt("Content", textFieldMapping)
docMapping.AddFieldMappingsAt("Comments", textFieldMapping) docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
const unicodeNormNFC = "unicodeNormNFC" if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
}); err != nil {
return err return err
} else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{ } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{
"type": custom.Name, "type": custom.Name,
"char_filters": []string{}, "char_filters": []string{},
"tokenizer": unicode.Name, "tokenizer": unicode.Name,
"token_filters": []string{unicodeNormNFC, lowercase.Name}, "token_filters": []string{unicodeNormalizeName, lowercase.Name},
}); err != nil { }); err != nil {
return err return err
} }
@ -97,21 +96,12 @@ func createIssueIndexer() error {
return err return err
} }
// UpdateIssue update the issue indexer // IssueIndexerBatch batch to add updates to
func UpdateIssue(update IssueIndexerUpdate) error { func IssueIndexerBatch() *Batch {
return issueIndexer.Index(indexerID(update.IssueID), update.Data) return &Batch{
} batch: issueIndexer.NewBatch(),
index: issueIndexer,
// BatchUpdateIssues perform a batch update of the issue indexer
func BatchUpdateIssues(updates ...IssueIndexerUpdate) error {
batch := issueIndexer.NewBatch()
for _, update := range updates {
err := batch.Index(indexerID(update.IssueID), update.Data)
if err != nil {
return err
} }
}
return issueIndexer.Batch(batch)
} }
// SearchIssuesByKeyword searches for issues by given conditions. // SearchIssuesByKeyword searches for issues by given conditions.