Spring Cloud Stream 分组

服务 介绍
stream-group-sender 消息发送者服务
stream-group-receiverA 消息接收者服务
stream-group-receiverB 消息接收者服务

创建stream-group-sender 服务

spring.application.name=stream-sender
//对应 MQ 是 exchange outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
1. 定义发送接口

public interface ISendeService {
    String OUTPUT="outputProduct";
    /**
     * 指定输出的交换器名称
     * @return
     */
    @Output(OUTPUT)
    MessageChannel send();
}
  1. 在启动类增加注解
    // 绑定我们刚刚创建的发送消息的接口类型
    @EnableBinding(value={ISendeService.class})
  2. 测试发送
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
    @Autowired
    private ISendeService sendService;

    @Test
    public void testStream(){
        Product p = new Product(666, "stream test ...");
        // 将需要发送的消息封装为Message对象
        Message message = MessageBuilder
                                .withPayload(p)
                                .build();
        sendService.send().send(message );
    }
}

创建stream-group-receiverA服务

spring.application.name=stream-group-receiverA
// 对应 MQ 是 exchange 和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
// 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct
1. 定义接收口

public interface IReceiverService {
    String INPUT = "inputProduct";
    /**
     * 指定接收的交换器名称
     * @return
     */
    @Input(INPUT)
    SubscribableChannel receiver();
}
  1. 消息的具体处理
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
    @StreamListener(IReceiverService.INPUT)
    public void onReceiver(Product p){
        System.out.println("消费者A:"+p);
    }
}
  1. 在启动类添加注解
    @EnableBinding(value={IReceiverService.class})

创建stream-group-receiverB服务

 此服务和stream-group-receiverA一样,复制一份只需修改配制中的服务名称,端口,group设置不一样
在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下都收到了消息
改为同组的情况下只有其中一个受到消息。避免了消息重复消费

RabbitMq使用

用于异步处理,日志处理,流量削峰,应用解耦

引用依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

参数配制

默认配制
spring:
 rabbitmq:
   host: localhost
   port: 5672
   username: guest
   password: guest

发送消息

public class MqSend {
    @Autowired
    private AmqpTemplate amqpTemplate;

     /**
     * 发送Mq测试消息
     */
    public void send(){
        amqpTemplate.convertAndSend("myQueue","now "+new Date());
    }

    /**
     * 发送数据供应商分组Mq测试消息
     */
    public void sendOrder(){
        amqpTemplate.convertAndSend("myQueue","computer","now "+new Date());
    }

}

接收消息

@Slf4j
@Component
public class MqReceiver {

    //1. 接收手动创建的消息,需先手动创建myQueue队列名
    /*@RabbitListener(queues = "myQueue")
    public void processtest(String message){
        log.info("MQReceiver: {}",message);
    }*/

    //2. 自动创建队列
    // @RabbitListener(queuesToDeclare = @Queue("myQueue"))

    //3. 自动创建, Exchange和Queue绑定
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    ))
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }
     /**
     * 数码供应商服务 接收分组消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "computer",
            value = @Queue("computerOrder")
    ))
    public void processComputer(String message) {
        log.info("computer MqReceiver: {}", message);
    }

    /**
     * 水果供应商服务 接收分组消息
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myOrder"),
            key = "fruit",
            value = @Queue("fruitOrder")
    ))
    public void processFruit(String message) {
        log.info("fruit MqReceiver: {}", message);
    }
}

Spring Cloud Stream

支持RabbitMQ,Kaflka,简化消息中间件的使用

添加依赖

支持RabbitMQ
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
支持Kaflka
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kaflka</artifactId>
</dependency>

定义接口

/**
 * 功能描述: 定义Spring Cloud Stream接口
 */
public interface StreamClient {

    String INPUT = "myMessage";

    String INPUT2 = "myMessage2";

    @Input(StreamClient.INPUT)
    SubscribableChannel input();

    @Output(StreamClient.INPUT)
    MessageChannel output();

    @Input(StreamClient.INPUT2)
    SubscribableChannel input2();

    @Output(StreamClient.INPUT2)
    MessageChannel output2();
}

发送stream消息

@Api(tags = "发送Spring Cloud Stream信息到MQ")
@RestController
public class SendMessageController {

    @Autowired
    private StreamClient streamClient;

//    @GetMapping("/sendMessage")
//    public void process() {
//        String message = "now " + new Date();
//        streamClient.output().send(MessageBuilder.withPayload(message).build());
//    }

    /**
     * 发送 orderDTO对象
     */
    @ApiOperation("发送 orderDTO对象")
    @GetMapping("/sendMessage")
    public void process() {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123456");
        streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
    }
}

接收stream消息

 * 功能描述:接收Spring Cloud Stream消息
 */
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {

//    @StreamListener(value = StreamClient.INPUT)
//    public void process(Object message) {
//        log.info("StreamReceiver: {}", message);
//    }

    /**
     * myMessage 接收orderDTO对象 消息,myMessage2发送一条返回消息
     * @param message
     */
    @StreamListener(value = StreamClient.INPUT)
    @SendTo(StreamClient.INPUT2)
    public String process(OrderDTO message) {
        log.info("StreamReceiver: {}", message);
        return "received.";
    }

    /**
     * myMessage2接收返回消息
     * @param message
     */
    @StreamListener(value = StreamClient.INPUT2)
    public void process2(String message) {
        log.info("StreamReceiver2: {}", message);
    }
}

RabbitMQ

https://www.rabbitmq.com/download.html

docker run -itd --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

sudo docker run -d --hostname rabbitmq --name rabbitmq  -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password --restart always harbor.com/public/rabbitmq:3-management

1883 是mqtt tcp协议默认端口
4369 EPMD默认端口号
5672 是服务的端口
15672 是管理界面的端口
15674 是web_stomp ws协议默认端口
15675 是web_mqtt ws协议默认端口
25672 用于节点间和CLI工具通信(Erlang分发服务器端口)

docker run -p 1883:1883 -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 8883:8883 -p 15672:15672 -p 25672:25672 -p 15675:15675  -d cyrilix/rabbitmq-mqtt

sudo docker run -d --hostname rabbitmq --name rabbitmq  -p 5672:5672 -p 15672:15672  -p 1883:1883  -p 15674:15674  -p 15675:15675 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --restart always  rabbitmq:3-management

docker exec rabbitmq rabbitmq-plugins list
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_mqtt
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_mqtt
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp 
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp_examples

启用 HTTP 后台认证需要使用 rabbitmq_auth_backend_http 插件,同时该插件还推荐配合 rabbitmq_auth_backend_cache 通过缓存减轻授权认证服务器压力。
---
docker-compose.yml
version: '3'
      services:
        rabbitmq:
       image: rabbitmq:management
       container_name: rabbitmq
       restart: always
       ports:
       - 5672:5672
       - 15672:15672
       - 1883:1883
       - 15675:15675
       environment:
       RABBITMQ_DEFAULT_USER: admin
       RABBITMQ_DEFAULT_PASS: admin
       volumes:
       - ~/rabbitmq/lib:/var/lib/rabbitmq
       - ~/rabbitmq/log:/var/log

docker exec -it rabbitmq bin/bash
      rabbitmq-plugins enable rabbitmq_mqtt
      rabbitmq-plugins enable rabbitmq_web_mqtt
 docker-compose restart
      docker-compose -f /home/zhu/rabbitmq/docker-compose.yml up -d

通过后台管理插件我们可以动态监控mq的流量,创建用户,队列等。

创建目录
mkdir /etc/rabbitmq

启用插件
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
其会在/etc/rabbitmq目录下创建一个enabled_plugins文件,这是后台管理的配置文件。

开启远程访问
rabbitmq的网页管理的端口是15672,如果你是远程操作服务器,输入http://ip:15672,发现连接不上,因为服务器防火墙不允许这个端口远程访问;
###将mq的tcp监听端口和网页管理端口都设置成允许远程访问
firewall-cmd –permanent –add-port=15672/tcp
firewall-cmd –permanent –add-port=5672/tcp
systemctl restart firewalld.service

管理界面介绍
###输入用户名密码登录后进入主界面
Overview:用来显示流量,端口,节点等信息,以及修改配置文件;
Connections:显示所有的TCP连接;
channels:显示所有的信道连接;
exchanges:显示所有的交换机以及创建删除等;
queues:显示所有的队列以及创建删除等;
admins:显示所有的用户以及用户管理;

用户设置
rabbitmq有一个默认的用户名和密码,guest和guest,但为了安全考虑,该用户名和密码只允许本地访问,如果是远程操作的话,需要创建新的用户名和密码;

###root权限
rabbitmqctl add_user username passwd //添加用户,后面两个参数分别是用户名和密码
rabbitmqctl set_permissions -p / username “.” “.” “.*” //添加权限
rabbitmqctl set_user_tags username administrator //修改用户角色,将用户设为管理员

注意:创建的新用户默认角色为空。
用户的角色说明

management:用户可以访问管理插件
policymaker:用户可以访问管理插件,并管理他们有权访问的vhost的策略和参数。
monitoring:用户可以访问管理插件,查看所有连接和通道以及与节点相关的信息。
administrator:用户可以做任何监视可以做的事情,管理用户,vhost和权限,关闭其他用户的连接,并管理所有vhost的政策和参数。

使用添加的账户远程访问后台管理站点,将原来的账号guest删除;
用户管理命令汇总

新建用户:rabbitmqctl add_user username passwd
删除用户:rabbitmqctl delete_user username
改密码: rabbimqctl change_password {username} {newpassword}
设置用户角色:rabbitmqctl set_user_tags {username} {tag …}

rabbitmqctl set_permissions -p / username “.” “.” “.*” //添加权限

权限说明:

rabbitmqctl set_permissions [-pvhostpath] {user} {conf} {write} {read}
Vhostpath:虚拟主机,表示该用户可以访问那台虚拟主机;
user:用户名。
Conf:一个正则表达式match哪些配置资源能够被该用户访问。
Write:一个正则表达式match哪些配置资源能够被该用户设置。
Read:一个正则表达式match哪些配置资源能够被该用户访问。

虚拟主机
默认的用户和队列都是在/虚拟机下。
创建一个虚拟主机
rabbitmqctl add_vhost vhost_name
删除一个虚拟主机
rabbitmqctl delete_vhost vhost_name

常用文件路径

/usr/local/rabbitmq_server/var/log/rabbitmq/rabbit@tms.log:记录rabbitmq运行日常的日志
/usr/local/rabbitmq_server/var/log/rabbitmq/rabbit@tms-sasl.log:rabbitmq的崩溃报告
/usr/local/rabbitmq_server/etc/rabbitmq/rabbitmq.config:rabbitmq的配置文件
/usr/local/rabbitmq_server/var/lib/rabbitmq/mnesia/rabbit@tms:rabbit消息持久化文件

注意:
如果相应路径的文件不存在就自己创建,如果是使用apt安装或yum安装那么这些文件夹都被自动创建好了。


RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。
1. Direct Exchange
该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。
1. Topic Exchange
该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。
Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。
1. Fanout Exchange
该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。
1. Headers Exchange
该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。