存档2020年1月13日

Spring Cloud Stream 分组

服务 介绍
stream-group-sender 消息发送者服务
stream-group-receiverA 消息接收者服务
stream-group-receiverB 消息接收者服务

创建stream-group-sender 服务

spring.application.name=stream-sender
//对应 MQ 是 exchange outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
1. 定义发送接口

public interface ISendeService {
    String OUTPUT="outputProduct";
    /**
     * 指定输出的交换器名称
     * @return
     */
    @Output(OUTPUT)
    MessageChannel send();
}
  1. 在启动类增加注解
    // 绑定我们刚刚创建的发送消息的接口类型
    @EnableBinding(value={ISendeService.class})
  2. 测试发送
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        Product p = new Product(666, "stream test ...");
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                                .withPayload(p)
                                .build();
        sendService.send().send(message );
    }
}

创建stream-group-receiverA服务

spring.application.name=stream-group-receiverA
// 对应 MQ 是 exchange 和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
// 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct
1. 定义接收口

public interface IReceiverService {
    String INPUT = "inputProduct";
    /**
     * 指定接收的交换器名称
     * @return
     */
    @Input(INPUT)
    SubscribableChannel receiver();
}
  1. 消息的具体处理
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
    @StreamListener(IReceiverService.INPUT)
    public void onReceiver(Product p){
        System.out.println("消费者A:"+p);
    }
}
  1. 在启动类添加注解
    @EnableBinding(value={IReceiverService.class})

创建stream-group-receiverB服务

 此服务和stream-group-receiverA一样,复制一份只需修改配制中的服务名称,端口,group设置不一样
在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下都收到了消息
改为同组的情况下只有其中一个受到消息。避免了消息重复消费

RabbitMq使用

用于异步处理,日志处理,流量削峰,应用解耦

引用依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

参数配制

默认配制
spring:
 rabbitmq:
   host: localhost
   port: 5672
   username: guest
   password: guest

发送消息

public class MqSend {
    @Autowired
    private AmqpTemplate amqpTemplate;

     /**
     * 发送Mq测试消息
     */
    public void send(){
        amqpTemplate.convertAndSend("myQueue","now "+new Date());
    }

    /**
     * 发送数据供应商分组Mq测试消息
     */
    public void sendOrder(){
        amqpTemplate.convertAndSend("myQueue","computer","now "+new Date());
    }

}

接收消息

@Slf4j
@Component
public class MqReceiver {

    //1. 接收手动创建的消息,需先手动创建myQueue队列名
    /*@RabbitListener(queues = "myQueue")
    public void processtest(String message){
        log.info("MQReceiver: {}",message);
    }*/

    //2. 自动创建队列
    // @RabbitListener(queuesToDeclare = @Queue("myQueue"))

    //3. 自动创建, Exchange和Queue绑定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    ))
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }
     /**
     * 数码供应商服务 接收分组消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "computer",
            value = @Queue("computerOrder")
    ))
    public void processComputer(String message) {
        log.info("computer MqReceiver: {}", message);
    }

    /**
     * 水果供应商服务 接收分组消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "fruit",
            value = @Queue("fruitOrder")
    ))
    public void processFruit(String message) {
        log.info("fruit MqReceiver: {}", message);
    }
}