通八洲科技

c++怎么使用mqtt协议通信_c++ paho-mqtt库异步发布与订阅【方法】

日期:2026-01-02 00:00 / 作者:尼克
C++调用paho-mqtt-cpp异步发布需使用mqtt::async_client,连接成功后调用publish()立即返回,须预先设置connection_lost_callback和delivery_complete_callback,发布时传mqtt::message对象且主题为std::string。

如何用 C++ 调用 paho-mqtt-cpp 实现异步发布

直接使用 mqtt::async_client 类,它默认走异步路径;同步发布(publish())会阻塞,而异步必须配合 delivery_complete_callbackconnection_lost_callback 才能真正“不卡主线程”。

关键点:异步发布调用 publish() 后立即返回,消息实际发送由底层线程池处理;但你必须在连接成功后才调用,否则抛 std::runtime_error(错误信息含 "not connected")。

mqtt::async_client cli("tcp://localhost:1883", "cpp_client");
cli.set_connection_lost_callback([](const std::string& cause) {
    std::cout << "Connection lost: " << cause << std::endl;
});
cli.connect()->wait(); // wait() 是阻塞的,但 connect() 返回 future,可改为 async + then
auto tok = cli.publish("sensor/temp", "23.5", 1, false);
tok->wait(); // 这里 wait() 是等发布动作提交完成,不是等 broker 确认

为什么订阅后收不到消息?检查 callback 注册和事件循环

paho-mqtt-cpp 的异步订阅本身不自动触发回调——它只把消息推到内部队列,你必须显式调用 cli.start_consuming() 启动消费线程,或手动轮询 cli.consume_message()。没这一步,message_arrived_callback 根本不会被调用。

常见错误是:调用了 subscribe(),也注册了 set_message_arrived_callback(),但忘了启动消费机制,结果消息静默丢失。

cli.set_message_arrived_callback([](const mqtt::message& msg) {
    try {
        std::cout << "Received: " << msg.get_payload_str() << " on " << msg.get_topic() << std::endl;
    } catch (...) { /* 防止异常逃逸 */ }
});
cli.subscribe("sensor/#", 1)->wait();
cli.start_consuming(); // 必须有!

QoS 1/2 发布失败却没报错?看 delivery_complete_callback 是否被触发

异步发布下,publish() 成功只表示消息已入发送队列,不代表 broker 已接收。QoS 1 或 2 的消息只有收到 PUBACK/PUBREC 才算真正送达,这个状态通过 delivery_complete_callback 通知。

如果你没设这个回调,或者回调里没做日志/计数,就完全不知道消息是否落地——尤其在网络抖动时,broker 可能丢弃未确认消息,而客户端毫无感知。

cli.set_delivery_complete_callback([](mqtt::idelivery_token_ptr tok) {
    if (tok && tok->is_complete()) {
        std::cout << "Message delivered, ID=" << tok->get_message_id() << std::endl;
    }
});

链接断开后自动重连失效?别只依赖 set_automatic_reconnect()

set_automatic_reconnect() 只控制 TCP 层重连行为,它不会自动恢复订阅(subscribe)、也不会重新绑定 callback。断连重连后,你得手动补订主题,否则消息收不到。

更隐蔽的问题是:重连成功触发 connected_callback,但此时 client 内部状态可能还没完全就绪(比如 session 还没重建),立刻 subscribe() 可能失败并抛异常。

自动重连不是银弹——它解决不了会话丢失、消息重复、订阅漂移这些 MQTT 协议层问题。