Chat Application using Topic
To illustrate how JMS works, we will use the JMS pub/sub API to build a simple chat application. The requirements of Internet chat map neatly onto the publish-and-subscribe messaging model. In this model, a producer can send a message to many consumers by delivering the message to a single topic. A message producer is also called a publisher and a message consumer is also called a subscriber. In reality, using JMS for a chat application would be overkill, since chat systems don't require enterprise quality service.
The following source code is a JMS-based chat client. Every participant in a chat session uses this Chat program to join a specific chat room (topic), and deliver and receive messages to and from that room:
package chap2.chat;
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.io.InputStreamReader;
import java.util.Properties;
public class Chat implements javax.jms.MessageListener{
private TopicSession pubSession;
private TopicSession subSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String username;
/* Constructor. Establish JMS publisher and subscriber */
public Chat(String topicName, String username, String password)
throws Exception {
// Obtain a JNDI connection
Properties env = new Properties( );
// ... specify the JNDI properties specific to the vendor
InitialContext jndi = new InitialContext(env);
// Look up a JMS connection factory
TopicConnectionFactory conFactory =
(TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
// Create a JMS connection
TopicConnection connection =
conFactory.createTopicConnection(username,password);
// Create two JMS session objects
TopicSession pubSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher =
pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber =
subSession.createSubscriber(chatTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application
set(connection, pubSession, subSession, publisher, username);
// Start the JMS connection; allows messages to be delivered
connection.start( );
}
/* Initialize the instance variables */
public void set(TopicConnection con, TopicSession pubSess,
TopicSession subSess, TopicPublisher pub,
String username) {
this.connection = con;
this.pubSession = pubSess;
this.subSession = subSess;
this.publisher = pub;
this.username = username;
}
/* Receive message from topic subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText( );
System.out.println(text);
} catch (JMSException jmse){ jmse.printStackTrace( ); }
}
/* Create and send message using topic publisher */
protected void writeMessage(String text) throws JMSException {
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
/* Close the JMS connection */
public void close( ) throws JMSException {
connection.close( );
}
/* Run the Chat client */
public static void main(String [] args){
try{
if (args.length!=3)
System.out.println("Topic or username missing");
// args[0]=topicName; args[1]=username; args[2]=password
Chat chat = new Chat(args[0],args[1],args[2]);
// Read from command line
BufferedReader commandLine = new
java.io.BufferedReader(new InputStreamReader(System.in));
// Loop until the word "exit" is typed
while(true){
String s = commandLine.readLine( );
if (s.equalsIgnoreCase("exit")){
chat.close( ); // close down connection
System.exit(0);// exit program
} else
chat.writeMessage(s);
}
} catch (Exception e){ e.printStackTrace( ); }
}
}
Getting Started with the Chat Example
To put this client to use, compile it like any other Java program. Then start your JMS server, setting up whatever topics, usernames, and passwords you want. Configuration of a JMS server is vendor-dependent, and won't be discussed here.
The Chat class includes a main( ) method so that it can be run as a standalone Java application. It's executed from the command line as follows:
java chap2.chat.Chat topic username password
The topic is the destination that we want to publish-and-subscribe to; username and password make up the authentication information for the client. Run at least two chat clients in separate command windows and try typing into one; you should see the text you type displayed by the other client.
Before examining the source code in detail, a quick explanation will be helpful. The chat client creates a JMS publisher and subscriber for a specific topic. The topic represents the chat room. The JMS server registers all the JMS clients that want to publish or subscribe to a specific topic. When text is entered at the command line of one of the chat clients, it is published to the messaging server. The messaging server identifies the topic associated with the publisher and delivers the message to all the JMS clients that have subscribed to that topic. As Figure 2-1 illustrates, messages published by any one of the JMS clients are delivered to all the JMS subscribers for that topic.
Figure 2-1. The Chat application.
Examining the Source Code
Running the Chat example in a couple of command windows demonstrates what the Chat application does. The rest of this chapter examines the source code for the Chat application so that you can see how the Chat application works.
Bootstrapping the JMS client
The main( ) method bootstraps the chat client and provides a command-line interface. Once an instance of the Chat class is created, the main( ) method spends the rest of its time reading text typed at the command line and passing it to the Chat instance using the instance's writeMessage( ) method.
The Chat instance connects to the topic and receives and delivers messages. The Chat instance starts its life in the constructor, which does all the work to connect to the topic and set up the TopicPublisher and TopicSubscribers for delivering and receiving messages.
The constructor of the Chat class starts by obtaining a connection to the JNDI naming service used by the JMS server:
// Obtain a JNDI connection
Properties env = new Properties( );
// ... specify the JNDI properties specific to the vendor
InitialContext jndi = new InitialContext(env);
Creating a connection to a JNDI naming service requires that a javax.naming.InitialContext object be created. An InitialContext is the starting point for any JNDI lookup -- it's similar in concept to the root of a filesystem. The InitialContext provides a network connection to the directory service that acts as a root for accessing JMS administered objects. The properties used to create an InitialContext depend on which JMS directory service you are using. The code used to create a JNDI InitialContext in BEA's Weblogic naming service, for example, would look something like this:
Properties env = new Properties( );
env.put(Context.SECURITY_PRINCIPAL, "guest");
env.put(Context.SECURITY_CREDENTIALS, "guest");
env.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
env.put(Context.PROVIDER_URL,
"t3://localhost:7001");
InitialContext jndi = new InitialContext(env);
When SonicMQ is used in combination with a third party LDAP directory service, the connection properties would be very different. For example, the following shows how a SonicMQ JMS client would use JNDI to access JMS administered objects stored in a LDAP directory server:
Properties env = new Properties( );
env.put(Context.SECURITY_PRINCIPAL, "guest");
env.put(Context.SECURITY_CREDENTIALS, "guest");
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.ldap.LdapCtxFactory");
env.put(Context.PROVIDER_URL,
"ldap://localhost:389/o=acme.com");
InitialContext jndi = new InitialContext(env);
NOTE: Alternatively, the InitialContext( ) can be created without properties (no-arg constructor). In this case JNDI will read the vendor-specific JNDI properties from a special file in the classpath named jndi.properties. This eliminates provider-specific code in JMS clients, making them more portable.
The TopicConnectionFactory
Once a JNDI InitialContext object is instantiated, it can be used to look up the TopicConnectionFactory in the messaging server's naming service:
TopicConnectionFactory conFactory = (TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
The javax.jms.TopicConnectionFactory is used to manufacture connections to a message server. A TopicConnectionFactory is a type of administered object, which means that its attributes and behavior are configured by the system administrator responsible for the messaging server. The TopicConnectionFactory is implemented differently by each vendor, so configuration options available to system administrators vary from product to product. A connection factory might, for example, be configured to manufacture connections that use a particular protocol, security scheme, clustering strategy, etc. A system administrator might choose to deploy several different TopicConnectionFactory objects, each configured with its own JNDI lookup name.
The TopicConnectionFactory provides two overloaded versions of the createTopicConnection( ) method:
package javax.jms;
public interface TopicConnectionFactory extends ConnectionFactory {
public TopicConnection createTopicConnection( )
throws JMSException, JMSSecurityException;
public TopicConnection createTopicConnection(String username,
String password) throws JMSException, JMSSecurityException;
}
These methods are used to create TopicConnection objects. The behavior of the no-arg method depends on the JMS provider. Some JMS providers will assume that the JMS client is connecting under anonymous security context, while other providers may assume that the credentials can be obtained from JNDI or the current thread. The second method provides the client with a username-password authentication credential, which can be used to authenticate the connection. In our code, we choose to authenticate the connection explicitly with a username and password.
The TopicConnection
The TopicConnection is created by the TopicConnectionFactory:
// Look up a JMS connection factory
TopicConnectionFactory conFactory = (TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
// Create a JMS connection
TopicConnection connection = conFactory.createTopicConnection(username, password);
The TopicConnection represents a connection to the message server. Each TopicConnection that is created from a TopicConnectionFactory is a unique connection to the server. A JMS client might choose to create multiple connections from the same connection factory, but this is rare as connections are relatively expensive (each connection requires a network socket, I/O streams, memory, etc.). Creating multiple Session objects (discussed later in this chapter) from the same connection is considered more efficient, because sessions share access to the same connection. The TopicConnection is an interface that extends javax.jms.Connection interface. It defines several general-purpose methods used by clients of the TopicConnection. Among these methods are the start( ), stop( ), and close( ) methods:
// javax.jms.Connection the super interface
public interface Connection {
public void start( ) throws JMSException;
public void stop( ) throws JMSException;
public void close( ) throws JMSException;
...
}
// javax.jms.TopicConnection extends javax.jms.Connection
public interface TopicConnection extends Connection {
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException;
...
}
The start( ), stop( ), and close( ) methods allow a client to manage the connection directly. The start( ) method turns the inbound flow of messages "on," allowing messages to be received by the client. This method is used at the end of the constructor in Chat class:
...
// Intialize the Chat application
set(connection, pubSession, subSession, publisher, username);
connection.start( );
}
It is a good idea to start the connection after the subscribers have been set up, because the messages start to flow in from the topic as soon as start( ) is invoked.
The stop( ) method blocks the flow of inbound messages until the start( ) method is invoked again. The close( ) method is used to close the TopicConnection to the message server. This should be done when a client is finished using the TopicConnection; closing the connection conserves resources on the client and server. In the Chat class, the main( ) method calls Chat.close( ) when "exit" is typed at the command line. The Chat.close( ) method in turn calls the TopicConnection.close( ) method:
public void close( ) throws JMSException {
connection.close( );
}
Closing a TopicConnection closes all the objects associated with the connection including the TopicSession, TopicPublisher, and TopicSubscriber.
The TopicSession
After the TopicConnection is obtained, it's used to create TopicSession objects:
// Create a JMS connection
TopicConnection connection = conFactory.createTopicConnection(username,password);
// Create two JMS session objects
TopicSession pubSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
TopicSession subSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
A TopicSession object is a factory for creating Message, TopicPublisher, and TopicSubscriber objects. A client can create multiple TopicSession objects to provide more granular control over publishers, subscribers, and their associated transactions. In this case we create two TopicSession objects, pubSession and subSession. We need two objects because of threading restrictions in JMS, which are discussed in the "Sessions and Threading" section later in the chapter.
The boolean parameter in the createTopicSession( ) method indicates whether the Session object will be transacted. A transacted Session automatically manages outgoing and incoming messages within a transaction. Transactions are important but not critical to our discussion at this time, so the parameter is set to false, which means the TopicSession will not be transacted.
The second parameter indicates the acknowledgment mode used by the JMS client. An acknowledgment is a notification to the message server that the client has received the message. In this case we chose AUTO_ACKNOWLEDGE, which means that the message is automatically acknowledged after it is received by the client.
The TopicSession objects are used to create the TopicPublisher and TopicSubscriber. The TopicPublisher and TopicSubscriber objects are created with a Topic identifier and are dedicated to the TopicSession that created them; they operate under the control of a specific TopicSession:
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
The TopicSession is also used to create the Message objects that are delivered to the topic. The pubSession is used to create Message objects in the writeMessage( ) method. When you type text at the command line, the main( ) method reads the text and passes it to the Chat instance by invoking writeMessage( ). The writeMessage( ) method (shown in the following example) uses the pubSession object to generate a TextMessage object that can be used to deliver the text to the topic:
protected void writeMessage(String text) throws JMSException{
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
Several Message types can be created by a TopicSession. The most commonly used type is the TextMessage.
The Topic
JNDI is used to locate a Topic object, which is an administered object like the TopicConnectionFactory :
InitialContext jndi = new InitialContext(env);
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
A Topic object is a handle or identifier for an actual topic, called a physical topic, on the messaging server. A physical topic is an electronic channel to which many clients can subscribe and publish. A topic is analogous to a news group or list server: when a message is sent to a news group or list server, it is delivered to all the subscribers. Similarly, when a JMS client delivers a Message object to a topic, all the clients subscribed to that topic receive the Message.
The Topic object encapsulates a vendor-specific name for identifying a physical topic in the messaging server. The Topic object has one method, getName( ), which returns the name identifier for the physical topic it represents. The name encapsulated by a Topic object is vendor-specific and varies from product to product. For example, one vendor might use dot (".") separated topic names, like "oreilly.jms.chat", while another vendor might use a completely different naming system, similar to LDAP naming, "o=oreilly,cn=chat". Using topic names directly will result in client applications that are not portable across brands of JMS servers. The Topic object hides the topic name from the client, making the client more portable.
As a convention, we'll refer to a physical topic as a topic and only use the term "physical topic" when it's important to stress its difference from a Topic object.
The TopicPublisher
A TopicPublisher was created using the pubSession and the chatTopic:
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
A TopicPublisher is used to deliver messages to a specific topic on a message server. The Topic object used in the createPublisher( ) method identifies the topic that will receive messages from the TopicPublisher. In the Chat example, any text typed on the command line is passed to the Chat class's writeMessage( ) method. This method uses the TopicPublisher to deliver a message to the topic:
protected void writeMessage(String text) throws JMSException{
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
The TopicPublisher objects deliver messages to the topic asynchronously. Asynchronous delivery and consumption of messages is a key characteristic of Message-Oriented Middleware; the TopicPublisher doesn't block or wait until all the subscribers receive the message. Instead, it returns from the publish( ) method as soon as the message server receives the message. It's up to the message server to deliver the message to all the subscribers for that topic.
The TopicSubscriber
The TopicSubscriber is created using the subSession and the chatTopic:
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
A TopicSubscriber receives messages from a specific topic. The Topic object argument used in the createSubscriber( ) method identifies the topic from which the TopicSubscriber will receive messages.
The TopicSubscriber receives messages from the message server one at a time (serially). These messages are pushed from the message server to the TopicSubscriber asynchronously, which means that the TopicSubscriber does not have to poll the message server for messages. In our example, each chat client will receive any message published by any of the other chat clients. When a user enters text at the command line, the text message is delivered to all other chat clients that subscribe to the same topic.
The pub/sub messaging model in JMS includes an in-process Java event model for handling incoming messages. This is similar to the event-driven model used by Java beans. An object simply implements the listener interface, in this case the MessageListener, and then is registered with the TopicSubscriber. A TopicSubscriber may have only one MessageListener object. Here is the definition of the MessageListener interface used in JMS:
package javax.jms;
public interface MessageListener {
public void onMessage(Message message);
}
When the TopicSubscriber receives a message from its topic, it invokes the onMessage( ) method of its MessageListener objects. The Chat class itself implements the MessageListener interface and implements the onMessage( ) method:
public class Chat implements{
javax.jms.MessageListener
public void onMessage(Message message){
try{
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText( );
System.out.println(text);
} catch (JMSException jmse){jmse.printStackTrace( );}
}
}
The Chat class is a MessageListener type, and therefore registers itself with the TopicSubscriber in its constructor:
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
subscriber.setMessageListener(this);
When the message server pushes a message to the TopicSubscriber, the TopicSubscriber invokes the Chat object's onMessage( ) method.
It's fairly easy to confuse the Java Message Service with its use of a Java event model. JMS is an API for asynchronous distributed enterprise messaging that spans processes and machines across a network. The Java event model is used to synchronously deliver events by invoking methods on one or more objects in the same process that have registered as listeners. The JMS pub/sub model uses the Java event model so that a TopicSubscriber can notify its MessageListener object in the same process that a message has arrived from the message server.
The Message
In the chat example, the TextMessage class is used to encapsulate the messages we send and receive. A TextMessage contains a java.lang.String as its body and is the most commonly used message type. The onMessage( ) method receives TextMessage objects from the TopicSubscriber. Likewise, the writeMessage( ) method creates and publishes TextMessage objects using the TopicPublisher:
public void onMessage(Message message){
try{
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText( );
System.out.println(text);
} catch (JMSException jmse){jmse.printStackTrace( );}
}
protected void writeMessage(String text) throws JMSException{
TextMessage message = pubSession.createTextMessage( );
message.setText(username+" : "+text);
publisher.publish(message);
}
A message basically has two parts: a header and payload. The header is comprised of special fields that are used to identify the message, declare attributes of the message, and provide information for routing. The difference between message types is determined largely by their payload, i.e., the type of application data the message contains. The Message class, which is the superclass of all message objects, has no payload. It is a lightweight message that delivers no payload but can serve as a simple event notification. The other message types have special payloads that determine their type and use:
Message
This type has no payload. It is useful for simple event notification.
TextMessage
This type carries a java.lang.String as its payload. It is useful for exchanging simple text messages and also for more complex character data, such as XML documents.
ObjectMessage
This type carries a serializable Java object as its payload. It's useful for exchanging Java objects.
BytesMessage
This type carries an array of primitive bytes as its payload. It's useful for exchanging data in an application's native format, which may not be compatible with other existing Message types. It is also useful where JMS is used purely as a transport between two systems, and the message payload is opaque to the JMS client.
StreamMessage
This type carries a stream of primitive Java types (int, double, char, etc.) as its payload. It provides a set of convenience methods for mapping a formatted stream of bytes to Java primitives. It's an easy programming model when exchanging primitive application data in a fixed order.
MapMessage
This type carries a set of name-value pairs as its payload. The payload is similar to a java.util.Properties object, except the values must be Java primitives or their wrappers. The MapMessage is useful for delivering keyed data.
Sessions and Threading
The Chat application uses a separate session for the publisher and subscriber, pubSession and subSession, respectively. This is due to a threading restriction imposed by JMS. According to the JMS specification, a session may not be operated on by more than one thread at a time. In our example, two threads of control are active: the default main thread of the Chat application and the thread that invokes the onMessage( ) handler. The thread that invokes the onMessage( ) handler is owned by the JMS provider. Since the invocation of the onMessage( ) handler is asynchronous, it could be called while the main thread is publishing a message in the writeMessage( ) method. If both the publisher and subscriber had been created by the same session, the two threads could operate on these methods at the same time; in effect, they could operate on the same TopicSession concurrently -- a condition that is prohibited.
A goal of the JMS specification was to avoid imposing an internal architecture on the JMS provider. Requiring a JMS provider's implementation of a Session object to be capable of safely handling multiple threads was specifically avoided. This is mostly due to one of the intended uses of JMS--that the JMS API be a wrapper around an existing messaging system, which may not have multithreaded delivery capabilities on the client.
The requirement imposed on the JMS provider is that the sending of messages and the asynchronous receiving of messages be processed serially. It is possible to publish-and-subscribe using the same session, but only if the application is publishing from within the onMessage( ) handler.
No comments:
Post a Comment