analytics

Monday, March 14, 2011

Queue (Point to Point Messaging)

Point to Point Messaging Program


The Hello World application consists of a sender application that sends a "Hello" message to a queue. This message will be received by one queue receiver connected to the queue in question. If no receivers are connected, the message will be retained on the queue. If more queue receivers are connected, they will receive messages in a round-robin fashion

There are four sample programs for this section:

Queue Sender

Synchronous Queue Receiver

Asynchronous Queue Receiver

Queue Browser

Note that none of the examples in this section show code for handling exceptions. Although this improves the readability of the example code, application programmers should notice that almost all methods in the JMS API's may raise aJMSException if the JMS provider fails.

Queue Sender

The queue sender application performs the following steps:

Obtain an InitialContext object for the JMS server.

Use the context object to lookup a specific queue, in this case, queue0.

Use the context object to lookup the queue connection factory. You only need to specify the queue/connectionFactory with the lookup because the batch file that you run this sample from has set the System properties to point to the appropriate root context for the System namespace. If you were using the JMS server with the Novell exteNd Application Server, you would have to specify the lookup as follows:

QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.

lookup("iiop://localhost:53506/queue/connectionFactory");

Use the QueueConnectionFactory to create a QueueConnection. The QueueConnection represents a physical connection to the JMS server.

Create a queue session. The first parameter in the createQueueSession method decides whether or not the session is transacted. Here, we use a non-transacted session. The second parameter decided the delivery mode, which is never used for sending applications.

Create a queue sender for queue0 and create a message.

Send the "Hello" message to queue0.

Close the queue connection. This will in turn close both the session and the QueueSender.

The full source code for the sender application is shown below:

package pointToPoint;

import javax.naming.InitialContext;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSender;
import javax.jms.DeliveryMode;
import javax.jms.QueueSession;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

public class Sender

{

public static void main(String[] args) throws Exception

{

// get the initial context

InitialContext ctx = new InitialContext();

// lookup the queue object

Queue queue = (Queue) ctx.lookup("queue/queue0");

// lookup the queue connection factory

QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.

lookup("queue/connectionFactory");

// create a queue connection

QueueConnection queueConn = connFactory.createQueueConnection();

// create a queue session

QueueSession queueSession = queueConn.createQueueSession(false,

Session.DUPS_OK_ACKNOWLEDGE);

// create a queue sender

QueueSender queueSender = queueSession.createSender(queue);

queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// create a simple message to say "Hello"

TextMessage message = queueSession.createTextMessage("Hello");

// send the message

queueSender.send(message);

// print what we did

System.out.println("sent: " + message.getText());

// close the queue connection

queueConn.close();

}

}

The Sender class sets the delivery mode to NON_PERSISTENT before sending the message. This means that the message will be lost in case the JMS server crashes. Since NON_PERSISTENT messages giver better performance than PERSISTENTmessages, applications should set the delivery mode to NON_PERSISTENT whenever guaranteed delivery is not a requirement.

Synchronous Queue Receiver

The receive application performs the same initial steps as the queue sender because you always have to find a queue object using the initial context, connect to the queue and create a session as shown here.

Instead of a QueueSender object, the receiver application creates a QueueReceiver from which messages can be received synchronously. Note that the receiver application must start the connection before any messages can be received.

The receiver application uses a non-transacted session with automatic message acknowledgement. This means that message will automatically be acknowledged by the session right before the receive method returns the message to the application.

Below is the source for the Receiver class:

package pointToPoint;

import javax.naming.InitialContext;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

public class Receiver

{

public static void main(String[] args) throws Exception

{

// get the initial context

InitialContext ctx = new InitialContext();

// lookup the queue object

Queue queue = (Queue) ctx.lookup("queue/queue0");

// lookup the queue connection factory

QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.

lookup("queue/connectionFactory");

// create a queue connection

QueueConnection queueConn = connFactory.createQueueConnection();

// create a queue session

QueueSession queueSession = queueConn.createQueueSession(false,

Session.AUTO_ACKNOWLEDGE);

// create a queue receiver

QueueReceiver queueReceiver = queueSession.createReceiver(queue);

// start the connection

queueConn.start();

// receive a message

TextMessage message = (TextMessage) queueReceiver.receive();

// print the message

System.out.println("received: " + message.getText());

// close the queue connection

queueConn.close();

}

}

If the connection is not started, the receive method will block forever (or until some other thread starts the connection). If a client want to temporarily stop delivery of messages, the connection can be stopped and then re-started later.

Asynchronous Queue Receiver

The AsyncReceiver class illustrates the use of message listeners. A message listener is a regular Java class that implements the MessageListener interface. This interface has a single onMessage method, which is called by JMS when messages arrive at a destination.

As with the synchronous receiver, the AsyncReceiver class performs the same initial steps to create a QueueReceiver. Then, the setMessageListener method is called to register this as a message listener. As with a synchronous receiver, messages will not be delivered until the start method is called on the connection.

Since acknowledge mode is set to automatic, JMS will acknowledge messages right after calls to the onMessage method returns. Note that onMessage is not allowed to throw any exceptions. You must catch all exceptions and deal with them somehow in the onMessage method.

In the synchronous receiver, the receive method can raise an exception if the JMS provider fails. Due to its asynchronous nature, this is not possible with message listeners. Therefore, it is possible to register an exception listener with the connection, which can pick up such exceptions.

Below is the full source for the AsyncReceiver class:

package pointToPoint;

import javax.naming.InitialContext;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import javax.jms.ExceptionListener;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

public class AsyncReceiver implements MessageListener, ExceptionListener

{

public static void main(String[] args) throws Exception

{

// get the initial context

InitialContext ctx = new InitialContext();

// lookup the queue object

Queue queue = (Queue) ctx.lookup("queue/queue0");

// lookup the queue connection factory

QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.

lookup("queue/connectionFactory");

// create a queue connection

QueueConnection queueConn = connFactory.createQueueConnection();

// create a queue session

QueueSession queueSession = queueConn.createQueueSession(false,

Session.AUTO_ACKNOWLEDGE);

// create a queue receiver

QueueReceiver queueReceiver = queueSession.createReceiver(queue);

// set an asynchronous message listener

AsyncReceiver asyncReceiver = new AsyncReceiver();

queueReceiver.setMessageListener(asyncReceiver);

// set an asynchronous exception listener on the connection

queueConn.setExceptionListener(asyncReceiver);

// start the connection

queueConn.start();

// wait for messages

System.out.print("waiting for messages");

for (int i = 0; i < 10; i++) {

Thread.sleep(1000);

System.out.print(".");

}

System.out.println();

// close the queue connection

queueConn.close();

}

/**
This method is called asynchronously by JMS when a message arrives
at the queue. Client applications must not throw any exceptions in
the onMessage method.
@param message A JMS message.
*/

public void onMessage(Message message)

{

TextMessage msg = (TextMessage) message;

try {

System.out.println("received: " + msg.getText());

} catch (JMSException ex) {

ex.printStackTrace();

}

}

/**
This method is called asynchronously by JMS when some error occurs.
When using an asynchronous message listener it is recommended to use
an exception listener also since JMS have no way to report errors
otherwise.
@param exception A JMS exception.
*/

public void onException(JMSException exception)

{

System.err.println("an error occurred: " + exception);

}

}

As documented in the source code, it is recommended to always set a connection exception listener when using asynchronous message listeners. This will allow you to detect any runtime problems, including a crash of the JMS server.

The JMSException API supports the getLinkedException method, which can be used to get the root cause of the exception (if any). As an example, if you raise a RuntimeException in the onMessage method, the linked exception will be this runtime exception when onException is called.

Queue Browser

A queue browser can be used to look at a queue without consuming any messages. The queue browser must perform the same initial steps as any other JMS client application, i.e. get a session object, which is a factory for QueueBrowser objects.

The QueueBrowser supports an iterator, which can be used to enumerate the messages on a queue. The following example shows how to count the number of messages on a queue. Note that acknowledge mode is not meaningful to a queue browser:

package pointToPoint;

import java.util.Enumeration;
import javax.naming.InitialContext;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;

public class Browser

{

public static void main(String[] args) throws Exception

{

// get the initial context

InitialContext ctx = new InitialContext();

// lookup the queue object

Queue queue = (Queue) ctx.lookup("queue/queue0");

// lookup the queue connection factory

QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.

lookup("queue/connectionFactory");

// create a queue connection

QueueConnection queueConn = connFactory.createQueueConnection();

// create a queue session

QueueSession queueSession = queueConn.createQueueSession(false,

Session.AUTO_ACKNOWLEDGE);

// create a queue browser

QueueBrowser queueBrowser = queueSession.createBrowser(queue);

// start the connection

queueConn.start();

// browse the messages

Enumeration e = queueBrowser.getEnumeration();

int numMsgs = 0;

// count number of messages

while (e.hasMoreElements()) {

Message message = (Message) e.nextElement();

numMsgs++;

}

System.out.println(queue + " has " + numMsgs + " messages");

// close the queue connection

queueConn.close();

}

}

The order of messages returned by the enumeration reflects the order of messages a regular message receiver would see. Note that a queue browser represents a static snapshop of the queue. If more messages are added to the queue while browsing, this will not be available to the queue browser.

No comments:

Post a Comment