aboutsummaryrefslogtreecommitdiff
path: root/src/client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.c')
-rw-r--r--src/client.c52
1 files changed, 41 insertions, 11 deletions
diff --git a/src/client.c b/src/client.c
index c4bd499..4c4ed7d 100644
--- a/src/client.c
+++ b/src/client.c
@@ -415,11 +415,20 @@ int MqttClientRun(MqttClient *client)
int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
int qos)
{
+ return MqttClientSubscribeMany(client, &topicFilter, &qos, 1);
+}
+
+int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters,
+ int *qos, size_t count)
+{
MqttPacketSubscribe *packet = NULL;
+ size_t i;
assert(client != NULL);
- assert(topicFilter != NULL);
- assert(qos >= 0 && qos <= 2);
+ assert(topicFilters != NULL);
+ assert(*topicFilters != NULL);
+ assert(qos != NULL);
+ assert(count > 0);
packet = (MqttPacketSubscribe *) MqttPacketWithIdNew(
MqttPacketTypeSubscribe, MqttClientNextPacketId(client));
@@ -427,8 +436,18 @@ int MqttClientSubscribe(MqttClient *client, const char *topicFilter,
if (!packet)
return -1;
- packet->topicFilter = bfromcstr(topicFilter);
- packet->qos = qos;
+ packet->topicFilters = bstrListCreate();
+ bstrListAllocMin(packet->topicFilters, count);
+
+ packet->qos = (int *) malloc(sizeof(int) * count);
+
+ for (i = 0; i < count; ++i)
+ {
+ packet->topicFilters->entry[i] = bfromcstr(topicFilters[i]);
+ ++packet->topicFilters->qty;
+ }
+
+ memcpy(packet->qos, qos, sizeof(int) * count);
MqttClientQueuePacket(client, (MqttPacket *) packet);
@@ -662,16 +681,27 @@ static void MqttClientHandleSubAck(MqttClient *client, MqttPacketSubAck *packet)
}
else
{
- TAILQ_REMOVE(&client->outMessages, sub, messages);
- MqttPacketFree(sub);
-
if (client->onSubscribe)
{
- LOG_DEBUG("calling onSubscribe id:%d rc:%d", MqttPacketId(packet),
- packet->returnCode);
- client->onSubscribe(client, MqttPacketId(packet),
- packet->returnCode);
+ MqttPacketSubscribe *sub2;
+ int i;
+
+ sub2 = (MqttPacketSubscribe *) sub;
+
+ for (i = 0; i < sub2->topicFilters->qty; ++i)
+ {
+ const char *filter = bdata(sub2->topicFilters->entry[i]);
+ int rc = packet->returnCode[i];
+
+ LOG_DEBUG("calling onSubscribe id:%d filter:'%s' rc:%d",
+ MqttPacketId(packet), filter, rc);
+
+ client->onSubscribe(client, MqttPacketId(packet), filter, rc);
+ }
}
+
+ TAILQ_REMOVE(&client->outMessages, sub, messages);
+ MqttPacketFree(sub);
}
}