Git Product home page Git Product logo

elastigo's Introduction

Matt's GitHub stats

elastigo's People

Contributors

araddon avatar cclient avatar cerisier avatar cvanderschuere avatar danrex avatar dimroc avatar dpetek avatar gottwald avatar kytrinyx avatar lr-paul avatar mattbaird avatar mic92 avatar mikosik avatar mschoch avatar nahap avatar nullbus avatar nwolff avatar olorin avatar philhofer avatar pjherring avatar sethcleveland avatar shawnps avatar snikch avatar stumpyfr avatar svipy9 avatar travisjeffery avatar vrecan avatar weberr13 avatar woodsaj avatar wuvist avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elastigo's Issues

Remove Ok field from response types

The ok field was removed in ES 1.0: elastic/elasticsearch#4340. Since it was hardcoded to true in almost all cases, it was useless to begin with.

I can submit a PR, though this will be a breaking change and I'm not sure what the status of this project is in relation to ES 1.0.

How to do initial connection?

It would be great to have an example of making the initial connection. Im new to go and tried to read through the code to figure it out but it isnt quite clicking. So a code example would be appreciated. Thanks!

Bulk indexer goroutines never exit

The bulk indexer starts maxConns goroutines on .Run() and the goroutines have no facility for exiting (

elastigo/core/bulk.go

Lines 184 to 220 in 32fcfc3

func (b *BulkIndexor) startHttpSendor() {
// this sends http requests to elasticsearch it uses maxConns to open up that
// many goroutines, each of which will synchronously call ElasticSearch
// in theory, the whole set will cause a backup all the way to IndexBulk if
// we have consumed all maxConns
for i := 0; i < b.maxConns; i++ {
go func() {
for {
buf := <-b.sendBuf
b.sendWg.Add(1)
err := b.BulkSendor(buf)
// Perhaps a b.FailureStrategy(err) ?? with different types of strategies
// 1. Retry, then panic
// 2. Retry then return error and let runner decide
// 3. Retry, then log to disk? retry later?
if err != nil {
if b.RetryForSeconds > 0 {
time.Sleep(time.Second * time.Duration(b.RetryForSeconds))
err = b.BulkSendor(buf)
if err == nil {
// Successfully re-sent with no error
b.sendWg.Done()
continue
}
}
if b.ErrorChannel != nil {
log.Println(err)
b.ErrorChannel <- &ErrorBuffer{err, buf}
}
}
b.sendWg.Done()
}
}()
}
}
is a for loop with no break or return).

I'm seeing panic backtraces with 300,000+ of these goroutines in one of our applications. It would be great to make the bulk indexer able to exit when it's done being used, somehow.

Flush() in BulkIndexor is funky

This issue is really for @araddon

This works, but has some idiosyncrasies. For instance, in the bulk_test.go file for func TestBulkSmallBatch, if you do not "sleep" and only Flush(), the test fails. So it seems that in order to properly Flush, you have to wait for some unspecified duration of time so that the channel can receive and format the messages - at least I think this is what is happening.

I would not mind if the Flush method actually handled this wait time, but I don't really have any clue as to how to calculate a reasonable wait time.

Does this make sense?

Planned merge on 27th Feb?

Did you postpone the breaking merge? We are about to start using the library and would like to keep away from the surprises... Any eta on that? Thanks.

No way to do an AND of multiple term filters in a query

To term filter multiple fields at the root level, you can do

Search("...").Filter(
    Filter().Terms(...),
    Filter().Terms(...),
)

There doesn't seem to be any way to do this on a filtered query though, which is probably the more useful version anyway. I think QueryDsl should contain a FilterWrap rather than a FilterOp.

Inconsistent Index() function signatures

I'm a bit confused about this:

Non-bulk indexer:

func Index(pretty bool, index string, _type string, id string, data interface{}) (api.BaseResponse, error)

Bulk indexer:

func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

Is there a reason that BulkIndexer.Index takes a date, but Index does not? (It's in IndexWithParameters, but not in the 'defaults' Index has.)

wrong field name exception isn't being reported in BulkIndexer

Let's say I have a mapping with a single field of type long

{"my-index": {
  "mappings": {
    "my-type": {
      "properties": {
        "field-1": {
          "type": "long"}}}}}}

If I try to index a document where field-1's value is a non-number string i.e. "hello" (can't be converted to a long), I will get an exception. This isn't reported to the error channel and should be.

Update in Bulk API

Hi @araddon

Today I was interested in adding an update to the bulk API. The entire API implicitly assumes that you are only using the "index" op. So let's chat - what would be a good way to add the Update? Perhaps something like this?

func (b *BulkIndexor) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}) error {
    by, err := UpdateBulkBytes(index, _type, id, ttl, date, data)
    if err != nil {
        u.Error(err)
        return err
    }
    b.bulkChannel <- by
    return nil
}

what's the recommended git hash for v1 ?

assuming v2 is still less stable/good than v1 (is it?), which git hash should we use for v1?
the readme says to use godep, which is fine, but it's not clear what's the recommended git hash for v1.
i see there's a git tag for v1.0, but looking at the git history shows that some fixes got merged after that tag, however the readme also says that v2.0 development started 2014-07-09 but it's not clear at which point that work broke master.
it seems that a separate v1 branch would be useful for this case.
thanks.

No ability to specify parent

ElasticSearch allows for parent/child relationships. Even when defined in mapping, the relationship has to be specified at index time using the "parent" request variable, e.g

$ curl -XPUT localhost:9200/blogs/blog_tag/1122?parent=1111 -d '{
    "tag" : "something"
}'

http://www.elasticsearch.org/guide/reference/api/index_/

Unfortunately I can't seem to find the functionality in the client, have I missed it? I'm happy to contribute if this functionality is likely to be accepted.

Support older versions of GO

it seems like with older versions of GO (I have the stable debian one that is 1.0.2), it's forbidden to end a function without a return statement :

go/src/github.com/mattbaird/elastigo/core/get.go:59: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/get.go:93: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/update.go:56: function ends without a return statement
go/src/github.com/mattbaird/elastigo/core/update.go:85: function ends without a return statement

maybe you should add default return statements ?

Synchronous bulk insert

Hi,
Is there a way to do synchronous bulk insert directly ? Or if not, what bulding blocks should I use to achieve that ?

Search DSL

Running the following command search.Search("index").Type("type").Result() results in an error.
invalid character '<' looking for beginning of value

win7 go get github.com/mattbaird/elastigo undefined: syscall.SIGSTOP

win7

C:\Users\guohongjun>go get github.com/mattbaird/elastigo

github.com/araddon/gou

D:\gocodes\src\github.com\araddon\gou\events.go:101: undefined: syscall.SIGSTOP
D:\gocodes\src\github.com\araddon\gou\events.go:105: undefined: syscall.SIGUSR1
D:\gocodes\src\github.com\araddon\gou\events.go:109: undefined: syscall.SIGUSR2
D:\gocodes\src\github.com\araddon\gou\log.go:250: undefined: syscall.SYS_IOCTL
D:\gocodes\src\github.com\araddon\gou\log.go:253: not enough arguments in call t
o syscall.Syscall

drone.io doesn't run tests

Headline says it all....

drone.io tries to run the tests in the main package which has none.
The real tests of the sub packages aren't run.
But they would be the important ones.

Conn.ExistsIndex returns a RecordNotFound error when it shouldn't return an error at all

This stems from the fact that Request.DoResponse will return the RecordNotFound error for all HTTP 404 responses. However, in this particular the 404 is not an error, it's the answer to the query.

I think fixing this properly requires making DoResponse return the response object on 404 (instead of nil as it does now) and then letting ExistsIndex check the status code before checking if the error is not nil.

Bulk indexing small batches can fail

Thanks for all the work on bulk indexing. Last night I was running a process that is indexing documents using the global bulk indexor and found that for small batches that finish the process before BulkDelaySeconds has elapsed never get flushed to Elasticsearch. I had assumed that calling Flush might "block" for long enough to flush and retrieve the result, but it does not. The only way I have been able to get around this is by sleeping until (i think) the request hits Elasticsearch with the data.

Any suggestions with how to deal with this situation would be great. I have the following func running in a go routine that blocks until all the documents on the channel have been read and the channel is closed by the routine that is sending documents

func StartElasticsearchIndexer(indexName string, queue chan Document, wg *sync.WaitGroup) {
    done := make(chan bool)
    core.BulkDelaySeconds = 1
    core.BulkIndexorGlobalRun(1, done)

    for document := range queue {
        json, _ := Transform(document)
        date := time.Now() // not sure where to set this
        err := core.IndexBulk(indexName, string(document.Type), strconv.Itoa(document.Id), &date, string(json))

        // This should likely explode if there is an error so we can try again?
        if err != nil {
            panic(err.Error())
        }
    }
    // This channel is closed when the there are no more docs to process ...
    mx.Log.Info("Document channel closed.")
    mx.Log.Info(fmt.Sprintf("%s errors while indexing", strconv.FormatUint(core.BulkErrorCt, 10)))

    // The call to flush should be blocking (?)
    core.BulkIndexorGlobalFlush()
    time.Sleep(1500*time.Millisecond)
    // Tell the indexer to pack it up
    done <- true
    // not true!! we have to wait for request to finish!
    // This is a terrible solution
    time.Sleep(200*time.Millisecond)

    // Finally, we can stop waiting on main routine execution
    wg.Done()
}

I should add that BulkIndexorGlobalFlush was added so that I could get access to Flush on the global instance (which is not exported).

IndicesExists panics if index don't exist

If the index I check with IdicesExists exist it works fine, but if it don't exist I get a panic. See sample code:

package main

import (
    "github.com/mattbaird/elastigo/api"
    "github.com/mattbaird/elastigo/indices"
    "fmt"
)

func main() {
    api.Domain = "localhost"
    fmt.Println(indices.IndicesExists("existing_index"))
    fmt.Println(indices.IndicesExists("not_existing_index"))
}

The output of this program is:

true <nil>
panic: interface conversion: error is *json.SyntaxError, not api.ESError

goroutine 16 [running]:
runtime.panic(0x643b40, 0xc2080401c0)
    /usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/mattbaird/elastigo/indices.IndicesExists(0xc208023ed8, 0x1, 0x1, 0xb, 0x0, 0x0)
    /home/meskio/dev/go/src/github.com/mattbaird/elastigo/indices/indicesExists.go:29 +0x1b0
main.main()
    /tmp/foo.go:12 +0x18c

goroutine 19 [finalizer wait]:
runtime.park(0x4130a0, 0x802570, 0x800809)
    /usr/local/go/src/pkg/runtime/proc.c:1369 +0x89
runtime.parkunlock(0x802570, 0x800809)
    /usr/local/go/src/pkg/runtime/proc.c:1385 +0x3b
runfinq()
    /usr/local/go/src/pkg/runtime/mgc0.c:2636 +0xcf
runtime.goexit()
    /usr/local/go/src/pkg/runtime/proc.c:1445

goroutine 23 [IO wait]:
net.runtime_pollWait(0x7f5856df46c0, 0x72, 0x0)
    /usr/local/go/src/pkg/runtime/netpoll.goc:146 +0x66
net.(*pollDesc).Wait(0xc20802c060, 0x72, 0x7f5856df3418, 0xb)
    /usr/local/go/src/pkg/net/fd_poll_runtime.go:84 +0x34
net.(*pollDesc).WaitRead(0xc20802c060, 0xb, 0x7f5856df3418)
    /usr/local/go/src/pkg/net/fd_poll_runtime.go:89 +0x30
net.(*netFD).Read(0xc20802c000, 0xc208022000, 0x1000, 0x1000, 0x0, 0x7f5856df3418, 0xb)
    /usr/local/go/src/pkg/net/fd_unix.go:232 +0x30e
net.(*conn).Read(0xc20803c020, 0xc208022000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/pkg/net/net.go:122 +0xd2
net/http.noteEOFReader.Read(0x7f5856df4798, 0xc20803c020, 0xc2080421b8, 0xc208022000, 0x1000, 0x1000, 0xc20804e170, 0x8, 0x404158)
    /usr/local/go/src/pkg/net/http/transport.go:1200 +0x50
net/http.(*noteEOFReader).Read(0xc20803e4e0, 0xc208022000, 0x1000, 0x1000, 0x10000000041bde8, 0xc20804e170, 0xc20804e130)
    <autogenerated>:115 +0xb5
bufio.(*Reader).fill(0xc2080043c0)
    /usr/local/go/src/pkg/bufio/bufio.go:97 +0x152
bufio.(*Reader).Peek(0xc2080043c0, 0x1, 0x0, 0x4, 0x1, 0x0, 0x0)
    /usr/local/go/src/pkg/bufio/bufio.go:132 +0xd1
net/http.(*persistConn).readLoop(0xc208042160)
    /usr/local/go/src/pkg/net/http/transport.go:779 +0x95
created by net/http.(*Transport).dialConn
    /usr/local/go/src/pkg/net/http/transport.go:597 +0x8dd

goroutine 17 [syscall]:
runtime.goexit()
    /usr/local/go/src/pkg/runtime/proc.c:1445

goroutine 24 [select]:
net/http.(*persistConn).writeLoop(0xc208042160)
    /usr/local/go/src/pkg/net/http/transport.go:882 +0x336
created by net/http.(*Transport).dialConn
    /usr/local/go/src/pkg/net/http/transport.go:598 +0x8f5

Handle multiple ES Hosts

Currently this Public Static var is used to configure ES which has a number of consequences: 1) can't do multiple hosts (in same cluster), 2) can't do multiple es "clusters".
https://github.com/linohh/elastigo/blob/master/api/request.go#L35

What about extracting it out and allowing a Connection Manager? Something that could someday wrap the Cluster Status/Health info? Potentially keep backwards compatibility in the Api but supporting a single global ConnectionManager, or allow people to create and manage their own (to address the 2nd above).

Cluttered signatures

Hey, not really an issue, but is there any reason for passing the pretty bool? I find this confusing, as the json is parsed anyway. I'll refactor it out (into a single setting if needed), but it will break things for some users.

Correctly handling 404 responses

When requesting a document via func (c *Conn) GetSource, a 404 response (when the record does not exist) should return an error that conveys this information. Currently any response > 304 is attempted to be unmarshaled, however the _source endpoint returns an empty body response when it is 404, which then results in an error from json.Unmarshall.

Would it make sense to have a redigo.RecordNotFound error type that can be returned in this case?

Broken BulkIndexer Flush

If you have something like this:

    indexer := core.NewBulkIndexerErrors(200, 60)
    indexer.BulkMaxBuffer = 10485760
    indexer.BulkMaxDocs = 6000
    done := make(chan bool)
    indexer.Run(done)
    // Clean up on exit as the indexer doesn't seem to do it by itself
    defer func() { indexer.ErrorChannel = nil }()
    defer close(indexer.ErrorChannel)

    go func() {
        for errBuf := range indexer.ErrorChannel {
            // just blissfully print errors forever
            log.Println(errBuf.Err)
        }
        log.Println("No more errors")
    }()

    for i := 0; i < 20; i++ {
        indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"alias":"babbsan"}`)
    }
    done <- true
    indexer.Flush()

And don't call the Flush in the end, it will not save those documents. I'm guessing it is because Run() is creating its own goroutine which won't block on shutdown, where calling Flush will block.

Snapshot API

Is it possible to make the Snapshot() func correspond with the new snapshot API? new means >= ES 1.3

Re: Tests Failing

They fail for me in dev. Could it be that you're using the cached version of dependencies and you need to go get -u them to see if the tests fail against the master branch of the dependencies?

I had to replace some calls with coerced types to get tests to even compile.

request.go : 84, typo which will make json request failed!

80 if body != nil {
81 switch v := body.(type) {
82 case *strings.Reader:
83 r.ContentLength = int64(v.Len())
84 case *bytes.Buffer:
85 r.ContentLength = int64(v.Len())
86 }

in line 84: case *bytes.Buffer shall be replaced by "case *bytes.Reader"

Update a record

I am trying to update a few fields in a record and noticed that there is a core.Update function. However I don't see how to pass in the field that I want to update and their new value.

Looking at the code it looks like this is unfinished? Is this correct or am I missing something?

NewConn seems to leak memory, needs related Close method given upstream changes.

This appears to be caused by upstream bug bitly/go-hostpool#6 from elasticgo code used at https://github.com/mattbaird/elastigo/blob/master/lib/connection.go#L96.

It sounds based on the pull request to go-hostpool that the fix involves introducing a Close method to cleanup time.Ticker and subsequently its references to go-hostpool.epsilonGreedyHostPool. This has an impact on elasticgo such that a Close method must also be implemented on elastigo.Conn to allow resources to be released.

Highlighting not handled

Currently elastigo does not use the information from the highlighting functionality es provides. It's basically a one-liner added to the Hit struct. I could make a PR or just post the one-liner here if there is no other reason for not including it.

Background errors on bulk indexing

10:33:18.265621 bulk.go:140: unexpected EOF
17:49:13.273746 bulk.go:140: write tcp 10.14.14.14:9200: broken pipe

Find a way to retry if possible, log/surface errors

BulkIndex failing to capture errors

In the BulkSend function in bulk.go when there are errors in the bulk response err is nil.

code:

func BulkSend(buf *bytes.Buffer) error {
    _, err := api.DoCommand("POST", "/_bulk", nil, buf)
    if err != nil {
        BulkErrorCt += 1
        return err
    }
    return nil

sample response that comes back with errors but since err is nil none of the errors make it onto the ErrorChannel

[{"took":4,"errors":true,"items":[{"index":{"_index":"myindex_v1","_type":"myfiles","_id":"aaaaac7683dddfee51e0725b0908e2b0","status":400,"error":....etc....

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.