Spring Boot 中使用Spring Cloud Stream
Spring Cloud Stream 用于构建高可扩展的使用消息系统连接的事件驱动服务的框架
- Binder: 负责提供与外部消息系统集成的组件
- Binding: 提供外部消息系统与应用之间桥接的生产者和消费者
- Message: 生产者和消费者与外部消息系统之间通信的数据结构
构建应用
假设一个点咖啡场景:调用下单接口后,需要通知店员制作咖啡,待制作完成后,需要更新订单状态为完成,并通知客户;假设有两个服务,一个是订单服务(Order Service),一个是店员服务(Waiter Service)
使用 RabbitMQ 作为MQ 服务
Order Service
添加依赖
1 2 3 4 5 6 7 8 9
| dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.cloud:spring-cloud-stream-test-support' }
|
添加配置
1 2 3 4 5 6
| spring.application.name=producer server.port=8082 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
|
Binding
1 2 3 4 5 6 7 8 9 10 11
| public interface OrderBinding {
String NEW_ORDERS = "newOrders"; String FINISHED_ORDERS = "finishedOrders";
@Input SubscribableChannel finishedOrders();
@Output MessageChannel newOrders(); }
|
@Input用于指定订阅消息Topic,当有该 Topic 的消息时会调用监听该消息的处理器
@Output用于指定发送消息Topic,可以通过该MessageChannel发送消息
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RestController public class OrderController {
@Autowired private OrderBinding orderBinding;
@GetMapping("/newOrder") public Object sendMessage(Long orderId, Long userId) { OrderVO orderVO = OrderVO.builder() .orderId(orderId) .userId(userId) .build();
boolean result = orderBinding.newOrders().send(MessageBuilder.withPayload(orderVO).build());
return new HashMap<String, String>() {{ put("message", result ? "下单成功" : "下单失败"); }}; } }
|
发送消息可以通过注入 Binding 对象,或者注入 MessageChannel对象时指定 Bean 来实现
1 2 3 4 5 6 7 8
| @Autowired private OrderBinding orderBinding;
@Autowired @Qualifier(OrderBinding.FINISHED_ORDERS) private MessageChannel messageChannel;
|
- OrderServiceApplication.java
1 2 3 4 5 6 7 8
| @SpringBootApplication @EnableBinding({OrderBinding.class}) public class OrderServiceApplication {
public static void main(String[] args) { SpringApplication.run(OrderServiceApplication.class, args); } }
|
Waiter Service
添加依赖
1 2 3 4 5 6 7 8
| dependencies { implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.cloud:spring-cloud-stream-test-support' }
|
添加配置
1 2 3 4
| spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
|
监听消息
1 2 3 4 5 6 7 8 9 10 11
| public interface OrderBinding {
String NEW_ORDERS = "newOrders"; String FINISHED_ORDERS = "finishedOrders";
@Input SubscribableChannel newOrders();
@Output MessageChannel finishedOrders(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @Slf4j public class WaiterListener {
@StreamListener(OrderBinding.NEW_ORDERS) @SendTo(OrderBinding.FINISHED_ORDERS) public OrderVO newOrders(OrderVO orderVO) { log.info("收到用户:{} 的新订单:{}", orderVO.getUserId(), orderVO.getOrderId()); try { Thread.sleep(10000); } catch (InterruptedException e) { log.error("制作失败", e); }
return orderVO; } }
|
这里在收到消息后等待制作完成,发送订单完成的消息给 Order Service,执行后续动作
@StreamListener用于监听指定 Topic 并处理
@SendTo用于指定处理的结果要发送的下一个 Topic
- WaiterServiceApplication.java
1 2 3 4 5 6 7 8
| @SpringBootApplication @EnableBinding({OrderBinding.class}) public class WaiterServiceApplication {
public static void main(String[] args) { SpringApplication.run(WaiterServiceApplication.class, args); } }
|
Order Service 处理后续消息
订单完成消息
收到订单完成的消息后发送通知消息
1 2 3 4 5 6 7
| public interface NotifyBinding {
String SEND_NOTIFY = "sendNotify";
@Output MessageChannel sendNotify(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component @Slf4j public class OrderListener {
@StreamListener(OrderBinding.FINISHED_ORDERS) @SendTo(NotifyBinding.SEND_NOTIFY) public NotifyMessage finishedOrdersHandler(OrderVO orderVO) { log.info("用户:{} 的订单:{} 已完成", orderVO.getUserId(), orderVO.getOrderId()); return NotifyMessage.builder() .title("订单已完成") .userId(orderVO.getUserId()) .sendTime(LocalDateTime.now()) .build(); } }
|
处理通知消息
1 2 3 4 5 6 7 8 9
| @Component @Slf4j public class NotifyListener {
@StreamListener(NotifyBinding.SEND_NOTIFY) public void sendNotifyHandler(NotifyMessage notifyMessage) { log.info("发送标题为:{} 的消息给用户:{}", notifyMessage.getTitle(), notifyMessage.getUserId()); } }
|
测试
1
| docker run -d --name rabbit -p 4369:4369 -p 5672:5672 -p 15671:15671 -p 15672:15672 -p 25672:25672 --hostname localhost -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3-management
|
- 启动 WaiterService 和 OrderService
- 发送下单请求
1 2
| curl 'localhost:8082/newOrder?orderId=1&userId=2' {"message":"下单成功"}%
|
- 然后会看到 WaiterService 和 OrderService 的相应的监听器都接收到了消息并打印日志
其他类型消息
如果一个消息需要处理加工后继续发给下一个,可以直接使用@Transformer注解来代替@StreamListener和@SendTo
1 2 3 4 5 6 7 8
| @Component public class TransformerListener {
@Transformer(inputChannel = TransoformBinding.INPUT, outputChannel = TransoformBinding.OUTPUT) public Object transform(String message) { return message.toUpperCase(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class TimerAdapter {
@InboundChannelAdapter(value = OrderBinding.NEW_ORDERS, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) @Bean public MessageSource<OrderVO> timerMessageSource() {
OrderVO orderVO = OrderVO.builder() .userId(1L) .orderId(1L) .build();
return () -> new GenericMessage<>(orderVO); } }
|
项目地址
https://github.com/helloworlde/SpringBootCollection/tree/master/SpringBoot-Stream