Spring Cloud Stream 分组
服务 介绍
stream-group-sender 消息发送者服务
stream-group-receiverA 消息接收者服务
stream-group-receiverB 消息接收者服务
创建stream-group-sender 服务
spring.application.name=stream-sender
//对应 MQ 是 exchange outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
1. 定义发送接口
public interface ISendeService {
String OUTPUT="outputProduct";
/**
* 指定输出的交换器名称
* @return
*/
@Output(OUTPUT)
MessageChannel send();
}
- 在启动类增加注解
// 绑定我们刚刚创建的发送消息的接口类型
@EnableBinding(value={ISendeService.class}) - 测试发送
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
@Autowired
private ISendeService sendService;
@Test
public void testStream(){
Product p = new Product(666, "stream test ...");
// 将需要发送的消息封装为Message对象
Message message = MessageBuilder
.withPayload(p)
.build();
sendService.send().send(message );
}
}
创建stream-group-receiverA服务
spring.application.name=stream-group-receiverA
// 对应 MQ 是 exchange 和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
// 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct
1. 定义接收口
public interface IReceiverService {
String INPUT = "inputProduct";
/**
* 指定接收的交换器名称
* @return
*/
@Input(INPUT)
SubscribableChannel receiver();
}
- 消息的具体处理
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
@StreamListener(IReceiverService.INPUT)
public void onReceiver(Product p){
System.out.println("消费者A:"+p);
}
}
- 在启动类添加注解
@EnableBinding(value={IReceiverService.class})
创建stream-group-receiverB服务
此服务和stream-group-receiverA一样,复制一份只需修改配制中的服务名称,端口,group设置不一样
在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下都收到了消息
改为同组的情况下只有其中一个受到消息。避免了消息重复消费