Git Product home page Git Product logo

messenger-enqueue-transport's Introduction

Enqueue's transport for Symfony Messenger component

This Symfony Messenger transport allows you to use Enqueue to send and receive your messages from all the supported brokers.

Usage

  1. Install the transport
composer req sroze/messenger-enqueue-transport
  1. Configure the Enqueue bundle as you would normaly do (see Enqueue's Bundle documentation). If you are using the recipes, you should just have to configure the environment variables to configure the default Enqueue transport:
# .env
# ...

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###
  1. Configure Messenger's transport (that we will name amqp) to use Enqueue's default transport:
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default
  1. Route the messages that have to go through the message queue:
# config/packages/messenger.yaml
framework:
    messenger:
        # ...

        routing:
            'App\Message\MyMessage': amqp
  1. Consume!
bin/console messenger:consume amqp

Advanced usage

Configure the queue(s) and exchange(s)

In the transport DSN, you can add extra configuration. Here is the common reference DSN (note that the values are just for the example):

enqueue://default
    ?queue[name]=queue_name
    &topic[name]=topic_name
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

Setting Custom Configuration on your Message

Each Enqueue transport (e.g. amqp, redis, etc) has its own message object that can normally be configured by calling setter methods (e.g. $message->setDeliveryDelay(5000)). But in Messenger, you don't have access to these objects directly. Instead, you can set them indirectly via the TransportConfiguration stamp:

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

// create your message like normal
$message = // ...

$transportConfig = (new TransportConfiguration())
    // commmon options have a convenient method
    ->setDeliveryDelay(5000)

    // other transport-specific options are set via metadata
    // example custom option for AmqpMessage
    // each "metadata" will map to a setter on your message
    // will result in setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT)
    // being called
    ->addMetadata('deliveryMode', AmqpMessage::DELIVERY_MODE_PERSISTENT)
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

Send a message on a specific topic

You can send a message on a specific topic using TransportConfiguration envelope item with your message:

use Symfony\Component\Messenger\Envelope;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

// ...

$transportConfig = (new TransportConfiguration())
    ->setTopic('specific-topic')
;

$bus->dispatch((new Envelope($message))->with($transportConfig));

Use AMQP topic exchange

See https://www.rabbitmq.com/tutorials/tutorial-five-php.html

You can use specific topic and queue options to configure your AMQP exchange in topic mode and bind it:

enqueue://default
    ?queue[name]=queue_name
    &queue[bindingKey]=foo.#
    &topic[name]=topic_name
    &topic[type]=topic
    &deliveryDelay=1800
    &delayStrategy=Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy
    &timeToLive=3600
    &receiveTimeout=1000
    &priority=1

Here is the way to send a message with a routing key matching this consumer:

$bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'topic_name',
    'metadata' => [
        'routingKey' => 'foo.bar'
    ]
])));

Configure custom Kafka message

Here is the way to send a message with with some custom options:

$this->bus->dispatch((new Envelope($message))->with(new TransportConfiguration([
    'topic' => 'test_topic_name',
    'metadata' => [
        'key' => 'foo.bar',
        'partition' => 0,
        'timestamp' => (new \DateTimeImmutable())->getTimestamp(),
        'messageId' => uniqid('kafka_', true),
    ]
])))

messenger-enqueue-transport's People

Contributors

andrewmy avatar andreybolonin avatar britaliope avatar devrck avatar dominikhajduk avatar drgomesp avatar dzianis-hrynko avatar ekkinox avatar fractalzombie avatar gnucki avatar hpatoio avatar keulinho avatar konstantincodes avatar lukascellar avatar makasim avatar nyholm avatar ogizanagi avatar samnela avatar scarbo87 avatar shulard avatar soyuka avatar sroze avatar weaverryan avatar xavismeh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

messenger-enqueue-transport's Issues

Use AMQP topic exchange

Hi,

For several days i've been trying to figure how to configure enqueue to work with the an AMQP Topic exchange.
According to the given documentation, here is the messenger.yml config file:

framework:
    messenger:
        transports:
            amqp_book_search: enqueue://default
                ?queue[name]=foo
                &topic[name]=bar
        routing:
            # Route your messages to the transports
            'App\Message\SearchForBookInformationMessage': baz

Quick look inside the controller file :

      public function search($isbn) {
            // Wrap the query inside a Message
            $message = new SearchForBookInformationMessage(
                array(
                    'isbn' => $isbn
                ));
            $query = new Envelope($message);
            
            $query->with(new TransportConfiguration(
                [
                    'topic' => 'bar',
                    'metadata' => [
                        'routingKey' => 'foo.#'
                    ]
                ]
            ));
            
            $this->messageBus->dispatch($message);
            // ....
      }

But when I try to do so, I get the following error :

In QueueInteropTransportFactory.php line 92:
  [RuntimeException]                                                                                           
  Can't find Enqueue's transport named "default ": Service "enqueue.transport.default .context" is not found.  

What am I doing wrong here ?
Does anyone have a working use case for this feature ?

Cheers,
Pxl.

Bugs in messenger-adapter

Hi, i tried to use messenger-adapter and found few bugs:

  1. QueueInteropTransport::send dont implement TransportInterface:
    Compile Error: Declaration of Enqueue\\MessengerAdapter\\QueueInteropTransport::send($message): void must be compatible with Symfony\\Component\\Messenger\\Transport\\SenderInterface::send(Symfony\\Component\\Messenger\\Envelope $envelope)
  2. If options deliveryDelay is true, must be call producer::setDelayStrategy method

Delay stamps not supported

I'm using Symfony's Messenger component, going through this interop layer and finally going through Enqueue.

It seems like envelopes are serialized as-is, without inspecting their stamps.. The DelayStamp is not taken into account at all when creating the interop message.

Now, it may probably be easy to loop through the original envelope and try to call $interopMessage->setDelay() when a DelayStamp is encountered. However it doesn't appear like this message delay thing is part of the interop Message interface.

So there's a need to either extend the interface or to do some method_exists() hacks on it.

Not supporting delay stamps is a complete dealbreaker for using this interop layer. If messages cannot be retried in a sensible manner (that is, with a delay), there's usually no point in retrying (10 retries within the same second usually leads to 10 failures).

"enqueue.locator" service does not exists

Hi there.
I followed this code example.
https://stefanoalletti.wordpress.com/2018/11/05/from-rabbitmq-to-phpenqueue-via-symfony-messenger/

When I try to run "php bin/console messenger:consume mailer" command I got following error.

In CheckExceptionOnInvalidReferenceBehaviorPass.php line 86:
The service "enqueue.messenger_transport.factory" has a dependency on a non-existent service "enqueue.locator".

I don't know why enqueue locator is missing. Can anyone help me with this issue?

Thanks.

Symfony 4.3 - No transport supports the given Messenger DSN "enqueue://default"

Im running my command

bin/console messenger:consume amqp

But Im getting this error

In TransportFactory.php line 42:

No transport supports the given Messenger DSN "enqueue://default".

Everything worked perfect in Symfony 4.2

# messenger.yaml
framework:
    messenger:
        transports:
            amqp: enqueue://default

        routing:
            '*': amqp
# enqueue.yaml (with my database connection)
enqueue:
    default:
        transport: '%env(resolve:DATABASE_URL)%'
        client: ~
"enqueue/dbal": "^0.9.8",
"enqueue/enqueue-bundle": "^0.9.8",
"sroze/messenger-enqueue-transport": "^0.3.0",
"symfony/messenger": "4.3.*",

Topic and Queue names required for simple queues (like SQS)

Took a while to figure out what was going on as we're trying to use Symfony Messenger with SQS.

SQS doesn't really have the need for topics as we're putting things into a specific queue and reading them back out of it.

We had setup our config like so:

enqueue:
    transport:
        default: 'sqs'
        sqs:
            key: "%env(SQS_KEY)%"
            secret: "%env(SQS_SECRET)%"
            token: null
            region: 'ap-southeast-1'
            version: '2012-11-05'

framework:
    messenger:
        transports:
            sqs: "enqueue://default?queue[name]=email_queue"
        routing:
            'App\Component\Mail\Message': sqs

But when trying to send to the queue we were getting messages from SQS that the queue didn't exist.

Some debugging later we found the queue was being set to "messages". Looking into the code we found this https://github.com/php-enqueue/messenger-adapter/blob/master/QueueInteropTransport.php#L109

In the SQS library both createTopic and createQueue do the same thing, create an SqsDestination with the given topic/queue name as the queue to use.

In order to make our code work we had to update our config to this:

            sqs: "enqueue://default?topic[name]=email_queue&queue[name]=email_queue"

Both are needed as while the send method uses the topic, the receive method uses the queue name instead.

This kind of setup makes sense from a RabbitMQ perspective, but seems a bit odd for something like SQS.

I was wondering if it would make more sense that if the topic name is not set then it uses the queue name instead (currently they both default to messages)? If not, could it be documented that this might be required as we couldn't find any reference to needing to do this.

Allow passing arbitrary metadata through the transport dsn

It would be interesting if we were able to add arbitrary metadata to a transport's messages through the configuration.

Something like:

transports:
            my_transport: 'enqueue://sqs?topic[name]=topic&metadata[messageGroupId]=my-group-id'

then that metadata could be merged with the one that comes from TransportConfiguration stamps when sending the message

This way it would be easier to make it work with more transports and different configurations, like with SQS FIFO queues (#40), for instance.

[RabbitMQ][Symfony]

i have this weird error with symfony/messenger v4.3.1 and sroze/messenger-enqueue-transport v0.3. it was working with symfony/messenger v4.2 and sroze/messenger-enqueue-transport v0.2. looks like the problem raises in acking a message.

21:42:57 INFO      [messenger] Received message "App\Messenger\Message\BidMessage" ["message" => App\Messenger\Message\BidMessage^ { โ€ฆ},"class" => "App\Messenger\Message\BidMessage"]
21:42:57 INFO      [messenger] Sending message "App\Messenger\Message\Notification\ParticipantNotificationMessage" with "Enqueue\MessengerAdapter\QueueInteropTransport" ["message" => App\Messenger\Message\Notification\ParticipantNotificationMessage^ { โ€ฆ},"class" => "App\Messenger\Message\Notification\ParticipantNotificationMessage","sender" => "Enqueue\MessengerAdapter\QueueInteropTransport"]
21:42:57 INFO      [messenger] Message "App\Messenger\Message\BidMessage" handled by "App\Messenger\Handler\BidHandler::__invoke" ["message" => App\Messenger\Message\BidMessage^{ โ€ฆ},"class" => "App\Messenger\Message\BidMessage","handler" => "App\Messenger\Handler\BidHandler::__invoke"]
21:42:57 INFO      [messenger] App\Messenger\Message\BidMessage was handled successfully (acknowledging to transport). ["message" => App\Messenger\Message\BidMessage^ { โ€ฆ},"class" => "App\Messenger\Message\BidMessage"]

In ClientMethods.php line 998:

  [Bunny\Exception\ClientException (406)]
  PRECONDITION_FAILED - unknown delivery tag 0


Exception trace:
 () at /code/vendor/bunny/bunny/src/Bunny/ClientMethods.php:998
 Bunny\AbstractClient->awaitExchangeDeclareOk() at /code/vendor/bunny/bunny/src/Bunny/ClientMethods.php:958
 Bunny\AbstractClient->exchangeDeclare() at /code/vendor/bunny/bunny/src/Bunny/ChannelMethods.php:47
 Bunny\Channel->exchangeDeclare() at /code/vendor/enqueue/amqp-bunny/AmqpContext.php:161
 Enqueue\AmqpBunny\AmqpContext->declareTopic() at /code/vendor/sroze/messenger-enqueue-transport/AmqpContextManager.php:64
 Enqueue\MessengerAdapter\AmqpContextManager->ensureExists() at /code/vendor/sroze/messenger-enqueue-transport/QueueInteropTransport.php:65
 Enqueue\MessengerAdapter\QueueInteropTransport->get() at /code/vendor/symfony/messenger/Worker.php:90
 Symfony\Component\Messenger\Worker->run() at /code/vendor/symfony/messenger/Worker/StopWhenRestartSignalIsReceived.php:54
 Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived->run() at /code/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:228
 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /code/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /code/vendor/symfony/console/Application.php:939
 Symfony\Component\Console\Application->doRunCommand() at /code/vendor/symfony/framework-bundle/Console/Application.php:87
 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /code/vendor/symfony/console/Application.php:273
 Symfony\Component\Console\Application->doRun() at /code/vendor/symfony/framework-bundle/Console/Application.php:73
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /code/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /code/bin/console:38

composer info:

enqueue/amqp-bunny                  0.9.11             Message Queue Amqp Transport
enqueue/amqp-tools                  0.9.8              Message Queue Amqp Tools
enqueue/dsn                         0.9.2              Parse DSN
enqueue/enqueue                     0.9.11             Message Queue Library
enqueue/enqueue-bundle              0.9.10             Message Queue Bundle
enqueue/null                        0.9.2              Enqueue Null transport
sroze/messenger-enqueue-transport   0.3.0              Enqueue adapter for Symfony Messenger component
symfony/framework-bundle            v4.3.1             Symfony FrameworkBundle
symfony/messenger                   v4.3.1             Symfony Messenger Component
kunicmarko/jms-messenger-adapter    dev-master 3aa771b Use JMS Serializer with Symfony Messenger.

Is there a plan to release a new version before Symfony 4.2 release ?

Hello guys !

Since the Symfony 4.2 as entered RC phase and the current stable version of this package is not compatible with it, do you plan to release a new version before the official release of Symfon 4.2 ?

Thanks

PS: and if you need some hands to help here, I'll be glad to give some time ๐Ÿ˜„.

Default receive timeout not compatible with SQS

Just to let you know that the default receive timeout of 30 seconds introduced in version 0.2.0:

if (null === ($interopMessage = $this->getConsumer()->receive($this->options['receiveTimeout'] ?? 30000))) {

Is incompatible with SQS (max 20 seconds):

https://github.com/php-enqueue/sqs/blob/b203de327b4f5045717b9420de071a19c0f78afc/SqsConsumer.php#L88

Not really a big deal but wasn't sure if 30 seconds was picked for a specific or arbitrary reason.

We're fixing our setup by setting the receive timeout, which is probably the best solution anyway, but thought you might consider updating the 30 seconds to 20 just to avoid confusion.

[question] How to consume messages from Cloud Pub Sub?

My message/command CreateStore is properly send to Cloud Pub Sub like this :

use App\Message\Command\Store\CreateStore;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

$command = new CreateStore();
$this->commandBus->dispatch((new Envelope($command))->with(new TransportConfiguration(
    ['topic' => 'enqueue.commands']
)));

All message/command are in my Cloud Pub Sub Queue, I can see them (gcloud pubsub subscriptions pull enqueue.commands) and acknowledge them manual via the gcloud Command-Line Tool.

Now I trying to consume my message by running bin/console messenger:consume-messages enqueue. The consumer run but nothing happening.

What I'm missing to consume my message?

Here are my config files :

service.yaml

framework:
    messenger:
        transports:
            default: 'amqp://guest:guest@localhost:5672/%2f/messages'
            enqueue: 'enqueue://gps'
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands: ~
            messenger.bus.events: ~
        routing:
            # Route your messages to the transports
            'App\Message\Command\Store\CreateStore': enqueue

enqueue.yaml

enqueue:
    transport:
        default: 'gps'
        gps:
            projectId: '%env(GOOGLE_PROJECT_ID)%'
            keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'

Enqueue\MessengerAdapter\QueueInteropTransport internal OptionsResolver failure since 'transport_name' introduction

Since this Symfony's FrameworkBundle PR was merged (4.3.3):
symfony/framework-bundle@5205108

The internal option resolver of https://github.com/sroze/messenger-enqueue-transport/blob/master/QueueInteropTransport.php makes the constructor fail because the key transport_name (introduced in commit linked before) is not handled by the option resolver.

Error details:

In OptionsResolver.php line 796:

  [Symfony\Component\OptionsResolver\Exception\UndefinedOptionsException]
  The option "transport_name" does not exist. Defined options are: "delayStrategy", "deliveryDelay", "priority", "queue", "receiveTimeout", "timeToLive", "topic".


Exception trace:
 () at /var/www/html/vendor/symfony/options-resolver/OptionsResolver.php:796
 Symfony\Component\OptionsResolver\OptionsResolver->resolve() at /var/www/html/vendor/sroze/messenger-enqueue-transport/QueueInteropTransport.php:60
 Enqueue\MessengerAdapter\QueueInteropTransport->__construct() at /var/www/html/vendor/sroze/messenger-enqueue-transport/QueueInteropTransportFactory.php:60
 Enqueue\MessengerAdapter\QueueInteropTransportFactory->createTransport() at /var/www/html/vendor/symfony/messenger/Transport/TransportFactory.php:40
 Symfony\Component\Messenger\Transport\TransportFactory->createTransport() at /var/www/html/var/cache/dev/ContainerCW1VQ2l/getMessenger_Transport_PublicationsService.php:23
 require() at /var/www/html/var/cache/dev/ContainerCW1VQ2l/srcApp_KernelDevDebugContainer.php:441
 ContainerCW1VQ2l\srcApp_KernelDevDebugContainer->load() at /var/www/html/vendor/symfony/dependency-injection/Container.php:433
 Symfony\Component\DependencyInjection\Container->getService() at /var/www/html/vendor/symfony/dependency-injection/Argument/ServiceLocator.php:40
 Symfony\Component\DependencyInjection\Argument\ServiceLocator->get() at /var/www/html/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:184
 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /var/www/html/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /var/www/html/vendor/symfony/console/Application.php:939
 Symfony\Component\Console\Application->doRunCommand() at /var/www/html/vendor/symfony/framework-bundle/Console/Application.php:87
 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /var/www/html/vendor/symfony/console/Application.php:273
 Symfony\Component\Console\Application->doRun() at /var/www/html/vendor/symfony/framework-bundle/Console/Application.php:73
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /var/www/html/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /var/www/html/bin/console:42

messenger:consume [-l|--limit LIMIT] [-m|--memory-limit MEMORY-LIMIT] [-t|--time-limit TIME-LIMIT] [--sleep SLEEP] [-b|--bus BUS] [-h|--help] [-q|--quiet] [-v|vv|vvv|--verbose] [-V|--version] [--ansi] [--no-ansi] [-n|--no-interaction] [-e|--env ENV] [--no-debug] [--] <command> [<receivers>...]

What is the proper approach ?

  • make a PR to unset($options['transport_name']); like done here: symfony/messenger@326990f
  • or should we extend the configureOptions() method to add support of the transport_name key (null || string) ?

According to your preference, I can PR if needed.

Best regards.

TransportConfiguration::setDeliveryDelay is not working

Expected result: deliveryDelay from TransportConfiguration is used over deliveryDelay specified in transport's options.

Actual result:

Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException: Missing "setDeliveryDelay" setter for "deliveryDelay" metadata key in "Double\Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage\P1"

Sorry for not providing a PR, this test can be pasted directly to QueueInteropTransportTest

    public function testDeliveryDetail()
    {
        $topicName = 'topic';
        $queueName = 'queue';
        $message = new \stdClass();
        $message->foo = 'bar';
        $envelope = (new Envelope($message))->with(new TransportConfiguration(array(
            'metadata' => array('routingKey' => 'foo.bar', 'deliveryDelay' => 1000),
        )));

        $psrMessageProphecy = $this->prophesize(DecoratedPsrMessage::class);
        $psrMessageProphecy->setRoutingKey('foo.bar')->shouldBeCalled();
        $psrMessage = $psrMessageProphecy->reveal();
        $topicProphecy = $this->prophesize(Topic::class);
        $topic = $topicProphecy->reveal();

        $producerProphecy = $this->prophesize(Producer::class);
        $producerProphecy->send($topic, $psrMessage)->shouldBeCalled();

        $contextProphecy = $this->prophesize(Context::class);
        $contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic);
        $contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal());
        $contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage);

        $contextManagerProphecy = $this->prophesize(ContextManager::class);
        $contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal());
        $contextManagerProphecy->ensureExists(array(
            'topic' => $topicName,
            'topicOptions' => array('name' => $topicName, 'foo' => 'bar'),
            'queue' => $queueName,
            'queueOptions' => array('name' => $queueName, 'bar' => 'foo'),
        ))->shouldBeCalled();

        $encoderProphecy = $this->prophesize(SerializerInterface::class);
        $encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo'));

        $transport = $this->getTransport(
            $encoderProphecy->reveal(),
            $contextManagerProphecy->reveal(),
            array(
                'topic' => array('name' => $topicName, 'foo' => 'bar'),
                'queue' => array('name' => $queueName, 'bar' => 'foo'),
            ),
            true
        );

        $transport->send($envelope);
    }

Help instruct to install serializer when it's missing

If you currently only install this adapter, but do not install the serializer, you'll get this error at container build time:

The service "enqueue.messenger_transport.factory" has a dependency on a non-existent service
"messenger.transport.serializer

Unless you're doing something really custom, the issue will always be the same: you haven't installed the serializer. I think we could add a compiler pass that checks for the messenger.transport.serializer service and throws an exception if it's missing (this is similar to what FrameworkExtension does):

The Messenger serializer transport is missing. Try enabling it or running "composer require symfony/serializer-pack".'

Need persistent messages with RabbitMQ

I do not see a way to have persistent messages with RabbitMQ. This requires instantiating the following AMQPMessage:

new \PhpAmqpLib\Message\AMQPMessage($body, ['delivery_mode' => 2]);

I cannot see a way to make that happen with existing code.

If I were to implement this, it seems like adding another option to QueueInteropTransport would be the way to do this. Usually, if you are going to persist messages you are going to persist all the messages in a queue and not just selective ones.

Question about logging message consumption errors.

Hello !

I've just discovered the new symfony/messenger ecosystem which seems pretty exciting. I've an issue (more a question than a problem) about the QueueInteropTransport implementation.

I'm trying to log message decoding error during the development phase but I can't find a clean way to do this. It seems related to the try {} catch {} used inside the QueueInteropTransport object.

try {
    $handler($this->decoder->decode(array(
        'body' => $message->getBody(),
        'headers' => $message->getHeaders(),
        'properties' => $message->getProperties(),
    )));

    $consumer->acknowledge($message);
} catch (RejectMessageException $e) {
    $consumer->reject($message);
} catch (RequeueMessageException $e) {
    $consumer->reject($message, true);
} catch (\Throwable $e) {
    $consumer->reject($message);
}

Is there a way to log exception messages somewhere using configuration or by extending / decorating an object ? It seems that Exception messages are never sent outside of that bloc of code. I've tried most of the solution that I know today.

Actually, I'm using Symfony 4.1-BETA3 with the messenger-adapter, and the enqueue/redis connector.

Send a message to a specific queue

Hi! I use RabbitMQ like amqp transport, my configuration:

enqueue:
    transport:
        default: "amqp://guest:guest@rabbitmq:5672/%2f"

framework:
    messenger:
        transports:
             amqp: enqueue://default

I don't understand, how I can send different messages to different RabbitMQ queues through single bus? Could I use TransportConfiguration or it is only for topics? It is not clear in the documentation.
Thanks!

Add --time-limit doesn't work for enqueue sqs

Hi,

When i use enqueue sqs the --time-limit doesn't work but it work when i use amqp.

php bin/console messenger:consume-messages feed --time-limit=10

Here my configuration

        "enqueue/messenger-adapter": "^0.1.2",
        "enqueue/sqs": "^0.8.41",
framework:
    messenger:
        default_bus: app.messenger.command_bus
        transports:
            feed: "%env(ENQUEUE_DSN)%?topic[name]=feed&queue[name]=feed"

       routing:
            App\Message\FeedMessage: 'feed'
        buses:
            app.messenger.command_bus:
                middleware:
                    - messenger.middleware.validation
                    - App\Middleware\RetryQueueMiddleware
            app.messenger.retry_bus: ~


enqueue:
    client: ~
    transport:
        default: 'sqs'
        sqs:
            key: "%env(AWS_KEY)%"
            secret: "%env(AWS_SECRET)%"
            region: "%env(AWS_REGION)%"
###> enqueue/sqs ###
ENQUEUE_DSN=enqueue://default
###< enqueue/sqs ###

Thanks for your help.

Messages sent twice using Messenger component when using multiple routes

Initially thought it's an issue with Messenger itself.

I'm using enqueue with Kafka and messenger adapter to send messages to external services. Here's my messenger.yaml:

framework:
  messenger:
    transports:
    # Uncomment the following line to enable a transport named "amqp"
      amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
      view_events: '%env(MESSENGER_TRANSPORT_DSN)%?topic[name]=view_events&queue[name]=view_events'

    routing:
      # Route your messages to the transports
      'App\Message\DisplayNotification':
        senders: [ view_events ]
        send_and_handle: true
      '*':
        senders: [ amqp ]
        send_and_handle: true

Notice: send_and_handle are only for debugging atm.

Related packages (and Symfony 4.2):

enqueue/amqp-tools                0.9.4    Message Queue Amqp Tools
enqueue/dsn                       0.9.2    Parse DSN
enqueue/enqueue                   0.9.6    Message Queue Library
enqueue/enqueue-bundle            0.9.3    Message Queue Bundle
enqueue/messenger-adapter         0.2.0    Enqueue adapter for Symfony Messenger component
enqueue/null                      0.9.2    Enqueue Null transport
enqueue/rdkafka                   0.9.2    Message Queue Kafka Transport

And enqueue.yaml:

enqueue:
    default:
        transport:
            dsn: "kafka:"
            global:
                metadata.broker.list: '%env(KAFKA_SERVERS)%'
            topic:
                auto.offset.reset: beginning
            commit_async: true
        client: ~

I'm emitting messages using:

public function addEvent(Request $request, string $hash, $selectedEntry): void
{
    $event = DisplayNotification::createFromRequest($request, $hash, $selectedEntry);
    $this->bus->dispatch($event);
}

This causes messages to be received twice. Once for amqp and once for view_events transports. When looking for the reason why this is happening I added send_and_handle settings and realized:

  1. Second message wraps the first one.
  2. I cannot use normal handlers, because Envelope returned from SendMessageMiddleware contains RdKafkaMessage instead of expected DisplayNotification.

Looking through SenderInterface I noticed that it's implementation QueueInteropTransport expects to receive and return an Envelope. However, envelope returned is a new instance of Envelope wrapping previous Envelope as message. I believe it should be the previous instance instead?

send method exerpt below:

public function send(Envelope $message): Envelope
{
   ...
   $encodedMessage = $this->serializer->encode($message);

   $originalMessage = $message;
   $message = $context->createMessage(
   ...

   return $message; // Should be $originalMessage?

When $originalMessage is returned, Messenger component behaves as expected: send message only once and handlers receive expected objects.

I'll create pull request in a moment, but do you see any issues with this change? Will it break anything in enqueue itself?

New version

Hi @sroze ,
Do you think it would be possible to have a version with latest commits?

SQS + Symfony 4.3 deletion fail

Hi,

The deletion of Amazon SQS messages is not working anymore with the new QueueInteropTransport inherited from Symfony 4.3.

Interop\Queue\Message\SqsMessage need to have the attribute receiptHandle defined in order to be deleted.

This worked before 4.3 because the same object was received and sent by the transport but now, as the messages are reconstructed with QueueInteropTransport::encodeMessage they loose this attribute.

Do you have any idea how this could be fixed ?

Thanks.

Amazon SQS FIFO queues

Hi,

when sending messages to FIFO queues parameter MessageGroupId is required.

How to configure transport for .fifo queue?
enqueue.yaml

enqueue:
    default:
        transport:  "%env(ENQUEUE_DSN)%"
        client: ~

messenger.yaml

framework:
    messenger:
        transports:
            sqs: enqueue://default?&topic[name]=test.fifo&queue[name]=test.fifo&receiveTimeout=3

        routing:
            'App\Message\Message': sqs

Tnx.

This transport does not supports envelope items

We should serialize/deserialize the message envelope items via the headers (Symfony's default message serializer does this already). All we should have to do is to set the headers properly from the encoded message :)

bin/console messenger:consume-messages amqp in PROD env using amqp_lib transport

in documentation is stated that to consume you need to run:

bin/console messenger:consume-messages amqp

my question is how to run this in PROD environment because in doc there is not info how to create queues in rabbitMQ

running in prod like:

messenger:consume-messages amqp --env=prod

results in error: NOT_FOUND - no queue 'messages' in vhost '/'

in dev environment, above command runs as expected

in dev env they are created because of debug flag:

if ($this->debug) {
            $this->contextManager->ensureExists($destination);
        }

in prod, presumably they should be crated manually

but command: enqueue:setup-broker

does not create queues

I found also queue is created here:

public function recoverException(\Exception $exception, array $destination): bool
    {
        if ($exception instanceof \AMQPQueueException) {
            if (404 === $exception->getCode()) {
                return $this->ensureExists($destination);
            }
        }

        return false;
    }

on exception but, my exception here is different from one that you are catching mine is:

\PhpAmqpLib\Exception\AMQPProtocolChannelException

from PhpAmqpLib, one of supported enqueue transports:

https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport


to sum it up either currently there is bug that messenger adapter is hardcoded to use only one of supported transports

or there is no info id doc how to create queues for prod environemnt

[Symfony][RabbitMQ] Enqueue null not working with Enveloppe

I followed this documentation because i wanted to test my producer on an API-platform route.

In my case, i want to run my tests in a Gitlab CI environement, which mean there is no real need in testing the connexion to my broker (RabbitMQ here). Therefore, i've followed this configuration :

# app/config/config_test.yml

enqueue:
    default:
        transport: 'null:'
        client: ~

I get an error because the enveloppe is not a supported class of the NullTransport. I need the enveloppe to setup a routing key for rabbitmq.

Uncaught PHP Exception Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException: "Missing "setRoutingKey" setter for "routingKey" metadata key in "Enqueue\Null\NullMessage" class" at /srv/api/vendor/enqueue/messenger-adapter/QueueInteropTransport.php line 220

adapter ignores config

In doc there is info that you could set some properites of queue:

https://github.com/php-enqueue/messenger-adapter#configure-the-queues-and-exchanges

but when you look at code where queue is created



    /**
     * {@inheritdoc}
     */
    public function ensureExists(array $destination): bool
    {
        if (!$this->psrContext instanceof AmqpContext) {
            return false;
        }

        $topic = $this->psrContext->createTopic($destination['topic']);
        $topic->setType(AmqpTopic::TYPE_FANOUT);
        $topic->addFlag(AmqpTopic::FLAG_DURABLE);
        $this->psrContext->declareTopic($topic);

        $queue = $this->psrContext->createQueue($destination['queue']);
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
        $this->psrContext->declareQueue($queue);

        $this->psrContext->bind(new AmqpBind($queue, $topic));

        return true;
    }

you can see that not all attributes of queue are set

in particular I have problem with setting priority

in above code $destination has only values:

Array
(
    [topic] => command_bus
    [queue] => command_bus
)

no arguments regarding priority are passed,

so when you set in symfony arguments:

command_bus: enqueue://default?queue[name]=command_bus&topic[name]=command_bus&priority=2

&priority=2 would be ignored, and queue will not have priorities

Kafka: Message can not be ACKed because it gets lost in serialization.

The original Kafka Message gets lost.

Here it's still present: \Enqueue\RdKafka\RdKafkaConsumer::doReceive
It gets lost when returning the date here: \Enqueue\MessengerAdapter\QueueInteropTransport::get

When it comes time for ACKing the message, we run into LogicException:

The message could not be acknowledged because it does not have kafka message set.

here \Enqueue\RdKafka\RdKafkaConsumer::acknowledge

We might need an InteropMessageStamp, which adds the \Interop\Queue\Message object, for later use.

Maybe even better to use \Symfony\Component\Messenger\Stamp\TransportMessageIdStamp and set the offset as the id.

Time limit not being checked if no messages are being received

When starting a consumer with a time limit, it will only be checked after messages have been received, so the consumer will not stop after the timeout when there are no messages.

I've started the worker like this:

$ bin/console messenger:consume-messages test --time-limit=1

Then there will be an infinite loop in https://github.com/php-enqueue/messenger-adapter/blob/master/QueueInteropTransport.php#L72:

      while (!$this->shouldStop) {
            try {
                if (null === ($message = $consumer->receive($this->options['receiveTimeout'] ?? 0))) {
                    continue;
                }
                // ...
      }

When taking a look at Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::receive, triggering the handler with null should fix this, like:

      while (!$this->shouldStop) {
            try {
                if (null === ($message = $consumer->receive($this->options['receiveTimeout'] ?? 0))) {
                    $handler(null);
                    continue;
                }
                // ...
      }

Reference: https://github.com/symfony/messenger/blob/v4.1.0/Transport/AmqpExt/AmqpReceiver.php#L43

Stable Messenger 4.3 support

I see that the commit fbcaa00 aims to add support for Messenger 4.3, but there's no tag for it yet.

Are there any missing tasks for complete 4.3 support?

No transport supports the given DSN "enqueue://default".

Hi,

I tryed to use your adapter with redis transporter.
I'm using symfony 4.1 with messenger, i followed steps, when at step 5 i got :

In TransportFactory.php line 37:
                                                            
  No transport supports the given DSN "enqueue://default".  
                                                            

I think messenger component isn't connected to enqueue, i checked and enqueue bundle is loaded bundles.php

It's maybe an messenger change before the release this week ?

Publish Kafka message to the right topic

I publish a Kafka message like this one:

$this->bus->dispatch(new \Enqueue\RdKafka\RdKafkaMessage('foo', ['topic' => 'test']));

I expect this to publish my message foo in topic test.

In fact, in send method of Enqueue\MessengerAdapter\QueueInteropTransport my message (wrapped in an Symfony\Component\Messenger\Envelope) is encoded by an instance of Symfony\Component\Messenger\Transport\Serialization\Serializer which gives:

array(2) {
  ["body"]=>
  string(72) ""{\"body\":\"plop\",\"properties\":{\"topic\":\"test\"},\"headers\":[]}""
  ["headers"]=>
  array(1) {
    ["type"]=>
    string(30) "Enqueue\RdKafka\RdKafkaMessage"
  }
}

After creation of a new \Enqueue\RdKafka\RdKafkaMessage (in the same function), I finally have the following sent message:

object(Enqueue\RdKafka\RdKafkaMessage)#402 (7) {
  ["body":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  string(72) ""{\"body\":\"plop\",\"properties\":{\"topic\":\"test\"},\"headers\":[]}""
  ["properties":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  array(0) {
  }
  ["headers":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  array(1) {
    ["type"]=>
    string(30) "Enqueue\RdKafka\RdKafkaMessage"
  }
  ["redelivered":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  bool(false)
  ["partition":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  NULL
  ["key":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  NULL
  ["kafkaMessage":"Enqueue\RdKafka\RdKafkaMessage":private]=>
  NULL
}

My original message has been serialized in the body of the sent message and the target topic is the default one (i.e. messages) instead of test.

I see that the destination topic comes from $this->options but I do not know how to set that options.

It is difficult for me to know the best way (implement a new encoder, define some options, upgrade code, ...) to achieve what I want to do. That's why I'm asking you for help.

"amqp" is not supported. Supported "null"

Hi, I tried to integrate messenger-adapter to work with my symfony/messenger component. I set up everything according to documentation. I have rabbitmq running on DSN : amqp://rabbit:rabbit@devel_rabbit:5672/%2f so I defined it as ENQUEUE_DSN in .env file.

The problem is, whenever I run bin/console messenger:consume-messages amqp following exception appears :

[LogicException]                                      
  The scheme "amqp" is not supported. Supported "null" 

Do you know what could be the cause ?
Thanks

Issue with multiple senders - enveloping same message more than once

I have multiple senders configured and everything was working fine until I updated to the latest messenger version.

After some debugging, I realized that the message is being enveloped more than once here.

I also noticed that the $message variable gets redefined here - it was originally an Envelope instance containing my original message under the message parameter and it becomes an instance of \Interop\Amqp\Impl\AmqpMessage, which gets enveloped and returned in the end of this method.

It seems that the issue also started after this change on the MiddlewareInterface, as the $envelope variable now gets redefined after each iteration.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.