Git Product home page Git Product logo

Comments (14)

avielb avatar avielb commented on June 28, 2024

help? anyone?

from nats-queue-worker.

alexellis avatar alexellis commented on June 28, 2024

Hi @avielb, thanks for your interest in OpenFaaS.

OpenFaaS has 1300 members in the Slack community and is operated for free on a voluntary basis by the community which means you won't always get an answer immediately. This does not mean that you are being ignored.

Please show some of your workings / console output that shows the max_inflight variable is not being used by NATS Streaming?

In addition to expanding max_inflight you'll also need to increase the max timeout window in ack_wait such that ack_wait is ack_wait / max_inflight. So if the max timeout is 1m and you move from 1 to 2 in-flight you'll need to increase the timeout to 2m.

Also I am not sure why you've forked faas-netes? Can you confirm the contents of values.yml?

Alex

from nats-queue-worker.

burtonr avatar burtonr commented on June 28, 2024

I've confirmed this is also the case with Swarm...
I think it may be related to the handler/nats_queue.go here

My settings:

queue-worker

    queue-worker:
        image: openfaas/queue-worker:0.7.1
        networks:
            - functions
        environment:
            max_inflight: "5"
            ack_wait: "5m5s" # Max duration of any async task / request
            basic_auth: "${BASIC_AUTH:-true}"
            secret_mount_path: "/run/secrets/"

To test, I created a node function as follows:

module.exports = (context, callback) => {
    setTimeout(() => {
        callback(undefined, {status: JSON.stringify(context)});
    }, 60000)
}

and the stack.yml used:

functions:
  test-inflight:
    lang: node
    handler: ./test-inflight
    image: test-inflight:latest
    environment:
      write_timeout: 1m30s
      read_timeout: 1m30s

Then, executed the following (added the query string so it was easier to see in the logs which test was executing):

curl -X POST -d "test 1" http://127.0.0.1:8080/async-function/test-inflight?test=test-1 \
  curl -X POST -d "test 2" http://127.0.0.1:8080/async-function/test-inflight?test=test-2 \
  curl -X POST -d "test 3" http://127.0.0.1:8080/async-function/test-inflight?test=test-3 \
  curl -X POST -d "test 4" http://127.0.0.1:8080/async-function/test-inflight?test=test-4 \
  curl -X POST -d "test 5" http://127.0.0.1:8080/async-function/test-inflight?test=test-5

Here is the func_queue-worker logs during that run (cleaned a little for clarity):

func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#60] Received on [faas-request]: 'sequence:38 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"cc057564-8b52-4cc6-8de1-6c1fbde2e263\"],\"X-Start-Time\":[\"1551758004322162308\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-1\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004323487744 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#61] Received on [faas-request]: 'sequence:39 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"791de82a-fe2c-47bf-aa8c-2db30e8fb446\"],\"X-Start-Time\":[\"1551758004452033618\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-2\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004452323780 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#62] Received on [faas-request]: 'sequence:40 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"9ae9eabe-70dd-48fe-8342-a50f6fcfc693\"],\"X-Start-Time\":[\"1551758004535878496\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-3\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004536455507 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#63] Received on [faas-request]: 'sequence:41 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"50fbc064-6c32-476b-a1cc-cc8340298526\"],\"X-Start-Time\":[\"1551758004619819782\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-4\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004620498231 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.
...
func_queue-worker.1.u1xmk5m1cmom@pop-os    | [#64] Received on [faas-request]: 'sequence:42 subject:"faas-request" data:"{\"Header\":{\"Accept\":[\"*/*\"],\"Content-Length\":[\"34\"],\"Content-Type\":[\"application/x-www-form-urlencoded\"],\"User-Agent\":[\"curl/7.58.0\"],\"X-Call-Id\":[\"be0ea050-58e4-4203-a161-5da9d56c8aba\"],\"X-Start-Time\":[\"1551758004703327458\"]},\"Host\":\"127.0.0.1:8080\",\"Body\":\"dGVzdCAxJnRlc3QgMiZ0ZXN0IDMmdGVzdCA0JnRlc3QgNQ==\",\"Method\":\"POST\",\"Path\":\"\",\"QueryString\":\"test=test-5\",\"Function\":\"test-inflight\",\"CallbackUrl\":null}" timestamp:1551758004703918210 '
func_queue-worker.1.u1xmk5m1cmom@pop-os    | Request for test-inflight.

you can see that the timestamps are about a minute apart indicating that the queue was being processed sequentially.

Scaling the func_queue-worker to more replicas did result in multiple async functions executing in parallel, however, the max_inflight parameter had no effect on how many were being executed (as expected. Each replica would handle 1 request)

from nats-queue-worker.

alexellis avatar alexellis commented on June 28, 2024

@burtonr thanks for taking a detailed look at this. Did you see any configuration issues?

Looking at git blame - these files were heavily modified by @bartsmykla for the NATS reconnecting piece. Could there have been a regression introduced?

Alex

from nats-queue-worker.

alexellis avatar alexellis commented on June 28, 2024

I have spent a couple of hours on this and can reproduce the issue too.

@avielb can you try going back to an earlier queue worker and see if the issue still persists?

I'll ping @kozlovic

A few things I've tried:

  • Setting stan values
	nc, err := stan.Connect(
		q.clusterID,
		q.clientID,
		stan.NatsURL(q.natsURL),
		stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
			log.Printf("Disconnected from %s\n", q.natsURL)

			q.reconnect()
		}),

		stan.PubAckWait(q.ackWait),
		stan.MaxPubAcksInflight(q.maxInFlight),
	)
  • Setting the values directly in the connect method in types.go
	subscription, err := q.conn.QueueSubscribe(
		q.subject,
		q.qgroup,
		q.messageHandler,
		stan.DurableName(q.durable),
		stan.AckWait(q.ackWait),
		q.startOption,
		stan.MaxInflight(q.maxInFlight),
	)

Alex

from nats-queue-worker.

alexellis avatar alexellis commented on June 28, 2024

These are the changes that I've tried, but this didn't seem to resolve the issue. https://github.com/openfaas/nats-queue-worker/pull/58/files

from nats-queue-worker.

kozlovic avatar kozlovic commented on June 28, 2024

@alexellis To be clear, a single subscription has always and will always be invoked with a single message at a time. The MaxInflight simply means that the server can send to the library up to that number of messages without receiving an ACK, but the callback will be invoked with 1 message, then when callback returns, present the 2nd message (which will likely be already in the library), etc..
You can always process a message in a go routine if you want (but then all your ordering goes out the window) and need to be mindful of ack because with auto-ack (default behavior), the ACK is sent by the library when the callback returns.

from nats-queue-worker.

burtonr avatar burtonr commented on June 28, 2024

@alexellis What's the plan here? Should we update the queue-worker to use a go routine so allow for processing multiple messages asynchronously (with the understanding that there is no guarantee of order)?

or do we update our documentation stating that if you want multiple long-running tasks to be performed at the same time, you must scale the queue-worker service to the number of functions to invoke at a time?

or do we add auto-scaling to the queue-worker (somehow) to automatically scale the service to the number of requests/messages if the current queue workers are waiting?

I'd be willing to take on whichever solution you think would be best, or at least give them a try.

from nats-queue-worker.

alexellis avatar alexellis commented on June 28, 2024

This is surprising because I'm sure this is the behaviour explained to me by Peter or Brian when they were with the NATS team. I think I even saw it working this way with our code.

If whatever changes we've made have caused a regression in behaviour, or if it actually never worked that way then we should just update the documentation to state parallelism x N = queue-worker x N.

from nats-queue-worker.

kozlovic avatar kozlovic commented on June 28, 2024

@alexellis I can tell you that in NATS (and streaming), a single subscription will always dispatch messages one at a time. It has always been the case, and will probably always be. Most users want to have messages coming on a given subscription to be processed in order. If the library was dispatching in different go routines for every message on a subscription, ordering would not be possible.

from nats-queue-worker.

burtonr avatar burtonr commented on June 28, 2024

The OpenFaaS docs have been updated to explain that queue-worker needs to scale to match the expected parallel invocations. There is now a new page Async in the Reference section.

I'll close this issue, but feel free to continue to comment

from nats-queue-worker.

burtonr avatar burtonr commented on June 28, 2024

Derek close

from nats-queue-worker.

avielb avatar avielb commented on June 28, 2024

@alexellis can you tell what version exactly? I have tried it also with 0.5.4 and still the same behavior.
@burtonr that means that OpenFaaS queue workers won't be able to do more than 1 task at the time?

from nats-queue-worker.

burtonr avatar burtonr commented on June 28, 2024

@avielb That is correct. The nats-streaming will process the requests in the order they are received. If you need tasks executed in parallel, you can scale the nats-queue-worker service up to match the amount of tasks you want to be executed in parallel.

This is a feature of nats-streaming so that the order of execution will be maintained. The functions will still be invoked asynchronously though.

The OpenFaaS docs have been updated to explain this here: https://docs.openfaas.com/reference/async/

from nats-queue-worker.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.