
Wildcard用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。
ActiveMQ支持以下三种wildcard:
示例,假设有两个Destination:PRICESTOCKNASDAQIBM 和 PRICESTOCKNYSESUNW。那么如下配置通配符:
路径符号替换,将“/"替换"",添加如下插件:
组合队列Composite Destinations 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个 *** 作中同时向多个queue/topic发送消息。 有两种实现方式:
在创建客户端连接的Destination时,多个destination之间采用","分隔。如下:
如果希望不同的Destination的话,需要加前缀“queue://”或者“topic://”。如下
客户端发送示例:
当客户端向“comQueue”发送消息时,消息就会同时发送到"queueFOO"和“topicFOO”中。
通过配置selecter来过滤掉不需要的消息,如下:
在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemqxml中配置:
在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收相应资源。
参数说明:
schedulePeriodForDestinationPurge :设置多长时间检查一次,单位是毫秒。
inactiveTimoutBeforeGC : 设置当Destination为空后,多长时间被删除,单位是毫秒。
gcInactiveDestinations :设置删除掉不活动的队列,默认为false。
Destination Options是一种无需扩展JMS API即可向JMS消费者提供扩展配置选项的方法。这些选项使用URL查询语法在创建消费者的目标名称中进行编码。
消费者选项如下:
虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination。
ActiveMQ支持2种方式:
ActiveMQ中,topic只有在持久订阅下才会持久化消息。JMS持久化订阅者 MessageConsumer 在创建时会生成一个 唯一的 JMS clientID 和一个持久化订阅者的name。 为了符合JMS,对于一个JMSclientID,同一时间,只能有一个活跃的JMS连接,并且,对于一个clientID和订阅者name而言,只有一个消费者可以是活跃的。也就是说,对于给定的topic订阅者来说,只能由一个活跃的进程进行消费。这意味着我们不能实现:
如今,JMS中的队列语义提供了通过多个消费者来实现可靠的负载均衡——允许多线程,进程和机器来处理消息。然后我们就有了成熟而复杂的负载均衡技术,就像Messge Groups一样在维护订单时进行负载均衡和并行化处理。如下图所示,全量的消息先发送到队列,然后再分发给消费者,通过队列来解决负载均衡和故障转移问题。
virtual topic背后的思想是生产者用正常的JMS方式把消息发送到一个topic。消费者可以继续使用JMS标准中的Topic语义。然而,如果 topic 是虚拟的, 订阅一个逻辑topic的消费者可以从一个真实队列中消费,允许多个消费者分布在多个机器上且多线程的进行负载均衡。 例如,假定我们现在有一个叫做 VirtualTopicOrders 的topic。(前缀 VirtualTopic 表明这是一个 virtual topic)。
我们希望将orders 发送到系统A 和 系统B。于是,用通常的 durable topics ,我们需要为 clientID_A 和 "A" 创建一个JMS消费者,同时为 clientID_B 和 "B"创建一个JMS消费者。 有了virtual topics 我们可以直接从队列 ConsumerAVirtualTopicOrders 中为系统A 消费,或者从队列 ConsumerBVirtualTopicOrders 中为系统B 消费。
默认前缀是: VirtualTopic>
自定义消费虚拟地址默认格式: Consumer VirtualTopic>
下面的示例演示如何使所有主题成为虚拟主题。下面的示例使用名称“>“”表示“匹配所有主题”,“VirtualTopicConsumers”为消费队列的前缀。
其他的配置参数:
通常情况下,使用网络连接消费者队列。在这种情况下,不要在虚拟主题上桥接任何普通主题消费者,因为任何转发的消息都会再次分散到网络代理上的消费者队列,从而导致消息重复。下面示例展示了如何排除虚拟队列:
ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。 使用ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerService的“useMirroredQueues“属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是“VirtualTopicMirror”。如下订阅“qmirror”的主题就开启了mirrored queue。
>
activeMq有两种模式,说下个人理解
activemq 8186 管理端口 61616 默认服务端口
queue队列模式
topic 主题模式
队列模式: 生产者生成消息存入队列,消费者通过监听队列的queue消费者负责消费,且每个消息只能消费一次
应用场景: 登陆成功,记录日志/记录状态/记录ip 等的一些列 *** 作,可以异步执行,相对于来说不须要同步的 *** 作,可以保证 *** 作要求的请求不丢失
主题模式: 生产者生成消息,发布消息,订阅之后的消费者都可以读取到发布的消息,并且所有消息均可被多个消费者都消费一次
应用场景: 商城下单成功后,发送一条成功的消息,被日志系统/库存系统/物流系统 分别读取到,并做相应 *** 作
springboot+activemq整合
1 引入pom依赖
<groupId>orgspringframeworkboot
<artifactId>spring-boot-starter-activemq
<version>1520RELEASE
<groupId>orgmessaginghub
<artifactId>pooled-jms
<version>104
</dependency>
2 分别编写 生产者 消费者 配置类
@Component
public class ActiveSend {
@Autowired
private Queuequeue;
@Autowired
private Topictopic;
@Autowired
private JmsMessagingTemplatejmsMessagingTemplate;
public void sendQueue()throws JMSException {
ObjectMessage objectMessage =new ActiveMQObjectMessage();
objectMessagesetObject("nihao");
jmsMessagingTemplateconvertAndSend(queue,objectMessage);
}
public void sendTopic()throws JMSException {
ObjectMessage objectMessage =new ActiveMQObjectMessage();
objectMessagesetObject("nihao");
jmsMessagingTemplateconvertAndSend(topic,objectMessage);
}
}
@Component
public class ActiveListenner {
@JmsListener(destination ="active-queue")
public void reciveQueueMsg(ObjectMessage message)throws JMSException {
Systemoutprintln("收到的消息"+messagegetObject()toString());
}
@JmsListener(destination ="active-topic")
public void reciveTopicMsg(ObjectMessage message)throws JMSException {
Systemoutprintln("主题收到的消息"+messagegetObject()toString());
}
}
@Configuration
public class ActiveMqConfig {
@Bean
public Queuequeue (){
return new ActiveMQQueue("active-queue");
}
@Bean
public Topictopic (){
return new ActiveMQTopic("active-topic");
}
}
做一下说明,队列模式/主题模式,都是以配置类中的 关键字为判断对象,且两种模式不能共存
为了测试什么呢
测试队列模式,是否两个消费者都消费,且均分
测试主题模式,是否消息产生后,两个消费者都消费一次
yml 配置:
spring:
jms:
# 默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
pub-sub-domain:false
# activemq消息队列
activemq:
broker-url: tcp://172183180:61616
#是否是内存模式(内置MQ,true是 false否)
in-memory:false
# 等待消息发送响应的时间。设置为0等待永远
send-timeout: 0
user:'admin'
password:'admin'
packages:
#信任所有的包
trust-all:true
pool:
#是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
enabled:false
现在是队列模式,编写测试类测试
改变配置 将yml中的配置 pub-sub-domain:false 变为pub-sub-domain:true,开启主题模式
假如有10个监听者,那么主题模式下(topic),一个消息将被消费十次
一、消息中间件相关知识
1、概述
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
2、消息中间件的组成
21 Broker
消息服务器,作为server提供消息核心服务
22 Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
23 Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
24 Topic
25 Queue
26 Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
3 消息中间件模式分类
31 点对点
PTP点对点:使用queue作为通信载体
说明:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
说明:
queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。
4 消息中间件的优势
41 系统解耦
交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。
42 提高系统响应时间
例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。
43 为大数据处理架构提供服务
通过消息作为整合,大数据的背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。
44 Java消息服务——JMS
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
5 消息中间件应用场景
51 异步通信
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
52 解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
53 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
54 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。
55 过载保护
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
56 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
57 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
58 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
59 数据流处理
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。
6 消息中间件常用协议
61 AMQP协议
AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
优点:可靠、通用
62 MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统
63 STOMP协议
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互 *** 作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
优点:命令模式(非topic\queue模式)
64 XMPP协议
XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时 *** 作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其 *** 作系统和浏览器不同。
优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大
65 其他基于TCP/IP自定义的协议
有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
7 常见消息中间件MQ介绍
71 RocketMQ
阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,30版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。
具有以下特点:
官方提供了一些不同于kafka的对比差异:
>
以上就是关于ActiveMQ之Destination全部的内容,包括:ActiveMQ之Destination、消息中间件Producer通用化封装思路、2020-07-28 activeMq 两种模式的测试等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)