diff options
Diffstat (limited to 'amalgamation/mqtt.c')
| -rw-r--r-- | amalgamation/mqtt.c | 40 |
1 files changed, 28 insertions, 12 deletions
diff --git a/amalgamation/mqtt.c b/amalgamation/mqtt.c index 3f4c002..40f3a22 100644 --- a/amalgamation/mqtt.c +++ b/amalgamation/mqtt.c @@ -5032,8 +5032,8 @@ int SocketSelect(int sock, int *events, int timeout) } memset(&tv, 0, sizeof(tv)); - tv.tv_sec = timeout; - tv.tv_usec = 0; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout - (tv.tv_sec * 1000)) * 1000; *events = 0; @@ -6049,7 +6049,7 @@ static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client) return inMessagesCount + outMessagesCount - queued; } -MqttClient *MqttClientNew(const char *clientId, int cleanSession) +MqttClient *MqttClientNew(const char *clientId) { MqttClient *client; @@ -6062,8 +6062,6 @@ MqttClient *MqttClientNew(const char *clientId, int cleanSession) client->clientId = bfromcstr(clientId); - client->cleanSession = cleanSession; - client->stream.sock = -1; client->retryTimeout = 20; @@ -6099,6 +6097,11 @@ void MqttClientFree(MqttClient *client) bdestroy(client->willMessage); bdestroy(client->host); + if (client->stream.sock != -1) + { + SocketDisconnect(client->stream.sock); + } + free(client); } @@ -6149,7 +6152,7 @@ void MqttClientSetOnPublish(MqttClient *client, } int MqttClientConnect(MqttClient *client, const char *host, short port, - int keepAlive) + int keepAlive, int cleanSession) { int sock; MqttPacketConnect *packet; @@ -6160,6 +6163,7 @@ int MqttClientConnect(MqttClient *client, const char *host, short port, client->host = bfromcstr(host); client->port = port; client->keepAlive = keepAlive; + client->cleanSession = cleanSession; if (keepAlive < 0) { @@ -6216,7 +6220,12 @@ int MqttClientDisconnect(MqttClient *client) return MqttClientQueueSimplePacket(client, MqttPacketTypeDisconnect); } -int MqttClientRunOnce(MqttClient *client) +int MqttClientIsConnected(MqttClient *client) +{ + return client->stream.sock != -1; +} + +int MqttClientRunOnce(MqttClient *client, int timeout) { int rv; int events; @@ -6246,7 +6255,12 @@ int MqttClientRunOnce(MqttClient *client) LOG_DEBUG("selecting"); - rv = SocketSelect(client->stream.sock, &events, client->keepAlive); + if (timeout < 0) + { + timeout = client->keepAlive * 1000; + } + + rv = SocketSelect(client->stream.sock, &events, timeout); if (rv == -1) { @@ -6301,7 +6315,7 @@ int MqttClientRunOnce(MqttClient *client) else if (SIMPLEQ_EMPTY(&client->sendQueue)) { int64_t elapsed = MqttGetCurrentTime() - client->lastPacketSentTime; - if (elapsed/1000 >= client->keepAlive) + if (elapsed/1000 >= client->keepAlive && client->keepAlive > 0) { MqttClientQueueSimplePacket(client, MqttPacketTypePingReq); client->pingSent = 1; @@ -6324,7 +6338,7 @@ int MqttClientRun(MqttClient *client) while (!client->stopped) { - if (MqttClientRunOnce(client) == -1) + if (MqttClientRunOnce(client, -1) == -1) return -1; } @@ -6627,7 +6641,9 @@ static void MqttClientHandlePublish(MqttClient *client, MqttPacketPublish *packe client->onMessage(client, bdata(packet->topicName), bdata(packet->message), - blength(packet->message)); + blength(packet->message), + packet->qos, + packet->retain); } if (MqttPacketPublishQos(packet) > 0) @@ -6722,7 +6738,7 @@ static void MqttClientHandlePubRel(MqttClient *client, MqttPacket *packet) TAILQ_FOREACH(pubRec, &client->inMessages, messages) { if (MqttPacketId(pubRec) == MqttPacketId(packet) && - MqttPacketType(pubRec) == MqttPacketTypePublish) + MqttPacketType(pubRec) == MqttPacketTypePubRec) { break; } |
