Git Product home page Git Product logo

camel-spring-amqp's Introduction

camel-spring-amqp

Introduction

An Apache Camel Component that will natively communicate with a RabbitMQ broker. This is implemented using Spring's AMQP project, so it should ultimately become vendor-agnostic.

Usage

The simplest Fanout exchange can be defined as:

spring-amqp:<Exchange Name>

spring-amqp:<Exchange Name>:<Queue Name>

Here a simple message producer will send a message to Exchange Name, and a simple consumer will bind the Exchange Name to the Queue Name.

If you wish to use a routing key, URIs have the structure:

spring-amqp:<Exchange Name>:<Queue Name>:<Routing Key>?type=<Exchange Type>

The routing key is optional, but Queue Name and Exchange Name are required for consumers. Just the Exchange Name is required for producers.

Producers can also defer the routing key to the message header, where the ROUTING_KEY header could be set to the appropriate routing key.

Producers can override the exchange name specified in the URI by setting the EXCHANGE_NAME Camel message header.

Options to the URI include the exchange type, which defaults to direct if none is specified.

For header based exchanges, the URI is similar but name/value pairs can be specified in place of the routing key. For example:

spring-amqp:myExchange:qName:cheese=gouda?type=headers

This example will fetch all messages where a header named "cheese" has the value of "gouda." You can also add additional name/value pairs:

spring-amqp:myExchange:qName:cheese=gouda&fromage=jack?type=headers

Which will create a binding for headers where "cheese" has the value of "gouda" AND "fromage" has the value of "jack." You can also choose to create an OR relationship:

spring-amqp:myExchange:qName:cheese=gouda|fromage=jack?type=headers

Additional Settings and Properties

Additional properties can be added to the endpoint as URI parameters. For example, to create a topic exchange that is non-exclusive and not durable:

spring-amqp:myExchange_10:writeQueue:write.*?type=topic&durable=false&autodelete=true&exclusive=false

Parameters available include:

prefetchCount How many messages a consumer should pre-fetch
concurrentConsumers The number of concurrent threads that will consume messages from a queue
transactional Mark messages coming to/from this endpoint as transactional
autodelete Allow this endpoint to be automagically deleted from the broker once it is gone
durable Make queues and exchanges created by this endpoint persistent
type One of the AMQP exchange types: direct, fanout, headers, or topic. Defaults to direct.
exclusive Mark this endpoint as an exclusive point for message exchanges
acknowledgeMode Sets the acknowledge mode (NONE, AUTO)
connection Configure a specific connection factory (for systems with multiple AMQP brokers)
autoReply Consumer sends back a response message when ReplyTo header is present in the consumed message. Defaults to true.

Spring Integration

The camel-spring-amqp component will attempt to fetch as much information from the application context it sits within. For example, if we are using Spring we could issue the following:

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="messageConverter"/>
<rabbit:admin connection-factory="connectionFactory"/>

The message converter amqp.spring.converter.XStreamConverter is provided by the camel-spring-amqp component; it provides JSON marshalling using the XStream libraries. If you would rather use the Jackson JSON marshalling (or another conversion method) provided by the Spring AMQP framework, you can swap out the appropriate message converter class in the above example.

Advanced Message Conversion

You may wish to use JSON marshalling for the majority of your inter-process communication, but may have the need to do XML marshalling for REST API calls or the like. If you wish you can specify multiple types of message conversion based on the content type of the message. For example, we may have messages marshalled to JSON by default but want content types of application/text to just be printed out as strings. Within the Spring XML DSL you may define:

<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
    <property name="converters">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter"/>
            <entry key="application/xml" value-ref="textMessageConverter"/>
        </map>
    </property>
    <property name="fallbackConverter" ref="jsonMessageConverter"/>
</bean>

This would allow messages with a "Content-Type" of application/json to be marshalled with the XStream converter, while messages with a content type of application/xml will be marshalled into a simple character string. If no content type is specified, the XStream JSON message converter will be used.

Downloads and Maven Repository

Release builds of the Camel Spring AMQP Component are hosted within the Sonatype repository. You can include this component within your Maven POM as:

<dependency>
    <groupId>com.bluelock</groupId>
    <artifactId>camel-spring-amqp</artifactId>
    <version>1.6.3</version>
</dependency>

Limitations

  • Transactions are currently not supported
  • Lifecycle events (e.g. stop, shutdown) need to be refined
  • Unit tests require a running AMQP broker. I may end up creating a VM local Qpid instance as an AMQP broker...

To-Do

  • Validate with other AMQP brokers (such as Qpid)

License

This package, the Camel Spring AMQP component is licensed under the Mozilla Public License v2.0. See LICENSE for details.

camel-spring-amqp's People

Contributors

alexgri avatar brbatwork avatar deckerego avatar drasil avatar edscriven avatar jakerobinson avatar klarna-marcus-nilsson avatar michael-alford avatar michaelpage avatar rob-snyder avatar verschdl avatar wat5 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

camel-spring-amqp's Issues

Consumer will shut down if no ID is found within the message

If no ID (i.e. message key) is found within the message, the consumer throws an exception and shuts down. For example:
org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'[B@4e8044a2(byte[1029])'; ID:null; Content:application/xml; Headers: Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:105)
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:142)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
at $Proxy61.invokeListener(Unknown Source)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)
at java.lang.Thread.run(Thread.java:636)

Route not started from console (ServiceMix)

My environment: ServiceMix 4.5.2 (latest stable) with Camel 2.10.6.

route-stop command working on route with spring-amqp consumer, but route-start not working on the same route, error: “Error executing command: java.lang.IllegalArgumentException: interface org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$ContainerDelegate is not visible from class loader”

osgi:restart for whole bundle works well (all routes starts).

Transaction Support

Hi,
is it possible to add transaction support to the component? I guess it needs more than just setting listener to be transacted and acknowledge mode to auto. What else needs to be done?

Thanks

Exchange-Queue binding not getting created

Thank you for the library.
I am trying to produce a message to RabbitMQ with Camel on Spring Boot. The destination endpoint I am trying to use is

spring-amqp:myExchange:myQueue:myRoutingKey

I get the following logs when I run the route:

[Camel (camel-1) thread #1 - JmsConsumer[abc]] INFO  amqp.spring.camel.component.SpringAMQPEndpoint - Creating endpoint for myExchange:myQueue:myRoutingKey
[Camel (camel-1) thread #1 - JmsConsumer[abc]] INFO  amqp.spring.camel.component.SpringAMQPProducer - Declaring exchange myExchange of type DirectExchange

But the binding to the queue is not getting created and thus the message is getting lost.
If I create the queue and the binding to the exchange manually and then fire a message, it works seamlessly.

Is there something wrong with the syntax of the endpoint that I have used?
Please provide a sample endpoint for my use case.

SpringAMQPProducer should implement ServicePoolAware

Hello,

recently I noticed that new SpringAMQPProducer instances are created quite often in my application. But creating a new producer instance is not a cheap operation as it includes (re)declaring an AMQP exchange and creating a threadpool. After some investigation I fould out that the creation is initiated by the org.apache.camel.impl.ProducerCache.doGetProducer() method. This method could reuse producer instances, but it is currently not allowed by SpringAMQPProducer because it does not implement the org.apache.camel.ServicePoolAware marker interface. I am not 100 % sure, but I can not see any reason for not implementing this interface. What is your position on this?

Maybe, implementing this interface should be considered for other classes as well.

Pavel

Update spring-amqp

This is using a quite old version of spring-amqp (1.1.1). The latest is 1.1.4.

Support for High Availability Parameter

Hi,

I have ran into the issue when configuring Camel endpoint for RabbitMQ queue that was configured with 'x-ha-policy=all' parameter.

When system starts up and queue is being defined based on the endpoint I get following exception.

channel error; reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-ha-policy' for queue 'test.queue' in vhost '/': received none but current is the value 'all' of type 'longstr', class-id=50, method-id=10),null,""}

Looks like RabbitMQ connection requires endpoint queue parameters to match those defined on the queue itself. This is fine, however, looking at the documentation and the code 'x-ha-policy' argument is not supported and is not being passed to connection code.

Please let me know the best way to address this issue. Am I overlooking something? Should I wait for a fix or fork the code, make a fix and submit it for review?

Thanks,
Yury

Memory (almost) leak issue

After upgrading from version 1.0 to 1.4, I saw that my app was going from 100mo heap to more than 1Go ! (and crashing because the limit I set was xmx 1024). The version upgrade was really the only change in my app.

I found out that the AcknowledgeMode.NONE was the root cause of the heap size grow.
I tracked down the issue and found it was introduced in version 1.2.
=>The ack mode was changed from AUTO to NONE.

It sounds like a spring-rabbit or spring-amqp issue but it impact a lot the camel-spring-amqp component. Maybe it would be safer to change the default ack mode to AUTO while the bug is not fixed on spring side.

If you want to reproduce, just checkout the 1.1.0 tag and change the ack mode from AUTO to NONE, then look at the heap size growing in visualvm when sending a million message.

Regards.
The pull request #25 provides a workaround.

camel-spring-amqp support for Apache Camel 2.2.1

Since new Apache Camel requires a default camel context passed to its default message class constructor .

image

The SpringAMQPMessage class is not compatible with latest apache camel .

image

Is there any plan to fix this in later releases ?

NullPointerException when a reply-timeout is triggered

Here is the stacktrace

java.lang.NullPointerException
at amqp.spring.camel.component.SpringAMQPProducer$AMQPProducerTask.run(SpringAMQPProducer.java:150)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

camel-spring-amqp failed to run with tomcat 8 + jdk 1.7

I have a tomcat 7 + jdk 1.7, but failed to use 1.7.0 version.

java.lang.UnsupportedClassVersionError: amqp/spring/converter/XStreamConverter : Unsupported major.minor version 52.0 (unable to load class amqp.spring.converter.XStreamConverter)

Looks like the latest version is compiled with java 8. Do you mind to make it compatible with jdk 1.7

Headers Exchange producer route does not work. Please provide an example.

Based on headers exchange consumer example I tried to create a route for headers exchange producer using different syntax for specifying headers, but it does not seem to work. Please provide an example for headers exchange producer.

I have already tried to use spring-amqp:myHeaderExchange:'':cheese=gouda and
spring-amqp:myHeaderExchange:qName:cheese=gouda but they do not work.

Defining the delivery mode for the message

Hello,

Thanks for all the work you've done!

Is it possible to define the delivery mode for a message using this component ?

Sorry for posting this here, but couldn't find anywhere else to.

Shall SpringAMQPHeader read receivedDeliveryMode instead of deliveryMode?

Hi

The method setBasicPropertiesToHeaders in class SpringAMQPHeader reads the deliveryMode property while consuming a message. Shall this instead read receivedDeliveryMode property?

Through wireshark and spring-amqp logging I can see if a producer sends different values for delivery-mode (PERSISTENT / NON_PERSISTENT), they get deserialised in receivedDeliveryMode property. Hence the above class fails in NPE while reading from a queue.

In addition, below snippet from Spring Rabbit jar (class DefaultMessagePropertiesConverter) gives me impression that while reading a message it would set the receivedDeliveryMode field only while setting deliveryMode to null always.

Integer deliveryMode = source.getDeliveryMode();
if (deliveryMode != null) {
	target.setReceivedDeliveryMode(MessageDeliveryMode.fromInt(deliveryMode));
}
target.setDeliveryMode(null);

Am I missing something here? My present code, when switched from 1.6.3 of this repo to 1.7.0, fails with NPE with below stacktrace

Caused by: java.lang.NullPointerException: null
	at org.springframework.amqp.core.MessageDeliveryMode.toInt(MessageDeliveryMode.java:34)
	at amqp.spring.camel.component.SpringAMQPHeader.setBasicPropertiesToHeaders(SpringAMQPHeader.java:69)
	at amqp.spring.camel.component.SpringAMQPMessage.fromAMQPMessage(SpringAMQPMessage.java:53)
	at amqp.spring.camel.component.SpringAMQPConsumer$RabbitMQMessageListener.onMessage(SpringAMQPConsumer.java:197)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)

Any suggestion please?

Cheers
Mainak

Bad camel context linking when component are instanciated from java code

When a SpringAMQPComponent is instanciated from java code (not by spring) it is correctly linked to the camel context when calling "add(key,component)", but the endpoint is not since CamelContextAware is not taken in account. This result in endpoints attached to "unknown" camel context when you watch jmx beans,

Setting BasicProperties

Hi,

Is there any way to specify BasicProperties without digging into the code?

I see on http://static.springsource.org/spring-amqp-net/docs/1.0.x/reference/html/amqp.html that it's supposed to be possible to specify 'messageId', 'timestamp', 'contentType' via the IMessageProperties interface, using SetHeader(string key, object val).

I'd ideally like to do the following:

sendMessage("spring-amqp:exch:queue:key", message, headers)

and uses the headers to set. So far that doesn't seem to do the job.

As an alternative, I've started looking at modifying toAMQPMessage to set the BasicProperties beyond setMessageId.

Cheers,

Rowlamd

comparation of connections by host port and vhost

procedure of finding of identical connections at amqp.spring.camel.component.SpringAMQPComponent#getAmqpAdministration and amqp.spring.camel.component.SpringAMQPComponent#getAmqpTemplate should use host port and virtualHost.

Upgrade Camel

Issue

There is a resource leak issue with the currently used version of camel (2.8.5). The producer and consumer caches in camel are both implemented as soft-reference LRU caches. There are a couple of possible problems with this.

  • SpringAMQPConsumer & SpringAMQPProducer both have managed thread-pools that need to be shutdown when they are evicted from the cache.
  • The soft-references allow the objects to be garbage-collected in certain instances without having the doShutdown() or doStop() methods invoked.

This can lead to thread-leaks, causing the number of threads to grow over time, until all the system resources are consumed.

Solution

Camel 2.8.x is no longer a supported version. The soft-reference caches have been removed in Camel 2.9.x, 2.10.x, and 2.11.x. They also implemented a cache eviction listener with an onEviction() callback so resources can be shutdown properly.

The plan is to upgrade to Camel 2.11.x.

Support for multiple bindings per queue

I've just completed a project migrating our applications from ActiveMQ to RabbitMQ. The most challenging part was replicating the functionality of jms selectors. Here are some of the selection rules that we had:
key IN (value1, value2...)
key1 IN (value1...) AND key2 IN (value1...) AND....

I've implemented this using Headers exchanges with multiple bindings per queue. Theoretically, any sql based selector can be decomposed into a combination of key/value pairs where each combination corresponds to a binding. For example the following routing key:
key1=value1|key1=value2&key2=value1|key2=value2 would result in 4 bindings:
key1=value1&key2=value1, key1=value2....

I added support for these use cases in SpringAMQPConsumer (see the code here: https://github.com/art11s/camel-spring-amqp). Are you interested in adding this functionality to the project?

Dead letter error handler not working with spring-amqp queue uri

Hi,
I have the following error handler:

    <bean id="deadLetterErrorHandler" class="org.apache.camel.builder.DeadLetterChannelBuilder">
         <property name="deadLetterUri" value="spring-amqp:MangaHigh:dead:dead?type=direct&amp;autodelete=false&amp;durable=false"/>
         <property name="redeliveryPolicy" ref="redeliveryPolicyConfig"/>
         <property name="useOriginalMessage" value="true"/>
    </bean>

     <bean id="redeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy">
         <property name="maximumRedeliveries" value="3"/>
         <property name="redeliveryDelay" value="500"/>
         <property name="useExponentialBackOff" value="true"/>
     </bean>

The dead letter message is published into the exchange, so an additional binding has to exist for the message to get into the right queue. Is this supposed to only publish into exchanges, or should it also support pushing messages directly into the queue?

prefetchCount parameter ignored while AcknowledgeMode is hardcoded to none

The prefetchCount parameter is ignored by the client if the acknowledgement mode is set to none, as it is presently on line 132 of SpringAMQPConsumer:

        //Transactions are currently not supported
        this.listenerContainer.setChannelTransacted(false);
        this.listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);

Even if transactions aren't supported, the acknowledge mode should still be configurable or at least default to AcknowledgeMode.AUTO if a prefetchSize is a positive integer.

routing key not required for consumer

In some simple testing, it does not appear that the routing key is necessary when consuming from a queue. I can put anything into the routing key field in my URL and still receive messages from the queue I am consuming from. From what I understand in the AMQP spec, consumers receive from a queue and the routing key is only relevant to producers.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.