Skip to main content
Version: 4.x

顺序消息发送

顺序消息介绍#

顺序消息是一种对消息发送和消费顺序有严格要求的消息。

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

顺序消息发送

顺序消息的应用场景也非常广泛,比如在创建订单的例子中,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。

顺序消息示例代码#

顺序消息的代码如下所示:

public class Producer {    public static void main(String[] args) throws UnsupportedEncodingException {        try {            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");            producer.start();
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};            for (int i = 0; i < 100; i++) {                int orderId = i % 10;                Message msg =                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {                    @Override                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {                        Integer id = (Integer) arg;                        int index = id % mqs.size();                        return mqs.get(index);                    }                }, orderId);
                System.out.printf("%s%n", sendResult);            }
            producer.shutdown();        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {            e.printStackTrace();        }    }}

这里的区别主要是调用了SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。

tip

MessageQueueSelector的接口如下:

public interface MessageQueueSelector {    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}

其中 mqs 是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。

顺序消息的一致性#

如果一个Broker掉线,那么此时队列总数是否会发化?

如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息:

> sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876create topic to 127.0.0.1:10911 success.TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

其次要保证NameServer中的配置 orderMessageEnablereturnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。