一、下载与安裝
立即去官方网站(
http://activemq.apache.org/)下载最新版就可以,因为这也是免安装的,只必须缓解压力就可以了。安装完以后进入bin文件目录,双击鼠标 activemq.bat文件(linux下到bin文件目录下实行 activemq start)
二、浏览控制面板
在网页键入:http://ip:8161/admin/,发生如下所示页面表明运行取得成功,默认设置的用户名密码全是admin
三、修改端口号
61616为对外开放服务项目端口号
8161为控制板端口号
当端口号矛盾时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里边的61616端口号。修改jetty.xml,修改里边的8161端口号。
queue队列方式:
和rabbitmq简易队列方式一样,倘若有好几个顾客交易同一个队列中的信息得话,默认设置也是轮询体制的交易
实例编码:
public class Productor {
public static final String BORKER_URL = \"tcp://127.0.0.1:61616\";
public static final String QUEUE_NAME = \"queue1\";
public static void main(String[] args) throws JMSException {
//建立加工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
//建立tcp连接
Connection connection = factory.createConnection();
//创建联接
connection.start();
/**
* 建立对话,1.是不是打开事务管理,2.查收方式
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立队列(信息的终点)
Queue queue = session.createQueue(QUEUE_NAME);
//建立经营者
MessageProducer producer = session.createProducer(queue);
//信息非分布式锁
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//信息分布式锁 默认设置是分布式锁的
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//建立信息
TextMessage message = session.createTextMessage(\"你现在还好吗\");
//推送信息
producer.send(message);
producer.close();
session.close();
connection.close();
System.out.println(\"发送成功!\");
}
}
public class Consumer {
public static final String BORKER_URL = \"tcp://127.0.0.1:61616\";
public static final String QUEUE_NAME = \"queue1\";
public static void main(String[] args) throws JMSException {
//建立加工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
//建立tcp连接
Connection connection = factory.createConnection();
//创建联接
connection.start();
/**
* 建立对话,1.是不是打开事务管理,2.查收方式
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立/申明队列(信息的终点)
Queue queue = session.createQueue(QUEUE_NAME);
//建立顾客
MessageConsumer consumer = session.createConsumer(queue);
/*while (true) {
//receive会堵塞进程
TextMessage message = (TextMessage)consumer.receive();
System.out.println(\"接受到信息:\" message.getText());
}*/
//监视的方法交易
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(\"1号接受到信息:\" textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
topic队列方式:
称之为公布定阅方式,经营者把信息发给定阅给某一topic主题风格的顾客,是派发的方式,这类方式默认设置必须先运行顾客,要不然即使经营者公布了某一topic主题风格的信息,顾客也交易不了;除非是顾客提早定阅,而且进行了信息分布式锁的解决,那样后运行顾客才可以交易提早消息推送的信息。
编码:
public class Productor {
public static final String BORKER_URL = \"tcp://127.0.0.1:61616\";
public static final String TOPIC_NAME = \"topic1\";
public static void main(String[] args) throws JMSException {
//建立加工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
//多线程递送
factory.setUseAsyncSend(true);
//建立tcp连接
Connection connection = factory.createConnection();
/**
* 创建对话,1.是不是打开事务管理,2.查收方式
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立/申明topic(信息的终点)
Topic topic = session.createTopic(TOPIC_NAME);
//建立经营者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);
//分布式锁
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//创建联接
connection.start();
//建立信息
TextMessage message = session.createTextMessage(\"你现在还好吗\");
//推送信息,多线程推送调用函数
producer.send(message, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(\"success\");
}
@Override
public void onException(JMSException e) {
System.out.println(\"fail\");
}
});
producer.close();
session.close();
connection.close();
System.out.println(\"发送成功!\");
}
}
public class Consumer1 {
public static final String BORKER_URL = \"tcp://127.0.0.1:61616\";
public static final String TOPIC_NAME = \"topic1\";
public static void main(String[] args) throws JMSException {
//创建工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
//创建tcp连接
Connection connection = factory.createConnection();
//制订clientId
connection.setClientID(\"my\");
/**
* 创建对话,1.是不是打开事务管理,2.查收方式
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建/申明topic(消息的终点)
Topic topic = session.createTopic(TOPIC_NAME);
//定阅主题
TopicSubscriber subscriber = session.createDurableSubscriber(topic, \"remark\");
//创建联接
connection.start();
while (true) {
//receive会阻塞进程
//接收定阅的消息
TextMessage message = (TextMessage) subscriber.receive();
System.out.println(\"接收到消息:\" message.getText());
}
/*//创建顾客
MessageConsumer consumer = session.createConsumer(topic);
//创建联接
connection.start();
*//*while (true) {
//receive会阻塞进程
TextMessage message = (TextMessage)consumer.receive();
System.out.println(\"接收到消息:\" message.getText());
}*//*
//监视的方法交易
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(\"1号接收到消息:\" textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});*/
}
}
怎样确保消息的稳定性
回应这个问题关键从分布式锁,事务管理,查收这一些层面下手
消息分布式锁的关键编码:
//queue方式的消息分布式锁 默认设置是分布式锁的
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
/**
* topic方式的分布式锁
*/
Topic topic = session.createTopic(TOPIC_NAME);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
事务管理的关键编码(偏经营者):
//基本参数成true
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//事务管理递交
session.commit();
查收的关键编码(偏顾客):
//基本参数成手动式递交
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//消息查收
message.acknowledge();
留意:倘若既打开事务管理,又打开手动式查收,以事务管理为标准,只需事务管理被递交了也默认设置消息被查收了
性能增加:
1.运用nio的协议书比tcp的特性高,
- 配备方法:在conf文件目录下activemq.xml对着下边配备
<broker>
...
<transportConnectors>
<transportConnector name=\"nio\" uri=\"nio://0.0.0.0:61616\"/>
</<transportConnectors>
...
</broker>
- 第二步是编码浏览方法由tcp改成nio
//创建工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(\"nio://127.0.0.1:61616\");
2.jdbc Journaling提升仅有jdbc分布式锁的特性,它在做分布式锁入数据库查询以前,会先将信息存放到Journaling文件中,以后才渐渐地同歩到数据库系统中,相当于正中间加了一层缓存层。
- 把数据库查询mysql的驱动包放进lib文件目录下
- 配备方法:在conf文件目录下activemq.xml对着下边配备,在其中有一个createTablesOnStartup特性,初始值是true,表明每一次运行后去数据库查询全自动建表
<persistenceAdapter>
<kahaDB directory=\"${activemq.data}/kahadb\"/>
</persistenceAdapter>
//上边是默认设置配备寻找改为下边的配备
<persistenceAdapter>
<journalPersistenceAdapterFactory journalLogFiles=\"5\" dataDirectory=\"${basedir}/activemq-data\" dataSource=\"#mysql-ds\"/>
</persistenceAdapter>
//下边的编码写在<beans>连接点中
<bean id=\"mysql-ds\" class=\"org.apache.commons.dbcp.BasicDataSource\" destroy-method=\"close\">
<property name=\"driverClassName\" value=\"com.mysql.jdbc.Driver\"/>
<property name=\"url\" value=\"jdbc:mysql://localhost/activemq?relaxAutoCommit=true\"/>
<property name=\"username\" value=\"activemq\"/>
<property name=\"password\" value=\"activemq\"/>
<property name=\"poolPreparedStatements\" value=\"true\"/>
</bean>
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。