Commit d1e7dcc8 authored by Andreas Schmidt's avatar Andreas Schmidt
Browse files

Properly terminating a receiver that has not yet received anything.

parent 07a10a1e
...@@ -79,6 +79,7 @@ void * receive_data_loop(void *ptr) { ...@@ -79,6 +79,7 @@ void * receive_data_loop(void *ptr) {
while (1) { while (1) {
memset(buffer, 0, MAX_PAYLOAD_LENGTH); memset(buffer, 0, MAX_PAYLOAD_LENGTH);
n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen); n = recvfrom(sock_ptr->dataSocketFd, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
printf("RECEIVED: %d\n", n);
check(send_feedback(sock_ptr, remote) == EXIT_SUCCESS, "Sending feedback failed."); check(send_feedback(sock_ptr, remote) == EXIT_SUCCESS, "Sending feedback failed.");
packet = calloc(1, sizeof(PrrtPacket)); packet = calloc(1, sizeof(PrrtPacket));
......
...@@ -23,7 +23,10 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i ...@@ -23,7 +23,10 @@ int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t i
check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.") check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket."); check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
pthread_mutex_init(&sock_ptr->closingMutex, NULL); pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&sock_ptr->closingMutex, &attr);
if (is_sender) { if (is_sender) {
// Bind Feedback Socket // Bind Feedback Socket
...@@ -97,11 +100,20 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le ...@@ -97,11 +100,20 @@ int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_le
return -1; return -1;
} }
uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) { int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
pthread_mutex_t t = sock_ptr->inQueueFilledMutex; pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
while (1) { while (1) {
pthread_mutex_lock(&t); pthread_mutex_lock(&t);
while (List_count(sock_ptr->inQueue) == 0) { while (List_count(sock_ptr->inQueue) == 0) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(sock_ptr->closing) {
// TODO: proper close
printf("CLOSE PROCESS");
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_mutex_unlock(&t);
return -1;
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t); pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
} }
...@@ -117,11 +129,47 @@ uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) { ...@@ -117,11 +129,47 @@ uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
} }
} }
int PrrtSocket_close(PrrtSocket *sock_ptr) { int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex); pthread_mutex_lock(&sock_ptr->closingMutex);
sock_ptr->closing = TRUE; sock_ptr->closing = TRUE;
pthread_mutex_unlock(&sock_ptr->closingMutex); pthread_mutex_unlock(&sock_ptr->closingMutex);
void **res = NULL;
if(sock_ptr->sendThread != 0) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
pthread_join(sock_ptr->sendThread, res);
sock_ptr->sendThread = 0;
}
if(sock_ptr->receiveDataThread != 0) {
pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
pthread_cancel(sock_ptr->receiveDataThread);
pthread_join(sock_ptr->receiveDataThread, res);
sock_ptr->receiveDataThread = 0;
}
if(sock_ptr->receiveFeedbackThread != 0) {
pthread_cancel(sock_ptr->receiveFeedbackThread);
pthread_join(sock_ptr->receiveFeedbackThread, res);
sock_ptr->receiveFeedbackThread = 0;
}
}
int PrrtSocket_close(PrrtSocket *sock_ptr) {
pthread_mutex_lock(&sock_ptr->closingMutex);
if(!sock_ptr->closing) {
PrrtSocket_interrupt(sock_ptr);
}
pthread_mutex_unlock(&sock_ptr->closingMutex);
if(sock_ptr->dataStore != NULL) { if(sock_ptr->dataStore != NULL) {
sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore); sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
} }
...@@ -135,34 +183,26 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { ...@@ -135,34 +183,26 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
free(List_shift(sock_ptr->receivers)); free(List_shift(sock_ptr->receivers));
} }
List_destroy(sock_ptr->receivers); List_destroy(sock_ptr->receivers);
} sock_ptr->receivers = NULL;
void **res = NULL;
if(sock_ptr->sendThread != 0) {
pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
pthread_cond_signal(&sock_ptr->outQueueFilledCv);
pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
pthread_join(sock_ptr->sendThread, res);
}
// TODO: shut down receiver threads
if(sock_ptr->receiveFeedbackThread != 0) {
pthread_cancel(sock_ptr->receiveFeedbackThread);
pthread_join(sock_ptr->receiveFeedbackThread, res);
} }
if(sock_ptr->outQueue != NULL) { if(sock_ptr->outQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex); pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->outQueueFilledCv); pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
List_destroy(sock_ptr->outQueue); List_destroy(sock_ptr->outQueue);
sock_ptr->outQueue = NULL;
} }
if(sock_ptr->inQueue != NULL) { if(sock_ptr->inQueue != NULL) {
pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex); pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv); pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
List_destroy(sock_ptr->inQueue); List_destroy(sock_ptr->inQueue);
sock_ptr->inQueue = NULL;
}
if(sock_ptr->forwardPacketTable != NULL ){
free(sock_ptr->forwardPacketTable);
sock_ptr->forwardPacketTable = NULL;
} }
pthread_mutex_destroy(&sock_ptr->closingMutex); pthread_mutex_destroy(&sock_ptr->closingMutex);
......
...@@ -47,10 +47,11 @@ typedef struct { ...@@ -47,10 +47,11 @@ typedef struct {
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender); int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender);
int PrrtSocket_interrupt(PrrtSocket *sock_ptr);
int PrrtSocket_close(PrrtSocket *sock_ptr); int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len); int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len);
uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr); int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length); PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length);
#endif // PRRT_SOCKET_H #endif // PRRT_SOCKET_H
...@@ -41,10 +41,6 @@ int PrrtForwardPacketTable_create(PrrtForwardPacketTable *fpt_prt) { ...@@ -41,10 +41,6 @@ int PrrtForwardPacketTable_create(PrrtForwardPacketTable *fpt_prt) {
memset(fpt_prt->data, 0, sizeof(fpt_prt->data)); memset(fpt_prt->data, 0, sizeof(fpt_prt->data));
} }
int PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt) {
}
int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) { int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *fpt_ptr, uint16_t seqno) {
int res = check_position(fpt_ptr, seqno); int res = check_position(fpt_ptr, seqno);
......
...@@ -17,6 +17,4 @@ int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *f ...@@ -17,6 +17,4 @@ int PrrtForwardPacketTable_test_set_is_number_relevant(PrrtForwardPacketTable *f
int PrrtForwardPacketTable_test_is_block_relevant(PrrtForwardPacketTable * forwardPacketTable, uint16_t start, uint16_t length); int PrrtForwardPacketTable_test_is_block_relevant(PrrtForwardPacketTable * forwardPacketTable, uint16_t start, uint16_t length);
int PrrtForwardPacketTable_destroy(PrrtForwardPacketTable* fpt_prt);
#endif //PRRT_FORWARD_PACKET_TABLE_H #endif //PRRT_FORWARD_PACKET_TABLE_H
#include <stdio.h> #include <stdio.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
#include <signal.h>
#include "prrt/socket.h" #include "prrt/socket.h"
PrrtSocket sock;
static volatile int keepRunning = 1;
void intHandler(int dummy) {
keepRunning = 0;
PrrtSocket_interrupt(&sock);
}
int main(int argc, char* const argv[]) { int main(int argc, char* const argv[]) {
signal(SIGINT, intHandler);
if(argc != 2) { if(argc != 2) {
printf("Too few arguments.\n"); printf("Too few arguments.\n");
return -1; return -1;
...@@ -11,7 +22,6 @@ int main(int argc, char* const argv[]) { ...@@ -11,7 +22,6 @@ int main(int argc, char* const argv[]) {
uint16_t port = (uint16_t) atoi(argv[1]); uint16_t port = (uint16_t) atoi(argv[1]);
PrrtSocket sock;
printf("PRRT - RECEIVER\n"); printf("PRRT - RECEIVER\n");
if(PrrtSocket_create(&sock, port, 0) < 0) { if(PrrtSocket_create(&sock, port, 0) < 0) {
...@@ -20,9 +30,12 @@ int main(int argc, char* const argv[]) { ...@@ -20,9 +30,12 @@ int main(int argc, char* const argv[]) {
} }
int i = 1; int i = 1;
while(1) { while(keepRunning) {
unsigned char buffer[MAX_PAYLOAD_LENGTH]; unsigned char buffer[MAX_PAYLOAD_LENGTH];
int n = PrrtSocket_recv(&sock, buffer); int n = PrrtSocket_recv(&sock, buffer);
if(n < 0 ) {
continue;
}
buffer[n] = '\0'; buffer[n] = '\0';
printf("[B (n: %d, i: %3d)] %s", n, i, buffer); printf("[B (n: %d, i: %3d)] %s", n, i, buffer);
i++; i++;
...@@ -30,8 +43,5 @@ int main(int argc, char* const argv[]) { ...@@ -30,8 +43,5 @@ int main(int argc, char* const argv[]) {
} }
PrrtSocket_close(&sock); PrrtSocket_close(&sock);
pthread_exit(NULL);
return 0; return 0;
} }
...@@ -16,8 +16,7 @@ protected: ...@@ -16,8 +16,7 @@ protected:
TEST_F(ForwardPacketTableTest, CreateDestroy) { TEST_F(ForwardPacketTableTest, CreateDestroy) {
ASSERT_EQ(forwardPacketTable->start, 1); ASSERT_EQ(forwardPacketTable->start, 1);
free(forwardPacketTable);
PrrtForwardPacketTable_destroy(forwardPacketTable);
} }
TEST_F(ForwardPacketTableTest, SimpleTestAndSet) { TEST_F(ForwardPacketTableTest, SimpleTestAndSet) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment