diff options
Diffstat (limited to 'src/client.c')
| -rw-r--r-- | src/client.c | 52 |
1 files changed, 41 insertions, 11 deletions
diff --git a/src/client.c b/src/client.c index c4bd499..4c4ed7d 100644 --- a/src/client.c +++ b/src/client.c @@ -415,11 +415,20 @@ int MqttClientRun(MqttClient *client) int MqttClientSubscribe(MqttClient *client, const char *topicFilter, int qos) { + return MqttClientSubscribeMany(client, &topicFilter, &qos, 1); +} + +int MqttClientSubscribeMany(MqttClient *client, const char **topicFilters, + int *qos, size_t count) +{ MqttPacketSubscribe *packet = NULL; + size_t i; assert(client != NULL); - assert(topicFilter != NULL); - assert(qos >= 0 && qos <= 2); + assert(topicFilters != NULL); + assert(*topicFilters != NULL); + assert(qos != NULL); + assert(count > 0); packet = (MqttPacketSubscribe *) MqttPacketWithIdNew( MqttPacketTypeSubscribe, MqttClientNextPacketId(client)); @@ -427,8 +436,18 @@ int MqttClientSubscribe(MqttClient *client, const char *topicFilter, if (!packet) return -1; - packet->topicFilter = bfromcstr(topicFilter); - packet->qos = qos; + packet->topicFilters = bstrListCreate(); + bstrListAllocMin(packet->topicFilters, count); + + packet->qos = (int *) malloc(sizeof(int) * count); + + for (i = 0; i < count; ++i) + { + packet->topicFilters->entry[i] = bfromcstr(topicFilters[i]); + ++packet->topicFilters->qty; + } + + memcpy(packet->qos, qos, sizeof(int) * count); MqttClientQueuePacket(client, (MqttPacket *) packet); @@ -662,16 +681,27 @@ static void MqttClientHandleSubAck(MqttClient *client, MqttPacketSubAck *packet) } else { - TAILQ_REMOVE(&client->outMessages, sub, messages); - MqttPacketFree(sub); - if (client->onSubscribe) { - LOG_DEBUG("calling onSubscribe id:%d rc:%d", MqttPacketId(packet), - packet->returnCode); - client->onSubscribe(client, MqttPacketId(packet), - packet->returnCode); + MqttPacketSubscribe *sub2; + int i; + + sub2 = (MqttPacketSubscribe *) sub; + + for (i = 0; i < sub2->topicFilters->qty; ++i) + { + const char *filter = bdata(sub2->topicFilters->entry[i]); + int rc = packet->returnCode[i]; + + LOG_DEBUG("calling onSubscribe id:%d filter:'%s' rc:%d", + MqttPacketId(packet), filter, rc); + + client->onSubscribe(client, MqttPacketId(packet), filter, rc); + } } + + TAILQ_REMOVE(&client->outMessages, sub, messages); + MqttPacketFree(sub); } } |
