云原生中间件RocketMQ-快速入门

在这里插入图片描述

生产组:用于消息的发送。 消费组:用于消息的订阅处理。 生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。

生产者使用

  1. 创建生产者对象 DefaultMQProducer
  2. 设置NamesrvAddr
  3. 启动生产者服务
  4. 创建消息并发送

代码实现如下: 同步发送:

代码语言:javascript
复制
// 创建DefaultMQProducer消息生产者对象
		DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
		//设置NameServer节点地址,多个节点间用分号分割
		producer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
		//与NameServer建立长连接
		producer.start();
	for(int i = 0 ; i <5; i ++) {
		//	1.	创建消息
		Message message = new Message("test_quick_topic",	//	主题
				"TagA", //	标签
				"key" + i,	// 	用户自定义的key ,唯一的标识
				("Hello RocketMQ" + i).getBytes());	//	消息内容实体(byte[])
		//	2.1	同步发送消息

// if(i == 1) {
// message.setDelayTimeLevel(3);
// }
// 发送消息,获取发送结果
SendResult sr = producer.send(message, new MessageQueueSelector() {

			@Override
			public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
				Integer queueNumber = (Integer)arg;
				return mqs.get(queueNumber);
			}
		}, 2);
		System.err.println(sr);

// SendResult sr = producer.send(message);
// SendStatus status = sr.getSendStatus();
// System.err.println(status);

      //System.err.println("消息发出: " + sr);


	}
	// 消息发送完毕关闭连接
	producer.shutdown();</code></pre></div></div><p>异步发送:</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">			//  2.2 异步发送消息
		producer.send(message, new SendCallback() {
			//rabbitmq急速入门的实战: 可靠性消息投递
			@Override
			public void onSuccess(SendResult sendResult) {
				System.err.println(&#34;msgId: &#34; + sendResult.getMsgId() + &#34;, status: &#34; + sendResult.getSendStatus());
			}
			@Override
			public void onException(Throwable e) {
				e.printStackTrace();
				System.err.println(&#34;------发送失败&#34;);
			}
		});</code></pre></div></div><p>执行生产者发送消息,可以看到控制台输出如下:

在这里插入图片描述

在对应的控制台可以查看到对应的消息主题

在这里插入图片描述

在消息页签可以通过topic查询到消息,也可以通过message_key和message_id查询。

在这里插入图片描述

消费者使用

  1. 创建消费者对象 DefaultMQPushConsumer
  2. 设置NamesrvAddr及其消费位置ConsumeFromWhere
  3. 设置订阅主题subscribe
  4. 注册监听并消费registerMessageListener

具体代码实现如下:

代码语言:javascript
复制
        // 创建消费者对象
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
		// 设置NameServer节点
		consumer.setNamesrvAddr(NameServerAddrConst.NAMESRV_ADDR_SINGLE);
		// 设置消费位置,从哪个点开始
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
		/*订阅主题,
        consumer.subscribe包含两个参数:
        topic: 说明消费者从Broker订阅哪一个主题,这一项要与Provider保持一致。
        subExpression: 子表达式用于筛选tags。
            同一个主题下可以包含很多不同的tags,subExpression用于筛选符合条件的tags进行接收。
            例如:设置为*,则代表接收所有tags数据。
            例如:设置为2022S1,则Broker中只有tags=2022S1的消息会被接收,而2022S2就会被排除在外。
        */
		consumer.subscribe("test_quick_topic", "*");
		// 创建监听,当有新的消息监听程序会及时捕捉并加以处理。
		consumer.registerMessageListener(new MessageListenerConcurrently() {
		@Override
		public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs, ConsumeConcurrentlyContext context) {
			MessageExt me = msgs.get(0);
			try {
				String topic = me.getTopic();
				String tags = me.getTags();
				String keys = me.getKeys();

// if(keys.equals("key1")) {
// System.err.println("消息消费失败..");
// int a = 1/0;
// }

				String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
				System.err.println(&#34;topic: &#34; + topic + &#34;,tags: &#34; + tags + &#34;, keys: &#34; + keys + &#34;,body: &#34; + msgBody);
			} catch (Exception e) {
				e.printStackTrace();

// int recousumeTimes = me.getReconsumeTimes();
// System.err.println("recousumeTimes: " + recousumeTimes);
// if(recousumeTimes == 3) {
// // 记录日志....
// // 做补偿处理
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// }

				return ConsumeConcurrentlyStatus.RECONSUME_LATER;
			}
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
	});
	// 启动消费者,与Broker建立长连接,开始监听。
	consumer.start();
	System.err.println(&#34;consumer start...&#34;);</code></pre></div></div><p>上述注释代码模拟了消息出现异常的情况,如果连续三次消费失败,则记录日志做补偿处理,并返回成功。</p><blockquote><p> 本文内容到此结束了,

如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页:共饮一杯无的博客汇总👨‍💻
保持热爱,奔赴下一场山海。🏃🏃🏃

在这里插入图片描述