aboutsummaryrefslogtreecommitdiff
path: root/amalgamation
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-02-19 16:23:21 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-02-19 16:23:21 +0200
commit7f8f79eb9d3cab407410814b2db261d89f11af2e (patch)
treee363f87522001e063fedf8a7ad42740da3edce94 /amalgamation
parent8198f6d6beb3c8af3768236070089112c094b92e (diff)
downloadmqtt-7f8f79eb9d3cab407410814b2db261d89f11af2e.tar.gz
mqtt-7f8f79eb9d3cab407410814b2db261d89f11af2e.zip
Update amalgamationv0.4
Diffstat (limited to 'amalgamation')
-rw-r--r--amalgamation/mqtt.c177
-rw-r--r--amalgamation/mqtt.h4
2 files changed, 141 insertions, 40 deletions
diff --git a/amalgamation/mqtt.c b/amalgamation/mqtt.c
index 40f3a22..1c8f88a 100644
--- a/amalgamation/mqtt.c
+++ b/amalgamation/mqtt.c
@@ -5152,8 +5152,8 @@ typedef struct MqttPacketSubscribe MqttPacketSubscribe;
struct MqttPacketSubscribe
{
MqttPacket base;
- bstring topicFilter;
- char qos;
+ struct bstrList *topicFilters;
+ int *qos;
};
typedef struct MqttPacketSubAck MqttPacketSubAck;
@@ -5161,7 +5161,7 @@ typedef struct MqttPacketSubAck MqttPacketSubAck;
struct MqttPacketSubAck
{
MqttPacket base;
- unsigned char returnCode;
+ unsigned char *returnCode;
};
typedef struct MqttPacketUnsubscribe MqttPacketUnsubscribe;
@@ -5284,7 +5284,7 @@ void MqttPacketFree(MqttPacket *packet)
else if (MqttPacketType(packet) == MqttPacketTypeSubscribe)
{
MqttPacketSubscribe *p = (MqttPacketSubscribe *) packet;
- bdestroy(p->topicFilter);
+ bstrListDestroy(p->topicFilters);
}
else if (MqttPacketType(packet) == MqttPacketTypeUnsubscribe)
{
@@ -5374,7 +5374,16 @@ static size_t MqttPacketConnectGetRemainingLength(const MqttPacketConnect *packe
static size_t MqttPacketSubscribeGetRemainingLength(const MqttPacketSubscribe *packet)
{
- return 2 + MqttStringLengthSerialized(packet->topicFilter) + 1;
+ size_t remaining = 2;
+ int i;
+
+ for (i = 0; i < packet->topicFilters->qty; ++i)
+ {
+ remaining += MqttStringLengthSerialized(packet->topicFilters->entry[i]);
+ remaining += 1;
+ }
+
+ return remaining;
}
static size_t MqttPacketUnsubscribeGetRemainingLength(const MqttPacketUnsubscribe *packet)
@@ -5529,14 +5538,21 @@ static int MqttPacketConnectSerialize(const MqttPacketConnect *packet, Stream *s
static int MqttPacketSubscribeSerialize(const MqttPacketSubscribe *packet, Stream *stream)
{
+ int i;
+
if (MqttPacketWithIdSerialize((const MqttPacket *) packet, stream) == -1)
return -1;
- if (StreamWriteMqttString(packet->topicFilter, stream) == -1)
- return -1;
+ for (i = 0; i < packet->topicFilters->qty; ++i)
+ {
+ unsigned char qos = (unsigned char) packet->qos[i];
- if (StreamWrite(&packet->qos, 1, stream) == -1)
- return -1;
+ if (StreamWriteMqttString(packet->topicFilters->entry[i], stream) == -1)
+ return -1;
+
+ if (StreamWrite(&qos, 1, stream) == -1)
+ return -1;
+ }
return 0;
}
@@ -5704,19 +5720,24 @@ static int MqttPacketConnAckDeserialize(MqttPacketConnAck **packet, Stream *stre
static int MqttPacketSubAckDeserialize(MqttPacketSubAck **packet, Stream *stream)
{
size_t remainingLength = 0;
+ size_t i;
if (StreamReadRemainingLength(&remainingLength, stream) == -1)
return -1;
- /* 2 bytes for packet id and 1 byte for single return code */
- if (remainingLength != 3)
- return -1;
-
if (StreamReadUint16Be(&((*packet)->base.id), stream) == -1)
return -1;
- if (StreamRead(&((*packet)->returnCode), 1, stream) == -1)
- return -1;
+ remainingLength -= 2;
+
+ (*packet)->returnCode = (unsigned char *) malloc(
+ sizeof(*(*packet)->returnCode) * remainingLength);
+
+ for (i = 0; i < remainingLength; ++i)
+ {
+ if (StreamRead(&((*packet)->returnCode[i]), 1, stream) == -1)
+ return -1;
+ }
return 0;
}
@@ -6008,6 +6029,8 @@ struct MqttClient
bstring willMessage;
int willQos;
int willRetain;
+ /* 1 if client should ignore incoming PUBLISH messages, 0 handle them */
+ int paused;
};
enum MessageState
@@ -6023,6 +6046,7 @@ static int MqttClientSendPacket(MqttClient *client, MqttPacket *packet);
static int MqttClientRecvPacket(MqttClient *client);
static uint16_t MqttClientNextPacketId(MqttClient *client);
static void MqttClientProcessMessageQueue(MqttClient *client);
+static void MqttClientClearQueues(MqttClient *client);
static MQTT_INLINE int MqttClientInflightMessageCount(MqttClient *client)
{
@@ -6078,19 +6102,7 @@ MqttClient *MqttClientNew(const char *clientId)
void MqttClientFree(MqttClient *client)
{
- MqttPacket *packet, *next;
-
- TAILQ_FOREACH_SAFE(packet, &client->outMessages, messages, next)
- {
- TAILQ_REMOVE(&client->outMessages, packet, messages);
- MqttPacketFree(packet);
- }
-
- TAILQ_FOREACH_SAFE(packet, &client->inMessages, messages, next)
- {
- TAILQ_REMOVE(&client->inMessages, packet, messages);
- MqttPacketFree(packet);
- }
+ MqttClientClearQueues(client);
bdestroy(client->clientId);
bdestroy(client->willTopic);
@@ -6160,11 +6172,19 @@ int MqttClientConnect(MqttClient *client, const char *host, short port,
assert(client != NULL);
assert(host != NULL);
- client->host = bfromcstr(host);
+ if (client->host)
+ bassigncstr(client->host, host);
+ else
+ client->host = bfromcstr(host);
client->port = port;
client->keepAlive = keepAlive;
client->cleanSession = cleanSession;
+ /* In case we are reconnecting */
+ client->stopped = 0;
+ client->pingSent = 0;
+ MqttClientClearQueues(client);
+
if (keepAlive < 0)
{
LOG_ERROR("invalid keepAlive: %d", keepAlive);
@@ -6348,11 +6368,20 @@ int MqttClientRun(MqttClient *client)
int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
int qos)
{
+ return MqttClientSubscribeMany(client, &topicFilter, &qos, 1);
+}
+
+int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters,
+ int *qos, size_t count)
+{
MqttPacketSubscribe *packet = NULL;
+ size_t i;
assert(client != NULL);
- assert(topicFilter != NULL);
- assert(qos >= 0 && qos <= 2);
+ assert(topicFilters != NULL);
+ assert(*topicFilters != NULL);
+ assert(qos != NULL);
+ assert(count > 0);
packet = (MqttPacketSubscribe *) MqttPacketWithIdNew(
MqttPacketTypeSubscribe, MqttClientNextPacketId(client));
@@ -6360,8 +6389,18 @@ int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
if (!packet)
return -1;
- packet->topicFilter = bfromcstr(topicFilter);
- packet->qos = qos;
+ packet->topicFilters = bstrListCreate();
+ bstrListAllocMin(packet->topicFilters, count);
+
+ packet->qos = (int *) malloc(sizeof(int) * count);
+
+ for (i = 0; i < count; ++i)
+ {
+ packet->topicFilters->entry[i] = bfromcstr(topicFilters[i]);
+ ++packet->topicFilters->qty;
+ }
+
+ memcpy(packet->qos, qos, sizeof(int) * count);
MqttClientQueuePacket(client, (MqttPacket *) packet);
@@ -6595,21 +6634,35 @@ static void MqttClientHandleSubAck(MqttClient *client, MqttPacketSubAck *packet)
}
else
{
- TAILQ_REMOVE(&client->outMessages, sub, messages);
- MqttPacketFree(sub);
-
if (client->onSubscribe)
{
- LOG_DEBUG("calling onSubscribe id:%d rc:%d", MqttPacketId(packet),
- packet->returnCode);
- client->onSubscribe(client, MqttPacketId(packet),
- packet->returnCode);
+ MqttPacketSubscribe *sub2;
+ int i;
+
+ sub2 = (MqttPacketSubscribe *) sub;
+
+ for (i = 0; i < sub2->topicFilters->qty; ++i)
+ {
+ const char *filter = bdata(sub2->topicFilters->entry[i]);
+ int rc = packet->returnCode[i];
+
+ LOG_DEBUG("calling onSubscribe id:%d filter:'%s' rc:%d",
+ MqttPacketId(packet), filter, rc);
+
+ client->onSubscribe(client, MqttPacketId(packet), filter, rc);
+ }
}
+
+ TAILQ_REMOVE(&client->outMessages, sub, messages);
+ MqttPacketFree(sub);
}
}
static void MqttClientHandlePublish(MqttClient *client, MqttPacketPublish *packet)
{
+ if (client->paused)
+ return;
+
if (MqttPacketPublishQos(packet) == 2)
{
/* Check if we have sent a PUBREC previously with the same id. If we
@@ -7019,3 +7072,47 @@ static void MqttClientProcessMessageQueue(MqttClient *client)
MqttClientProcessInMessages(client);
MqttClientProcessOutMessages(client);
}
+
+static void MqttClientClearQueues(MqttClient *client)
+{
+ while (!SIMPLEQ_EMPTY(&client->sendQueue))
+ {
+ MqttPacket *packet = SIMPLEQ_FIRST(&client->sendQueue);
+
+ SIMPLEQ_REMOVE_HEAD(&client->sendQueue, sendQueue);
+
+ if (TAILQ_NEXT(packet, messages) == NULL &&
+ TAILQ_PREV(packet, MessageList, messages) == NULL &&
+ TAILQ_FIRST(&client->inMessages) != packet &&
+ TAILQ_FIRST(&client->outMessages) != packet)
+ {
+ MqttPacketFree(packet);
+ }
+ }
+
+ while (!TAILQ_EMPTY(&client->outMessages))
+ {
+ MqttPacket *packet = TAILQ_FIRST(&client->outMessages);
+ TAILQ_REMOVE(&client->outMessages, packet, messages);
+ MqttPacketFree(packet);
+ }
+
+ while (!TAILQ_EMPTY(&client->inMessages))
+ {
+ MqttPacket *packet = TAILQ_FIRST(&client->inMessages);
+ TAILQ_REMOVE(&client->inMessages, packet, messages);
+ MqttPacketFree(packet);
+ }
+}
+
+void MqttClientPause(MqttClient *client)
+{
+ assert(client != NULL);
+ client->paused = 1;
+}
+
+void MqttClientResume(MqttClient *client)
+{
+ assert(client != NULL);
+ client->paused = 0;
+}
diff --git a/amalgamation/mqtt.h b/amalgamation/mqtt.h
index f07ff3c..ad84aaf 100644
--- a/amalgamation/mqtt.h
+++ b/amalgamation/mqtt.h
@@ -33,6 +33,7 @@ typedef void (*MqttClientOnConnectCallback)(MqttClient *client,
typedef void (*MqttClientOnSubscribeCallback)(MqttClient *client,
int id,
+ const char *topicFilter,
MqttSubscriptionStatus status);
typedef void (*MqttClientOnUnsubscribeCallback)(MqttClient *client, int id);
@@ -82,6 +83,9 @@ int MqttClientRun(MqttClient *client);
int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
int qos);
+int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters,
+ int *qos, size_t count);
+
int MqttClientUnsubscribe(MqttClient *client, const char *topicFilter);
int MqttClientPublish(MqttClient *client, int qos, int retain,