aboutsummaryrefslogtreecommitdiff
path: root/src/client.c
diff options
context:
space:
mode:
authorOskari Timperi <oskari.timperi@iki.fi>2017-02-15 21:38:31 +0200
committerOskari Timperi <oskari.timperi@iki.fi>2017-02-15 21:38:31 +0200
commite41108fe4f40113f51e7b873f5165ebaf2b085d1 (patch)
tree76fb4d7f994f32d3fda1cb8903fe2065f7b9fd86 /src/client.c
parenta2c431ec96f7a6ed7e5c41b7ae93dc086ae541f1 (diff)
downloadmqtt-e41108fe4f40113f51e7b873f5165ebaf2b085d1.tar.gz
mqtt-e41108fe4f40113f51e7b873f5165ebaf2b085d1.zip
Add SocketSelect() to wrap select()
This also makes all the necessary includes for sockets centralized to `sockets.c`.
Diffstat (limited to 'src/client.c')
-rw-r--r--src/client.c26
1 files changed, 7 insertions, 19 deletions
diff --git a/src/client.c b/src/client.c
index fa5773a..56dbe1a 100644
--- a/src/client.c
+++ b/src/client.c
@@ -24,8 +24,6 @@
#error define PRId64 for your platform
#endif
-#include <sys/select.h>
-
TAILQ_HEAD(MessageList, MqttPacket);
typedef struct MessageList MessageList;
@@ -287,9 +285,8 @@ int MqttClientDisconnect(MqttClient *client)
int MqttClientRunOnce(MqttClient *client)
{
- fd_set rfd, wfd;
- struct timeval tv;
int rv;
+ int events;
assert(client != NULL);
@@ -299,10 +296,7 @@ int MqttClientRunOnce(MqttClient *client)
return -1;
}
- FD_ZERO(&rfd);
- FD_ZERO(&wfd);
-
- FD_SET(client->stream.sock, &rfd);
+ events = EV_READ;
/* Handle outMessages and inMessages, moving queued messages to sendQueue
if there are less than maxInflight number of messages in flight */
@@ -314,18 +308,12 @@ int MqttClientRunOnce(MqttClient *client)
}
else
{
- FD_SET(client->stream.sock, &wfd);
+ events |= EV_WRITE;
}
- /* TODO: break select when queuing packets (need to protect queue with
- mutex to allow queuing packets from another thread) */
-
- memset(&tv, 0, sizeof(tv));
- tv.tv_sec = client->keepAlive;
- tv.tv_usec = 0;
-
LOG_DEBUG("selecting");
- rv = select(client->stream.sock+1, &rfd, &wfd, NULL, &tv);
+
+ rv = SocketSelect(client->stream.sock, &events, client->keepAlive);
if (rv == -1)
{
@@ -336,7 +324,7 @@ int MqttClientRunOnce(MqttClient *client)
{
LOG_DEBUG("select rv=%d", rv);
- if (FD_ISSET(client->stream.sock, &wfd))
+ if (events & EV_WRITE)
{
MqttPacket *packet;
@@ -356,7 +344,7 @@ int MqttClientRunOnce(MqttClient *client)
}
}
- if (FD_ISSET(client->stream.sock, &rfd))
+ if (events & EV_READ)
{
LOG_DEBUG("socket readable");