RocketMQ 相关知识
@(消息队列)[RocketMQ, 消息]
Rocket MQ消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。
Rocket MQ相关名词
- Producer 消息生产者,负责生产消息
- Consumer 消息消费者,负责消费消息
- NameServer 无状态节点,用来保存活跃的broker列表和topic列表
- Broker 消息中转角色,负责存储消息,转发消息
- Topic 消息的逻辑管理单位
- Message 消息
- body 消息体,用于携带消息具体内容
- key 消息的key,用于区别不同的消息
- tags 消息的Tag,用于不同的订阅者过滤消息
消息发送方式
- 同步方式
发送消息,接收到结果之后再发送下一条消息,速度最慢,耗时最长
- 异步方式
发送消息,不论是否收到结果,直接发送下一条消息,发送速度介于同步和单向方式之间
- 单向方式
发送消息,直接发送消息,不返回发送结果,发送速度最快
消息类型
- 定时消息
在指定的发送时间发送消息
- 延时消息
从当前时间开始,经过延时时间后再发送消息
- 顺序消息
立即发送消息
- 事务消息
MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致
实例代码
Producer
1 | public class ProducerDelayTest { |
消息类型代码
定时消息
1
2
3
4
5
6
7
8Message msg = new Message();
msg.setTag("TAG");
msg.setKey("KEY");
msg.setTopic("TOPIC");
msg.setBody("BODY".getBytes());
long timeStamp =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2017-09-03 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
延时消息
1
2
3
4
5
6
7Message msg = new Message();
msg.setTag("TAG");
msg.setKey("KEY");
msg.setTopic("TOPIC");
msg.setBody("BODY".getBytes());
long delayTime = 3000;//30秒后再发送
msg.setStartDeliverTime(System.currentTimes() + delayTime);顺序消息
1
2
3
4
5Message msg = new Message();
msg.setTag("TAG");
msg.setKey("KEY");
msg.setTopic("TOPIC".getBytes());
msg.setBody("BODY");
消息发送方式代码
同步方式发送
1
SendResult sendResult = producer.send(msg);
异步方式发送
1
2
3
4
5
6
7
8
9
10
11producer.sendAsync(message, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
logger.info("MQ send ASYNCHRONOUS message successed,response is " + JSON.toJSONString(sendResult));
}
@Override
public void onException(OnExceptionContext onExceptionContext) {
logger.info("MQ send ASYNCHRONOUS message failed, error is " + onExceptionContext.getException().getMessage());
}
});单向方式发送
1
producer.sendOneway(message);
Consumer
1 | public class ConsumerTest { |