#include #include #include #include #include #include #include #include #include "../defines.h" #include "packet.h" #include "../util/dbg.h" #include "../util/common.h" #include "processes/feedbackReceiver.h" #include "processes/dataTransmitter.h" #include "processes/dataReceiver.h" #include "socket.h" #include "block.h" #include "receiver.h" PrrtSocket *PrrtSocket_create(const bool is_sender) { assert(sizeof(float) == 4); PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket)); check_mem(sock_ptr); sock_ptr->isSender = is_sender; sock_ptr->clock = PrrtClock_create(); sock_ptr->isBound = false; sock_ptr->sequenceNumberSource = 1; sock_ptr->sequenceNumberRedundancy = 1; sock_ptr->csi = PrrtChannelStateInformation_create(); sock_ptr->applicationConstraints = PrrtApplicationConstraints_create(); sock_ptr->dataStore = NULL; check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.") check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket."); pthread_mutexattr_t attr; check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed."); check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed."); check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed."); if (is_sender) { check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed."); check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed."); sock_ptr->outQueue = List_create(); sock_ptr->receivers = List_create(); } else { sock_ptr->forwardPacketTable = PrrtForwardPacketTable_create(); check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed."); check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed."); sock_ptr->inQueue = List_create(); } return sock_ptr; error: PrrtSocket_close(sock_ptr); return NULL; } bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t port) { check(port <= 65534, "Port %d cannot be bound to.", port); size_t size = sizeof(struct sockaddr_in); struct sockaddr_in* address = calloc(1, size); check_mem(address); address->sin_family = AF_INET; address->sin_addr.s_addr = inet_addr(ipAddress); address->sin_port = htons((uint16_t) (port + 1)); sock_ptr->address = address; check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS, "Cannot bind feedback socket."); address->sin_port = htons((uint16_t) (port)); check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS, "Cannot bind data socket."); if(sock_ptr->isSender) { check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread."); check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread."); } else { check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread."); } return true; error: PrrtSocket_close(sock_ptr); return false; } int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) { PrrtReceiver *recv = PrrtReceiver_create(host, port); List_push(sock_ptr->receivers, recv); return 0; } int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) { check(sock_ptr->isSender, "Cannot send on receiver socket.") check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed."); PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, 0, PrrtApplicationConstraints_get_target_delay(sock_ptr->applicationConstraints)); List_push(sock_ptr->outQueue, packet); check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed."); check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed"); return 0; error: PERROR("There was a failure while sending from socket.%s", ""); return -1; } int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) { check(sock_ptr->isSender == false, "Cannot receive on sender socket.") while (1) { check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed."); while (List_count(sock_ptr->inQueue) == 0) { check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); if (sock_ptr->closing) { check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed."); return -1; } check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed."); } PrrtPacket *packet = List_shift(sock_ptr->inQueue); prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE); PrrtPacket_destroy(packet); check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed."); return len; } error: PERROR("There was a failure while receiving from socket.%s", ""); return -1; } int PrrtSocket_interrupt(PrrtSocket *sock_ptr) { check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); sock_ptr->closing = true; check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); void **res = NULL; if (sock_ptr->sendThread != 0) { check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed."); check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed."); check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed."); check(pthread_join(sock_ptr->sendThread, res) == 0, "Join failed."); sock_ptr->sendThread = 0; } if (sock_ptr->receiveDataThread != 0) { check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed."); check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed"); check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed"); check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed."); check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed."); sock_ptr->receiveDataThread = 0; } if (sock_ptr->receiveFeedbackThread != 0) { check(pthread_cancel(sock_ptr->receiveFeedbackThread) == 0, "Cancel failed."); check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed."); sock_ptr->receiveFeedbackThread = 0; } return EXIT_SUCCESS; error: return EXIT_FAILURE; } int PrrtSocket_close(PrrtSocket *sock_ptr) { debug("Closing socket."); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); if (!sock_ptr->closing) { check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed."); } else { check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); } if (sock_ptr->dataStore != NULL) { List *dataList = List_create(); BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1); while (List_count(dataList) > 0) { PrrtPacket *packet = List_shift(dataList); PrrtPacket_destroy(packet); } List_destroy(dataList); sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore); } if (sock_ptr->blockStore != NULL) { List *blockList = List_create(); BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1); while (List_count(blockList) > 0) { PrrtBlock *block = List_shift(blockList); PrrtBlock_destroy(block); } List_destroy(blockList); sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore); } if (sock_ptr->receivers != NULL) { while (List_count(sock_ptr->receivers) > 0) { PrrtReceiver *recv = List_shift(sock_ptr->receivers); PrrtReceiver_destroy(recv); } List_destroy(sock_ptr->receivers); sock_ptr->receivers = NULL; } if (sock_ptr->outQueue != NULL) { check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed."); check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed."); List_destroy(sock_ptr->outQueue); sock_ptr->outQueue = NULL; } if (sock_ptr->inQueue != NULL) { check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed."); check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed."); List_destroy(sock_ptr->inQueue); sock_ptr->inQueue = NULL; } if (sock_ptr->forwardPacketTable != NULL) { check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed."); sock_ptr->forwardPacketTable = NULL; } check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed."); if(sock_ptr->address != NULL) { free(sock_ptr->address); } if(sock_ptr->csi != NULL) { check(PrrtChannelStateInformation_destroy(sock_ptr->csi), "Could not destroy channel state information.") } if(sock_ptr->applicationConstraints) { check(PrrtApplicationConstraints_destroy(sock_ptr->applicationConstraints), "Could not destroy application constraints.") } if(sock_ptr->clock != NULL) { check(PrrtClock_destroy(sock_ptr->clock), "Destroy clock failed"); } close(sock_ptr->dataSocketFd); close(sock_ptr->feedbackSocketFd); debug("Socket closed."); return 0; error: PERROR("Closing socket failed.%s", ""); return -1; } PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length) { char bufin[MAX_PAYLOAD_LENGTH]; ssize_t n; struct sockaddr_in remote; socklen_t addrlen = sizeof(remote); struct pollfd fds; int timeout_msecs = 1000; fds.fd = prrtSocket->feedbackSocketFd; fds.events = POLLIN; n = poll(&fds, 1, timeout_msecs); check(n >= 0, "Select failed.") if (n == 0) { return NULL; } prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us(); n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen); check(n >= 0, "Receiving feedback failed."); PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket)); check_mem(prrtPacket); PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket); PrrtPacketFeedbackPayload* payload = prrtPacket->payload; struct in_addr a; a.s_addr = payload->receiverAddress; debug("Receiver Address: %s", inet_ntoa(a)); prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us; PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp)); PrrtChannelStateInformation_print(prrtSocket->csi); return prrtPacket; error: return NULL; } bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value) { if(strcmp(name, "targetdelay") == 0) { PrrtApplicationConstraints_set_target_delay(sock_ptr->applicationConstraints, value); } else { return false; } return true; }