aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-02-19 13:15:28 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-02-19 13:15:28 +0200
commit6748a038cf62cfb6e817f758ce8255e0e2debe60 (patch)
tree3ee0ef0cf671b35a67bd9a5985ca32e05dc6e296
parentc4dbe5151ef76e46dc2adc9e373dd8447219bcaf (diff)
downloadmqtt-6748a038cf62cfb6e817f758ce8255e0e2debe60.tar.gz
mqtt-6748a038cf62cfb6e817f758ce8255e0e2debe60.zip
Support reconnecting in MqttClientConnect()
-rw-r--r--src/client.c57
1 files changed, 43 insertions, 14 deletions
diff --git a/src/client.c b/src/client.c
index ac2eac7..bdfaf2c 100644
--- a/src/client.c
+++ b/src/client.c
@@ -90,6 +90,7 @@ static int MqttClientSendPacket(MqttClient *client, MqttPacket *packet);
static int MqttClientRecvPacket(MqttClient *client);
static uint16_t MqttClientNextPacketId(MqttClient *client);
static void MqttClientProcessMessageQueue(MqttClient *client);
+static void MqttClientClearQueues(MqttClient *client);
static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client)
{
@@ -145,19 +146,7 @@ MqttClient *MqttClientNew(const char *clientId)
void MqttClientFree(MqttClient *client)
{
- MqttPacket *packet, *next;
-
- TAILQ_FOREACH_SAFE(packet, &client->outMessages, messages, next)
- {
- TAILQ_REMOVE(&client->outMessages, packet, messages);
- MqttPacketFree(packet);
- }
-
- TAILQ_FOREACH_SAFE(packet, &client->inMessages, messages, next)
- {
- TAILQ_REMOVE(&client->inMessages, packet, messages);
- MqttPacketFree(packet);
- }
+ MqttClientClearQueues(client);
bdestroy(client->clientId);
bdestroy(client->willTopic);
@@ -227,11 +216,19 @@ int MqttClientConnect(MqttClient *client, const char *host, short port,
assert(client != NULL);
assert(host != NULL);
- client->host = bfromcstr(host);
+ if (client->host)
+ bassigncstr(client->host, host);
+ else
+ client->host = bfromcstr(host);
client->port = port;
client->keepAlive = keepAlive;
client->cleanSession = cleanSession;
+ /* In case we are reconnecting */
+ client->stopped = 0;
+ client->pingSent = 0;
+ MqttClientClearQueues(client);
+
if (keepAlive < 0)
{
LOG_ERROR("invalid keepAlive: %d", keepAlive);
@@ -1086,3 +1083,35 @@ static void MqttClientProcessMessageQueue(MqttClient *client)
MqttClientProcessInMessages(client);
MqttClientProcessOutMessages(client);
}
+
+static void MqttClientClearQueues(MqttClient *client)
+{
+ while (!SIMPLEQ_EMPTY(&client->sendQueue))
+ {
+ MqttPacket *packet = SIMPLEQ_FIRST(&client->sendQueue);
+
+ SIMPLEQ_REMOVE_HEAD(&client->sendQueue, sendQueue);
+
+ if (TAILQ_NEXT(packet, messages) == NULL &&
+ TAILQ_PREV(packet, MessageList, messages) == NULL &&
+ TAILQ_FIRST(&client->inMessages) != packet &&
+ TAILQ_FIRST(&client->outMessages) != packet)
+ {
+ MqttPacketFree(packet);
+ }
+ }
+
+ while (!TAILQ_EMPTY(&client->outMessages))
+ {
+ MqttPacket *packet = TAILQ_FIRST(&client->outMessages);
+ TAILQ_REMOVE(&client->outMessages, packet, messages);
+ MqttPacketFree(packet);
+ }
+
+ while (!TAILQ_EMPTY(&client->inMessages))
+ {
+ MqttPacket *packet = TAILQ_FIRST(&client->inMessages);
+ TAILQ_REMOVE(&client->inMessages, packet, messages);
+ MqttPacketFree(packet);
+ }
+}