发送普通消息(三种方式)

RocketMQ 发送普通消息有三种实现方式:可靠同步发送可靠异步发送单向(Oneway)发送

注意顺序消息只支持可靠同步发送

GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog

一、概念

1、可靠同步发送

原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

2、可靠异步发送

原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 消息队列 RocketMQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。

应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如批量发货等操作。

3、单向(Oneway)发送

原理:单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

4、三种对比

下表概括了三者的特点和主要区别。

发送方式 发送 TPS 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 可能丢失

二、代码示例

1、三种方式代码示例

@Slf4j
@RestController
public class Controller {
    /**
     * 生产者组
     */
    private static String PRODUCE_RGROUP = "test_producer";
    /**
     * 创建生产者对象
     */
    private static DefaultMQProducer producer = null;

    static {
        producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr("47.99.03.25:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    @GetMapping("/message")
    public  void  message() throws Exception {
        //1、同步
        sync();
        //2、异步
        async();
        //3、单项发送
        oneWay();
    }
    /**
     * 1、同步发送消息
     */
    private  void sync() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  同步发送  ").getBytes());
        //同步发送消息
        SendResult sendResult = producer.send(message);
        log.info("Product-同步发送-Product信息={}", sendResult);
    }
    /**
     * 2、异步发送消息
     */
    private  void async() throws Exception {
        //创建消息
        Message message = new Message("topic_family", ("  异步发送  ").getBytes());
        //异步发送消息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Product-异步发送-输出信息={}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //补偿机制,根据业务情况进行使用,看是否进行重试
            }
        });
    }
    /**
     * 3、单项发送消息
     */
    private  void oneWay() throws Exception {
        //创建消息
        Message message = new Message("topic_family", (" 单项发送 ").getBytes());
        //同步发送消息
        producer.sendOneway(message);
    }
}

2、测试结果

这里消费者代码就不贴出来了。

通过这个很明显可以看出三种方式都被 Consumer 消费了。只不过对于 Product 同步和异步发送是有返回信息的,单项发送是没有返回信息的。

参考

1、RocketMQ 阿里云官网文档

只要自己变优秀了,其他的事情才会跟着好起来(上将3)
版权声明:本文为qdhxhz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/qdhxhz/p/11121981.html