Spring Cloud Stream
支持RabbitMQ,Kaflka,简化消息中间件的使用
添加依赖
支持RabbitMQ
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
支持Kaflka
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kaflka</artifactId>
</dependency>
定义接口
/**
* 功能描述: 定义Spring Cloud Stream接口
*/
public interface StreamClient {
String INPUT = "myMessage";
String INPUT2 = "myMessage2";
@Input(StreamClient.INPUT)
SubscribableChannel input();
@Output(StreamClient.INPUT)
MessageChannel output();
@Input(StreamClient.INPUT2)
SubscribableChannel input2();
@Output(StreamClient.INPUT2)
MessageChannel output2();
}
发送stream消息
@Api(tags = "发送Spring Cloud Stream信息到MQ")
@RestController
public class SendMessageController {
@Autowired
private StreamClient streamClient;
// @GetMapping("/sendMessage")
// public void process() {
// String message = "now " + new Date();
// streamClient.output().send(MessageBuilder.withPayload(message).build());
// }
/**
* 发送 orderDTO对象
*/
@ApiOperation("发送 orderDTO对象")
@GetMapping("/sendMessage")
public void process() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123456");
streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
}
}
接收stream消息
* 功能描述:接收Spring Cloud Stream消息
*/
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
// @StreamListener(value = StreamClient.INPUT)
// public void process(Object message) {
// log.info("StreamReceiver: {}", message);
// }
/**
* myMessage 接收orderDTO对象 消息,myMessage2发送一条返回消息
* @param message
*/
@StreamListener(value = StreamClient.INPUT)
@SendTo(StreamClient.INPUT2)
public String process(OrderDTO message) {
log.info("StreamReceiver: {}", message);
return "received.";
}
/**
* myMessage2接收返回消息
* @param message
*/
@StreamListener(value = StreamClient.INPUT2)
public void process2(String message) {
log.info("StreamReceiver2: {}", message);
}
}