博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ使用示例之Topic
阅读量:6583 次
发布时间:2019-06-24

本文共 9623 字,大约阅读时间需要 32 分钟。

 非持久的Topic消息示例

对于非持久的Topic消息的发送

基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创建队列替换成创建Topic,例如:

Destination destination = session.createTopic("MyTopic");

对于非持久的Topic消息的接收

1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
2:同样把创建Destination的地方,由创建队列替换成创建Topic,例如:

Destination destination = session.createTopic("MyTopic");

3:由于不知道客户端发送多少信息,因此改成while循环的方式了,例如:

Message message = consumer.receive();while(message!=null) {  TextMessage txtMsg = (TextMessage)message;  System.out.println("收到消 息:" + txtMsg.getText());  message = consumer.receive(1000L);}

消息的生产者:

public class NoPersistenceSender {    //默认连接用户名    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;    //默认连接密码    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;    //默认连接地址    private static final String BROKEURL = "tcp://192.168.0.129:61616";    //发送的消息数量    private static final int SENDNUM = 10;    public static void main(String[] args) {        //连接工厂        ConnectionFactory connectionFactory;        //连接        Connection connection = null;        //会话 接受或者发送消息的线程        Session session;        //消息的目的地        Destination destination;        //消息生产者        MessageProducer messageProducer;        //实例化连接工厂(连接到ActiveMQ服务器)        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME,                NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);        try {            //通过连接工厂获取连接            connection = connectionFactory.createConnection();            //启动连接            connection.start();            //创建session            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)            destination = session.createTopic("MyTopic");            //创建消息生产者            messageProducer = session.createProducer(destination);            //发送消息            sendMessage(session, messageProducer);            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            if (connection != null) {                try {                    connection.close();                } catch (JMSException e) {                    e.printStackTrace();                }            }        }    }    /**     * 发送消息     *     * @param session     * @param messageProducer 消息生产者     * @throws Exception     */    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {        for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {            //创建一条文本消息            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);            System.out.println("发送消息:Activemq 发送消息" + i);            //通过消息生产者发出消息            messageProducer.send(message);        }    }}

消息的消费者:

public class NoPersistenceReceiver {    //默认连接用户名    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;    //默认连接密码    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;    //默认连接地址    private static final String BROKEURL = "tcp://192.168.0.129:61616";    public static void main(String[] args) {        ConnectionFactory connectionFactory;//连接工厂        Connection connection = null;//连接        Session session;//会话 接受或者发送消息的线程        Destination destination;//消息的目的地        MessageConsumer messageConsumer;//消息的消费者        //实例化连接工厂(连接到ActiveMQ服务器)        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME,                NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);        try {            //通过连接工厂获取连接            connection = connectionFactory.createConnection();            //启动连接            connection.start();            //创建session            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取            destination = session.createTopic("MyTopic");            //创建消息消费者            messageConsumer = session.createConsumer(destination);            //Message message = messageConsumer.receive();            //while (message != null) {                //TextMessage txtMsg = (TextMessage) message;                //System.out.println("收到消息:" + txtMsg.getText());                //message = messageConsumer.receive(1000L);                //session.commit();            //}            Message message = messageConsumer.receive();            while (message != null) {                TextMessage txtMsg = (TextMessage) message;                System.out.println("收到消 息:" + txtMsg.getText());                //没这句有错                message = messageConsumer.receive(1000L);            }            session.commit();            session.close();            connection.close();        } catch (JMSException e) {            e.printStackTrace();        }    }}

首先运行运行生产者(消费者处于费运行状态),然后运行消费者:

 此时再次运行一下生产者(消费者处于开启状态)

 结论:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息

 持久的Topic消息示例

 对于持久的Topic消息的发送

1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定

2:一定要设置完成后,再start 这个 connection 

对于持久的Topic消息的接收

1:需要在连接上设置消费者id,用来识别消费者

2:需要创建TopicSubscriber来订阅
3:要设置好了过后再start 这个 connection
4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,
无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来

生产者:

public class PersistenceSender {    //默认连接用户名    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;    //默认连接密码    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;    //默认连接地址    private static final String BROKEURL = "tcp://192.168.0.129:61616";    //发送的消息数量    private static final int SENDNUM = 10;    public static void main(String[] args) {        //连接工厂        ConnectionFactory connectionFactory;        //连接        Connection connection = null;        //会话 接受或者发送消息的线程        Session session;        //消息的目的地        Destination destination;        //消息生产者        MessageProducer messageProducer;        //实例化连接工厂(连接到ActiveMQ服务器)        connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME,                PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);        try {            //通过连接工厂获取连接            connection = connectionFactory.createConnection();            //创建session            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)            destination = session.createTopic("MyTopic1");            //创建消息生产者            messageProducer = session.createProducer(destination);            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);            //启动连接            connection.start();            //发送消息            sendMessage(session, messageProducer);            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            if (connection != null) {                try {                    connection.close();                } catch (JMSException e) {                    e.printStackTrace();                }            }        }    }    /**     * 发送消息     *     * @param session     * @param messageProducer 消息生产者     * @throws Exception     */    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {        for (int i = 0; i < PersistenceSender.SENDNUM; i++) {            //创建一条文本消息            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);            System.out.println("发送消息:Activemq 发送消息" + i);            //通过消息生产者发出消息            messageProducer.send(message);        }    }}

消费者:

public class PersistenceReceiver {    //默认连接用户名    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;    //默认连接密码    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;    //默认连接地址    private static final String BROKEURL = "tcp://192.168.0.129:61616";    public static void main(String[] args) {        ConnectionFactory connectionFactory;//连接工厂        Connection connection = null;//连接        Session session;//会话 接受或者发送消息的线程        Topic topic;//消息的目的地        //实例化连接工厂(连接到ActiveMQ服务器)        connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME,                PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);        try {            //通过连接工厂获取连接            connection = connectionFactory.createConnection();            connection.setClientID("winner_0715");            //创建session            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);            //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取            topic = session.createTopic("MyTopic1");            //创建消息消费者            TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");            //启动连接            connection.start();            Message message = consumer.receive();            while (message != null) {                TextMessage txtMsg = (TextMessage) message;                System.out.println("收到消 息:" + txtMsg.getText());                //没这句有错                message = consumer.receive(1000L);            }            session.commit();            session.close();            connection.close();        } catch (JMSException e) {            e.printStackTrace();        }    }}

消费者需要先运行一次,注册~

 

因为是持久消息,所以还会有别的订阅者,所以是0

关于持久化和非持久化消息

持久化消息

这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消
息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

非持久化消息

保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式

转载地址:http://wixno.baihongyu.com/

你可能感兴趣的文章
20050616:今天跟老外第一次讲E文,我说:
查看>>
实验三—单臂路由
查看>>
Python之定义函数
查看>>
Linux下编译安装PHP7.2
查看>>
使用vue如何默认选中单选框
查看>>
pageContext.request.contextPath} JSP取得绝对路径
查看>>
毕业设计进度日志02
查看>>
linux配置java环境变量
查看>>
cocos2d-x遍历zip某个目录(可用于android中apk包的文件访问)
查看>>
IO中同步与异步,阻塞与非阻塞区别(转)
查看>>
数据库升级层
查看>>
css之postion定位
查看>>
补第一阶段冲刺站立会议2(应发表日期5月14日)
查看>>
jquery常见问题
查看>>
自定义事件
查看>>
Spring中四种实例化bean的方式
查看>>
Beautiful Soup学习
查看>>
MySQL函数不能创建的解决方法
查看>>
字王·百字工程·正式启动
查看>>
POJ2348 Euclid's Game
查看>>