JMS Inbound coordination is introduced with WSO2 ESB 4.9.0, which allows only one active Inbound Endpoint will poll messages from a JMS queue created in WSO2 Message Broker. The Inbound Endpoint should be created with protocol 'jms', then it will be started as polling inbound by default.
Following setup will demonstrate Inbound Coordination where only one active Inbound will poll Message Store.
To create WSO2 ESB cluster for this scenario, follow my previous blog post Creating WSO2 ESB Cluster with Nginx LB.
Since we have already configured ESB cluster, follow below steps to configure WSO2 MB as the message broker.
- In ESB Cluster, following configurations needs to be done to establish the connection to Message Broker.
- Update jndi.properties file with new queue details. This should be done in all nodes in cluster.
Open the <ESB_HOME>/repository/conf/JNDI.proerties file.
Define a new queue called ‘ordersQueue’.
# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]
connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5682'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5682'
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.ordersQueue = ordersQueue
|
- Download WSO2 Message Broker binary file from WSO2 Web Site and follow instructions in Installation Guide to setup the server.
- Start WSO2 MB by running <MB_HOME>/bin/wso2server.sh (on Linux) or <MB_HOME>/bin/wso2server.bat (on Windows).
- Start WSO2 ESB Manager Node by running <ESB_HOME>/bin/wso2server.sh
and Worker nodes by <ESB_HOME>/bin/wso2server.sh -DworkerNode=true
- Next, run <ESB_HOME>/samples/axis2server/axis2Server.sh to start the Axis2 server.
- Point your browser to http://127.0.0.1:9000/services/SimpleStockQuoteService?wsdl and verify that the service is running.
- Next create below proxy in ESB Manager Node. It will be used to send messages to back-end which were polled by Inbound Endpoint from MB Queue. This sequence will be specified in Inbound Endpoint configuration.
<sequence xmlns="http://ws.apache.org/ns/synapse" name="request" onError="fault">
<log level="full"/>
<call>
<endpoint>
<address uri="http://localhost:9000/services/SimpleStockQuoteService"
format="soap12"/>
</endpoint>
</call>
<drop/>
</sequence>
- Now create Inbound Endpoint using this configuration in ESB Manager node, which will connect to ‘ordersQueue’ in WSO2 MB and start polling messages in every 4 seconds. Polled messages will be injected to the sequence named 'request' which we created above.
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="jms_inbound"
sequence="request"
onError="fault"
protocol="jms"
suspend="false">
<parameters>
<parameter name="interval">4000</parameter>
<parameter name="coordination">true</parameter>
<parameter name="transport.jms.Destination">ordersQueue</parameter>
<parameter name="transport.jms.CacheLevel">3</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</parameter>
<parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="sequential">true</parameter>
<parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
<parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
<parameter name="transport.jms.SessionTransacted">false</parameter>
<parameter name="transport.jms.ConnectionFactoryType">queue</parameter>
</parameters>
</inboundEndpoint>
- Go to admin console of WSO2 MB,https://esb.wso2.com:9453/carbon/ it will show the new queue ‘ordersQueue’ is created in Queues>Browse window.
- To add a message to the ‘ordersQueue’, click on ‘PublishMessage’ link of ‘ordersQueue’ and enter below message as the Message Body in new window. Set Number of Messages as 1. Then click on ‘Send Message’, then the message will be published in the queue.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing">
<soapenv:Body>
<m:placeOrder xmlns:m="http://services.samples">
<m:order>
<m:price>3.141593E0</m:price>
<m:quantity>4</m:quantity>
<m:symbol>IBM</m:symbol>
</m:order>
</m:placeOrder>
</soapenv:Body>
</soapenv:Envelope>
- One of Inbound Endpoints running in worker nodes will pick this message and send to the backend. In one of worker ESB consoles, below log entry can be observed which confirms the message polling.
[2015-05-28 15:00:36,753] DEBUG - Axis2SynapseEnvironment Creating Message Context
[2015-05-28 15:00:36,841] DEBUG - Axis2SynapseEnvironment Injecting MessageContext for inbound mediation using the : request Sequence
[2015-05-28 15:00:36,851] DEBUG - SequenceMediator Start : Sequence <request>
[2015-05-28 15:00:36,851] DEBUG - SequenceMediator Setting the onError handler : fault for the sequence : request
[2015-05-28 15:00:36,858] DEBUG - SequenceMediator Sequence <SequenceMediator> :: mediate()
[2015-05-28 15:00:36,858] DEBUG - SequenceMediator Mediation started from mediator position : 0
[2015-05-28 15:00:36,858] DEBUG - SequenceMediator Building message. Sequence <SequenceMediator> is content aware
[2015-05-28 15:00:36,898] DEBUG - LogMediator Start : Log mediator
[2015-05-28 15:00:36,912] DEBUG - MiscellaneousUtil Loading a file 'synapse.properties' from classpath
[2015-05-28 15:00:36,932] INFO - LogMediator To: , MessageID: urn:uuid:29D7910394F77252861432805436798, Direction: request, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing"><soapenv:Body>
<m:placeOrder xmlns:m="http://services.samples">
<m:order>
<m:price>3.141593E0</m:price>
<m:quantity>4</m:quantity>
<m:symbol>IBM</m:symbol>
</m:order>
</m:placeOrder>
</soapenv:Body></soapenv:Envelope>
To test the coordination functionality (coordination is already enabled in Inbound Configuration as <parameter name="coordination">true</parameter>), shutdown the worker node which contained above log (node that polled last message) and re-do step 9. Now the message will be polled by the other node.