Hi @zhfeng
We have a problem with the load balancing consumers to a jboss cluster. They don't stay connected to the same server over time.
Let's contextualize by our infrastructure :
We have a HornetQ jms broker hosted by two jboss eap6 servers. With artemis client, We connect to our cluster by this url (tcp://jbosshost01:5049,tcp://jbosshost01:5049)?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory
. All servers have a queue in our cluster, so here two queues. For the load balancing needs or move messages from jbosshost01 to jbosshost01 or reverse, some queues exist between servers to move messages if needed.
The cluster look like below in the best situation. Two servers with two consumer on each. It should be more in production.
some producers
โ
โโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโ
โ โ
โ โ
business.queue.name โ โ business.queue.name
โ โ
โผ โผ
โโโโโโโโโโโโโ load balance queue 1 โโโโโโโโโโโโโ
โ โ โโโโโโโโโโโโโโโโโโโโโโบ โ โ
โ host1 โ โ host2 โ
โ โ load balance queue 2 โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโ โ โ
โโโโโโโฌโโโโโโ โโโโโโฌโโโโโโโ
โ โ
โ โ
business.queue.name โ โ business.queue.name
โ โ
โผ โผ
consumer a consumer b
consumer c consumer d
Consumer balancing is not stable over a long period of time. They change servers. For example, consider 2 connexion and 4 consumer, the consumer balancing number could be like following :
host 1 host 2
1 3
0 4
3 1
4 0
Sending messages or not, it's the same.
This behavior on the client side is handled by the default class RoundRobinConnectionLoadBalancingPolicy
in Artemis Client.
how to keep a consumer connected to a server, no matter how many times a connection and a session are created on the cluster ?
An example of the code that we use inside our consumer.
// this following consumeMessages() method is called by 4 thread in parallel which are run by a scheduler quarkus
@Transactional(Transactional.TxType.NEVER)
private void consumeMessages() throws JMSException {
log.debug("start session consuming activity");
try (Connection connection = factory.createConnection();
Session session = connection.createSession(Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(queue)) {
connection.start();
var instant = LocalTime.now();
while (isStillInActivity(instant)) {
var message = consumer.receive(MESSAGE_RECEPTION_TIMEOUT.toMillis());
if (message != null) {
log.debug("session thread treat message");
boolean ok = false;
try {
listener.onMessage(message);
session.commit();
log.debug("### session commit");
ok = true;
} finally {
if (!ok) {
session.rollback();
log.debug("### session rollback");
}
}
} else {
log.debug("session thread remains idle");
}
}
log.debug("end session activity successfully");
}
}
private boolean isStillInActivity(LocalTime instant) {
LocalTime timeout = instant.plus(sessionTTL);
return LocalTime.now().isBefore(timeout) && !shutdown.get();
}
Thanks !