diff options
| author | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-15 21:38:31 +0200 |
|---|---|---|
| committer | Oskari Timperi <oskari.timperi@iki.fi> | 2017-02-15 21:38:31 +0200 |
| commit | e41108fe4f40113f51e7b873f5165ebaf2b085d1 (patch) | |
| tree | 76fb4d7f994f32d3fda1cb8903fe2065f7b9fd86 /src | |
| parent | a2c431ec96f7a6ed7e5c41b7ae93dc086ae541f1 (diff) | |
| download | mqtt-e41108fe4f40113f51e7b873f5165ebaf2b085d1.tar.gz mqtt-e41108fe4f40113f51e7b873f5165ebaf2b085d1.zip | |
Add SocketSelect() to wrap select()
This also makes all the necessary includes for sockets centralized to
`sockets.c`.
Diffstat (limited to 'src')
| -rw-r--r-- | src/client.c | 26 | ||||
| -rw-r--r-- | src/socket.c | 49 | ||||
| -rw-r--r-- | src/socket.h | 8 |
3 files changed, 64 insertions, 19 deletions
diff --git a/src/client.c b/src/client.c index fa5773a..56dbe1a 100644 --- a/src/client.c +++ b/src/client.c @@ -24,8 +24,6 @@ #error define PRId64 for your platform #endif -#include <sys/select.h> - TAILQ_HEAD(MessageList, MqttPacket); typedef struct MessageList MessageList; @@ -287,9 +285,8 @@ int MqttClientDisconnect(MqttClient *client) int MqttClientRunOnce(MqttClient *client) { - fd_set rfd, wfd; - struct timeval tv; int rv; + int events; assert(client != NULL); @@ -299,10 +296,7 @@ int MqttClientRunOnce(MqttClient *client) return -1; } - FD_ZERO(&rfd); - FD_ZERO(&wfd); - - FD_SET(client->stream.sock, &rfd); + events = EV_READ; /* Handle outMessages and inMessages, moving queued messages to sendQueue if there are less than maxInflight number of messages in flight */ @@ -314,18 +308,12 @@ int MqttClientRunOnce(MqttClient *client) } else { - FD_SET(client->stream.sock, &wfd); + events |= EV_WRITE; } - /* TODO: break select when queuing packets (need to protect queue with - mutex to allow queuing packets from another thread) */ - - memset(&tv, 0, sizeof(tv)); - tv.tv_sec = client->keepAlive; - tv.tv_usec = 0; - LOG_DEBUG("selecting"); - rv = select(client->stream.sock+1, &rfd, &wfd, NULL, &tv); + + rv = SocketSelect(client->stream.sock, &events, client->keepAlive); if (rv == -1) { @@ -336,7 +324,7 @@ int MqttClientRunOnce(MqttClient *client) { LOG_DEBUG("select rv=%d", rv); - if (FD_ISSET(client->stream.sock, &wfd)) + if (events & EV_WRITE) { MqttPacket *packet; @@ -356,7 +344,7 @@ int MqttClientRunOnce(MqttClient *client) } } - if (FD_ISSET(client->stream.sock, &rfd)) + if (events & EV_READ) { LOG_DEBUG("socket readable"); diff --git a/src/socket.c b/src/socket.c index 0b0a59d..38c968e 100644 --- a/src/socket.c +++ b/src/socket.c @@ -92,3 +92,52 @@ int SocketSendAll(int sock, const char *buf, size_t *len) return rv == -1 ? -1 : 0; } + +int SocketSelect(int sock, int *events, int timeout) +{ + fd_set rfd, wfd; + struct timeval tv; + int rv; + + assert(sock != -1); + assert(events != NULL); + assert(*events != 0); + + FD_ZERO(&rfd); + FD_ZERO(&wfd); + + if (*events & EV_READ) + { + FD_SET(sock, &rfd); + } + + if (*events & EV_WRITE) + { + FD_SET(sock, &wfd); + } + + memset(&tv, 0, sizeof(tv)); + tv.tv_sec = timeout; + tv.tv_usec = 0; + + *events = 0; + + rv = select(sock+1, &rfd, &wfd, NULL, &tv); + + if (rv < 0) + { + return rv; + } + + if (FD_ISSET(sock, &wfd)) + { + *events = EV_WRITE; + } + + if (FD_ISSET(sock, &rfd)) + { + *events = EV_READ; + } + + return rv; +} diff --git a/src/socket.h b/src/socket.h index ba90843..0b9b099 100644 --- a/src/socket.h +++ b/src/socket.h @@ -11,4 +11,12 @@ int SocketDisconnect(int sock); int SocketSendAll(int sock, const char *buf, size_t *len); +enum +{ + EV_READ = 1, + EV_WRITE = 2 +}; + +int SocketSelect(int sock, int *events, int timeout); + #endif |
