RocketMQ与SpringBoot整合、核心使用、多租户自动隔离、Java8时间支持

一、文章核心内容

  • RocketMQ与SpringBoot的整合以及基础的应用配置

  • RocketMQTemplate模板类的各种使用,如顺序消息、异步消息、响应消息、单向消息、指定消息key等等

  • RocketMQ多租户/环境自动隔离topic/group/tag,如只需要配置基础的topic、group、tag,部署到dev、test、prod环境自动隔离,只需要写一次配置;多租户同样适用

  • Java时间模块支持,默认如果RocketMQMessageListener用实体类接收消息时,字段不支持LocalDate/LocalDateTime类型,发送会报错,需要增加单独处理

  • 启动日志报RocketMQLog:WARN Please initialize the logger system properly错误解决

    文章预告

  • 下一篇文章分享在企业级项目中RocketMQ的二次封装,达到核心代码抽离,实现类只需要关注业务模块,日志记录,自动重试啥啥的全都扔给抽象层

  • 抽象层是解耦、复用的一大有效手段,需要结合业务、设计模式、实战经验,抽象出一个适合自身项目的抽象层

    特殊说明

  • 这篇文章分享的是高级部分,并非基础RocketMQ,所以RocketMQ基础使用、RocketMQ控制台等基础部分不会过多说明

二、核心使用

2.1 与SpringBoot整合

整体项目结构如下,文章未列出所有代码,未提及的请参考下面源代码

image.png

  1. 添加maven依赖

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0</modelVersion>
    5. <parent>
    6. <groupId>org.springframework.boot</groupId>
    7. <artifactId>spring-boot-starter-parent</artifactId>
    8. <version>2.2.0.RELEASE</version>
    9. <relativePath/> <!-- lookup parent from repository -->
    10. </parent>
    11. <groupId>com.codecoord</groupId>
    12. <artifactId>springboot-rocketmq</artifactId>
    13. <version>1.0</version>
    14. <name>springboot-rocketmq</name>
    15. <properties>
    16. <java.version>1.8</java.version>
    17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    18. <maven.compiler.source>1.8</maven.compiler.source>
    19. <maven.compiler.target>1.8</maven.compiler.target>
    20. </properties>
    21. <dependencies>
    22. <dependency>
    23. <groupId>org.springframework.boot</groupId>
    24. <artifactId>spring-boot-starter-web</artifactId>
    25. </dependency>
    26. <dependency>
    27. <groupId>org.projectlombok</groupId>
    28. <artifactId>lombok</artifactId>
    29. </dependency>
    30. <dependency>
    31. <groupId>com.alibaba</groupId>
    32. <artifactId>fastjson</artifactId>
    33. <version>1.2.76</version>
    34. </dependency>
    35. <!-- RocketMQ核心依赖 -->
    36. <dependency>
    37. <groupId>org.apache.rocketmq</groupId>
    38. <artifactId>rocketmq-spring-boot-starter</artifactId>
    39. <version>2.2.1</version>
    40. </dependency>
    41. </dependencies>
    42. <build>
    43. <plugins>
    44. <plugin>
    45. <groupId>org.springframework.boot</groupId>
    46. <artifactId>spring-boot-maven-plugin</artifactId>
    47. </plugin>
    48. </plugins>
    49. </build>
    50. </project>
  2. 配置文件增加服务器配置

    • 核心的配置在application.yml配置注释中有说明
      1. server:
      2. port: 8888
      3. spring:
      4. application:
      5. name: springboot-rocketmq
      6. rocketmq:
      7. # 多个NameServer,host:port;host:port,RocketMQProperties
      8. name-server: 127.0.0.1:9876
      9. producer:
      10. # 发送同一类消息的设置为同一个group,保证唯一
      11. group: springboot_producer_group
      12. # 发送消息失败重试次数,默认2
      13. retryTimesWhenSendFailed: 2
      14. # 异步消息重试此处,默认2
      15. retryTimesWhenSendAsyncFailed: 2
      16. # 发送消息超时时间,默认3000
      17. sendMessageTimeout: 3000
      18. # 消息最大长度,默认1024 * 1024 * 4(默认4M)
      19. maxMessageSize: 4096
      20. # 压缩消息阈值,默认4k(1024 * 4)
      21. compressMessageBodyThreshold: 4096
      22. # 是否在内部发送失败时重试另一个broker,默认false
      23. retryNextServer: false
      24. # access-key
      25. #accessKey: xxx
      26. # secret-key
      27. #secretKey: xxx
      28. # 是否启用消息跟踪,默认false
      29. enableMsgTrace: false
      30. # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
      31. customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
      32. consumer:
      33. # 配置指定group是否启动监听器 group1.topic1 = false
      34. listeners:
      35. # key:group名称
      36. rocketmq_source_code_group:
      37. # value:{key: topic名称: value: true/false}
      38. rocketmq_source_code: true
      39. # 指定消费组
      40. group: springboot_consumer_group
      41. # 指定topic,启动时就会注册
      42. #topic: springboot_consumer_xxx
      43. # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
      44. pull-batch-size: 10
      45. # 其他配置参考属性类
      46. logging:
      47. level:
      48. io.netty: ERROR
      49. # 关闭RocketmqClient info日志,不然每隔几秒就会打印broker信息
      50. RocketmqClient: error
  3. 新建启动类

    • 解决启动RocketMQLog:WARN Please initialize the logger system properly.报错
      ```java
      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;

/**

  • RocketMQ启动类
    */
    @SpringBootApplication
    public class RocketMqApplication {
    public static void main(String[] args) {
    1. /*
    2. * 指定使用的日志框架,否则将会告警
    3. * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
    4. * RocketMQLog:WARN Please initialize the logger system properly.
    5. */
    6. System.setProperty("rocketmq.client.logUseSlf4j", "true");
    7. SpringApplication.run(RocketMqApplication.class, args);
    }
    }
    ```
    1. 以上完成,和SpringBoot整合已经完成,基本上大部分框架与SpringBoot整合都只需要以下三步
    • 添加pom
    • application.yml配置
    • 启动类(如果需要开启注解)

2.2 项目基础数据类

  1. 新建消息实体类,LocalDate和LocalDateTime类型默认不支持,需要单独处理,参考后面配置,因为LocalDate和LocalDateTime日常使用中用的较多
    ```java
    import com.codecoord.rocketmq.config.RocketMqConfig;
    import lombok.Data;

import java.time.LocalDate;
import java.time.LocalDateTime;

@Data
public class RocketMqMessage {
private Long id;
private String message;
private String version;
/**

  1. * LocalDateLocalDateTime默认不支持,需要单独处理
  2. * {@link RocketMqConfig}
  3. */
  4. private LocalDate currentDate;
  5. private LocalDateTime currentDateTime;

}

  1. 2. 新增常量类,包含主要topic等信息
  2. - 生产环境中一但上线90%情况下不会去更改原有的tag或者topic消息信息,所以可以将其定义为常量类,方便全局引用,避免出错
  3. ```java
  4. public interface RocketMqBizConstant {
  5. String SOURCE_TOPIC = "rocketmq_source_code_topic";
  6. String SOURCE_GROUP = "rocketmq_source_code_group";
  7. String SOURCE_TAG = "rocketmq_source_code_tag";
  8. }
  • 也可以在配置文件中定义,然后通过 ${} 表达式注入,这种方式适用于需要更改不同配置信息让队列生效情况
    1. # 配置文件中配置组信息
    2. service:
    3. topic: rocketmq_source_code_topic
    4. group: rocketmq_source_code_group
    5. tag: rocketmq_source_code_tag
  1. 新增延迟等级常量类,延迟等级和基础RocketMQ中的延迟等级一致

    1. public interface RocketMqDelayLevel {
    2. int ONE_SECOND = 1;
    3. int FIVE_SECOND = 2;
    4. int TEN_SECOND = 3;
    5. int THIRTY_SECOND = 4;
    6. int ONE_MINUTE = 5;
    7. int TWO_MINUTE = 6;
    8. int THREE_MINUTE = 7;
    9. int FOUR_MINUTE = 8;
    10. int FIVE_MINUTE = 9;
    11. int SIX_MINUTE = 10;
    12. int SEVEN_MINUTE = 11;
    13. int EIGHT_MINUTE = 12;
    14. int NINE_MINUTE = 13;
    15. int TEN_MINUTE = 14;
    16. int TWENTY_MINUTE = 15;
    17. int THIRTY_MINUTE = 16;
    18. int ONE_HOUR = 17;
    19. int TWO_HOUR = 18;
    20. }

    2.3 RocketMQ配置

  2. 配置跨域,此配置可选,是为了方便网页版apipost请求的时候处理浏览器跨域问题,如果不需要测试可以不配置
    ```java
    import org.springframework.stereotype.Component;
    import org.springframework.web.servlet.config.annotation.CorsRegistry;
    import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**

  • web拦截器配置,可以使用网页版本的apipost
    */
    @Component
    public class WebConfig implements WebMvcConfigurer {

    /**

    • 跨域配置方式一
      /
      @Override
      public void addCorsMappings(CorsRegistry registry) {
      registry.addMapping(“/*
      “)

      1. .allowedOrigins("*")
      2. .allowedHeaders("*")
      3. .allowedMethods("*")
      4. // .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
      5. .maxAge(3600L);

      }

      /**

    • 跨域配置方式二
      /
      /
      @Bean
      public CorsFilter corsFilter() {
      UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
      CorsConfiguration configuration = new CorsConfiguration();
      configuration.setAllowCredentials(true);
      configuration.setMaxAge(3600L);
      // 配置允许的原始ip、头、方法
      configuration.addAllowedOrigin(““);
      configuration.addAllowedHeader(“
      “);
      configuration.addAllowedMethod(“*”);

      source.registerCorsConfiguration(“/*”, configuration);
      return new CorsFilter(source);
      }
      /
      }
      ```

  1. 重要:RocketMQ序列化器处理,增加Java8时间类型处理,配置完成之后可以在实体类中使用Java8时间类型作为属性
    • 未配置出现错误参考第四节
      ```java
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
      import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.context.annotation.Primary;
      import org.springframework.messaging.converter.CompositeMessageConverter;
      import org.springframework.messaging.converter.MappingJackson2MessageConverter;
      import org.springframework.messaging.converter.MessageConverter;

import java.util.List;

/**

  • RocketMQ序列化器处理
    */
    @Configuration
    public class RocketMqConfig {

    /**

    • 解决RocketMQ Jackson不支持Java时间类型配置
    • 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
      */
      @Bean
      @Primary
      public RocketMQMessageConverter createRocketMQMessageConverter() {
      RocketMQMessageConverter converter = new RocketMQMessageConverter();
      CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
      List messageConverterList = compositeMessageConverter.getConverters();
      for (MessageConverter messageConverter : messageConverterList) {
      1. if (messageConverter instanceof MappingJackson2MessageConverter) {
      2. MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
      3. ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
      4. // 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTime
      5. objectMapper.registerModules(new JavaTimeModule());
      6. }
      }
      return converter;
      }
      }
      ```

      2.4 RocketMQTemplate发送消息

  1. SpringBoot中使用RocketMQTemplate模板类进行消息发送
    • org.apache.rocketmq.spring.core.RocketMQTemplate
  2. 多种发送消息方式示例
    • 注意:发送带key消息是通过设置请求头完成
    • org.apache.rocketmq.spring.support.RocketMQHeaders
    • org.apache.rocketmq.spring.support.RocketMQUtil#convertToSpringMessage(MessageExt)
      • 需要注意目的地发送格式为 topic:tag
    • 下面不同消息间用了注释,需要测试哪种发送消息方式打开注释即可
      ```java
      import com.alibaba.fastjson.JSONObject;
      import com.codecoord.rocketmq.constant.RocketMqBizConstant;
      import com.codecoord.rocketmq.constant.RocketMqDelayLevel;
      import com.codecoord.rocketmq.domain.RocketMqMessage;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.spring.core.RocketMQTemplate;
      import org.springframework.messaging.Message;
      import org.springframework.messaging.support.MessageBuilder;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;

@RestController
@RequestMapping(“/rocketmq”)
@Slf4j
public class RocketMqController {
@Resource(name = “rocketMQTemplate”)
private RocketMQTemplate rocketMqTemplate;

  1. /**
  2. * 通过实体类发送消息,发送注意事项请参考实体类
  3. * http://localhost:8888/rocketmq/entity/message
  4. */
  5. @RequestMapping("/entity/message")
  6. public Object sendMessage() {
  7. // 目的:topic:tag,如果不指定则发往配置的默认地址
  8. String destination = RocketMqBizConstant.SOURCE_TOPIC + ":" + RocketMqBizConstant.SOURCE_TAG;
  9. RocketMqMessage message = new RocketMqMessage();
  10. message.setId(System.currentTimeMillis());
  11. message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
  12. // Java时间字段需要单独处理,否则会序列化失败
  13. message.setCurrentDate(LocalDate.now());
  14. message.setCurrentDateTime(LocalDateTime.now());
  15. message.setVersion("1.0");
  16. /// 发送同步消息,消息成功发送到Broker时才返回,message可以入参批量消息
  17. // 通过SendResult来处理发送结果
  18. // SendResult sendResult = rocketMqTemplate.syncSend(destination, message);
  19. /// 发送时指定业务key
  20. /*Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
  21. // 设置keys
  22. .setHeader(RocketMQHeaders.KEYS, message.getId())
  23. .build();
  24. SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage);*/
  25. /// 发送延迟消息
  26. Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
  27. SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage, 3000, RocketMqDelayLevel.FIVE_SECOND);
  28. /// 发送同步有序消息,需要指定hashKey,可以用业务唯一键
  29. // rocketMqTemplate.syncSendOrderly(destination, message, message.getId().toString());
  30. /// 发送异步消息,消息发送后及时返回,然后通过回调方法通知
  31. // rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
  32. // @Override
  33. // public void onSuccess(SendResult sendResult) {
  34. // log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
  35. // }
  36. //
  37. // @Override
  38. // public void onException(Throwable e) {
  39. // log.error("消息发送失败【{}】", e.getMessage());
  40. // }
  41. // });
  42. /// 发送异步有序消息,需要指定hashKey,可以用业务唯一键
  43. // rocketMqTemplate.asyncSendOrderly(destination, message, message.getId().toString(), new SendCallback() {
  44. // @Override
  45. // public void onSuccess(SendResult sendResult) {
  46. // log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));
  47. // }
  48. //
  49. // @Override
  50. // public void onException(Throwable e) {
  51. // log.error("消息发送失败【{}】", e.getMessage());
  52. // }
  53. // });
  54. /// 发送单向消息
  55. // rocketMqTemplate.sendOneWay(destination, message);
  56. /// 发送单向有序消息,通过MessageBuilder构建
  57. // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();
  58. // rocketMqTemplate.sendOneWayOrderly(destination, buildMessage, message.getId().toString());
  59. /// 发送和接收回调消息,需要实现 RocketMQReplyListener 监听器类才可以,否则将会超时错误
  60. // rocketMqTemplate.sendAndReceive(destination, message, new RocketMQLocalRequestCallback<String>() {
  61. // @Override
  62. // public void onSuccess(String message) {
  63. // log.info("消息发送成功,消息类型【{}】", message);
  64. // }
  65. //
  66. // @Override
  67. // public void onException(Throwable e) {
  68. // log.error("消息发送失败", e);
  69. // }
  70. // });
  71. /// 调用抽象类方法发送,最终也是syncSend
  72. // rocketMqTemplate.convertAndSend(destination, "convertAndSend");
  73. // 转换消息和发送,底层使用的是syncSend(destination, message),将会被RocketEntityMessageListener消费
  74. // Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)
  75. // // 设置请求头
  76. // .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
  77. // .build();
  78. // 将会被RocketEntityMessageListener03消费
  79. // Message<Object> buildMessage = MessageBuilder.withPayload(new Object()).build();
  80. // rocketMqTemplate.send(destination, buildMessage);
  81. /// 发送批量消息,批量消息最终转为单挑进行发送
  82. // List<Message<String>> msgList = new ArrayList<>();
  83. // for (int i = 0; i < 10; i++) {
  84. // msgList.add(MessageBuilder.withPayload("消息:" + i).build());
  85. // }
  86. // rocketMqTemplate.syncSend(destination, msgList);
  87. return message;
  88. }
  89. /**
  90. * 直接将对象进行传输,也可以自己进行json转化后传输
  91. */
  92. @RequestMapping("/messageExt/message")
  93. public SendResult convertAndSend() {
  94. String destination = "rocketmq_source_code_topic:rocketmq_source_code_ext_tag";
  95. JSONObject jsonObject = new JSONObject();
  96. jsonObject.put("type", "messageExt");
  97. return rocketMqTemplate.syncSend(destination, jsonObject);
  98. }

}

  1. 2. 需要测试哪种发送方式,把其他方式注释打开需要的发送方式注释即可,上面发送延迟消息
  2. ## 2.5 RocketMQListener消费
  3. 1. 消息消费需要使用注解**RocketMQMessageListener**进行监听
  4. - electorExpression = "\*":指定tag,*表示监听所有tag
  5. - consumerGroup:指定消费组
  6. - 一般可以将consumerGroupselectorExpression设置为一样
  7. - topic:指定topictopic至关重要,通常表示某一类业务或者平台,例如订单topic、仓储topic
  8. 2. 监听器注解类需要实现RocketMQListener接口,**泛型为发送消息时的类型**
  9. - org.apache.rocketmq.spring.core.RocketMQListener
  10. 3. 监听器实例
  11. ```java
  12. import com.alibaba.fastjson.JSONObject;
  13. import com.codecoord.rocketmq.constant.RocketMqBizConstant;
  14. import com.codecoord.rocketmq.domain.RocketMqMessage;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  17. import org.apache.rocketmq.spring.core.RocketMQListener;
  18. import org.springframework.stereotype.Component;
  19. import java.util.concurrent.TimeUnit;
  20. /**
  21. * 实体类消费监听器
  22. */
  23. @Slf4j
  24. @Component
  25. @RocketMQMessageListener(
  26. topic = RocketMqBizConstant.SOURCE_TOPIC,
  27. consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
  28. selectorExpression = RocketMqBizConstant.SOURCE_TAG
  29. )
  30. public class RocketEntityMessageListener implements RocketMQListener<RocketMqMessage> {
  31. /**
  32. * 普通消息
  33. */
  34. @Override
  35. public void onMessage(RocketMqMessage message) {
  36. log.info("收到消息【{}】", JSONObject.toJSON(message));
  37. try {
  38. // 方法执行完成之后会自动进行进行ack,后续会分享源码解读
  39. TimeUnit.SECONDS.sleep(3);
  40. // 出现异常,将会自动进入重试队列
  41. // int ex = 10 / 0;
  42. } catch (InterruptedException e) {
  43. log.error(e.getMessage());
  44. }
  45. log.info("休眠了3s后消费完成");
  46. }
  47. }
  1. 通过表达式注入

    1. @RocketMQMessageListener(
    2. topic = "${service.topic}",
    3. consumerGroup = "${service.group}",
    4. selectorExpression = "${service.tag}"
    5. )

    2.6 发送消息测试

  2. RocketMqController已经提供了消息发送测试接口,通过接口发送后,即可自动发送到指定的topic:tag,特别注意是发送到 topic:tag,而不是 group:tag

  3. 发送之后通过RocketMQ控制台查看

三、消费配置

3.1 配置线程数

  1. RocketMQMessageListener默认是64个线程并发消息,一些时候需要单线程消费,所以可以配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够

    1. @RocketMQMessageListener(
    2. topic = RocketMqBizConstant.SOURCE_TOPIC,
    3. consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
    4. selectorExpression = RocketMqBizConstant.SOURCE_TAG,
    5. // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
    6. consumeThreadMax = 5
    7. )

    3.2 消息确认

  2. RocketMQ在SpringBoot中,当方法执行成功且未抛出异常时会自动进行确认,如果出现异常将会进行重试

    • 源码类:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
      1. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      2. for (MessageExt messageExt : msgs) {
      3. log.debug("received msg: {}", messageExt);
      4. try {
      5. long now = System.currentTimeMillis();
      6. handleMessage(messageExt);
      7. long costTime = System.currentTimeMillis() - now;
      8. log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
      9. } catch (Exception e) {
      10. // 只要出现Exception异常,将会进行重试
      11. log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
      12. context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
      13. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
      14. }
      15. }
      16. // 如果没有出现异常则消费成功
      17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      18. }
  3. 其中delayLevelWhenNextConsume指定重试频率

    • -1:不重试,直接放入死信队列
    • 0:broker控制重试频率
    • >0:客户端控制重试频率

3.3 WARN No appenders could be found for logger

  1. 引入SpringBoot之后不在启动类中做日志配置情况下将会有以下警告

    1. RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
    2. RocketMQLog:WARN Please initialize the logger system properly.
  2. 需要在启动类中启动前设置环境变量 rocketmq.client.logUseSlf4j 为 true 明确指定RocketMQ的日志框架
    ```java
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketMqApplication {
public static void main(String[] args) {
/*

  1. * 指定使用的日志框架,否则将会报错
  2. * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
  3. * RocketMQLog:WARN Please initialize the logger system properly.
  4. */
  5. System.setProperty("rocketmq.client.logUseSlf4j", "true");
  6. SpringApplication.run(RocketMqApplication.class, args);
  7. }

}

  1. # 四、Java8时间类型支持
  2. > 2.3节有提到,在此处做详细说明
  3. 1. RocketMQ内置使用的转换器是**RocketMQMessageConverter**
  4. - org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration
  5. 2. 转换JSON时使用的是**MappingJackson2MessageConverter**,但是其不支持Java的时间类型,比如LocalDate
  6. 2. 当消息实体中存在上面的时间类型字段时将会报以下错误
  7. ```java
  8. java.lang.RuntimeException: cannot convert message to class com.codecoord.rocketmq.domain.RocketMqMessage
  9. at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:486) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  10. at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  11. at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  12. at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
  13. at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.1.jar:4.9.1]
  14. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_231]
  15. at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_231]
  16. at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_231]
  17. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
  18. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
  19. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
  20. Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Expected array or string.
  1. 所以需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,然后添加支持时间模块
    ```java
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.messaging.converter.CompositeMessageConverter;
    import org.springframework.messaging.converter.MappingJackson2MessageConverter;
    import org.springframework.messaging.converter.MessageConverter;

import java.util.List;

/**

  • 序列化器处理
    */
    @Configuration
    public class RocketMqConfig {

    /**

    • 解决RocketMQ Jackson不支持Java时间类型配置
    • 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
      */
      @Bean
      @Primary
      public RocketMQMessageConverter createRocketMQMessageConverter() {
      RocketMQMessageConverter converter = new RocketMQMessageConverter();
      CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
      List messageConverterList = compositeMessageConverter.getConverters();
      for (MessageConverter messageConverter : messageConverterList) {
      1. if (messageConverter instanceof MappingJackson2MessageConverter) {
      2. MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
      3. ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
      4. objectMapper.registerModules(new JavaTimeModule());
      5. }
      }
      return converter;
      }
      }
      ```

五、多环境自动隔离

5.1 环境隔离说明

  1. 从上面代码中可以看到,topic等信息通过代码常量方式维护,此时如果不对环境进行隔离,将会导致消息被错误的环境消费
  2. 其中一种解决方案是通过$Value注入,但是这个需要每个环境都维护自己的topic等信息
  3. 我们可以进行环境区分自动隔离,比如dev、test、prod等不同环境只需要简单配置一个选项,所有的消息将被自动隔离,这样当协作人员需要单独测试的时候较为方便
  4. 环境隔离利用BeanPostProcessor的postProcessBeforeInitialization在监听器实例初始化前把对应topic修改

5.2 环境隔离配置

多环境自动隔离这部分实用性五颗星

  1. 配置文件中增加属性配置

    1. # 自定义属性
    2. system:
    3. environment:
    4. # 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串
    5. name: tianxin
    6. # 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果
    7. # 默认为true,配置类:EnvironmentIsolationConfig
    8. isolation: true
  2. 配置Bean修改配置,自动隔离核心原理

    • Spring生命周期中BeanPostProcessor在类初始化前在postProcessBeforeInitialization中搞事情
    • 监听器类脑袋上面有@Component注解,所以一定会被Spring收拾到容器中
    • 所以可以在初始化前,把监听器类的topic/group/tag神不知鬼不觉给改掉,然后实例化的时候用的就是改后的值
    • 可以选择隔离topic/group/tag或者全部隔离,如果是多租户那就拼接多租户信息即可
      ```java
      import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
      import org.springframework.beans.BeansException;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.beans.factory.config.BeanPostProcessor;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.lang.NonNull;
      import org.springframework.util.StringUtils;

/**

  • RocketMQ多环境隔离配置
  • 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉
    */
    @Configuration
    public class EnvironmentIsolationConfig implements BeanPostProcessor {
    @Value(“${system.environment.isolation:true}”)
    private boolean enabledIsolation;
    @Value(“${system.environment.name:’’}”)
    private String environmentName;

    @Override
    public Object postProcessBeforeInitialization(@NonNull Object bean,

    1. @NonNull String beanName) throws BeansException {
    2. // DefaultRocketMQListenerContainer是监听器实现类
    3. if (bean instanceof DefaultRocketMQListenerContainer) {
    4. DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
    5. // 开启消息隔离情况下获取隔离配置,此处隔离topic,根据自己的需求隔离group或者tag
    6. if (enabledIsolation && StringUtils.hasText(environmentName)) {
    7. container.setTopic(String.join("_", container.getTopic(), environmentName));
    8. }
    9. return container;
    10. }
    11. return bean;

    }
    }

```

  1. 启动项目,从控制台中可以看到当开启隔离的时候会自动在topic后面加上隔离名称
  2. 提示:如果按照topic隔离,请提前创建好topic或者开启broker的topic自动创建功能

文章标签:

原文连接:https://juejin.cn/post/7104445507912597517

相关推荐

这样优化Spring Boot,启动速度快到飞起!

通过webservice实现springboot项目间接口调用与对象传递

博大精深的JAVA虚拟机内存模型JMM

引入Elasticsearch7.x版本后,原本好好的服务无法启动了

SpringBoot-配置高级

SpringBoot+Vue实现简单用户管理平台第二篇(前端设计,接口对接)

Spring Boot+Vue3 动态菜单实现思路梳理

如何用SpringBoot整合Redis(详细讲解~)

springboot如何进行混淆加密(proguard+xjar)

聊聊写代码的20个反面教材

SpringBoot接口 - 如何生成接口文档之Swagger技术栈?

Spring Boot + WebSocket 实时监控异常

SpringBoot整合oceanbase,实现oracle无缝切换到oceanbase

SpringBoot自定义注解 + AOP 防止重复提交(建议收藏)

SpringBoot 三大开发工具,你都用过么?

从零开始把 SpringBoot 搬到 K8s 上运行,我用了这几步!

阿里出品---SpringBoot应用自动化部署神器,IDEA版Jenkins?

免费分享一个springboot+vue校园宿舍管理系统,挺漂亮的

SpringBoot开发 - 如何定制自己的Banner?还能用图片?

大公司为什么禁止SpringBoot项目使用Tomcat?