aboutsummaryrefslogtreecommitdiff
path: root/amalgamation
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-02-19 00:40:50 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-02-19 00:40:50 +0200
commitc4dbe5151ef76e46dc2adc9e373dd8447219bcaf (patch)
tree437c43f3bc0a43da3acd1a617a9747115a4ec72b /amalgamation
parent56b14eb55d3078d38e07bc22aae73e73017b1257 (diff)
downloadmqtt-c4dbe5151ef76e46dc2adc9e373dd8447219bcaf.tar.gz
mqtt-c4dbe5151ef76e46dc2adc9e373dd8447219bcaf.zip
Update amalgamationv0.3
Diffstat (limited to 'amalgamation')
-rw-r--r--amalgamation/mqtt.c40
-rw-r--r--amalgamation/mqtt.h13
2 files changed, 37 insertions, 16 deletions
diff --git a/amalgamation/mqtt.c b/amalgamation/mqtt.c
index 3f4c002..40f3a22 100644
--- a/amalgamation/mqtt.c
+++ b/amalgamation/mqtt.c
@@ -5032,8 +5032,8 @@ int SocketSelect(int sock, int *events, int timeout)
}
memset(&tv, 0, sizeof(tv));
- tv.tv_sec = timeout;
- tv.tv_usec = 0;
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout - (tv.tv_sec * 1000)) * 1000;
*events = 0;
@@ -6049,7 +6049,7 @@ static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client)
return inMessagesCount + outMessagesCount - queued;
}
-MqttClient *MqttClientNew(const char *clientId, int cleanSession)
+MqttClient *MqttClientNew(const char *clientId)
{
MqttClient *client;
@@ -6062,8 +6062,6 @@ MqttClient *MqttClientNew(const char *clientId, int cleanSession)
client->clientId = bfromcstr(clientId);
- client->cleanSession = cleanSession;
-
client->stream.sock = -1;
client->retryTimeout = 20;
@@ -6099,6 +6097,11 @@ void MqttClientFree(MqttClient *client)
bdestroy(client->willMessage);
bdestroy(client->host);
+ if (client->stream.sock != -1)
+ {
+ SocketDisconnect(client->stream.sock);
+ }
+
free(client);
}
@@ -6149,7 +6152,7 @@ void MqttClientSetOnPublish(MqttClient *client,
}
int MqttClientConnect(MqttClient *client, const char *host, short port,
- int keepAlive)
+ int keepAlive, int cleanSession)
{
int sock;
MqttPacketConnect *packet;
@@ -6160,6 +6163,7 @@ int MqttClientConnect(MqttClient *client, const char *host, short port,
client->host = bfromcstr(host);
client->port = port;
client->keepAlive = keepAlive;
+ client->cleanSession = cleanSession;
if (keepAlive < 0)
{
@@ -6216,7 +6220,12 @@ int MqttClientDisconnect(MqttClient *client)
return MqttClientQueueSimplePacket(client, MqttPacketTypeDisconnect);
}
-int MqttClientRunOnce(MqttClient *client)
+int MqttClientIsConnected(MqttClient *client)
+{
+ return client->stream.sock != -1;
+}
+
+int MqttClientRunOnce(MqttClient *client, int timeout)
{
int rv;
int events;
@@ -6246,7 +6255,12 @@ int MqttClientRunOnce(MqttClient *client)
LOG_DEBUG("selecting");
- rv = SocketSelect(client->stream.sock, &events, client->keepAlive);
+ if (timeout < 0)
+ {
+ timeout = client->keepAlive * 1000;
+ }
+
+ rv = SocketSelect(client->stream.sock, &events, timeout);
if (rv == -1)
{
@@ -6301,7 +6315,7 @@ int MqttClientRunOnce(MqttClient *client)
else if (SIMPLEQ_EMPTY(&client->sendQueue))
{
int64_t elapsed = MqttGetCurrentTime() - client->lastPacketSentTime;
- if (elapsed/1000 >= client->keepAlive)
+ if (elapsed/1000 >= client->keepAlive && client->keepAlive > 0)
{
MqttClientQueueSimplePacket(client, MqttPacketTypePingReq);
client->pingSent = 1;
@@ -6324,7 +6338,7 @@ int MqttClientRun(MqttClient *client)
while (!client->stopped)
{
- if (MqttClientRunOnce(client) == -1)
+ if (MqttClientRunOnce(client, -1) == -1)
return -1;
}
@@ -6627,7 +6641,9 @@ static void MqttClientHandlePublish(MqttClient *client, MqttPacketPublish *packe
client->onMessage(client,
bdata(packet->topicName),
bdata(packet->message),
- blength(packet->message));
+ blength(packet->message),
+ packet->qos,
+ packet->retain);
}
if (MqttPacketPublishQos(packet) > 0)
@@ -6722,7 +6738,7 @@ static void MqttClientHandlePubRel(MqttClient *client, MqttPacket *packet)
TAILQ_FOREACH(pubRec, &client->inMessages, messages)
{
if (MqttPacketId(pubRec) == MqttPacketId(packet) &&
- MqttPacketType(pubRec) == MqttPacketTypePublish)
+ MqttPacketType(pubRec) == MqttPacketTypePubRec)
{
break;
}
diff --git a/amalgamation/mqtt.h b/amalgamation/mqtt.h
index 6c16ea6..f07ff3c 100644
--- a/amalgamation/mqtt.h
+++ b/amalgamation/mqtt.h
@@ -39,11 +39,14 @@ typedef void (*MqttClientOnUnsubscribeCallback)(MqttClient *client, int id);
typedef void (*MqttClientOnMessageCallback)(MqttClient *client,
const char *topic,
- const void *data, size_t size);
+ const void *data,
+ size_t size,
+ int qos,
+ int retain);
typedef void (*MqttClientOnPublishCallback)(MqttClient *client, int id);
-MqttClient *MqttClientNew(const char *clientId, int cleanSession);
+MqttClient *MqttClientNew(const char *clientId);
void MqttClientFree(MqttClient *client);
@@ -66,11 +69,13 @@ void MqttClientSetOnPublish(MqttClient *client,
MqttClientOnPublishCallback cb);
int MqttClientConnect(MqttClient *client, const char *host, short port,
- int keepAlive);
+ int keepAlive, int cleanSession);
int MqttClientDisconnect(MqttClient *client);
-int MqttClientRunOnce(MqttClient *client);
+int MqttClientIsConnected(MqttClient *client);
+
+int MqttClientRunOnce(MqttClient *client, int timeout);
int MqttClientRun(MqttClient *client);