文章14 | 阅读 5007 | 点赞0
1、将连接Mysql数据库驱动包,放到ActiveMQ的lib目录下
2,修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式
2.1 修改原来的kshadb的持久化数据的方式
2.2 连接Mysql的配置
3、将数据持久化Mysql的运行截图
3.1 重新启动ActiveMQ,并运行程序,放入持久化数据,查看Mysql的active数据库
4,数据持久化代码
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) throws Exception {
/*ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");*/
/**
* activemq.xml 配置密码
*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"bhz",
"bhz",
"tcp://localhost:61616");
//连接
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建session
// Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//开启事物
// Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//开启事物 并且使用Client的方式
/**
* 3、通过Connection对象创建Session会话(上下文环境对象),
参数一,表示是否开启事务
参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收
第一个参数设置为true,表示开启事务
开启事务后,记得要手动提交事务
*/
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
// 4、通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象。
// 在PTP模式中,Destination指的是Queue
// 在发布订阅模式中,Destination指的是Topic
//消息的目的地
Destination destination = session.createQueue("queue1");
//创建消息生产者
// 5、使用Session来创建消息对象的生产者或者消费者
MessageProducer messageProducer = session.createProducer(destination);
//PERSISTENT 用来指定JMS Provider对消息进行持久化操作,以免Provider fail的时候,丢失Message
//NON_Persistent 方式下的JMS Provider不会对消进行持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//发送消息
// sendMessage(session, messageProducer);
for (int j = 0; j < 10; j++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我的消息内容,id为"+j);
System.out.println("生产者: "+textMessage.getText());
messageProducer.send(destination,textMessage,DeliveryMode.PERSISTENT,j,60*1000);
// System.out.println("生产者: "+textMessage.getText());
}
//使用事物 Boolean.TRUE
// session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer 消息生产者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < Sender.SENDNUM; i++) {
//创建一条文本消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我的消息内容,id为"+i);
messageProducer.send(textMessage);
System.out.println("生产者: "+textMessage.getText());
// TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
// System.out.println("生产者发送消息:Activemq 发送消息" + i);
//通过消息生产者发出消息
// messageProducer.send(message);
}
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/ywl470812087/article/details/84703979
内容来源于网络,如有侵权,请联系作者删除!