Commit e787bc50 authored by Andreas Schmidt's avatar Andreas Schmidt

Remove feedback handling from API.

parent 09152fb3
Pipeline #108 failed with stage
...@@ -117,7 +117,6 @@ cdef extern from "prrt/socket.h": ...@@ -117,7 +117,6 @@ cdef extern from "prrt/socket.h":
int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port) int PrrtSocket_connect(PrrtSocket *sock_ptr, const_char *host, const uint16_t port)
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len)
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) nogil
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length)
cdef extern from "util/bptree.h": cdef extern from "util/bptree.h":
ctypedef struct BPTreeNode: ctypedef struct BPTreeNode:
......
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <sys/socket.h>
#include <bits/poll.h>
#include <netinet/in.h>
#include <sys/poll.h>
#include "../../defines.h" #include "../../defines.h"
#include "../../util/common.h" #include "../../util/common.h"
#include "../../util/dbg.h" #include "../../util/dbg.h"
...@@ -8,16 +12,54 @@ ...@@ -8,16 +12,54 @@
#include "../socket.h" #include "../socket.h"
#include "feedbackReceiver.h" #include "feedbackReceiver.h"
void * receive_feedback_loop(void *ptr) { void handle_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
struct pollfd fds;
int timeout_msecs = 1000;
fds.fd = prrtSocket->feedbackSocketFd;
fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs);
check(n >= 0, "Select failed.")
if(n == 0) {
return;
}
prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket));
check_mem(prrtPacket);
PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
//PrrtPacketFeedbackPayload* payload = prrtPacket->payload;
//struct in_addr a;
//a.s_addr = payload->receiverAddress;
//debug("Receiver Address: %s", inet_ntoa(a));
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload *) prrtPacket->payload)->forwardTripTimestamp_us;
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_print(prrtSocket->csi);
error:
if(prrtPacket != NULL) { PrrtPacket_destroy(prrtPacket); }
}
void *receive_feedback_loop(void *ptr)
{
PrrtSocket *sock_ptr = ptr; PrrtSocket *sock_ptr = ptr;
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
while (sock_ptr->closing == false) { while(sock_ptr->closing == false) {
check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed."); check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
PrrtPacket *t = PrrtSocket_recv_feedback(sock_ptr, MAX_PAYLOAD_LENGTH); handle_feedback(sock_ptr, MAX_PAYLOAD_LENGTH);
if (t != NULL) {
PrrtPacket_destroy(t);
}
usleep(1); usleep(1);
check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed."); check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
} }
...@@ -26,6 +68,6 @@ void * receive_feedback_loop(void *ptr) { ...@@ -26,6 +68,6 @@ void * receive_feedback_loop(void *ptr) {
return NULL; return NULL;
error: error:
PERROR("Feedback reception failed."); PERROR("Feedback reception failed.");
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -286,48 +286,6 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) { ...@@ -286,48 +286,6 @@ int PrrtSocket_close(PrrtSocket *sock_ptr) {
return -1; return -1;
} }
PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length)
{
char bufin[MAX_PAYLOAD_LENGTH];
ssize_t n;
struct sockaddr_in remote;
socklen_t addrlen = sizeof(remote);
struct pollfd fds;
int timeout_msecs = 1000;
fds.fd = prrtSocket->feedbackSocketFd;
fds.events = POLLIN;
n = poll(&fds, 1, timeout_msecs);
check(n >= 0, "Select failed.")
if (n == 0) {
return NULL;
}
prrtTimestamp_t receiveTime = PrrtClock_get_current_time_us();
n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
check(n >= 0, "Receiving feedback failed.");
PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket));
check_mem(prrtPacket);
PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
//PrrtPacketFeedbackPayload* payload = prrtPacket->payload;
//struct in_addr a;
//a.s_addr = payload->receiverAddress;
//debug("Receiver Address: %s", inet_ntoa(a));
prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us;
PrrtChannelStateInformation_update_rtt(prrtSocket->csi, (prrtTimedelta_t) (receiveTime - forwardTripTimestamp));
PrrtChannelStateInformation_print(prrtSocket->csi);
return prrtPacket;
error:
return NULL;
}
bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value) bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value)
{ {
if(strcmp(name, "targetdelay") == 0) { if(strcmp(name, "targetdelay") == 0) {
......
...@@ -71,6 +71,5 @@ int PrrtSocket_close(PrrtSocket *sock_ptr); ...@@ -71,6 +71,5 @@ int PrrtSocket_close(PrrtSocket *sock_ptr);
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port); int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port);
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len); int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len);
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr); int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr);
PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length);
#endif // PRRT_SOCKET_H #endif // PRRT_SOCKET_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment