diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 00:40:50 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-19 00:40:50 +0200 |
| commit | c4dbe5151ef76e46dc2adc9e373dd8447219bcaf (patch) | |
| tree | 437c43f3bc0a43da3acd1a617a9747115a4ec72b /amalgamation | |
| parent | 56b14eb55d3078d38e07bc22aae73e73017b1257 (diff) | |
| download | mqtt-c4dbe5151ef76e46dc2adc9e373dd8447219bcaf.tar.gz mqtt-c4dbe5151ef76e46dc2adc9e373dd8447219bcaf.zip | |
Update amalgamationv0.3
Diffstat (limited to 'amalgamation')
| -rw-r--r-- | amalgamation/mqtt.c | 40 | ||||
| -rw-r--r-- | amalgamation/mqtt.h | 13 |
2 files changed, 37 insertions, 16 deletions
diff --git a/amalgamation/mqtt.c b/amalgamation/mqtt.c index 3f4c002..40f3a22 100644 --- a/amalgamation/mqtt.c +++ b/amalgamation/mqtt.c @@ -5032,8 +5032,8 @@ int SocketSelect(int sock, int *events, int timeout) } memset(&tv, 0, sizeof(tv)); - tv.tv_sec = timeout; - tv.tv_usec = 0; + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout - (tv.tv_sec * 1000)) * 1000; *events = 0; @@ -6049,7 +6049,7 @@ static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client) return inMessagesCount + outMessagesCount - queued; } -MqttClient *MqttClientNew(const char *clientId, int cleanSession) +MqttClient *MqttClientNew(const char *clientId) { MqttClient *client; @@ -6062,8 +6062,6 @@ MqttClient *MqttClientNew(const char *clientId, int cleanSession) client->clientId = bfromcstr(clientId); - client->cleanSession = cleanSession; - client->stream.sock = -1; client->retryTimeout = 20; @@ -6099,6 +6097,11 @@ void MqttClientFree(MqttClient *client) bdestroy(client->willMessage); bdestroy(client->host); + if (client->stream.sock != -1) + { + SocketDisconnect(client->stream.sock); + } + free(client); } @@ -6149,7 +6152,7 @@ void MqttClientSetOnPublish(MqttClient *client, } int MqttClientConnect(MqttClient *client, const char *host, short port, - int keepAlive) + int keepAlive, int cleanSession) { int sock; MqttPacketConnect *packet; @@ -6160,6 +6163,7 @@ int MqttClientConnect(MqttClient *client, const char *host, short port, client->host = bfromcstr(host); client->port = port; client->keepAlive = keepAlive; + client->cleanSession = cleanSession; if (keepAlive < 0) { @@ -6216,7 +6220,12 @@ int MqttClientDisconnect(MqttClient *client) return MqttClientQueueSimplePacket(client, MqttPacketTypeDisconnect); } -int MqttClientRunOnce(MqttClient *client) +int MqttClientIsConnected(MqttClient *client) +{ + return client->stream.sock != -1; +} + +int MqttClientRunOnce(MqttClient *client, int timeout) { int rv; int events; @@ -6246,7 +6255,12 @@ int MqttClientRunOnce(MqttClient *client) LOG_DEBUG("selecting"); - rv = SocketSelect(client->stream.sock, &events, client->keepAlive); + if (timeout < 0) + { + timeout = client->keepAlive * 1000; + } + + rv = SocketSelect(client->stream.sock, &events, timeout); if (rv == -1) { @@ -6301,7 +6315,7 @@ int MqttClientRunOnce(MqttClient *client) else if (SIMPLEQ_EMPTY(&client->sendQueue)) { int64_t elapsed = MqttGetCurrentTime() - client->lastPacketSentTime; - if (elapsed/1000 >= client->keepAlive) + if (elapsed/1000 >= client->keepAlive && client->keepAlive > 0) { MqttClientQueueSimplePacket(client, MqttPacketTypePingReq); client->pingSent = 1; @@ -6324,7 +6338,7 @@ int MqttClientRun(MqttClient *client) while (!client->stopped) { - if (MqttClientRunOnce(client) == -1) + if (MqttClientRunOnce(client, -1) == -1) return -1; } @@ -6627,7 +6641,9 @@ static void MqttClientHandlePublish(MqttClient *client, MqttPacketPublish *packe client->onMessage(client, bdata(packet->topicName), bdata(packet->message), - blength(packet->message)); + blength(packet->message), + packet->qos, + packet->retain); } if (MqttPacketPublishQos(packet) > 0) @@ -6722,7 +6738,7 @@ static void MqttClientHandlePubRel(MqttClient *client, MqttPacket *packet) TAILQ_FOREACH(pubRec, &client->inMessages, messages) { if (MqttPacketId(pubRec) == MqttPacketId(packet) && - MqttPacketType(pubRec) == MqttPacketTypePublish) + MqttPacketType(pubRec) == MqttPacketTypePubRec) { break; } diff --git a/amalgamation/mqtt.h b/amalgamation/mqtt.h index 6c16ea6..f07ff3c 100644 --- a/amalgamation/mqtt.h +++ b/amalgamation/mqtt.h @@ -39,11 +39,14 @@ typedef void (*MqttClientOnUnsubscribeCallback)(MqttClient *client, int id); typedef void (*MqttClientOnMessageCallback)(MqttClient *client, const char *topic, - const void *data, size_t size); + const void *data, + size_t size, + int qos, + int retain); typedef void (*MqttClientOnPublishCallback)(MqttClient *client, int id); -MqttClient *MqttClientNew(const char *clientId, int cleanSession); +MqttClient *MqttClientNew(const char *clientId); void MqttClientFree(MqttClient *client); @@ -66,11 +69,13 @@ void MqttClientSetOnPublish(MqttClient *client, MqttClientOnPublishCallback cb); int MqttClientConnect(MqttClient *client, const char *host, short port, - int keepAlive); + int keepAlive, int cleanSession); int MqttClientDisconnect(MqttClient *client); -int MqttClientRunOnce(MqttClient *client); +int MqttClientIsConnected(MqttClient *client); + +int MqttClientRunOnce(MqttClient *client, int timeout); int MqttClientRun(MqttClient *client); |
