aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-02-19 16:03:56 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-02-19 16:03:56 +0200
commit8198f6d6beb3c8af3768236070089112c094b92e (patch)
tree7dcddfde2133850077ec8dc34bc9264fcb4e469a /src
parente6e625ed6c1300e382a36b9ebef48336a55550e9 (diff)
downloadmqtt-8198f6d6beb3c8af3768236070089112c094b92e.tar.gz
mqtt-8198f6d6beb3c8af3768236070089112c094b92e.zip
Add MqttClientSubscribeMany() and make necessary API changes
Diffstat (limited to 'src')
-rw-r--r--src/client.c52
-rw-r--r--src/deserialize.c17
-rw-r--r--src/mqtt.h4
-rw-r--r--src/packet.c2
-rw-r--r--src/packet.h6
-rw-r--r--src/serialize.c26
6 files changed, 81 insertions, 26 deletions
diff --git a/src/client.c b/src/client.c
index c4bd499..4c4ed7d 100644
--- a/src/client.c
+++ b/src/client.c
@@ -415,11 +415,20 @@ int MqttClientRun(MqttClient *client)
int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
int qos)
{
+ return MqttClientSubscribeMany(client, &topicFilter, &qos, 1);
+}
+
+int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters,
+ int *qos, size_t count)
+{
MqttPacketSubscribe *packet = NULL;
+ size_t i;
assert(client != NULL);
- assert(topicFilter != NULL);
- assert(qos >= 0 && qos <= 2);
+ assert(topicFilters != NULL);
+ assert(*topicFilters != NULL);
+ assert(qos != NULL);
+ assert(count > 0);
packet = (MqttPacketSubscribe *) MqttPacketWithIdNew(
MqttPacketTypeSubscribe, MqttClientNextPacketId(client));
@@ -427,8 +436,18 @@ int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
if (!packet)
return -1;
- packet->topicFilter = bfromcstr(topicFilter);
- packet->qos = qos;
+ packet->topicFilters = bstrListCreate();
+ bstrListAllocMin(packet->topicFilters, count);
+
+ packet->qos = (int *) malloc(sizeof(int) * count);
+
+ for (i = 0; i < count; ++i)
+ {
+ packet->topicFilters->entry[i] = bfromcstr(topicFilters[i]);
+ ++packet->topicFilters->qty;
+ }
+
+ memcpy(packet->qos, qos, sizeof(int) * count);
MqttClientQueuePacket(client, (MqttPacket *) packet);
@@ -662,16 +681,27 @@ static void MqttClientHandleSubAck(MqttClient *client, MqttPacketSubAck *packet)
}
else
{
- TAILQ_REMOVE(&client->outMessages, sub, messages);
- MqttPacketFree(sub);
-
if (client->onSubscribe)
{
- LOG_DEBUG("calling onSubscribe id:%d rc:%d", MqttPacketId(packet),
- packet->returnCode);
- client->onSubscribe(client, MqttPacketId(packet),
- packet->returnCode);
+ MqttPacketSubscribe *sub2;
+ int i;
+
+ sub2 = (MqttPacketSubscribe *) sub;
+
+ for (i = 0; i < sub2->topicFilters->qty; ++i)
+ {
+ const char *filter = bdata(sub2->topicFilters->entry[i]);
+ int rc = packet->returnCode[i];
+
+ LOG_DEBUG("calling onSubscribe id:%d filter:'%s' rc:%d",
+ MqttPacketId(packet), filter, rc);
+
+ client->onSubscribe(client, MqttPacketId(packet), filter, rc);
+ }
}
+
+ TAILQ_REMOVE(&client->outMessages, sub, messages);
+ MqttPacketFree(sub);
}
}
diff --git a/src/deserialize.c b/src/deserialize.c
index aaff490..96d7789 100644
--- a/src/deserialize.c
+++ b/src/deserialize.c
@@ -46,19 +46,24 @@ static int MqttPacketConnAckDeserialize(MqttPacketConnAck **packet, Stream *stre
static int MqttPacketSubAckDeserialize(MqttPacketSubAck **packet, Stream *stream)
{
size_t remainingLength = 0;
+ size_t i;
if (StreamReadRemainingLength(&remainingLength, stream) == -1)
return -1;
- /* 2 bytes for packet id and 1 byte for single return code */
- if (remainingLength != 3)
- return -1;
-
if (StreamReadUint16Be(&((*packet)->base.id), stream) == -1)
return -1;
- if (StreamRead(&((*packet)->returnCode), 1, stream) == -1)
- return -1;
+ remainingLength -= 2;
+
+ (*packet)->returnCode = (unsigned char *) malloc(
+ sizeof(*(*packet)->returnCode) * remainingLength);
+
+ for (i = 0; i < remainingLength; ++i)
+ {
+ if (StreamRead(&((*packet)->returnCode[i]), 1, stream) == -1)
+ return -1;
+ }
return 0;
}
diff --git a/src/mqtt.h b/src/mqtt.h
index f07ff3c..ad84aaf 100644
--- a/src/mqtt.h
+++ b/src/mqtt.h
@@ -33,6 +33,7 @@ typedef void (*MqttClientOnConnectCallback)(MqttClient *client,
typedef void (*MqttClientOnSubscribeCallback)(MqttClient *client,
int id,
+ const char *topicFilter,
MqttSubscriptionStatus status);
typedef void (*MqttClientOnUnsubscribeCallback)(MqttClient *client, int id);
@@ -82,6 +83,9 @@ int MqttClientRun(MqttClient *client);
int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
int qos);
+int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters,
+ int *qos, size_t count);
+
int MqttClientUnsubscribe(MqttClient *client, const char *topicFilter);
int MqttClientPublish(MqttClient *client, int qos, int retain,
diff --git a/src/packet.c b/src/packet.c
index 5de7d97..47aa689 100644
--- a/src/packet.c
+++ b/src/packet.c
@@ -96,7 +96,7 @@ void MqttPacketFree(MqttPacket *packet)
else if (MqttPacketType(packet) == MqttPacketTypeSubscribe)
{
MqttPacketSubscribe *p = (MqttPacketSubscribe *) packet;
- bdestroy(p->topicFilter);
+ bstrListDestroy(p->topicFilters);
}
else if (MqttPacketType(packet) == MqttPacketTypeUnsubscribe)
{
diff --git a/src/packet.h b/src/packet.h
index 4fe7b74..7ab4f73 100644
--- a/src/packet.h
+++ b/src/packet.h
@@ -92,8 +92,8 @@ typedef struct MqttPacketSubscribe MqttPacketSubscribe;
struct MqttPacketSubscribe
{
MqttPacket base;
- bstring topicFilter;
- char qos;
+ struct bstrList *topicFilters;
+ int *qos;
};
typedef struct MqttPacketSubAck MqttPacketSubAck;
@@ -101,7 +101,7 @@ typedef struct MqttPacketSubAck MqttPacketSubAck;
struct MqttPacketSubAck
{
MqttPacket base;
- unsigned char returnCode;
+ unsigned char *returnCode;
};
typedef struct MqttPacketUnsubscribe MqttPacketUnsubscribe;
diff --git a/src/serialize.c b/src/serialize.c
index b6c8cbc..3378b80 100644
--- a/src/serialize.c
+++ b/src/serialize.c
@@ -42,7 +42,16 @@ static size_t MqttPacketConnectGetRemainingLength(const MqttPacketConnect *packe
static size_t MqttPacketSubscribeGetRemainingLength(const MqttPacketSubscribe *packet)
{
- return 2 + MqttStringLengthSerialized(packet->topicFilter) + 1;
+ size_t remaining = 2;
+ int i;
+
+ for (i = 0; i < packet->topicFilters->qty; ++i)
+ {
+ remaining += MqttStringLengthSerialized(packet->topicFilters->entry[i]);
+ remaining += 1;
+ }
+
+ return remaining;
}
static size_t MqttPacketUnsubscribeGetRemainingLength(const MqttPacketUnsubscribe *packet)
@@ -197,14 +206,21 @@ static int MqttPacketConnectSerialize(const MqttPacketConnect *packet, Stream *s
static int MqttPacketSubscribeSerialize(const MqttPacketSubscribe *packet, Stream *stream)
{
+ int i;
+
if (MqttPacketWithIdSerialize((const MqttPacket *) packet, stream) == -1)
return -1;
- if (StreamWriteMqttString(packet->topicFilter, stream) == -1)
- return -1;
+ for (i = 0; i < packet->topicFilters->qty; ++i)
+ {
+ unsigned char qos = (unsigned char) packet->qos[i];
- if (StreamWrite(&packet->qos, 1, stream) == -1)
- return -1;
+ if (StreamWriteMqttString(packet->topicFilters->entry[i], stream) == -1)
+ return -1;
+
+ if (StreamWrite(&qos, 1, stream) == -1)
+ return -1;
+ }
return 0;
}