mikespook / gearman-go Goto Github PK
View Code? Open in Web Editor NEWThis package is a Gearman API for Golang. It was implemented a native protocol for both worker and client API.
License: MIT License
This package is a Gearman API for Golang. It was implemented a native protocol for both worker and client API.
License: MIT License
xingxing,我这里使用client,按照例子用for一直循环调用同个work,1分钟左右,就卡住了,设置的ResponseTimeout为5也没用
然后改成同个连接调用100次后就释放,这样可以用得久一点了,大概5分钟左右,又和上面一样了。。。
I'm running gearmand on windows.
When using example/py/worker.py
, it works fine. But using example/worker/worker
, it seems that gearman-go send invalid magic to server. Below is a log output of gearmand.
2015/03/05 19:19:42 util.go:218: �[0;36m[debug] magic not match 0x32353533�[0m
2015/03/05 19:19:42 session.go:59: �[0;36m[debug] invalid magic sessionId 2 �[0m
Since py/worker.py is working, I'm thinking this is problem of gearman-go.
are client.WorkComplate and client.WorkDate typos or intentional?
// job handler
func (client *Client) handleJob(job *Job) {
//此处jobUniqueId永远为空 , job.UniqueId is empty
if h, ok := client.jobhandlers[job.UniqueId]; ok {
h(job)
delete(client.jobhandlers, job.UniqueId)
}
}
从而example中Worker.go 和 Client.go中,Client永远不会执行到这里面:
jobHandler := func(job *client.Job) {
log.Printf("%s", job.Data)
wg.Done()
}
Usage
section in README.md seems out of date.
I'm new to Go, so apologies if this is super-obvious.
I'm writing a Gearman worker using this library; I want to be able to test my functions, but I can't quite figure out how to create a test job to pass in to the method that actually does the processing.
Just as my super-basic example for now:
func DisplayMessage(job worker.Job) ([]byte, error) {
log.Printf("DisplayMessage: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
I can see that I need to pass a struct of type worker.Job
, but I can't figure out how to create that struct.
I guess if I needed to, I could use the client to send a job directly to Gearman, but I'd rather minimise the number of moving parts here. I couldn't see a test in the library that created jobs (but again, I might have missed that).
Can anyone help?
panic: runtime error: slice bounds out of range
goroutine 5 [running]:
github.com/mikespook/gearman-go/worker.decodeInPack(0xc2000fe000, 0x400, 0x400, 0x0, 0x0, ...)
/opt/go/src/github.com/mikespook/gearman-go/worker/inpack.go:97 +0xa6f
github.com/mikespook/gearman-go/worker.(*agent).work(0xc200081410)
/opt/go/src/github.com/mikespook/gearman-go/worker/agent.go:75 +0x324
created by github.com/mikespook/gearman-go/worker.(*agent).Connect
/opt/go/src/github.com/mikespook/gearman-go/worker/agent.go:41 +0x192
This happens because decodeInPack tries to decode the packet as soon as the amount of bytes received is > 12, but often the data received is only 1024 and the packet size may be bigger.
I have submitted a pull request with a fix.
35 SUBMIT_JOB_SCHED REQ Client
36 SUBMIT_JOB_EPOCH REQ Client
Support needed.
I tried the worker.go example, connecting to a gearmand 0.27 version server on a Linux machine!
The simplest "ToUpper" example is working, I got the expected result but the gearman.log is saying:
WARNING [ 2 ] lost connection to client recv(peer has closed connection) 172.16.1.8:32774 -> libgearman-server/io.cc:608
ERROR [ 2 ] Failed while in _connection_read() -> libgearman-server/io.cc:489
at every execution!
Should I do something else ?
Thanks,
Teo
If we tries to check for all the Gearman
agents are ready or not By using Ready() this causes multiple registration of the worker.
It would be better to have a separate method for Status
that can send the readiness of Geraman servers, or the function registrations can move to first of Run
In order to gracefully remove a worker, there needs to be a way to stop asking for more work. Other language libraries have this functionality, e.g.
$monitor
callback: https://github.com/brianlmoon/net_gearman/blob/master/Net/Gearman/Worker.php. Thestop_if
callback: https://github.com/gearman/perl-Gearman-Client/blob/master/t/worker.plI don't have a specific test case because this is happening randomly and infrequently on our servers, but I do have a backtrace.
https://gist.github.com/justinruggles/cbd8d920976f9c58f6b6
The deadlock is in:
goroutine 23970 [semacquire, 841 minutes]:
sync.runtime_Semacquire(0xc209ae282c)
/usr/local/go/src/pkg/runtime/sema.goc:199 +0x30
sync.(*Mutex).Lock(0xc209ae2828)
/usr/local/go/src/pkg/sync/mutex.go:66 +0xd6
github.com/mikespook/gearman-go/client.(*Client).Status(0xc21360b730, 0xc20f6ebda0, 0x13, 0x0, 0x0, 0x0)
/home/vimeo/stack/lib/go/src/github.com/mikespook/gearman-go/client/client.go:261 +0x280
main.(*JobServer).sendGearmanJob(0xc20801a8c0, 0xc20d495440, 0x6f5d50, 0xa, 0x0, 0x0)
/home/vimeo/jobserver/client.go:165 +0x380
main.(*JobServer).handlePutJob(0xc20801a8c0, 0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00, 0xc20cef1b00)
/home/vimeo/jobserver/server.go:262 +0x707
main.*JobServer.(main.handlePutJob)·fm(0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00, 0x1)
/home/vimeo/jobserver/server.go:468 +0x44
vimeo/http.(*SeqHandlerFuncImpl).ServeHTTP(0xc208040080, 0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00, 0xc20804e901)
/home/vimeo/stack/lib/go/src/vimeo/http/handler.go:32 +0x4e
vimeo/http.(*RestfulHandler).ServeHTTP(0xc20800edb0, 0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00)
/home/vimeo/stack/lib/go/src/vimeo/http/handler.go:113 +0x2ac
vimeo/http.(*Server).ServeHTTP(0xc20801a910, 0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00)
/home/vimeo/stack/lib/go/src/vimeo/http/server.go:34 +0xfc
net/http.serverHandler.ServeHTTP(0xc208004360, 0x7fa8ae94ab78, 0xc20d4dfae0, 0xc20ebffa00)
/usr/local/go/src/pkg/net/http/server.go:1673 +0x19f
net/http.(*conn).serve(0xc20814f500)
/usr/local/go/src/pkg/net/http/server.go:1174 +0xa7e
created by net/http.(*Server).Serve
/usr/local/go/src/pkg/net/http/server.go:1721 +0x313
Here is a snippet of my client code:
func (this *JobServer) sendGearmanJob(job *Job, worker string) error {
logger := job.logger
c, err := client.New(client.Network, "localhost:4730")
if err != nil {
logger.Println(err)
return err
}
c.ErrorHandler = func(e error) {
logger.Println(err)
}
payload, err := json.Marshal(job)
if err != nil {
c.Close()
return err
}
jobHandler := func(resp *client.Response) {
// response handling code here
}
handle, err := c.Do(worker, payload, client.JobNormal, jobHandler)
if err != nil {
return err
}
st, err := c.Status(handle)
if err == nil {
logger.Printf("Status: %+v", st)
} else {
logger.Printf("Status: %v", err)
}
return nil
}
If you use WORK_DATA for multiple intermediate updates, it does not work properly, and only calls the handler once. Consequently, the handler never gets called on WORK_COMPLETE.
目前有没有增加多个服务器的接口? 比如worker需要添加多台服务器, 类似于 gearman_worker_add_servers 和 gearman_client_add_servers 的接口?
We are currently locking in client.do() to avoid a race condition when submitting jobs. We do not have anything to identify the submitted job with when receiving a JOB_CREATED response from the server so multiple simultaneous job submissions need to happen on separate connections if we don't want to lock.
Previous discussion on this can be found in #75
Protocol reference: http://gearman.org/protocol/
type IdGenerator interface {
Id() string
}
Using this interface to instead of autoinc
for generating unique ID. Let users implement their own IdGenerator. The autoinc
& bson/ObjectId
should be supplied by default.
今天在学习gearman-go api的时候,发现实例中的client.go中的jobHandler并没有被调用。
多个 goruntine 并发实例化client,然后递交任务会有问题,返回的hanlde可能都是相同的。worker只能收到一次处理任务请求。
具体是这样:
go func() { client.New() .... client.Do(...) }()
go func() { client.New() .... client.Do(...) }()
....多个goroutine
发现是 client中的ai计数器每次都随client.New重建导致的,我改成从一个全局计数器引用后就可以这样使用了。
不过你原来的client构架应是实例一次client,然后可以跨goroutine并发递交任务。
var @ruan Chunping
If I use client.Do() and the job fails, I'm unable to queue any more jobs after that. I don't have time to diagnose this atm, but changing client.do() to this demonstrates the issue:
func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var mutex sync.Mutex
fmt.Println("1 locking")
mutex.Lock()
fmt.Println("1 ...done")
client.lastcall = "c"
client.innerHandler["c"] = func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
return
}
fmt.Println("2 unlocking")
handle = resp.Handle
mutex.Unlock()
fmt.Println("2 ...done")
}
id := IdGen.Id()
req := getJob(id, []byte(funcname), data)
req.DataType = flag
client.write(req)
fmt.Println("3 locking")
mutex.Lock()
fmt.Println("3 ...done")
return
}
It gets stuck on "3 locking".
It seems other gearman libraries have the capability to specify multiple job servers. It would be nice to maintain the same API gearman-go has, just in case we want to implement our own client connection strategies, but it would also be nice if gearman had one of its own.
The python gearman seems to poll the connections and wait until it gets a status back from the job server that it's been accepted. But some other libraries just pick a random server from the list. The official ruby gearman libraries just does a round robin.
I have written a small http server that is receiving requests for some processing.
IMPORT UPDATE: I am using THE SAME *client.Client for all http goroutines and it seems that the go gearman client is not thread-safe (it is reading from the same buffer in (client *Client) readData function!
I have written a small workerProgram that is running on two servers.
Test it with two batches of curl commands issuing work in parallel.
I discovered that sometimes, I got the same handler string into the gearman client and at that moment the client is unable to retrieve the work from that handler and it fails with timeout. You will find below the H:teo:177 handler delivered twice:
teo@teo:~/go/bd2013/src$ go run demoClientGearman.go
2013/07/18 01:40:08 Opening connection with gearman-server
2013/07/18 01:40:33 HANDLE gearman received: H:teo:174
2013/07/18 01:40:33 HANDLE gearman received: H:teo:175
2013/07/18 01:40:33 HANDLE gearman received: H:teo:176
2013/07/18 01:40:34 HANDLE gearman received: H:teo:177
2013/07/18 01:40:34 HANDLE gearman received: H:teo:177
2013/07/18 01:40:34 HANDLE gearman received: H:teo:178
2013/07/18 01:40:34 HANDLE gearman received: H:teo:179
2013/07/18 01:40:35 HANDLE gearman received: H:teo:180
2013/07/18 01:40:35 HANDLE gearman received: H:teo:181
2013/07/18 01:40:36 HANDLE gearman received: H:teo:182
2013/07/18 01:40:36 HANDLE gearman received: H:teo:183
2013/07/18 01:40:36 HANDLE gearman received: H:teo:184
2013/07/18 01:40:37 ERROR - TIMEOUT on handler H:teo:177
2013/07/18 01:40:37 HANDLE gearman received: H:teo:185
2013/07/18 01:40:37 HANDLE gearman received: H:teo:186
2013/07/18 01:40:37 HANDLE gearman received: H:teo:187
I'm using now gearman-server 1.1.8 👍
gearmand -L 172.16.1.8 -u teo --verbose WARNING -l 'stderr' --http-port 8383 -r http
No error in the gearmand-server window!
Capture net errors.
When the TCP link was broken, re-connect to the server again.
This should be a transparent processing with configurable options.
示例代码
package main
import (
"fmt"
"os"
"github.com/mikespook/gearman-go/client"
)
func main() {
for j := 0; j < 100; j++ {
cl, _ := client.New("tcp", os.Args[1])
for i := 0; i < 1000; i++ {
_, e := cl.DoBg("title", []byte("normal"), client.JobNormal)
if e != nil {
fmt.Println(e)
}
}
}
}
go run file.go 127.0.0.1:4730
Trying to send N
jobs and wait until all of them are completed, is correct to assume that JobHandler
will be always called, even when an error occurs?
Currently I'm using a sync.WaitGroup
, so in the case that not all of the response handlers are invoked the client could hang forever, and would like to make sure this should never happen.
Thanks in advance, and congrats for the job! :)
worker.New() is documented:
//
// If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly.
// If limit is greater than zero, the number of paralled executing
// jobs are limited under the number. If limit is assgined to
// OneByOne(=1), there will be only one job executed in a time.
It's not working that way for me...
w := worker.New(10)
defer w.Close()
w.ErrorHandler = func(e error) {
app.errorLog.Println(e)
if opErr, ok := e.(*net.OpError); ok {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
app.errorLog.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
app.errorLog.Println(err)
}
}
}
}
// Add the GM server
w.AddServer("tcp4", config.GearmanServer)
w.AddFunc("Photos", app.TintBlue, worker.Unlimited)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
signal.Wait()
If I add more than 10 things to my gearman queue, the first 10 are processed by the app.TintBlue
func, but work halts there...the rest of the items in the queue are never processed.
If I use worker.Unlimited
as the limit I run out of memory (working with 7-12mb images being retrieved from S3 and manipulated).
What am I missing? Why can't I specify the number of parallel processes to run my worker in...but trust that that amount of workers will do all of the work?
Oddly, if I use worker.OneByOne
as the limit, it processes 2 items then halts.
请问:
1 worker连续两次grab:
a.Grab()
a.Grab()
获取到一个任务后再也获取不到任务了,为什么,是Job Server限制了吗?
2 worker里面一个agent是不是不支持并发,必须grab到一个任务后,再grab下一个?
3 注册多个AddFunc后,观察发现Job Server是按照注册的反序发送任务的,即必须等后面注册的全部拉空后才能获取到前面注册的任务?
期待您的回答,谢谢!
When creating several new jobs in parallel the client.innerhandler["c"] will be overwritten in client/client.go:225 which down the line leads to jobs getting results of the wrong job.
Proposed solution is to lock the do function only allowing one job to be set up at a time. Pull request: #75
Example code
package main
import
(
"github.com/mikespook/gearman-go/worker"
"log"
)
func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
job.SendData(job.Data())
job.UpdateStatus(1, 1)
return nil, nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New( worker.Unlimited )
defer w.Close()
w.ErrorHandler = func(e error) {
log.Fatal(e)
return
}
w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
}
w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
w.Work()
}
console:
2014/10/01 10:17:09 Starting ...
2014/10/01 10:17:12 ToUpper: Data=[hello world]
2014/10/01 10:17:12 EOF
exit status 1
kentchentekiiMac-23868:gearman kentchen$ go run s1.go
./s1.go:73: undefined: "github.com/mikespook/golib/signal".Loop
I noticed with my application, if I send a large amount of data, then it takes a long time to process while gearman is spewing out "Not enough data" errors back at me. There are multiple solutions to this problem, but one I tested can be found here:
https://github.com/kdar/gearman-go/compare/big-data
It basically reads the entire data upfront before it ever gets to decodeInPack(), so decodeInPack() won't throw an error. Another solution is to have the caller of decodeInPack() notice when it's not enough data and wait until there is a sufficient amount to continue. You would also need to increase bufferSize as a size of 1024 is extremely small and would still make it take a long time to process.
Let me know what you think.
Callbacks aren't working because client.UniqueId is never populated, and that's what is used as the key for the jobHandlers callback map.
Might be helpful to have a functional test that uses a worker and a client to make sure they both interact correctly.
I often get these errors when running my worker. Everything still works fine but these sporadically show up. Any idea where they would come from?
2018/08/16 17:35:41 Unsolicited response received on idle HTTP channel starting with "HTTP/1.0 408 Request Timeout\nCache-Control: no-cache\nConnection: close\nContent-Type: text/html\n\n<!DOCTYPE html>\n<!--\n\nHello future GitHubber! I bet you're here to remove those nasty inline styles,\nDRY up these templates and make 'em nice and re-usable, right?\n\nPlease, don't. https://github.com/styleguide/templates/2.0\n\n-->\n<html>\n <head>\n <meta http-equiv=\"Content-type\" content=\"text/html; charset=utf-8\">\n <title>Unicorn! · GitHub</title>\n <style type=\"text/css\" media=\"screen\">\n body {\n background-color: #f1f1f1;\n margin: 0;\n font-family: \"Helvetica Neue\", Helvetica, Arial, sans-serif;\n }\n\n .container { margin: 50px auto 40px auto; width: 600px; text-align: center; }\n\n a { color: #4183c4; text-decoration: none; }\n a:hover { text-decoration: underline; }\n\n h1 { letter-spacing: -1px; line-height: 60px; font-size: 60px; font-weight: 100; margin: 0px; text-shadow: 0 1px 0 #fff; }\n p { color: rgba(0, 0, 0, 0.5); margin: 10px 0 10px; font-size: 18px; font-weight: 200; line-height: 1.6em;}\n\n ul { list-style: none; margin: 25px 0; padding: 0; }\n li { display: table-cell; font-weight: bold; width: 1%; }\n\n .logo { display: inline-block; margin-top: 35px; }\n .logo-img-2x { display: none; }\n @media\n only screen and (-webkit-min-device-pixel-ratio: 2),\n only screen and ( min--moz-de"; err=<nil>
Although this may tie into issue #3 and require some code rewrite, other gearman libraries (the ruby, python, and php ones) allow you to have a task set. You can define a task set and then run the job, and have attached to it when the job completes or fails.
As it stands right now, there is only one handler for the entire client, but it would be nice to have different handlers for different submitted jobs.
client.decodeJob() instead appends the handle and the data together. So when the callback for JobHandler is called, job.Handle is always an empty string, and job.Data contains both the handle and data.
Fix for this:
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
var handle string
switch datatype {
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
i := bytes.IndexByte(data, '\x00')
if i != -1 {
handle = string(data[:i])
data = data[i:]
}
}
return &Job{magicCode: common.RES,
DataType: datatype,
Data: data,
Handle: handle}, nil
}
I also didn't use newJob() here because it is used in multiple places in the code. If you want to use newJob() instead of &Job{..., then change it where applicable.
read tcp4 127.0.0.1:4730: use of closed network connection
你好,我在使用github.com/mikespook/gearman-go/client包时遇到一个问题上面的报错,不知道是什么原因 ErrorHandler 会抛出上面的错误信息。
我在去掉defer c.Close()后,报错消失,但是我想添加完gearman task 后关闭链接。
func AddTask(data []byte) { c, err := client.New("tcp4", AppConfig.String("gearman::host")) if err != nil { log.Fatal(err) } defer c.Close() c.ErrorHandler = func(e error) { log.Println(e) // read tcp4 127.0.0.1:4730: use of closed network connection } handle, err := c.Do(AppConfig.String("gearman::func_callback"), data, client.JobLow, nil) if err != nil { log.Fatalln(err) } status, err := c.Status(handle) if err != nil { log.Fatalln(err) } log.Printf("%t", status) }
hi All, i got race condition on calling close worker function. i was try worker code example, but race condition reproduced also.o.
zainul@zainul-bahar:~/Documents/GOPATH/src/github.com/mikespook/gearman-go/example/worker$ ./worker
2019/05/27 13:30:48 Starting ...
^C==================
WARNING: DATA RACE
Read at 0x00c0000aa210 by main goroutine:
github.com/mikespook/gearman-go/worker.(*Worker).Close()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/worker.go:223 +0x7f
main.main()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/example/worker/worker.go:74 +0x78f
Previous write at 0x00c0000aa210 by goroutine 8:
github.com/mikespook/gearman-go/worker.(*Worker).Work()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/worker.go:200 +0x5e
Goroutine 8 (running) created at:
main.main()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/example/worker/worker.go:71 +0x704
==================
2019/05/27 13:30:52 read tcp4 127.0.0.1:41870->127.0.0.1:4730: use of closed network connection
==================
WARNING: DATA RACE
Write at 0x00c0000ae0a8 by main goroutine:
github.com/mikespook/gearman-go/worker.(*agent).Close()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/agent.go:128 +0xd3
github.com/mikespook/gearman-go/worker.(*Worker).Close()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/worker.go:225 +0xdf
main.main()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/example/worker/worker.go:74 +0x78f
Previous read at 0x00c0000ae0a8 by goroutine 7:
github.com/mikespook/gearman-go/worker.(*agent).disconnect_error()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/agent.go:114 +0x42
github.com/mikespook/gearman-go/worker.(*agent).work()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/agent.go:63 +0x4b5
Goroutine 7 (running) created at:
github.com/mikespook/gearman-go/worker.(*agent).Connect()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/agent.go:42 +0x5f7
github.com/mikespook/gearman-go/worker.(*Worker).Ready()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/worker/worker.go:178 +0x139
main.main()
/home/zainul/Documents/GOPATH/src/github.com/mikespook/gearman-go/example/worker/worker.go:67 +0x64d
==================
2019/05/27 13:30:52 Shutdown complete!
Found 2 data race(s)
[warn] Epoll ADD(1) on fd 572 failed. Old events were 0; read change was 1 (add); write change was 0 (none): Bad file descriptor
Hello,
I do need to use sleep
right after I send data back. I could as well just sleep in the worker's thread, but that would be ugly : response would be delayed, and that's not (at all!) what I want.
Using php-gearman, I am able to do this :
while (1) {
$ret = $worker->work();
sleep(1);
}
Would that be possible with gearman-go, too ? :)
还会继续开发吗?
可以产品化吗?
gearman有window版本?
the repo seems is removed by yourself?
Hello.
I've noticed that the worker-connection sometimes freezes.
The reason for this is an unhandled io.ErrShortWrite error caused inside agent.write().
The writer throws this error when a worker-go-routine writes "dtWorkComplete" just after the main-thread receives a "dtNoop" but before sending "dtPreSleep".
In other words:
dtNoop
dtWorkComplete
dtPreSleep
This causes the issue because the go-routine is missing a synclock inside worker.exec().
I've solved this by changing:
inpack.a.write(outpack)
into
inpack.a.Lock()
err := inpack.a.write(outpack)
inpack.a.Unlock()
This is a problem if you want to limit the amount of "functions" run concurrently, ie. if you want to run one job at a time.
What happens is the worker continuously accepts jobs from the server while only running running n at time (worker.New(n)
where n not equal to 0). The major problem with this is those jobs cannot be run by other workers, thus losing a lot of the power of gearman. Also pinging the server for job status will says all the locally queued jobs are running, when only n are (see example below).
Given this limitation, this code should not be used. I was unable to fix this in an elegant way as the nature of the implementation has a bunch of stuff going on concurrently and was unable to put in a nice fix to limit things.
I recommend simplifying everything so one worker has one connection to each gearman server and can only do one job at a time. If you need to run multiple functions at a time then create multiple workers
(echo status ; sleep 0.1) | nc 127.0.0.1 4730
on the console to see what is queued on the gearman server. You should seetest 100 0 0
.
This means 100 test jobs queued, 0 running, and 0 workers connected who can do this job.
(echo status ; sleep 0.1) | nc 127.0.0.1 4730
in another terminal. You should see something liketest 97 19 1
.
This means 97 jobs are still waiting to complete, 19 are running and there is 1 worker working.
When only one worker provides service, the worker is blocked. At this point, client continues to submit the job, runnning jobs more than worker!!!
Server status is as follows:
gearadmin --status
ToUpper 7 7 1
Hello
I want to know several problem as follows:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.