Commit ab36bdd3 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Properly terminating feedback thread. Fixed issue with unbound data socket for senders.

parent f3e1484e
......@@ -28,12 +28,12 @@ void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock_decode(block);
while(List_count(block->dataPackets) > 0) {
while (List_count(block->dataPackets) > 0) {
PrrtPacket *pkt = List_shift(block->dataPackets);
if (PrrtForwardPacketTable_test_set_is_number_relevant(sock_ptr->forwardPacketTable, pkt->seqno)) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, pkt);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
} else {
PrrtPacket_destroy(pkt);
......@@ -59,6 +59,7 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
hp = gethostbyname(remote_host);
memcpy((void *) &targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);
PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
uint8_t buf[MAX_PAYLOAD_LENGTH];
memset(buf, 0, MAX_PAYLOAD_LENGTH);
......@@ -77,7 +78,7 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
return EXIT_FAILURE;
}
void * receive_data_loop(void *ptr) {
void *receive_data_loop(void *ptr) {
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
......@@ -115,7 +116,7 @@ void * receive_data_loop(void *ptr) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
List_push(sock_ptr->inQueue, packet);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
decode_block(sock_ptr, packet->seqno - packet->index);
......
......@@ -3,6 +3,7 @@
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/util/dbg.h>
#include "feedback_receiver.h"
void *receive_feedback_loop(void *ptr) {
......@@ -10,13 +11,16 @@ void *receive_feedback_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
while(1) {
pthread_mutex_lock(&sock_ptr->closingMutex);
while (sock_ptr->closing == FALSE) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
memset(bufin, 0, MAX_PAYLOAD_LENGTH);
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
printf("WOKE\n");
if(t != NULL) {
if (t != NULL) {
PrrtPacket_destroy(t);
}
usleep(1000);
usleep(1);
pthread_mutex_lock(&sock_ptr->closingMutex);
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
}
\ No newline at end of file
......@@ -11,6 +11,7 @@
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
#include <sys/poll.h>
#include "socket.h"
#include "block.h"
......@@ -29,6 +30,16 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&sock_ptr->closingMutex, &attr);
// Bind Data Socket
struct sockaddr_in address;
memset((char *) &address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(port);
check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
"Cannot bind data socket.");
if (is_sender) {
// Bind Feedback Socket
struct sockaddr_in address;
......@@ -37,46 +48,42 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
"Cannot bind feedback socket.");
pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
sock_ptr->outQueue = List_create();
check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
EXIT_SUCCESS, "Cannot create receive feedback thread.");
check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
"Cannot create send thread.");
sock_ptr->receivers = List_create();
} else {
// Bind Data Socket
struct sockaddr_in address;
memset((char *) &address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(port);
check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL);
sock_ptr->inQueue = List_create();
check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
"Cannot create data receiving thread.");
}
return EXIT_SUCCESS;
error:
// TODO: cancel threads
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
return EXIT_FAILURE;
// TODO: cancel threads
close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd);
return EXIT_FAILURE;
}
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
PrrtReceiver* recv = calloc(1, sizeof(PrrtReceiver));
PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
recv->host_name = host;
recv->port = port;
......@@ -98,7 +105,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
return 0;
error:
return -1;
return -1;
}
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
......@@ -107,14 +114,14 @@ int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_lock(&t);
while (List_count(sock_ptr->inQueue) == 0) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(sock_ptr->closing) {
if (sock_ptr->closing) {
// TODO: proper close
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&t);
return -1;
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
pthread_cond_wait(&sock_ptr->inQueueFilledCv, &t);
}
PrrtPacket *packet = List_shift(sock_ptr->inQueue);
......@@ -135,7 +142,7 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
void **res = NULL;
if(sock_ptr->sendThread != 0) {
if (sock_ptr->sendThread != 0) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
......@@ -144,18 +151,17 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
sock_ptr->sendThread = 0;
}
if(sock_ptr->receiveDataThread != 0) {
if (sock_ptr->receiveDataThread != 0) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_cond_signal(&sock_ptr->inQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
pthread_cancel(sock_ptr->receiveDataThread);
pthread_join(sock_ptr->receiveDataThread, res);
sock_ptr->receiveDataThread = 0;
}
if(sock_ptr->receiveFeedbackThread != 0) {
if (sock_ptr->receiveFeedbackThread != 0) {
pthread_cancel(sock_ptr->receiveFeedbackThread);
pthread_join(sock_ptr->receiveFeedbackThread, res);
......@@ -165,16 +171,18 @@ int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
int PrrtSocket_close(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(!sock_ptr->closing) {
if (!sock_ptr->closing) {
pthread_mutex_unlock(&sock_ptr->closingMutex);
PrrtSocket_interrupt(sock_ptr);
} else {
pthread_mutex_unlock(&sock_ptr->closingMutex);
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
if(sock_ptr->dataStore != NULL) {
List* dataList = List_create();
BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE-1);
while(List_count(dataList) > 0) {
PrrtPacket* packet = List_shift(dataList);
if (sock_ptr->dataStore != NULL) {
List *dataList = List_create();
BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1);
while (List_count(dataList) > 0) {
PrrtPacket *packet = List_shift(dataList);
PrrtPacket_destroy(packet);
}
......@@ -183,11 +191,11 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
}
if(sock_ptr->blockStore != NULL) {
List* blockList = List_create();
BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE-1);
while(List_count(blockList) > 0) {
PrrtBlock* block = List_shift(blockList);
if (sock_ptr->blockStore != NULL) {
List *blockList = List_create();
BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
while (List_count(blockList) > 0) {
PrrtBlock *block = List_shift(blockList);
PrrtBlock_free(block);
}
......@@ -196,7 +204,7 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
}
if(sock_ptr->receivers != NULL) {
if (sock_ptr->receivers != NULL) {
while (List_count(sock_ptr->receivers) > 0) {
free(List_shift(sock_ptr->receivers));
}
......@@ -204,21 +212,21 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->receivers = NULL;
}
if(sock_ptr->outQueue != NULL) {
if (sock_ptr->outQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
List_destroy(sock_ptr->outQueue);
sock_ptr->outQueue = NULL;
}
if(sock_ptr->inQueue != NULL) {
if (sock_ptr->inQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
pthread_cond_destroy(&sock_ptr->inQueueFilledCv);
List_destroy(sock_ptr->inQueue);
sock_ptr->inQueue = NULL;
}
if(sock_ptr->forwardPacketTable != NULL ){
if (sock_ptr->forwardPacketTable != NULL) {
free(sock_ptr->forwardPacketTable);
sock_ptr->forwardPacketTable = NULL;
}
......@@ -234,6 +242,18 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
struct pollfd fds;
int timeout_msecs = 1000;
fds.fd = sock_ptr->feedbackSocketFd;
fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs);
check(n >= 0, "Select failed.")
if (n == 0) {
return NULL;
}
n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
......@@ -247,5 +267,5 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
return packet_ptr;
error:
return NULL;
return NULL;
}
......@@ -24,7 +24,7 @@ typedef struct {
pthread_t receiveDataThread;
pthread_mutex_t inQueueFilledMutex;
pthread_cond_t inQueueFilledMutexCv;
pthread_cond_t inQueueFilledCv;
List *inQueue;
BPTreeNode* dataStore;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment