Comments (14)
help? anyone?
from nats-queue-worker.
Hi @avielb, thanks for your interest in OpenFaaS.
OpenFaaS has 1300 members in the Slack community and is operated for free on a voluntary basis by the community which means you won't always get an answer immediately. This does not mean that you are being ignored.
Please show some of your workings / console output that shows the max_inflight
variable is not being used by NATS Streaming?
In addition to expanding max_inflight
you'll also need to increase the max timeout window in ack_wait
such that ack_wait
is ack_wait / max_inflight
. So if the max timeout is 1m
and you move from 1
to 2
in-flight you'll need to increase the timeout to 2m
.
Also I am not sure why you've forked faas-netes? Can you confirm the contents of values.yml?
Alex
from nats-queue-worker.
I've confirmed this is also the case with Swarm...
I think it may be related to the handler/nats_queue.go
here
My settings:
queue-worker
queue-worker:
image: openfaas/queue-worker:0.7.1
networks:
- functions
environment:
max_inflight: "5"
ack_wait: "5m5s" # Max duration of any async task / request
basic_auth: "${BASIC_AUTH:-true}"
secret_mount_path: "/run/secrets/"
To test, I created a node function as follows:
module.exports = (context, callback) => {
setTimeout(() => {
callback(undefined, {status: JSON.stringify(context)});
}, 60000)
}
and the stack.yml
used:
functions:
test-inflight:
lang: node
handler: ./test-inflight
image: test-inflight:latest
environment:
write_timeout: 1m30s
read_timeout: 1m30s
Then, executed the following (added the query string so it was easier to see in the logs which test was executing):
curl -X POST -d "test 1" http://127.0.0.1:8080/async-function/test-inflight?test=test-1 \
curl -X POST -d "test 2" http://127.0.0.1:8080/async-function/test-inflight?test=test-2 \
curl -X POST -d "test 3" http://127.0.0.1:8080/async-function/test-inflight?test=test-3 \
curl -X POST -d "test 4" http://127.0.0.1:8080/async-function/test-inflight?test=test-4 \
curl -X POST -d "test 5" http://127.0.0.1:8080/async-function/test-inflight?test=test-5
Here is the func_queue-worker
logs during that run (cleaned a little for clarity):
func_queue-worker.1.u1xmk5m1cmom@pop-os | [#60] Received on [faas-request]: 'sequence:38 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"cc057564-8b52-4cc6-8de1-6c1fbde2e263\"],\"X-Start-Time\":[\"1551758004322162308\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-1\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004323487744 '
func_queue-worker.1.u1xmk5m1cmom@pop-os | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os | [#61] Received on [faas-request]: 'sequence:39 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"791de82a-fe2c-47bf-aa8c-2db30e8fb446\"],\"X-Start-Time\":[\"1551758004452033618\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-2\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004452323780 '
func_queue-worker.1.u1xmk5m1cmom@pop-os | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os | [#62] Received on [faas-request]: 'sequence:40 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"9ae9eabe-70dd-48fe-8342-a50f6fcfc693\"],\"X-Start-Time\":[\"1551758004535878496\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-3\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004536455507 '
func_queue-worker.1.u1xmk5m1cmom@pop-os | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os | [#63] Received on [faas-request]: 'sequence:41 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"50fbc064-6c32-476b-a1cc-cc8340298526\"],\"X-Start-Time\":[\"1551758004619819782\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-4\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004620498231 '
func_queue-worker.1.u1xmk5m1cmom@pop-os | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os | [#64] Received on [faas-request]: 'sequence:42 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"be0ea050-58e4-4203-a161-5da9d56c8aba\"],\"X-Start-Time\":[\"1551758004703327458\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-5\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004703918210 '
func_queue-worker.1.u1xmk5m1cmom@pop-os | Request for test-inflight.
you can see that the timestamps are about a minute apart indicating that the queue was being processed sequentially.
Scaling the func_queue-worker
to more replicas did result in multiple async functions executing in parallel, however, the max_inflight
parameter had no effect on how many were being executed (as expected. Each replica would handle 1 request)
from nats-queue-worker.
@burtonr thanks for taking a detailed look at this. Did you see any configuration issues?
Looking at git blame
- these files were heavily modified by @bartsmykla for the NATS reconnecting piece. Could there have been a regression introduced?
Alex
from nats-queue-worker.
I have spent a couple of hours on this and can reproduce the issue too.
@avielb can you try going back to an earlier queue worker and see if the issue still persists?
I'll ping @kozlovic
A few things I've tried:
- Setting stan values
nc, err := stan.Connect(
q.clusterID,
q.clientID,
stan.NatsURL(q.natsURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.natsURL)
q.reconnect()
}),
stan.PubAckWait(q.ackWait),
stan.MaxPubAcksInflight(q.maxInFlight),
)
- Setting the values directly in the
connect
method intypes.go
subscription, err := q.conn.QueueSubscribe(
q.subject,
q.qgroup,
q.messageHandler,
stan.DurableName(q.durable),
stan.AckWait(q.ackWait),
q.startOption,
stan.MaxInflight(q.maxInFlight),
)
Alex
from nats-queue-worker.
These are the changes that I've tried, but this didn't seem to resolve the issue. https://github.com/openfaas/nats-queue-worker/pull/58/files
from nats-queue-worker.
@alexellis To be clear, a single subscription has always and will always be invoked with a single message at a time. The MaxInflight simply means that the server can send to the library up to that number of messages without receiving an ACK, but the callback will be invoked with 1 message, then when callback returns, present the 2nd message (which will likely be already in the library), etc..
You can always process a message in a go routine if you want (but then all your ordering goes out the window) and need to be mindful of ack because with auto-ack (default behavior), the ACK is sent by the library when the callback returns.
from nats-queue-worker.
@alexellis What's the plan here? Should we update the queue-worker to use a go routine so allow for processing multiple messages asynchronously (with the understanding that there is no guarantee of order)?
or do we update our documentation stating that if you want multiple long-running tasks to be performed at the same time, you must scale the queue-worker service to the number of functions to invoke at a time?
or do we add auto-scaling to the queue-worker (somehow) to automatically scale the service to the number of requests/messages if the current queue workers are waiting?
I'd be willing to take on whichever solution you think would be best, or at least give them a try.
from nats-queue-worker.
This is surprising because I'm sure this is the behaviour explained to me by Peter or Brian when they were with the NATS team. I think I even saw it working this way with our code.
If whatever changes we've made have caused a regression in behaviour, or if it actually never worked that way then we should just update the documentation to state parallelism x N = queue-worker x N.
from nats-queue-worker.
@alexellis I can tell you that in NATS (and streaming), a single subscription will always dispatch messages one at a time. It has always been the case, and will probably always be. Most users want to have messages coming on a given subscription to be processed in order. If the library was dispatching in different go routines for every message on a subscription, ordering would not be possible.
from nats-queue-worker.
The OpenFaaS docs have been updated to explain that queue-worker
needs to scale to match the expected parallel invocations. There is now a new page Async in the Reference
section.
I'll close this issue, but feel free to continue to comment
from nats-queue-worker.
Derek close
from nats-queue-worker.
@alexellis can you tell what version exactly? I have tried it also with 0.5.4 and still the same behavior.
@burtonr that means that OpenFaaS queue workers won't be able to do more than 1 task at the time?
from nats-queue-worker.
@avielb That is correct. The nats-streaming will process the requests in the order they are received. If you need tasks executed in parallel, you can scale the nats-queue-worker service up to match the amount of tasks you want to be executed in parallel.
This is a feature of nats-streaming so that the order of execution will be maintained. The functions will still be invoked asynchronously though.
The OpenFaaS docs have been updated to explain this here: https://docs.openfaas.com/reference/async/
from nats-queue-worker.
Related Issues (20)
- [Research] Retries for certain HTTP codes HOT 4
- Move the queue-worker to Go modules HOT 4
- Async invocation callback follows 30x redirects but loses data on the way HOT 3
- [Feature Request] Asynchronous Concurrency Limiting HOT 41
- Feature request - don't log message contents HOT 6
- [Feature Request] CRD for Queue/Queue Worker
- Queue Worker does not gracefully shut down HOT 3
- Per-function ack_wait
- Posting function statistics to the gateway returns http status 401 HOT 1
- Dynamic max_inflight HOT 2
- Is JetStream available for OpenFaaS? HOT 5
- ghcr missing tag 0.13.0 HOT 1
- Path needs transformation before invoking via gateway_invoke HOT 6
- Upgrade nats-streaming client
- queue-worker crashes if callback happens and function call failed HOT 3
- TLSInsecure option required for callbacks HOT 1
- Show built version and SHA on start-up HOT 2
- Fix properties used by queue worker and add concurrent function invocation
- Remove durable config from the readme HOT 2
- Expose nats monitoring endpoints to prometheus HOT 8
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from nats-queue-worker.