基于腾讯云tdmq消息队列封装SpringBootStarter(二)

关于腾讯云tdmq的基本使用参见《基于腾讯云tdmq消息队列封装SpringBootStarter(一)》,这里我们基于之前的内容在次进行优化封装。

一、创建消费者注解(TdmqConsumer)和生产者注解(TdmqProducer)

1.1、基础工程回顾

首先我们回顾下上一章完成的基础功能。

工程目录

工程目录

上一章我们创建了配置目录config、生产者和消费者目录,以及META-INF目录和spring.factories配置文件。

在此基础上我们继续完善我们的工程。

1.2、创建注解

在该工程上新建annotation包,并在annotation包下创建TdmqProducerTdmqConsumer注解。并且在消费者注解TdmqConsumer注解中新增一下属性:topicclazzSubscriptionTypeconsumerNamesubscriptionName

最终消费者注解内容如下:

代码语言:javascript
复制
/**
 * 消费者注解
 *
 * @author wanghongjie
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface TdmqConsumer {
    /**
     * 订阅主题
     *
     * @return
     */
    String topic();
/**
 * 序列化类
 *
 * @return
 */
Class<?> clazz() default byte[].class;

/**
 * 消费者类型
 *
 * @return
 */
SubscriptionType[] subscriptionType() default {};

/**
 * 消费者名称
 *
 * @return
 */
String consumerName() default "";

/**
 * 订阅对象名称
 *
 * @return
 */
String subscriptionName() default "";

}

生产者注解:

代码语言:javascript
复制
/**

  • 生产者注解

  • @author wanghongjie
    /
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface TdmqProducer {
    }

1.3、创建收集器

我们在工程中创建生产者收集器(ProducerCollector)和消费者收集器(ConsumerCollector),创建收集器的目的是在springBoot项目启动中,扫描所有带有TdmqProducerTdmqConsumer注解的Bean对象,并将其统一管理。

在工程中创建collector包,并在该包下创建ProducerCollectorConsumerCollector

在创建消费者收集器前我们需要创建个对象类ConsumerHolderProducerCollector,用来绑定注解和实现类的绑定关系。

1.3.1、创建消费者绑定对象
代码语言:javascript
复制
/*

  • @Author julyWhj

  • @Description $
    **/
    public class ConsumerHolder {
    private final TdmqConsumer annotation;
    private final Method handler;
    private final Object bean;
    private final Class<?> type;

    ConsumerHolder(TdmqConsumer annotation, Method handler, Object bean, Class<?> type) {
    this.annotation = annotation;
    this.handler = handler;
    this.bean = bean;
    this.type = type;
    }

    public TdmqConsumer getAnnotation() {
    return annotation;
    }

    public Method getHandler() {
    return handler;
    }

    public Object getBean() {
    return bean;
    }

    public Class<?> getType() {
    return type;
    }

    public boolean isWrapped() {
    return type.isAssignableFrom(Object.class);
    }
    }

  • 1.3.2 生产者绑定对象
    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 生产者绑定关系$

  • @Date 2022/1/3 1:26 下午
    **/
    public class ProducerHolder {
    private final String topic;
    private final Class<?> clazz;
    private final String serialization;

    public ProducerHolder(String topic, Class<?> clazz, String serialization) {
    this.topic = topic;
    this.clazz = clazz;
    this.serialization = serialization;
    }

    public String getTopic() {
    return topic;
    }

    public Class<?> getClazz() {
    return clazz;
    }

    public String getSerialization() {
    return serialization;
    }

  • }

    1.3.3、创建消费者收集器ConsumerCollector.
    代码语言:javascript
    复制
    /**

    • @Author julyWhj

    • @Description 消费者收集器$

    • @Date 2022/1/3 10:34 上午
      /
      @Configuration
      public class ConsumerCollector implements BeanPostProcessor {
      /

      • 维护SpringBoot所有bean对象中包含TdmqConsumer注解的实例对象
        */
        private Map<String, ConsumerHolder> consumers = new ConcurrentHashMap<>();

      /**

      • SpringBoot 启动过程中,Bean实例化后加载postProcessBeforeInitialization方法

      • @param bean bean对象

      • @param beanName 注解所在的方法名称

      • @return
        */
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) {
        final Class<?> beanClass = bean.getClass();
        // 过滤所有的Bean对象,如果包含TdmqConsumer注解的加入到consumers中
        consumers.putAll(Arrays.stream(beanClass.getDeclaredMethods())
        .filter( -&gt; .isAnnotationPresent(TdmqConsumer.class))
        .collect(Collectors.toMap(
        method -> buildConsumerName(beanClass, method),
        method -> new ConsumerHolder(method.getAnnotation(TdmqConsumer.class), method, bean,
        getParameterType(method)))));

        return bean;
        }

      @Override
      public Object postProcessAfterInitialization(Object bean, String beanName) {
      return bean;
      }

      public Map<String, ConsumerHolder> getConsumers() {
      return consumers;
      }

      public Optional<ConsumerHolder> getConsumer(String methodDescriptor) {
      return Optional.ofNullable(consumers.get(methodDescriptor));
      }

      public static Class<?> getParameterType(Method method) {
      return method.getParameterTypes()[0];
      }

      /**

      • 构建消费者名称
      • @param clazz 对象
      • @param method 方法
      • @return 消费者名称
        /
        public String buildConsumerName(Class<?> clazz, Method method) {
        return clazz.getName() + method.getName() + Arrays
        .stream(method.getGenericParameterTypes())
        .map(Type::getTypeName)
        .collect(Collectors.joining());
        }
        }
    1.3.4、创建生产者收集器
    代码语言:javascript
    复制
    /*
    
    
    
  • @Author julyWhj

  • @Description 生产者收集器$

  • @Date 2022/1/3 1:25 下午
    **/
    @Component
    public class ProducerCollector implements BeanPostProcessor, EmbeddedValueResolverAware {

    private final PulsarClient pulsarClient;
    private final TdmqProperties tdmqProperties;

    private final Map<String, Producer> producers = new ConcurrentHashMap<>();

    private StringValueResolver stringValueResolver;

    public ProducerCollector(PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
    this.pulsarClient = pulsarClient;
    this.tdmqProperties = tdmqProperties;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
    final Class<?> beanClass = bean.getClass();
    if (beanClass.isAnnotationPresent(TdmqProducer.class) && bean instanceof IProducerFactory) {
    producers.putAll(((IProducerFactory) bean).getTopics().entrySet().stream()
    .map($ -> new ProducerHolder(
    stringValueResolver.resolveStringValue($.getKey()),
    $.getValue().left,
    $.getValue().right))
    .collect(Collectors.toMap(ProducerHolder::getTopic, this::buildProducer)));
    }

     return bean;
    

    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
    return bean;
    }

    private Producer<?> buildProducer(ProducerHolder holder) {
    try {
    return pulsarClient.newProducer(getSchema(holder))
    .topic(buildTopicUrl(holder.getTopic()))
    .create();
    } catch (PulsarClientException e) {
    throw new RuntimeException(e);
    }
    }

    private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
    return getGenericSchema(holder.getSerialization(), holder.getClazz());
    }

    public Producer getProducer(String topic) {
    return producers.get(stringValueResolver.resolveStringValue(topic));
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
    this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
    return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
    "/" + topic;
    }

    private static <T> Schema<?> getGenericSchema(String type, Class<T> clazz) throws RuntimeException {
    switch (type) {
    case "JSON": {
    return Schema.JSON(clazz);
    }
    case "AVRO": {
    return Schema.AVRO(clazz);
    }
    case "STRING": {
    return Schema.STRING;
    }
    default: {
    throw new RuntimeException("Unknown producer schema.");
    }
    }
    }
    }

  • 1.4、创建消费者消息处理聚合器

    我们通过postProcessBeforeInitialization方法以及将全部带有TdmqConsumer注解的对象收集起来,接下来我们定义个消费者消息处理器,来出来这些Bean对象,这里也是本篇文章的核心内容。

    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 消息处理聚合器$

  • @Date 2022/1/3 11:09 上午
    **/
    @Component
    @DependsOn({"pulsarClient"})
    public class ConsumerAggregator implements EmbeddedValueResolverAware {
    private final ConsumerCollector consumerCollector;
    private final PulsarClient pulsarClient;
    private final static SubscriptionType DEFAULT_SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
    private final TdmqProperties tdmqProperties;
    private StringValueResolver stringValueResolver;
    private List<Consumer> consumers;

    public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
    this.consumerCollector = consumerCollector;
    this.pulsarClient = pulsarClient;
    this.tdmqProperties = tdmqProperties;
    }

    /**

    • 待spring上下文启动完毕后,加载注解init()方法
      */
      @EventListener(ApplicationReadyEvent.class)
      public void init() {
      //获取收集器中所有的消费者对象
      consumers = consumerCollector.getConsumers().entrySet().stream()
      .map(holder -> subscribe(holder.getKey(), holder.getValue()))
      .collect(Collectors.toList());
      }

    /**

    • 消费者消息监听处理类
    • @param generatedConsumerName 消费者名称
    • @param holder 绑定关系
    • @return
      */
      private Consumer<?> subscribe(String generatedConsumerName, ConsumerHolder holder) {
      try {
      //从注解中获取消费名称
      final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
      //从注解中获取订阅名称
      final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
      //从注解中获取队列topic名称
      final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
      //获取消费者类型--参考官方文档类型说明
      final SubscriptionType subscriptionType = getSubscriptionType(holder);
      //通过pulsarClient构建consumerBuilder
      final ConsumerBuilder<?> consumerBuilder = pulsarClient
      .newConsumer()
      .consumerName(consumerName)
      .subscriptionName(subscriptionName)
      .topic(buildTopicUrl(topicName))
      .subscriptionType(subscriptionType)
      .messageListener((consumer, msg) -> {
      try {
      //从绑定关系中获取需执行的方法
      final Method method = holder.getHandler();
      method.setAccessible(true);
      //通过反射执行注解所在的方法,并将监听到的消息作为参数进行传递
      method.invoke(holder.getBean(), msg.getValue());
      //消息执行后手动ack消息
      consumer.acknowledge(msg);
      } catch (Exception e) {
      //消息处理执行异常,进行negativeAcknowledge操作
      consumer.negativeAcknowledge(msg);
      }
      });
      buildDeadLetterPolicy(holder, consumerBuilder);
      return consumerBuilder.subscribe();
      } catch (PulsarClientException e) {
      //应该自定义异常,这里暂时不做处理
      throw new RuntimeException(e);
      }
      }

    private SubscriptionType getSubscriptionType(ConsumerHolder holder) {
    SubscriptionType subscriptionType = Arrays.stream(holder.getAnnotation().subscriptionType())
    .findFirst().orElse(null);
    if (subscriptionType == null) {
    subscriptionType = DEFAULT_SUBSCRIPTION_TYPE;
    }
    return subscriptionType;
    }

    public void buildDeadLetterPolicy(ConsumerHolder holder, ConsumerBuilder<?> consumerBuilder) {
    DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder =
    DeadLetterPolicy.builder().maxRedeliverCount(-1);
    }

    public List<Consumer> getConsumers() {
    return consumers;
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
    this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
    return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
    "/" + topic;
    }
    }

  • 1.5、创建生产者工厂和模版处理类

    1.5.1、创建生产者工厂接口IProducerFactory
    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 生产者工程接口$

  • @Date 2021/12/30 7:55 下午
    /
    public interface IProducerFactory {
    Map<String, ImmutablePair<Class<?>, String>> getTopics();
    }

  • 1.5.2、创建工程对象
    代码语言:javascript
    复制
    /

  • @Author julyWhj

  • @Description $

  • @Date 2022/1/3 1:27 下午
    **/
    @TdmqProducer
    public class ProducerFactory implements IProducerFactory {

    private final Map<String, ImmutablePair<Class<?>, String>> topics = new HashMap<>();

    public ProducerFactory addProducer(String topic) {
    return addProducer(topic, byte[].class, "JSON");
    }

    public ProducerFactory addProducer(String topic, Class<?> clazz) {
    topics.put(topic, new ImmutablePair<>(clazz, "JSON"));
    return this;
    }

    public ProducerFactory addProducer(String topic, Class<?> clazz, String serialization) {
    topics.put(topic, new ImmutablePair<>(clazz, serialization));
    return this;
    }

    @Override
    public Map<String, ImmutablePair<Class<?>, String>> getTopics() {
    return topics;
    }
    }

  • 1.5.3、构建TdmqTemplate
    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 模版工具类$

  • @Date 2022/1/3 1:42 下午
    **/
    public class TdmqTemplate<T> {
    private final ProducerCollector producerCollector;

    public TdmqTemplate(ProducerCollector producerCollector) {
    this.producerCollector = producerCollector;
    }

    /**

    • 发送消息接口
    • @param topic 队列
    • @param msg 消息内容
    • @return
    • @throws PulsarClientException
      */
      public MessageId send(String topic, T msg) throws PulsarClientException {
      return producerCollector.getProducer(topic).send(msg);
      }

    /**

    • 异步发送消息接口
    • @param topic 队列
    • @param message 消息内容
    • @return
      */
      public CompletableFuture<MessageId> sendAsync(String topic, T message) {
      return producerCollector.getProducer(topic).sendAsync(message);
      }

    /**

    • 构建消息
    • @param topic 队列
    • @param message 消息内容
    • @return
      /
      public TypedMessageBuilder<T> createMessage(String topic, T message) {
      return producerCollector.getProducer(topic).newMessage().value(message);
      }
      }
  • 1.6、整合生产消息者配置

    将生产者和消费者配置到TdmqAutoConfiguration文件中,完整的TdmqAutoConfiguration内容如下:

    代码语言:javascript
    复制
    /*
    
    
    
  • @Author julyWhj

  • @Description Mq自动装配类$

  • @Date 2022/1/2 9:59 上午
    /
    @Slf4j
    @Data
    @EnableConfigurationProperties({TdmqProperties.class})
    public class TdmqAutoConfiguration {
    /

    • Pulsar 客户端
    • 推荐一个进程一个实例
    • @return {@link TdmqAutoConfiguration}
      */
      @Bean
      @ConditionalOnMissingBean(PulsarClient.class)
      @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
      public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException {
      log.info("-----------------");
      return PulsarClient.builder()
      .serviceUrl(mqProperties.getServiceUrl())
      .authentication(AuthenticationFactory.token(mqProperties.getToken()))
      .build();
      }

    /**

    • 配置消费者收集器
    • @return
      */
      @Bean
      @ConditionalOnMissingBean(ConsumerCollector.class)
      @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
      public ConsumerCollector consumerCollector() {
      return new ConsumerCollector();
      }

    /**

    • 配置消费者消费者消息处理聚合器
    • @param consumerCollector 配置消费者收集器
    • @param pulsarClient pulsarClient 客户端
    • @param tdmqProperties 配置信息
    • @return
      */
      @Bean
      @ConditionalOnMissingBean(ConsumerAggregator.class)
      @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
      public ConsumerAggregator consumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
      return new ConsumerAggregator(consumerCollector, pulsarClient, tdmqProperties);
      }

    /**

    • 配置生产者收集器
    • @param pulsarClient pulsarClient 客户端
    • @param tdmqProperties 配置信息
    • @return
      */
      @Bean
      @ConditionalOnMissingBean(ProducerCollector.class)
      @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
      public ProducerCollector producerCollector(PulsarClient pulsarClient,
      TdmqProperties tdmqProperties) {
      return new ProducerCollector(pulsarClient, tdmqProperties);
      }

    /**

    • 生产者消息模版
    • @param producerCollector 生产者收集器
    • @return
      */
      @Bean
      @ConditionalOnMissingBean(TdmqTemplate.class)
      @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
      public TdmqTemplate pulsarTemplate(ProducerCollector producerCollector) {
      return new TdmqTemplate(producerCollector);
      }
  • }

    二、使用案例

    我们这里使用自定义的TdmqConsumerTdmqTemplate来完成一个生产消费者的案例。

    2.1、创建生产者配置类

    创建生产者配置类ProducerConfiguration,该配置类,主要将消息队列队列名称绑定到ProducerFactory上下文中,我们可以通过TdmqTemplate去直接使用。

    代码语言:javascript
    复制
    /**

    • @Author julyWhj

    • @Description $

    • @Date 2022/1/3 1:58 下午
      /
      @Configuration
      public class ProducerConfiguration {
      /

      • 队列名称
        */
        public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

      @Bean
      public ProducerFactory producerFactory() {
      //将队列添加到ProducerFactory上下文中
      return new ProducerFactory()
      .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
      }
      }

    创建消费者监听

    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 消息队列消费者$

  • @Date 2022/1/3 2:01 下午
    **/
    @Slf4j
    @Service
    public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
    log.info("------------{}", msg);
    }
    }

  • 去除之前消费者TdmqConsumer.class.

    修改单元测试SpringBootStarterTdmqApplicationTests

    代码语言:javascript
    复制
    @Slf4j
    @SpringBootTest
    class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
    MessageId messageId = proucer.send("message_logging", "发送消息测试");
    log.info("send msg is success Id = {}", messageId);
    }

    }

    将之前的TdmqProucer改为TdmqTemplate;

    启动单元测试:

    查看测试结果:

    测试结果

    测试结果

    三、说明:

    1、配置生产者工厂

    代码语言:javascript
    复制
    /**

    • @Author julyWhj

    • @Description $

    • @Date 2022/1/3 1:58 下午
      /
      @Configuration
      public class ProducerConfiguration {
      /

      • 队列名称
        */
        public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

      @Bean
      public ProducerFactory producerFactory() {
      //将队列添加到ProducerFactory上下文中
      return new ProducerFactory()
      .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
      }
      }

    2、创建消费者实现类

    代码语言:javascript
    复制
    /**

  • @Author julyWhj

  • @Description 消息队列消费者$

  • @Date 2022/1/3 2:01 下午
    **/
    @Slf4j
    @Service
    public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
    log.info("------------{}", msg);
    }
    }

  • 主要在方法上增加TdmqConsumer注解。

    3、生产者TdmqTemplate模版使用

    代码语言:javascript
    复制
    @Slf4j
    @SpringBootTest
    class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
    MessageId messageId = proucer.send("message_logging", "发送消息测试");
    log.info("send msg is success Id = {}", messageId);
    }

    }

    4、使用配置文件

    代码语言:javascript
    复制
    tdmq:
    enable: true
    serviceUrl: serviceUrl
    token: token
    clusterId: clusterId
    environmentId: environmentId

    源码地址:

    hongjieWang/spring-boot-starter-tdmq: spring-boot-starter-tdmq (github.com)