Messaging Request/Reply

Messaging Integration Style

Fire-and-Forget Pattern

When we think integration using messaging style, what usually comes to our mind is the one-way communication, also known as fire-and-forget pattern.  With features like being asynchronous and robust to deal with large messages, this pattern brings advantages in some scenarios of integration:

  • Loose-coupling: consumer and provider does not need to be up and running at the same time, everything happens asynchronously;
  • Guaranty of delivery: take advantages of using Messaging Servers features, like persistence message, the guaranty of the message being delivered might be increased. Other thing that helps here is the communication being asynchronously, hence both doesn’t need to be online to receive/send messages.
  • Robustness: usually the size messages being exchanged using Messaging style is likely bigger than those we use in RPC styles. This has to do with the characteristics of the protocol usually adopted on each style: RPC (HTTP, REST, SOAP) versus Messaging (AMQP, MQTT, STOMP)
But everything comes at a price, here some cons:
  • Error handling – because of the nature of communication (fire-and-forget), there’s no response from the Consumer of the message, hence the Sender will not be aware of any exception that might happens when the Consumer were processing the message.
  • Completely Stateless – there isn’t any kind of conversation, that’s only one communication happening, the Sender sends the message to the recipient and finish the conversation from his side, that’s all for him. The consumer doesn’t have any chance to interfere on the content of the message before being delivered.
Of course, to overcome some of the cons it’s possible to come up with some design solutions. For instance, create an Error Queue for messages that for some reason the Consumer didn’t succeed to process, with some info regarding the cause.

But let’s say that the Consumer, before gets the message, needs to send a request to the Sender defining what content he wants in the message being delivered, it doesn’t sound like a simple fire-and-forget pattern anymore.
Although it’s not possible to use fire-and-forget pattern, we still need (or desire) some of the advantages of the messaging style integration, like loose-coupling (there’s no need both parts being up and running at the same time), robustness (large, large, large messages) and guaranty of delivery (persistence of the message while nobody at home to receive the message). In this scenario it’s likely that we could adopt a Request/Reply pattern over messaging style integration.

Request/Reply Pattern

As explained, we need the advantages of messaging style integration but in a two-way communication, hence it’s necessary to send the request before receiving any information, that’s a characteristic that the received message it’s actually a reply.
In this scenario the Service Provider actually plays the role of a Replier – waits to receive a message request and produces a message reply. The Service Consumer plays the role of a Requestor – send a message request and waits for the message reply to arrive at any moment.

Just a point here…  regarding who’s Consumer, who’s Provider, sometimes this can turn out to be very confuse depends on what environment you are involved, then sometimes it’s better to get these things very clear before start talking about integration/service designs. Check this other small article regarding this topic. In our scenario, we are going to call them Requestor (Service Consumer – who needs some information) and Replier (Service Provider – who has the information).

Messaging Request/Reply Pattern – Some Sketches…

One possible architecture design for our solution would be:
A candidate design for a messaging request/reply solution.

Picture 1. A candidate design for a messaging request/reply solution.

The sequence of the events at Picture 1:

  1. The Requestor sends a Message to the QUEUE REQUEST with the ReplyTo property set to the QUEUE name “REPLY”; After send its message, the Requestor also saves the generated JMSMessageID property value of its Message. We will see why at next step;
  2.  The Requestor starts listening the QUEUE REPLY waiting for its answer. It sets a Filter in this listening looking only for Messages that has the property JMSCorrelationID equals to the JMSMessageID value, created at the previous step.
  3. The Replier is listening the QUEUE REQUEST waiting Messages to process and compose the Response Message to send back.
  4. Before send the response, the Replier has to do one more thing: it has to set the Response Message property JMSCorrelationID equals to the value of the JMSMessageID of the Request Message processed.
  5. The Replier then sends the response Message to the QUEUE REPLY.
We have pros and cons on this design, these are the pros:
  • We don’t have to create a new QUEUE REPLY for each new Requestor, this design it’s ready to accept new Requestors right away, as long as each one takes care properly of the CorrelationID/MessageID correspondence.
And here are the cons:
  • All the  Requestors has access to the same QUEUE, sharing all its characteristics and resources.
  • The duty of the message correlation it’s up to each Requestor. It’s his duty to properly handle this correlation IDs otherwise he can end up getting messages that’s not belong to him, and in some case this can be also a security problem, when the messages must be delivery with confidentiality.
Other design that we can define is to use a QUEUE per Requestors, each one it’s going to have yours. This is the picture that illustrates this:

A candidate design for a messaging request/reply solution.

Picture 2. A candidate design for a messaging request/reply solution.

This is how the flow works in the design of Picture 2:
  1. The Requestors Zeta and Omega send its Message Request to the QUEUE REQUEST. In case of the Zeta Requestors, it sets the message property ReplyTo to the value “Zeta“, and the Omega Requestors sets the same property to the value “Omega“.
  2. The Requestors Zeta and Omega start listening its respective reply QUEUEs waiting for the responses.
  3. The unique Replier (the provider of this Service) is listening to the Request QUEUE waiting for Message Requests to process.
  4. Even though we know that each Requestor has its own “private” QUEUE to receive the responses, the Replier still sets the response Message JMSCorrelationID property to the value of the request Message JMSMessageID property received. This way it’s possible for the Requestor to perform the correlation between the reply and each correspondent sent request.
  5. Through the received message Request ReplyTo property, the Replier reads what is the destination to send the response.
  6. The Replier send the response to the QUEUE Zeta or Omega according to the value read at the ReplyTo property.
Again, as always, this other approach has its pros and cons, the advantages:
  • Each Requestor now has its own “private” QUEUE, they only have to take care of the correlation ID in case they want to know which Message Response is correlated with which Message Request. But, in case each subsequent Requests were send only after the Response of the previous Request arrived, it’s not necessary to perform the correlation ID.
  • As each Requestor are completely separated of each other regarding the response QUEUE (each one has its own), the characteristics of these QUEUEs can be individually defined and managed.
And here are the disadvantages:
  • It’s necessary to create a new QUEUE at the environment for each new Requestor. At first this seems to be nothing so complex (technically), but it’s hard in some workplaces to change anything at the production environment without going through a lot of bureaucracy.
  • Depends on the number of Requestors, and in case the messages were small, we can end up having a higher effort to manage lots of QUEUE’s with mini-messages without much benefits in exchange.

The Proof! A Sample Project Implementation

Now, after some drawings and architectural stuffs, let’s have some fun and implement the solution coding in Java (using JMS API Specification) and check if everything thought really works fine. We are going to implement the first design explained here before (Picture 1), but in case we want to build the second scenario it’s just a matter of each one of the Requestor use it’s own QUEUE and send this information in the JMS ReplyTo Property to the Replier.

In this sample project, we will have Requestors that is going to send messages with simple math equations to be solved by the unique Replier, the specialist in math. The Replier, after perform the calculations, will answer each Message Requestor with its Message Response properly correlated.

The tools used in this solution were:

  • Apache ActiveMQ v5.13.1;
  • Java SE 8;
  • JMS API;
  • Maven; and…
  • Your favorite IDE (Eclipse, Netbeans, Notepad, vi, vim, etc.)
Our project is composed of only 6 java classes, as follows:
  • JMSRequestReplySample
    • This is our entry class, where we start running the sample JMS Request/Reply solution. It register two requestors and the unique replier (provider of the service requested).
  • Requestor
    • This class represents the Requestor, it’s implemented as a Thread that as soon as it gets running start sending its Message requests and then waiting for the respective replies.
  • Replier
    • This class represents the provider of the service, this is the one who owns the knowledge to answer the Message requests. It’s also implemented as a Thread that stays listening for Message requests to be processed. It performs the calculation over the math equation received, it assigns the Response Message CorrelationID equals to the Request Message MessageID, reads the ReplyTo property to get the QUEUE Reply and then sends the answer.
  • MessagingProviderConnection
    • Class that was delegate the responsibility to realize the work of connect to the Messaging JMS Server used in our solution. In our case, we use the ActiveMQ.
  • Configuration
    • Responsible to provide the configuration information regarding all the necessary parameter in our solution, for instance: ConnectionFactoryName, URL messaging server.
  • Utils
    • General utility static operations, useful for all the other classes.
The link to download the complete source code of this solution is available at the end of this article. Before run it, it’s necessary download all the maven dependencies and have the Apache ActiveMQ open source messaging installed, running and with our two default QUEUEs created with the names: REQUEST and REPLY.

 

Let’s highlight some of the most relevant parts of our sample solution. Beginning with the main method that is in the class JMSRequestReplySample. Before we instantiate this class we get all the properties configuration  to connect to the Apache ActiveMQ through a file named ActiveMQ.jms.properties. At the line 75 and 76 two Requestors are registered passing as parameters: the name of the Requestor, the time to wait before sending each message (it’s possible to ask to send more than one), the reply QUEUE name and the last parameter is the Message itself, that is, the equation math that must be solved by the Replier.
Three seconds after the Requestors registration, at the line 89 we then register the Replier that starts listening to the Request QUEUE and answer all the messages that arrives.
public static void main(String[] args) {
		String filePropertiesJMS = "ActiveMQ.jms.properties";
		if ( args.length > 0 ) {
			filePropertiesJMS = args[0];
		}
		JMSRequestReplySample jmsSample = new JMSRequestReplySample(filePropertiesJMS);

		logger.info(Utils.separator());
		logger.info("Starting the requestors");
		jmsSample.registerRequestor("RequestorApp", 1000, Configuration.getReplyQueue() , "(4 + 4) * 2");
		jmsSample.registerRequestor("RequestorApp", 1000, Configuration.getReplyQueue(), "4 + 4 * 2");
		logger.info(Utils.separator());

		// Waits 3 secs before start sending requests to the replier
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			Utils.logAndThrow(e);
		}
		// Register the Replier first
		logger.info(Utils.separator());
		logger.info("Start the Replier");
		logger.info(Utils.separator());
		jmsSample.registerReplier("ReplierApp");

		//quickTestJMSConnectionSendReceiveQueueHELLO();
}
At the Requestor thread class when it starts to run, it looks for the Request QUEUE (line 48), wrap the message text  in a TextMessage object and send to the destination. Before send the message, it’s important the Requestor don’t forget to inform the Replier which is the QUEUE name that he must send the reply to (line 57).

 

Then after sending the message it’s necessary to save the automatically created MessageID of the Message Request, in order to be possible later to correlate the Message Reply CorrelationID value with the Message Request MessageID. For this objective we create a Map object to save all the MessageID created and its respective sent text message value (line 66).
After all the process of sending and save the information regarding the Messages Requests, the next step for the the Requestor is start its listening Thread waiting all its replies (line 73).
@Override
public void run() {
	try {
		logger.info("Connection to Messaging for Send a Request");
		// Get Queue Request
		Destination requestQueue = (Destination) mp.ctx.lookup(Configuration.getRequestQueue());
		logger.debug("Get the queue {} to SEND Requests", Configuration.getRequestQueue());

		// Sending the Messages to the Replier
		int index = 0;
		// Create Message
		TextMessage requestMessage;
		for (String msg : messagesText) {
			requestMessage = mp.session.createTextMessage(msg);
			requestMessage.setJMSReplyTo(replyQueue);
			// Sending Message
			MessageProducer producer = mp.session.createProducer(requestQueue);
			producer.send(requestMessage);
			mp.session.commit();
			logger.info("Sent {} of {} Message \"{}\"; to QUEUE {}, waiting response at the QUEUE {}...", ++index, messagesText.length, messagesText,
					Configuration.getRequestQueue(), replyQueueName);
			Utils.logQueueMessage(logger, " MESSAGE REQUEST ", requestMessage);

			mySentMessages.put(requestMessage.getJMSMessageID(), requestMessage.getText());

			Thread.sleep(interval);
		}
		logger.info("All messages sent.");

		// Register our thread listener to receive the replies
		listenerThread.start();

	} catch (NamingException | JMSException | InterruptedException e) {
		Utils.logAndThrow(e);
	} finally {
		mp.close();
	}
}

One thing to be clearer here is that even though our sample solution start listening for the Message Replies right before have sent the Message Requests, it is not necessary to be this way. Of course, the Requestor might start waiting for its replies whatever time would be better for him. It could wait for hours or a even a whole day, it all depends on how long the Replier can processes the Message Replies or the business context involved in a real scenario. For the sake of simplicity and agility, in our sample solution we decide to get the replies right after the messages requests were already sent.

Before the Requestor create its listening, he has to build a query filter to select only the messages that the value of the JMSCorrelationIDs property is equal to the one of the JMSMessageIDs sent earlier (line 91). With this filter created then the Requestor starts listening the Message Replies to get the answers of the math equation sent.

// That's the Thread that will Listening the replies related to our sent
// requests
Thread listenerThread = new Thread(() -> {
	final MessagingProviderConnection mpThreadListener = new MessagingProviderConnection();
	logger.info("Start listening to replies...");
	try {
		// Creating Filter to Select only my replies
		StringBuilder filter = new StringBuilder("");
		mySentMessages.keySet().stream().forEach(key -> filter.append("JMSCorrelationID = '").append(key).append("' OR "));
		filter.delete(filter.length() - 4, filter.length());
		filter.append("");

		MessageConsumer messageConsumer = mpThreadListener.session.createConsumer(replyQueue, filter.toString());
		messageConsumer.setMessageListener(message -> {
			try {
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;
					logger.info("Reply Message Received from Replier");
					Utils.logQueueMessage(logger, " REPLY RECEIVED ", textMessage);
					mpThreadListener.session.commit();
				}
			} catch (JMSException e) {
				Utils.logAndThrow(e);
			}
		});
		logger.info("Listening...");
		while (true)
			Thread.sleep(1000);
	} catch (JMSException | InterruptedException e) {
		Utils.logAndThrow(e);
	} finally {
		mpThreadListener.close();
	}
} ,"RequestorListener");
Meanwhile, at the Replier class, we have our consumer listener set and prepared to received the Message Requests at the Request QUEUE (line 36). The messages that it will be accepted must be of the TextMessage type and must have the ReplyTo property configured also (line 39). Then the messages are processed performing the math equation evaluation (line 51), to compose the Message Reply to send back.

 

Before send the message, the Replier also must assign at the Response TextMessage JMSCorrelationID property, the same value of the correspondent Request TextMessage JMSMessageID property (line 58).
MessageConsumer consumer = mp.session.createConsumer(requestQueue);
consumer.setMessageListener(requestMessage -> {
	try {
		if ((requestMessage instanceof TextMessage) && requestMessage.getJMSReplyTo() != null) {
			/**
			 * The message is a TEXT and send the ReplyTo QUEUE to
			 * "reply to" 😉
			 */
			TextMessage requestTextMessage = (TextMessage) requestMessage;
			Utils.logQueueMessage(logger, " MESSAGE RECEIVED ", requestTextMessage);

			/**
			 * Sending the Response
			 */
			// Process the answer
			Long result = processMathEquation(requestTextMessage.getText());
			// Check and Log Queue for the Reply
			logger.debug("Answer to the queue: {}", requestMessage.getJMSReplyTo());
			MessageProducer producer = mp.session.createProducer(requestMessage.getJMSReplyTo());
			// Composing the Text reply message
			String response = "Ok! Here is the result: \"" + requestTextMessage.getText() + " = " + result + "\"";
			TextMessage replyMessage = mp.session.createTextMessage(response);
			replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
			producer.send(replyMessage);
			logger.info("Reply Message Sent to the queue: {}", requestMessage.getJMSReplyTo());
			mp.session.commit();
			logger.info(Utils.separator());
		} else {
			/**
			 * Ops... We receive a message to reply that there's no
			 * ReplyTo, so sorry, just put aside in a error QUEUE
			 */
			if (requestMessage.getJMSReplyTo() == null)
				logger.error("Message received does not have the replyTo configured");
			Utils.logQueueMessage(logger, " MESSAGE RECEIVED ", requestMessage);
			// Put the this Message to a ERROR queue, for instance.
			// To do 🙂
		}
	} catch (JMSException e) {
		Utils.logAndThrow(e);
	}
});
In order to process the answers we use the available implementation of a JavaScript engine to parse and evaluate the math equation (line 107 and 112).
/**
 * Using the ScriptEngineManager we look for the service provider implementation of the a JavaScript script available
 * and use it to perform some simple math operations.
 * @param eq Equation to be resolved
 * @return the result of the equation resolved
 */
public Long processMathEquation(String eq) {
	long result = 0;

	ScriptEngineManager manager = new ScriptEngineManager();
	ScriptEngine engine = manager.getEngineByName( "JavaScript" );

	logger.debug("Using the Script Engine {}", engine.getClass().getName());
	logger.debug("Solving the equation:\"{}\"", eq);
	try {
		result = Long.parseLong(engine.eval( eq ).toString());
	} catch (ScriptException e) {
		Utils.logAndThrow(e);
	}
	logger.debug("Result of the equation:\"{} = {}\"", eq,result);
	return result;
}

Running The Proof

In order to get up and running the sample project we only have to run the class JMSRequestReplySampleThat can be done directly inside from the IDE with the project configured (Run as…) or using the packaged JAR and run it through the command:
uaza@mac:~$
uaza@mac:~$ java -jar jms-request-reply-1.0.jar
uaza@mac:~$
This command should be fired inside the maven’s $project.build.directory. In the target folder should be localized, besides the packaged JAR itself, also all the dependencies to be able to run the sample project.
Bellow we can read the part of the Log showing the two messages sent by the Requestor. Highlighted we have the JMSMessageID, the ReplyTo QUEUE and the message content sent. The message content are two slightly different math equation.
15:19:45.516 [RequestorApp] INFO - Connection to Messaging for Send a Request
15:19:45.550 [RequestorApp] INFO - Sent 1 of 1 Message "[(4 + 4) * 2]" to QUEUE REQUEST, waiting response at the QUEUE REPLY...
15:19:45.552 [RequestorApp] INFO - [ /**************************** MESSAGE REQUEST *****************************\ ]
15:19:45.553 [RequestorApp] INFO - [ | Message ID: ID:BVF-BFC-68408-56077-1459448385200-1:1:1:1:1               | ]
15:19:45.554 [RequestorApp] INFO - [ | Correl. ID: [NULL]                                                       | ]
15:19:45.555 [RequestorApp] INFO - [ | Reply To: queue://REPLY                                                  | ]
15:19:45.555 [RequestorApp] INFO - [ | Contents : (4 + 4) * 2                                                   | ]
15:19:45.555 [RequestorApp] INFO - [ \**************************************************************************/ ]
15:19:45.631 [RequestorApp] INFO - Sent 1 of 1 Message "[4 + 4 * 2]" to QUEUE REQUEST, waiting response at the QUEUE REPLY...
15:19:45.631 [RequestorApp] INFO - [ /**************************** MESSAGE REQUEST *****************************\ ]
15:19:45.631 [RequestorApp] INFO - [ | Message ID: ID:BVF-BFC-68408-56077-1459448385200-3:1:1:1:1               | ]
15:19:45.631 [RequestorApp] INFO - [ | Correl. ID: [NULL]                                                       | ]
15:19:45.631 [RequestorApp] INFO - [ | Reply To: queue://REPLY                                                  | ]
15:19:45.631 [RequestorApp] INFO - [ | Contents : 4 + 4 * 2                                                     | ]
15:19:45.632 [RequestorApp] INFO - [ \**************************************************************************/ ]
15:19:46.555 [RequestorApp] INFO - All messages sent.

Upon receiving both Request Messages, the Replier process and send the answers properly correlated, we can check at the Log information bellow the Reply Messages received by the Requestor:

15:19:49.208 [ActiveMQ Session Task-1] INFO - Reply Message Received from Replier
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ /***************************** REPLY RECEIVED *****************************\ ]
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ | Message ID: ID:BVF-BFC-68408-56077-1459448385200-9:1:1:1:1               | ]
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ | Correl. ID: ID:BVF-BFC-68408-56077-1459448385200-1:1:1:1:1               | ]
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ | Reply To: [NULL]                                                         | ]
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ | Contents : Ok! Here is the result: "(4 + 4) * 2 = 16"                    | ]
15:19:49.209 [ActiveMQ Session Task-1] INFO - [ \**************************************************************************/ ]
15:19:49.225 [ActiveMQ Session Task-2] INFO - Reply Message Sent to the queue: queue://REPLY
15:19:49.289 [ActiveMQ Session Task-2] INFO - --------------------------------------------------
15:19:49.290 [ActiveMQ Session Task-1] INFO - Reply Message Received from Replier
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ /***************************** REPLY RECEIVED *****************************\ ]
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ | Message ID: ID:BVF-BFC-68408-56077-1459448385200-9:1:1:2:1               | ]
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ | Correl. ID: ID:BVF-BFC-68408-56077-1459448385200-3:1:1:1:1               | ]
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ | Reply To: [NULL]                                                         | ]
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ | Contents : Ok! Here is the result: "4 + 4 * 2 = 12"                      | ]
15:19:49.290 [ActiveMQ Session Task-1] INFO - [ \**************************************************************************/ ]
Observe in highlight the JMSCorrelationID value, that correspond to the JMSMessageID of the Request Message, and the answer received for each one. This way the Requestor performing the filtering will gets only the messaged correlated to its sent Message Requests, even if the Reply QUEUE only belongs or not to the Requestor.

 

Conclusion: As we could see using this pattern of integration we can have some advantages of an asynchronous scenario, deal with larger messages and still using a request/reply style communication. We already know that every bonus comes with some onus, so we have some disadvantages as well, for instance the error handling should be more sophisticated to support this scenarios of integration. As always there’s no such a thing like “one size fits all”, hence… it must exist a logical and reasonable judgment, regarding the needs of the business context scenario (mainly), to apply a solution accordingly.

 


Source code: