#include "helper.h" #include #include Helper::Helper(QObject *parent) : QObject(parent), m_counterTimer(new QTimer(this)) { m_counterTimer->setInterval(2000); m_counterTimer->start(); QObject::connect(m_counterTimer, &QTimer::timeout, this, &Helper::publishValue); } Helper::~Helper() { } void Helper::openConnection() { qWarning() << "Helper::openConnection()"; QMetaObject::invokeMethod(this, [this] { m_mqttClient = new QMqttClient(this); m_mqttClient->setClientId(m_clientId); m_mqttClient->setHostname(QStringLiteral("test.mosquitto.org")); m_mqttClient->setPort(1883); connect(m_mqttClient, &QMqttClient::connected, this, &Helper::mqttConnected, Qt::QueuedConnection); connect(m_mqttClient, &QMqttClient::disconnected, this, &Helper::mqttDisconnected, Qt::QueuedConnection); connect(m_mqttClient, &QMqttClient::errorChanged, this, &Helper::mqttErrorChanged, Qt::QueuedConnection); connect(m_mqttClient, &QMqttClient::stateChanged, this, &Helper::mqttStateChanged, Qt::QueuedConnection); continueConnect(); }, Qt::QueuedConnection); } void Helper::continueConnect() { qWarning() << "Helper::continueConnect()"; QMetaObject::invokeMethod(this, [this] { m_mqttClient->setCleanSession(true); m_mqttClient->connectToHost(); }, Qt::QueuedConnection); } void Helper::createMqttSubscription() { if (!m_mqttSubscription) { m_mqttSubscription = m_mqttClient->subscribe(m_topicPath); connect(m_mqttSubscription, &QMqttSubscription::stateChanged, this, &Helper::mqttSubscriptionStateChanged); connect(m_mqttSubscription, &QMqttSubscription::messageReceived, this, &Helper::mqttSubscriptionMessageReceived); } } void Helper::mqttConnected() { if (!m_mqttSubscription) { qWarning() << "Helper::mqttConnected() - first time connection, subscribing"; createMqttSubscription(); } else { qWarning() << "Helper::mqttConnected() - reconnection"; /* UNCOMMENT THE FOLLOWING TO WORK AROUND THE ISSUE disconnect(m_mqttSubscription, &QMqttSubscription::stateChanged, this, &Helper::mqttSubscriptionStateChanged); disconnect(m_mqttSubscription, &QMqttSubscription::messageReceived, this, &Helper::mqttSubscriptionMessageReceived); m_mqttSubscription->unsubscribe(); m_mqttSubscription = nullptr; QTimer::singleShot(1000, this, [this] { createMqttSubscription(); }); */ } } void Helper::mqttDisconnected() { qWarning() << "Helper::mqttDisconnected()"; QTimer::singleShot(20000, this, [this] { if (m_mqttClient->state() == QMqttClient::Disconnected) { qWarning() << "AUTOMATICALLY RECONNECTING..."; continueConnect(); } }); } void Helper::mqttErrorChanged() { qWarning() << "Helper::mqttErrorChanged():" << m_mqttClient->error(); } void Helper::mqttStateChanged() { qWarning() << "Helper::mqttStateChanged():" << m_mqttClient->state(); m_state = m_mqttClient->state(); Q_EMIT stateChanged(); } void Helper::mqttSubscriptionStateChanged() { qWarning() << "Helper::mqttSubscriptionStateChanged():" << m_mqttSubscription->state(); } void Helper::mqttSubscriptionMessageReceived(const QMqttMessage &message) { qWarning() << "Helper::mqttSubscriptionMessageReceived():" << message.topic().name() << "=" << message.payload(); m_receivedValue = QString::fromUtf8(message.payload()).toInt(); Q_EMIT receivedValueChanged(); } void Helper::publishValue() { if (m_mqttClient && m_mqttClient->state() == QMqttClient::Connected && m_mqttSubscription && m_mqttSubscription->state() == QMqttSubscription::Subscribed) { ++m_publishedValue; qWarning() << "Helper::publishValue():" << m_publishedValue; m_mqttClient->publish(m_topicPath, QStringLiteral("%1").arg(m_publishedValue).toUtf8()); Q_EMIT publishedValueChanged(); } }