RocketMQ(03)——通过Tag对消息分类
通过Tag对消息分类
RocketMQ建议一个业务系统只使用一个Topic,不同类型的消息通过tag来区分。tag可以在构造Message的时候指定,下面代码就指定了发送的消息的tag都为tag0。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", "tag0", ("hello with tag---" + i).getBytes());
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
也可以通过Message的setTags()
进行指定。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", ("hello with tag---" + i).getBytes());
message.setTags("tag0");
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
虽然参数名叫tags,但是一条消息只能指定一个tag。消费者进行消费的时候也可以指定需要消费的消息对应的tag,比如下面就指定了需要消费的消息对应的Topic是topic1,Tag是tag0。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
同一消费者也可以同时订阅同一个Topic的多个Tag,多个Tag之间通过||
进行分隔。比如下面代码就同时订阅了tag0、tag1和tag2。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0||tag1||tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
(注:本文基于RocketMQ4.5.0所写)
正文到此结束
- 本文标签: Spring jQuery获取对象 jQuery获取对象常用方法 struts2 s:iterator标签
- 本文链接: http://www.it586.cn/article/127
- 版权声明: 本文由miger原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权