Commit bd741649 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Add more error checking to socket.c.

parent 38c77ceb
......@@ -31,9 +31,9 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&sock_ptr->closingMutex, &attr);
check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed.");
check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed.");
check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed.");
// Bind Data Socket
struct sockaddr_in address;
......@@ -57,8 +57,8 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
"Cannot bind feedback socket.");
pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed.");
check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed.");
sock_ptr->outQueue = List_create();
check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
......@@ -70,10 +70,10 @@ PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
} else {
sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
check_mem(sock_ptr->forwardPacketTable)
PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
check(PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable) == 0, "Creating table failed.");
pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL);
check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
sock_ptr->inQueue = List_create();
check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
......@@ -95,16 +95,17 @@ int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t po
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
check(sock_ptr->is_sender, "Cannot send on receiver socket.")
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
List_push(sock_ptr->outQueue, packet);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
return 0;
error:
PERROR("There was a failure while sending from socket.%s", "");
return -1;
}
......@@ -113,16 +114,16 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_t filledMutex = sock_ptr->inQueueFilledMutex;
pthread_mutex_t closingMutex = sock_ptr->closingMutex;
while (1) {
pthread_mutex_lock(&filledMutex);
check(pthread_mutex_lock(&filledMutex) == 0, "Lock failed.");
while (List_count(sock_ptr->inQueue) == 0) {
pthread_mutex_lock(&closingMutex);
check(pthread_mutex_lock(&closingMutex) == 0, "Lock failed.");
if (sock_ptr->closing) {
pthread_mutex_unlock(&closingMutex);
pthread_mutex_unlock(&filledMutex);
check(pthread_mutex_unlock(&closingMutex) == 0, "Unlock failed.");
check(pthread_mutex_unlock(&filledMutex) == 0, "Unlock failed.");
return -1;
}
pthread_mutex_unlock(&closingMutex);
pthread_cond_wait(&sock_ptr->inQueueFilledCv, &filledMutex);
check(pthread_mutex_unlock(&closingMutex) == 0, "Unlock failed.");
check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &filledMutex) == 0, "Wait failed.");
}
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
......@@ -130,59 +131,61 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
PrrtPacket_destroy(packet);
pthread_mutex_unlock(&filledMutex);
check(pthread_mutex_unlock(&filledMutex) == 0, "Unlock failed.");
return len;
}
error:
PERROR("Wrong socket type.%s", "");
PERROR("There was a failure while receiving from socket.%s", "");
return -1;
}
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
sock_ptr->closing = true;
pthread_mutex_unlock(&sock_ptr->closingMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
void **res = NULL;
if (sock_ptr->sendThread != 0) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
pthread_cond_broadcast(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed.");
check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
pthread_join(sock_ptr->sendThread, res);
check(pthread_join(sock_ptr->sendThread, res) == 0, "Join failed.");
sock_ptr->sendThread = 0;
}
if (sock_ptr->receiveDataThread != 0) {
debug("Shutting down receive data thread.");
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
pthread_cond_broadcast(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed");
check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed");
pthread_cancel(sock_ptr->receiveDataThread);
pthread_join(sock_ptr->receiveDataThread, res);
check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed.");
check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed.");
sock_ptr->receiveDataThread = 0;
}
if (sock_ptr->receiveFeedbackThread != 0) {
pthread_cancel(sock_ptr->receiveFeedbackThread);
pthread_join(sock_ptr->receiveFeedbackThread, res);
check(pthread_cancel(sock_ptr->receiveFeedbackThread) == 0, "Cancel failed.");
check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed.");
sock_ptr->receiveFeedbackThread = 0;
}
debug("Interrupted all threads.");
return EXIT_SUCCESS;
error:
PERROR("Interruping socket failed.%s","");
return EXIT_FAILURE;
}
int PrrtSocket_close(PrrtSocket *sock_ptr) {
debug("Closing socket.");
pthread_mutex_lock(&sock_ptr->closingMutex);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
if (!sock_ptr->closing) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
PrrtSocket_interrupt(sock_ptr);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed.");
} else {
pthread_mutex_unlock(&sock_ptr->closingMutex);
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
}
if (sock_ptr->dataStore != NULL) {
......@@ -221,15 +224,15 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
}
if (sock_ptr->outQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed.");
check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed.");
List_destroy(sock_ptr->outQueue);
sock_ptr->outQueue = NULL;
}
if (sock_ptr->inQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->inQueueFilledCv);
check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed.");
check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed.");
List_destroy(sock_ptr->inQueue);
sock_ptr->inQueue = NULL;
}
......@@ -239,12 +242,16 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->forwardPacketTable = NULL;
}
pthread_mutex_destroy(&sock_ptr->closingMutex);
check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
debug("Socket closed.");
return 0;
error:
PERROR("Closing socket failed.%s", "");
return -1;
}
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
......@@ -266,8 +273,8 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr);
//uint16_t remote_port = ntohs(remote.sin_port);
//char *remote_host = inet_ntoa(remote.sin_addr);
//debug("Received feedback %s:%d", remote_host, remote_port);
PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment