mattbaird / elastigo Goto Github PK
View Code? Open in Web Editor NEWA Go (golang) based Elasticsearch client library.
License: Apache License 2.0
A Go (golang) based Elasticsearch client library.
License: Apache License 2.0
The ok
field was removed in ES 1.0: elastic/elasticsearch#4340. Since it was hardcoded to true
in almost all cases, it was useless to begin with.
I can submit a PR, though this will be a breaking change and I'm not sure what the status of this project is in relation to ES 1.0.
@mattbaird do you mind looking at this? It is an experimental search dsl and it would be great to get other input.
https://github.com/araddon/elastigo/blob/newsearch/core/search_test.go
It is based on new test data loaded here (required running elasticsearch)
https://github.com/araddon/elastigo/blob/newsearch/core/test_test.go#L137
usage:
test -v -host eshost -loaddata
looked in to the code, couldn't find a way. Did I miss it ?
It would be great to have an example of making the initial connection. Im new to go and tried to read through the code to figure it out but it isnt quite clicking. So a code example would be appreciated. Thanks!
The bulk indexer starts maxConns
goroutines on .Run()
and the goroutines have no facility for exiting (
Lines 184 to 220 in 32fcfc3
break
or return
).
I'm seeing panic backtraces with 300,000+ of these goroutines in one of our applications. It would be great to make the bulk indexer able to exit when it's done being used, somehow.
The current design of elastigo requests hide the http.Response
(and its http status) returning json. This makes error handling problematic. #21 (comment)
Propose to: add a new DoResponse
method
https://github.com/mattbaird/elastigo/blob/master/api/request.go#L86
This issue is really for @araddon
This works, but has some idiosyncrasies. For instance, in the bulk_test.go file for func TestBulkSmallBatch
, if you do not "sleep" and only Flush()
, the test fails. So it seems that in order to properly Flush, you have to wait for some unspecified duration of time so that the channel can receive and format the messages - at least I think this is what is happening.
I would not mind if the Flush method actually handled this wait time, but I don't really have any clue as to how to calculate a reasonable wait time.
Does this make sense?
Did you postpone the breaking merge? We are about to start using the library and would like to keep away from the surprises... Any eta on that? Thanks.
Currently the Bulk API only appears to support index + update (see https://github.com/mattbaird/elastigo/blob/master/lib/corebulk.go#L310).
'delete' is a valid bulk operation that could have support added (see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html).
To term filter multiple fields at the root level, you can do
Search("...").Filter(
Filter().Terms(...),
Filter().Terms(...),
)
There doesn't seem to be any way to do this on a filtered query though, which is probably the more useful version anyway. I think QueryDsl
should contain a FilterWrap
rather than a FilterOp
.
Filters don't have scores, and json being returned doesn't serialize into go.
I'm a bit confused about this:
Non-bulk indexer:
func Index(pretty bool, index string, _type string, id string, data interface{}) (api.BaseResponse, error)
Bulk indexer:
func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error
Is there a reason that BulkIndexer.Index takes a date, but Index does not? (It's in IndexWithParameters, but not in the 'defaults' Index has.)
Let's say I have a mapping with a single field of type long
{"my-index": {
"mappings": {
"my-type": {
"properties": {
"field-1": {
"type": "long"}}}}}}
If I try to index a document where field-1
's value is a non-number string i.e. "hello" (can't be converted to a long), I will get an exception. This isn't reported to the error channel and should be.
Hi @araddon
Today I was interested in adding an update to the bulk API. The entire API implicitly assumes that you are only using the "index" op. So let's chat - what would be a good way to add the Update? Perhaps something like this?
func (b *BulkIndexor) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
by, err := UpdateBulkBytes(index, _type, id, ttl, date, data)
if err != nil {
u.Error(err)
return err
}
b.bulkChannel <- by
return nil
}
assuming v2 is still less stable/good than v1 (is it?), which git hash should we use for v1?
the readme says to use godep, which is fine, but it's not clear what's the recommended git hash for v1.
i see there's a git tag for v1.0, but looking at the git history shows that some fixes got merged after that tag, however the readme also says that v2.0 development started 2014-07-09 but it's not clear at which point that work broke master.
it seems that a separate v1 branch would be useful for this case.
thanks.
ElasticSearch allows for parent/child relationships. Even when defined in mapping, the relationship has to be specified at index time using the "parent" request variable, e.g
$ curl -XPUT localhost:9200/blogs/blog_tag/1122?parent=1111 -d '{
"tag" : "something"
}'
http://www.elasticsearch.org/guide/reference/api/index_/
Unfortunately I can't seem to find the functionality in the client, have I missed it? I'm happy to contribute if this functionality is likely to be accepted.
it seems like with older versions of GO (I have the stable debian one that is 1.0.2), it's forbidden to end a function without a return statement :
go/src/github.com/mattbaird/elastigo/core/get.go:59: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/get.go:93: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/update.go:56: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/update.go:85: function ends without a return statement
maybe you should add default return statements ?
Hi,
Is there a way to do synchronous bulk insert directly ? Or if not, what bulding blocks should I use to achieve that ?
Running the following command search.Search("index").Type("type").Result() results in an error.
invalid character '<' looking for beginning of value
There seems to be multiple ways to denote errors in elastigo now
https://github.com/mattbaird/elastigo/blob/master/lib/baserequest.go#L72
https://github.com/mattbaird/elastigo/blob/master/lib/error.go#L10
Would it not be nice to standardize it at some point?
win7
C:\Users\guohongjun>go get github.com/mattbaird/elastigo
D:\gocodes\src\github.com\araddon\gou\events.go:101: undefined: syscall.SIGSTOP
D:\gocodes\src\github.com\araddon\gou\events.go:105: undefined: syscall.SIGUSR1
D:\gocodes\src\github.com\araddon\gou\events.go:109: undefined: syscall.SIGUSR2
D:\gocodes\src\github.com\araddon\gou\log.go:250: undefined: syscall.SYS_IOCTL
D:\gocodes\src\github.com\araddon\gou\log.go:253: not enough arguments in call t
o syscall.Syscall
does not compile: imported and not used: "github.com/mattbaird/elastigo/api"
Getting to much noise in the logs while indexing multiple documents
See line:
https://github.com/mattbaird/elastigo/blob/master/core/index.go#L59
Headline says it all....
drone.io tries to run the tests in the main package which has none.
The real tests of the sub packages aren't run.
But they would be the important ones.
https://github.com/mattbaird/elastigo/blob/master/lib/corebulk.go#L186-L189
the timeout should probably be configurable, and there should be some sort of indication that you've hit the timeout
This stems from the fact that Request.DoResponse
will return the RecordNotFound
error for all HTTP 404 responses. However, in this particular the 404 is not an error, it's the answer to the query.
I think fixing this properly requires making DoResponse
return the response object on 404 (instead of nil
as it does now) and then letting ExistsIndex
check the status code before checking if the error is not nil
.
As far as I can tell, there's no way to set the boost
value of any query.
Thanks for all the work on bulk indexing. Last night I was running a process that is indexing documents using the global bulk indexor and found that for small batches that finish the process before BulkDelaySeconds has elapsed never get flushed to Elasticsearch. I had assumed that calling Flush might "block" for long enough to flush and retrieve the result, but it does not. The only way I have been able to get around this is by sleeping until (i think) the request hits Elasticsearch with the data.
Any suggestions with how to deal with this situation would be great. I have the following func running in a go routine that blocks until all the documents on the channel have been read and the channel is closed by the routine that is sending documents
func StartElasticsearchIndexer(indexName string, queue chan Document, wg *sync.WaitGroup) {
done := make(chan bool)
core.BulkDelaySeconds = 1
core.BulkIndexorGlobalRun(1, done)
for document := range queue {
json, _ := Transform(document)
date := time.Now() // not sure where to set this
err := core.IndexBulk(indexName, string(document.Type), strconv.Itoa(document.Id), &date, string(json))
// This should likely explode if there is an error so we can try again?
if err != nil {
panic(err.Error())
}
}
// This channel is closed when the there are no more docs to process ...
mx.Log.Info("Document channel closed.")
mx.Log.Info(fmt.Sprintf("%s errors while indexing", strconv.FormatUint(core.BulkErrorCt, 10)))
// The call to flush should be blocking (?)
core.BulkIndexorGlobalFlush()
time.Sleep(1500*time.Millisecond)
// Tell the indexer to pack it up
done <- true
// not true!! we have to wait for request to finish!
// This is a terrible solution
time.Sleep(200*time.Millisecond)
// Finally, we can stop waiting on main routine execution
wg.Done()
}
I should add that BulkIndexorGlobalFlush was added so that I could get access to Flush on the global instance (which is not exported).
If the index I check with IdicesExists exist it works fine, but if it don't exist I get a panic. See sample code:
package main
import (
"github.com/mattbaird/elastigo/api"
"github.com/mattbaird/elastigo/indices"
"fmt"
)
func main() {
api.Domain = "localhost"
fmt.Println(indices.IndicesExists("existing_index"))
fmt.Println(indices.IndicesExists("not_existing_index"))
}
The output of this program is:
true <nil>
panic: interface conversion: error is *json.SyntaxError, not api.ESError
goroutine 16 [running]:
runtime.panic(0x643b40, 0xc2080401c0)
/usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/mattbaird/elastigo/indices.IndicesExists(0xc208023ed8, 0x1, 0x1, 0xb, 0x0, 0x0)
/home/meskio/dev/go/src/github.com/mattbaird/elastigo/indices/indicesExists.go:29 +0x1b0
main.main()
/tmp/foo.go:12 +0x18c
goroutine 19 [finalizer wait]:
runtime.park(0x4130a0, 0x802570, 0x800809)
/usr/local/go/src/pkg/runtime/proc.c:1369 +0x89
runtime.parkunlock(0x802570, 0x800809)
/usr/local/go/src/pkg/runtime/proc.c:1385 +0x3b
runfinq()
/usr/local/go/src/pkg/runtime/mgc0.c:2636 +0xcf
runtime.goexit()
/usr/local/go/src/pkg/runtime/proc.c:1445
goroutine 23 [IO wait]:
net.runtime_pollWait(0x7f5856df46c0, 0x72, 0x0)
/usr/local/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20802c060, 0x72, 0x7f5856df3418, 0xb)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:84 +0x34
net.(*pollDesc).WaitRead(0xc20802c060, 0xb, 0x7f5856df3418)
/usr/local/go/src/pkg/net/fd_poll_runtime.go:89 +0x30
net.(*netFD).Read(0xc20802c000, 0xc208022000, 0x1000, 0x1000, 0x0, 0x7f5856df3418, 0xb)
/usr/local/go/src/pkg/net/fd_unix.go:232 +0x30e
net.(*conn).Read(0xc20803c020, 0xc208022000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
/usr/local/go/src/pkg/net/net.go:122 +0xd2
net/http.noteEOFReader.Read(0x7f5856df4798, 0xc20803c020, 0xc2080421b8, 0xc208022000, 0x1000, 0x1000, 0xc20804e170, 0x8, 0x404158)
/usr/local/go/src/pkg/net/http/transport.go:1200 +0x50
net/http.(*noteEOFReader).Read(0xc20803e4e0, 0xc208022000, 0x1000, 0x1000, 0x10000000041bde8, 0xc20804e170, 0xc20804e130)
<autogenerated>:115 +0xb5
bufio.(*Reader).fill(0xc2080043c0)
/usr/local/go/src/pkg/bufio/bufio.go:97 +0x152
bufio.(*Reader).Peek(0xc2080043c0, 0x1, 0x0, 0x4, 0x1, 0x0, 0x0)
/usr/local/go/src/pkg/bufio/bufio.go:132 +0xd1
net/http.(*persistConn).readLoop(0xc208042160)
/usr/local/go/src/pkg/net/http/transport.go:779 +0x95
created by net/http.(*Transport).dialConn
/usr/local/go/src/pkg/net/http/transport.go:597 +0x8dd
goroutine 17 [syscall]:
runtime.goexit()
/usr/local/go/src/pkg/runtime/proc.c:1445
goroutine 24 [select]:
net/http.(*persistConn).writeLoop(0xc208042160)
/usr/local/go/src/pkg/net/http/transport.go:882 +0x336
created by net/http.(*Transport).dialConn
/usr/local/go/src/pkg/net/http/transport.go:598 +0x8f5
Currently this Public Static var is used to configure ES which has a number of consequences: 1) can't do multiple hosts (in same cluster), 2) can't do multiple es "clusters".
https://github.com/linohh/elastigo/blob/master/api/request.go#L35
What about extracting it out and allowing a Connection Manager? Something that could someday wrap the Cluster Status/Health info? Potentially keep backwards compatibility in the Api but supporting a single global ConnectionManager, or allow people to create and manage their own (to address the 2nd above).
Hey, not really an issue, but is there any reason for passing the pretty bool? I find this confusing, as the json is parsed anyway. I'll refactor it out (into a single setting if needed), but it will break things for some users.
When requesting a document via func (c *Conn) GetSource
, a 404 response (when the record does not exist) should return an error that conveys this information. Currently any response > 304 is attempted to be unmarshaled, however the _source
endpoint returns an empty body response when it is 404, which then results in an error from json.Unmarshall
.
Would it make sense to have a redigo.RecordNotFound
error type that can be returned in this case?
I want to track how many items are pending to be flushed.
should be doable but with a simple len() on the channel, except that it's a private property, so it needs a wrapper function
Is there a reason that this library supports a 'terms' filter but not a 'term' filter via the QueryDSL. Forgive me if I'm missing something.
The use of "c := NewConn()" in the tests is causing them to fail when they are run with a remote elasticsearch
e.g.
go test -v -host <ip.address>
If you have something like this:
indexer := core.NewBulkIndexerErrors(200, 60)
indexer.BulkMaxBuffer = 10485760
indexer.BulkMaxDocs = 6000
done := make(chan bool)
indexer.Run(done)
// Clean up on exit as the indexer doesn't seem to do it by itself
defer func() { indexer.ErrorChannel = nil }()
defer close(indexer.ErrorChannel)
go func() {
for errBuf := range indexer.ErrorChannel {
// just blissfully print errors forever
log.Println(errBuf.Err)
}
log.Println("No more errors")
}()
for i := 0; i < 20; i++ {
indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"alias":"babbsan"}`)
}
done <- true
indexer.Flush()
And don't call the Flush in the end, it will not save those documents. I'm guessing it is because Run() is creating its own goroutine which won't block on shutdown, where calling Flush will block.
Any chance of adding Facet filtering as described here: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-facets.html
I tried but my knowledge of ElasticSearch isn't good enough to add it.
Thanks
code looks for "score" not "_score"
Is it possible to make the Snapshot() func correspond with the new snapshot API? new means >= ES 1.3
They fail for me in dev. Could it be that you're using the cached version of dependencies and you need to go get -u
them to see if the tests fail against the master branch of the dependencies?
I had to replace some calls with coerced types to get tests to even compile.
80 if body != nil {
81 switch v := body.(type) {
82 case *strings.Reader:
83 r.ContentLength = int64(v.Len())
84 case *bytes.Buffer:
85 r.ContentLength = int64(v.Len())
86 }
in line 84: case *bytes.Buffer shall be replaced by "case *bytes.Reader"
Currently fails due to an unavailable version of elasticsearch.
I am trying to update a few fields in a record and noticed that there is a core.Update function. However I don't see how to pass in the field that I want to update and their new value.
Looking at the code it looks like this is unfinished? Is this correct or am I missing something?
This appears to be caused by upstream bug bitly/go-hostpool#6 from elasticgo code used at https://github.com/mattbaird/elastigo/blob/master/lib/connection.go#L96.
It sounds based on the pull request to go-hostpool that the fix involves introducing a Close method to cleanup time.Ticker
and subsequently its references to go-hostpool.epsilonGreedyHostPool
. This has an impact on elasticgo such that a Close method must also be implemented on elastigo.Conn to allow resources to be released.
Currently elastigo does not use the information from the highlighting functionality es provides. It's basically a one-liner added to the Hit
struct. I could make a PR or just post the one-liner here if there is no other reason for not including it.
10:33:18.265621 bulk.go:140: unexpected EOF
17:49:13.273746 bulk.go:140: write tcp 10.14.14.14:9200: broken pipe
Find a way to retry if possible, log/surface errors
Also returns:
dial tcp: too many colons in address localhost:9200http:
when called:
core.DeleteByQuery(true, []string{indexName},
[]string{typeName},
nil,
query)
In the BulkSend function in bulk.go when there are errors in the bulk response err is nil.
code:
func BulkSend(buf *bytes.Buffer) error {
_, err := api.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
BulkErrorCt += 1
return err
}
return nil
sample response that comes back with errors but since err is nil none of the errors make it onto the ErrorChannel
[{"took":4,"errors":true,"items":[{"index":{"_index":"myindex_v1","_type":"myfiles","_id":"aaaaac7683dddfee51e0725b0908e2b0","status":400,"error":....etc....
https://drone.io/github.com/mattbaird/elastigo/latest
This indicates that the build script is trying to cd into a now defunct directory. I suspect that what you really want to do is go build ./...
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.