Commit 07a10a1e authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Properly closing and freeing memory for sending socket.

parent 58120f8f
...@@ -13,12 +13,12 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k, ...@@ -13,12 +13,12 @@ void retrieve_data_blocks(PrrtSocket *sock_ptr, uint16_t base_seqno, uint8_t k,
LIST_FOREACH(res, first, next, cur) { LIST_FOREACH(res, first, next, cur) {
PrrtPacket *packetPtr = cur->value; PrrtPacket *packetPtr = cur->value;
PrrtBlock_insert_data_packet(block, packetPtr); PrrtBlock_insert_data_packet((PrrtBlock *) block, packetPtr);
sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno); sock_ptr->dataStore = BPTree_delete(sock_ptr->dataStore, packetPtr->seqno);
} }
} }
void decode_block(const PrrtSocket *sock_ptr, uint16_t base_seqno) { void decode_block(PrrtSocket *sock_ptr, uint16_t base_seqno) {
PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno); PrrtBlock *block = BPTree_get(sock_ptr->blockStore, base_seqno);
if (block != NULL && PrrtBlock_decode_ready(block)) { if (block != NULL && PrrtBlock_decode_ready(block)) {
...@@ -66,7 +66,7 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) { ...@@ -66,7 +66,7 @@ int send_feedback(const PrrtSocket *sock_ptr, struct sockaddr_in remote) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
void receive_data_loop(void *ptr) { void * receive_data_loop(void *ptr) {
ssize_t n; ssize_t n;
struct sockaddr_in remote; struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote); socklen_t addrlen = sizeof(remote);
......
#ifndef PRRT_DATA_RECEIVER_H #ifndef PRRT_DATA_RECEIVER_H
#define PRRT_DATA_RECEIVER_H #define PRRT_DATA_RECEIVER_H
void receive_data_loop(void *ptr); void * receive_data_loop(void *ptr);
#endif //PRRT_DATA_RECEIVER_H #endif //PRRT_DATA_RECEIVER_H
...@@ -44,7 +44,7 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) { ...@@ -44,7 +44,7 @@ int send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
void *send_data_loop(void *ptr) { void * send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL; PrrtBlock *block = NULL;
...@@ -54,6 +54,17 @@ void *send_data_loop(void *ptr) { ...@@ -54,6 +54,17 @@ void *send_data_loop(void *ptr) {
while (1) { while (1) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex); pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
while (List_count(sock_ptr->outQueue) == 0) { while (List_count(sock_ptr->outQueue) == 0) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(sock_ptr->closing) {
free(cpar);
PrrtBlock_free(&block);
block = NULL;
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
return NULL;
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex); pthread_cond_wait(&sock_ptr->outQueueFilledCv, &sock_ptr->outQueueFilledMutex);
} }
......
...@@ -13,6 +13,7 @@ void *receive_feedback_loop(void *ptr) { ...@@ -13,6 +13,7 @@ void *receive_feedback_loop(void *ptr) {
while(1) { while(1) {
memset(bufin, 0, MAX_PAYLOAD_LENGTH); memset(bufin, 0, MAX_PAYLOAD_LENGTH);
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH); PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, bufin, MAX_PAYLOAD_LENGTH);
printf("WOKE\n");
if(t != NULL) { if(t != NULL) {
PrrtPacket_destroy(t); PrrtPacket_destroy(t);
} }
......
...@@ -23,6 +23,8 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i ...@@ -23,6 +23,8 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.") check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket."); check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
pthread_mutex_init(&sock_ptr->closingMutex, NULL);
if (is_sender) { if (is_sender) {
// Bind Feedback Socket // Bind Feedback Socket
struct sockaddr_in address; struct sockaddr_in address;
...@@ -95,7 +97,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le ...@@ -95,7 +97,7 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
return -1; return -1;
} }
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) { uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_t t = sock_ptr->inQueueFilledMutex; pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
while (1) { while (1) {
pthread_mutex_lock(&t); pthread_mutex_lock(&t);
...@@ -116,6 +118,10 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) { ...@@ -116,6 +118,10 @@ uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
} }
int PrrtSocket_close(PrrtSocket *sock_ptr) { int PrrtSocket_close(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
sock_ptr->closing = TRUE;
pthread_mutex_unlock(&sock_ptr->closingMutex);
if(sock_ptr->dataStore != NULL) { if(sock_ptr->dataStore != NULL) {
sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore); sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
} }
...@@ -124,19 +130,29 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { ...@@ -124,19 +130,29 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore); sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
} }
//void **res = NULL;
//pthread_join(sock_ptr->sendThread, res);
// TODO: shut down threads;
if(sock_ptr->receivers != NULL) { if(sock_ptr->receivers != NULL) {
while(List_count(sock_ptr->receivers) > 0) { while (List_count(sock_ptr->receivers) > 0) {
free(List_shift(sock_ptr->receivers)); free(List_shift(sock_ptr->receivers));
} }
List_destroy(sock_ptr->receivers); List_destroy(sock_ptr->receivers);
} }
void **res = NULL;
if(sock_ptr->sendThread != 0) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
pthread_join(sock_ptr->sendThread, res);
}
// TODO: shut down receiver threads
if(sock_ptr->receiveFeedbackThread != 0) {
pthread_cancel(sock_ptr->receiveFeedbackThread);
pthread_join(sock_ptr->receiveFeedbackThread, res);
}
if(sock_ptr->outQueue != NULL) { if(sock_ptr->outQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex); pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->outQueueFilledCv); pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
...@@ -149,6 +165,8 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { ...@@ -149,6 +165,8 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
List_destroy(sock_ptr->inQueue); List_destroy(sock_ptr->inQueue);
} }
pthread_mutex_destroy(&sock_ptr->closingMutex);
close(sock_ptr->dataSocketFd); close(sock_ptr->dataSocketFd);
close(sock_ptr->feedbackSocketFd); close(sock_ptr->feedbackSocketFd);
return 0; return 0;
...@@ -158,8 +176,8 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co ...@@ -158,8 +176,8 @@ PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, co
ssize_t n; ssize_t n;
struct sockaddr_in remote; struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote); socklen_t addrlen = sizeof(remote);
n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen) >= 0, "Receiving feedback failed."); check(n >= 0, "Receiving feedback failed.");
uint16_t remote_port = ntohs(remote.sin_port); uint16_t remote_port = ntohs(remote.sin_port);
char *remote_host = inet_ntoa(remote.sin_addr); char *remote_host = inet_ntoa(remote.sin_addr);
......
...@@ -35,6 +35,9 @@ typedef struct { ...@@ -35,6 +35,9 @@ typedef struct {
List* receivers; List* receivers;
pthread_mutex_t closingMutex;
uint8_t closing;
uint16_t packetsCount; uint16_t packetsCount;
uint16_t sequenceNumberSource; uint16_t sequenceNumberSource;
uint16_t sequenceNumberRepetition; uint16_t sequenceNumberRepetition;
...@@ -47,7 +50,7 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i ...@@ -47,7 +50,7 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
int PrrtSocket_close(PrrtSocket *sock_ptr); int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len); int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len);
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr); uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length); PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length);
#endif // PRRT_SOCKET_H #endif // PRRT_SOCKET_H
...@@ -47,8 +47,8 @@ int main(int argc, char* const argv) { ...@@ -47,8 +47,8 @@ int main(int argc, char* const argv) {
if (line) if (line)
free(line); free(line);
usleep(1000 * 1000 * 6);
usleep(1000*1000*4);
PrrtSocket_close(&sock); PrrtSocket_close(&sock);
printf("COMPLETELY CLOSED\n"); printf("COMPLETELY CLOSED\n");
return 0; return 0;
......
...@@ -15,7 +15,7 @@ void List_destroy(const List *list) ...@@ -15,7 +15,7 @@ void List_destroy(const List *list)
} }
free(list->last); free(list->last);
free(list); free((void *) list);
} }
...@@ -38,7 +38,7 @@ void List_push(List *list, const void *value) ...@@ -38,7 +38,7 @@ void List_push(List *list, const void *value)
{ {
ListNode *node = calloc(1, sizeof(ListNode)); ListNode *node = calloc(1, sizeof(ListNode));
node->value = value; node->value = (void *) value;
if(list->last == NULL) { if(list->last == NULL) {
list->first = node; list->first = node;
...@@ -58,14 +58,14 @@ void List_push(List *list, const void *value) ...@@ -58,14 +58,14 @@ void List_push(List *list, const void *value)
void *List_pop(const List *list) void *List_pop(const List *list)
{ {
ListNode *node = list->last; ListNode *node = list->last;
return node != NULL ? List_remove(list, node) : NULL; return node != NULL ? List_remove((List *) list, node) : NULL;
} }
void List_unshift(List *list, const void *value) void List_unshift(List *list, const void *value)
{ {
ListNode *node = calloc(1, sizeof(ListNode)); ListNode *node = calloc(1, sizeof(ListNode));
node->value = value; node->value = (void *) value;
if(list->first == NULL) { if(list->first == NULL) {
list->first = node; list->first = node;
...@@ -85,7 +85,7 @@ void List_unshift(List *list, const void *value) ...@@ -85,7 +85,7 @@ void List_unshift(List *list, const void *value)
void *List_shift(const List *list) void *List_shift(const List *list)
{ {
ListNode *node = list->first; ListNode *node = list->first;
return node != NULL ? List_remove(list, node) : NULL; return node != NULL ? List_remove((List *) list, node) : NULL;
} }
void *List_remove(List *list, const ListNode *node) void *List_remove(List *list, const ListNode *node)
...@@ -110,7 +110,7 @@ void *List_remove(List *list, const ListNode *node) ...@@ -110,7 +110,7 @@ void *List_remove(List *list, const ListNode *node)
list->count--; list->count--;
result = node->value; result = node->value;
free(node); free((void *) node);
error: error:
return result; return result;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment