下面我们就接着来看openJms在发布/订阅模式上的表现,由于篇幅关系,在这里只讲述非持久订阅模式,持久订阅模式可以根据JMS的标准来试。
消息发布的代码如下:
package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class TopicPublish {
public static void main(String[] args) {
try {
//取得JNDI上下文和连接
Hashtable properties = new Hashtable();
properties.put(
Context.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
//openJms默认的端口是1099
properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
Context context = new InitialContext(properties);
//获得JMS Topic连接队列工厂
TopicConnectionFactory factory =
(TopicConnectionFactory) context.lookup(
"JmsTopicConnectionFactory");
//创建一个Topic连接,并启动
TopicConnection topicConnection = factory.createTopicConnection();
topicConnection.start();
//创建一个Topic会话,并设置自动应答
TopicSession topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
//lookup 得到 topic1
Topic topic = (Topic) context.lookup("topic1");
//用Topic会话生成Topic发布器
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
//发布消息到Topic
System.out.println("消息发布到Topic");
TextMessage message = topicSession.createTextMessage
("你好,欢迎定购Topic类消息");
topicPublisher.publish(message);
//资源清除,代码略 ... ...
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
而订阅消息的接收有同步的和异步2种,他们分别使用receive()和onMessage(Message message)方法来接收消息,具体代码:
同步接收:
package javayou.demo.openjms;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class TopicSubscribeSynchronous {
public static void main(String[] args) {
try {
System.out.println("定购消息接收启动:");
//取得JNDI上下文和连接
Hashtable properties = new Hashtable();
properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
Context context = new InitialContext(properties);
//获得Topic工厂和Connection
TopicConnectionFactory factory =
(TopicConnectionFactory) context.lookup(
"JmsTopicConnectionFactory");
TopicConnection topicConnection = factory.createTopicConnection();
topicConnection.start();