DSP

消息中间件及小案例

2019-07-13 19:44发布

一:什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来
进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进
程间的通信。对于消息中间件,常见的角 {MOD}大致也就有 Producer(生产者)、Consumer(消
费者)
常见的消息中间件产品:
( (1 )ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完
全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。我们在本次课程中介绍 ActiveMQ 的使
用。
(2)RabbitMQ
AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯,
OpenStack 开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache 下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到 10W/s
的吞吐速率;完全的分布式系统。适合处理海量数据。
1.2 JMS 简介
1.2.1 什么是 JMS
JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便
于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的
接口简化企业应用的开发。
JMS 本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系
统。它类似于 JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关
系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多
厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA 的 Weblogic JMS service 和 Progress 的
北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090
SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或
路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对
象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息
主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一
些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage–一个字符串对象 · MapMessage–一套名称-值对 · ObjectMessage–一个序列化的
Java 对象 · BytesMessage–一个字节的数据流 · StreamMessage – Java 原始值的数据流
1.2.2 JMS 消息传递类型
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进
行接收。 1.3ActiveMQ
1.3.1 简介
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。
1.3.2 安装与启动
官方网站下载:http://activemq.apache.org/ 资源包中也提供软件包
品优购资源配套软件activeMQapache-activemq-5.14.4-bin.zip
解压,运行 binwin64 下的 activemq.bat ,打开浏览器输入地址
http://localhost:8161/ 即可进入 ActiveMQ 管理页面
点击进入管理页面
输入用户名和密码 均为 admin 进入主界面
点对点消息列表:
列表各列信息含义如下:
Number Of Pending Messages : 等待消费的消息 这个是当前未出队列的数量。
Number Of Consumers : 消费者 这个是消费者端的消费者数量
Messages Enqueued : 进入队列的消息 进入队列的总数量,包括出队列的。
Messages Dequeued : 出了队列的消息 可以理解为是消费这消费掉的数量。
1.4 点对点模式
1.4.1 消息生产者
(1)创建工程 springjms_producer,在 POM 文件中引入 SpringJms 、activeMQ 以及单元测
试相关依赖
(2)在 src/main/resources 下创建 spring 配置文件 applicationContext-jms-producer.xml <context:component-scan base-package="cn.itcast.demo">context:component-scan> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616" /> bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> bean> <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue_text"/> bean> (3)在 cn.itcast.demo 包下创建消息生产者类 @Component public class QueueProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueTextDestination; public void sendTextMessage(String text){ jmsTemplate.convertAndSend( queueTextDestination,text); } } (4)单元测试
在 src/test/java 创建测试类 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml") public class TestJms { @Autowired 北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090 private QueueProducer queueProducer; @Test public void testSendQueue(){ queueProducer.sendTextMessage("SpringJms-点对点"); } } 1.4.2 消息消费者
(1)创建工程 springjms_consumer,在 POM 文件中引入依赖 (同上一个工程)
(2)创建配置文件 applicationContext-jms-consumer-queue.xml <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> bean> <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue_text"/> bean> <bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener">bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueTextDestination" /> <property name="messageListener" ref="myMessageListener" /> bean> (3)编写监听类 public class MyMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } (4)创建测试类 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xm l") public class TestQueue { @Test public void testQueue(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } } 1.5 发布/ 订阅模式
1.5.1 消息生产者
(1)在工程 springjms_producer 的 applicationContext-jms-producer.xml 增加配置 "topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> value="topic_text"/> 2)创建生产者类 @Component public class TopicProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination topicTextDestination; public void sendTextMessage(String text){ jmsTemplate.convertAndSend( topicTextDestination,text); } } (3)编写测试类 import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import cn.itcast.demo.TopicProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml ") public class TestTopic { @Autowired private TopicProducer topicProducer; @Test public void sendTextQueue(){ topicProducer.sendTextMessage(); } } 1.5.2 消息消费者
(1)在 springjms_consumer 工程中创建配置文件 applicationContext-jms-consumer-topic.xml <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> bean> <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic_text"/> bean> <bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener">bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicTextDestination" /> <property name="messageListener" ref="myMessageListener" /> bean> (2)编写测试类 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xm l") public class TestTopic { @Test public void testTopic(){ try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } } 测试:同时运行三个消费者工程,在运行生产者工程,查看三个消费者工程的控制台输出。