...
 
Commits (6)
......@@ -5,6 +5,7 @@ set (PRRT_SOURCES ../defines.h
codingParams.c codingParams.h
receiver.c receiver.h
socket.c socket.h
timer.c timer.h
applicationConstraints.c applicationConstraints.h
processes/dataReceiver.c processes/dataReceiver.h
processes/dataTransmitter.c processes/dataTransmitter.h
......
......@@ -16,6 +16,7 @@ typedef struct prrtBlock {
bool isCoded;
pthread_mutex_t lock;
PrrtCoder *coder;
uint16_t inRound;
} PrrtBlock;
......
......@@ -3,6 +3,7 @@
#include <string.h>
#include "../../defines.h"
#include "../block.h"
#include "../timer.h"
#include "../receiver.h"
#include "../socket.h"
#include "../../util/dbg.h"
......@@ -124,6 +125,40 @@ static bool send_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
return false;
}
void block_timeout(void* arg);
typedef struct timer_arg {
PrrtSocket* socket;
PrrtBlock* block;
} RetransmissionTimerArgs;
void retransmission_round_handler(void *arg) {
int j;
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) arg;
uint32_t redundancyPackets = args->block->codingParams->n_cycle[args->block->inRound];
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(args->block);
send_packet(args->socket, red_pkt);
}
args->block->inRound++;
if (args->block->inRound >= args->block->codingParams->c) {
PrrtBlock_destroy(args->block);
free(arg);
} else {
PrrtTimerTask task = {
.arg = arg,
.fun = retransmission_round_handler
};
// TODO: Chose one RTT + a bit.
uint32_t waittime_us = PrrtSocket_get_rtprop_fwd(args->socket);
prrtTimerDate deadline = abstime_from_now(waittime_us);
PrrtTimer_submit(args->socket->timer, &deadline, &task);
}
}
void *send_data_loop(void *ptr) {
PrrtSocket *sock_ptr = ptr;
PrrtBlock *block = NULL;
......@@ -159,7 +194,6 @@ void *send_data_loop(void *ptr) {
PrrtBlock_insert_data_packet(block, packet);
// TODO: redundancy should only be sent when necessary
if (PrrtBlock_encode_ready(block)) {
uint32_t j = 0;
unsigned int redundancy_seqno = sock_ptr->sequenceNumberRedundancy;
......@@ -167,13 +201,11 @@ void *send_data_loop(void *ptr) {
PrrtBlock_encode(block, &sock_ptr->sequenceNumberRedundancy);
XlapTimeStampCycle(sock_ptr, ts_redundancy_packet, redundancy_seqno, PrrtEncodeEnd);
uint32_t redundancyPackets = List_count(block->redundancyPackets);
for (j = 0; j < redundancyPackets; j++) {
PrrtPacket *red_pkt = PrrtBlock_get_first_red_data(block);
send_packet(sock_ptr, red_pkt);
}
RetransmissionTimerArgs* args = (RetransmissionTimerArgs*) calloc(1, sizeof(RetransmissionTimerArgs));
args->block = block;
args->socket = sock_ptr;
retransmission_round_handler(args);
PrrtBlock_destroy(block);
block = NULL;
}
}
......
......@@ -38,18 +38,6 @@ static inline prrtPacketLength_t deliver_packet(const PrrtSocket *s, void *buffe
return len;
}
struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
struct timespec deadline;
prrtTimedelta_t diff_s = wait_time / 1000000;
prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
deadline.tv_sec = diff_s + now.tv_sec;
deadline.tv_nsec = diff_ns + now.tv_nsec;
return deadline;
}
PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_us) {
assert(sizeof(float) == 4);
PrrtSocket *s = (PrrtSocket *) calloc(1, sizeof(PrrtSocket));
......@@ -66,6 +54,8 @@ PrrtSocket *PrrtSocket_create(prrtByteCount_t mtu, prrtTimedelta_t target_delay_
s->isBound = false;
s->receiver = NULL;
s->timer = PrrtTimer_create(0);
uint8_t n_cycle[1] = { N_START - K_START };
s->codingParameters = PrrtCodingConfiguration_create(K_START, N_START, 1, n_cycle);
s->coder = PrrtCoder_create(s->codingParameters);
......
......@@ -17,6 +17,7 @@
#include "types/lossStatistics.h"
#include "types/packet.h"
#include "clock.h"
#include "timer.h"
#include "xlap.h"
#include "receiver.h"
......@@ -79,6 +80,8 @@ typedef struct prrtSocket {
atomic_bool isThreadPinning;
prrtByteCount_t mtu;
PrrtTimer* timer;
} PrrtSocket;
......
This diff is collapsed.
#ifndef PRRT_TIMER_H
#define PRRT_TIMER_H
#include <pthread.h>
#include <stdatomic.h>
#include "../util/futex.h"
#include "../util/list.h"
typedef void *prrtTimerTaskArg;
typedef void (*prrtTimerTaskFun)(prrtTimerTaskArg);
typedef struct timespec prrtTimerDate;
typedef unsigned long long TimerDateUDiff_t;
typedef struct prrtTimerTask {
prrtTimerTaskFun fun;
prrtTimerTaskArg arg;
} PrrtTimerTask;
typedef struct prrtTimer PrrtTimer;
PrrtTimer *PrrtTimer_create(unsigned int core);
int PrrtTimer_submit(PrrtTimer *timer, const prrtTimerDate *when, const PrrtTimerTask *what);
void PrrtTimer_end(PrrtTimer *timer);
void PrrtTimer_sleep_until(PrrtTimer *self, const prrtTimerDate *end);
void PrrtTimer_sleep_nanos(PrrtTimer *self, TimerDateUDiff_t nanos);
#endif // PRRT_TIMER_H
......@@ -35,3 +35,16 @@ void pin_thread_to_core(pthread_attr_t *ap, int core)
CPU_SET(core, &cpuset);
pthread_attr_setaffinity_np(ap, sizeof(cpu_set_t), &cpuset);
}
struct timespec abstime_from_now(prrtTimedelta_t wait_time) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
struct timespec deadline;
prrtTimedelta_t diff_s = wait_time / 1000000;
prrtTimedelta_t diff_ns = (wait_time % 1000000) * 1000;
deadline.tv_sec = diff_s + now.tv_sec;
deadline.tv_nsec = diff_ns + now.tv_nsec;
return deadline;
}
......@@ -3,11 +3,13 @@
#include <sched.h>
#include <pthread.h>
#include "../proto/types/packet.h"
#include "../proto/vdmcode/block_code.h"
int print_buffer(const char *buf, const int length);
void print_gf(const gf *start, const int len);
void pin_thread_to_core(pthread_attr_t *ap, int core);
struct timespec abstime_from_now(prrtTimedelta_t wait_time);
#define PERROR(fmt, args...) \
printf("PRRT ERROR: \n" fmt, ## args);
......
......@@ -120,4 +120,25 @@ futex__exposed int futex_wake_all(atomic_int *addr)
return syscall(SYS_futex, addr, FUTEX_WAKE_PRIVATE, INT_MAX);
}
/**
* \brief Wait for a variable to change, with timeout
*
* This function is equivalent to futex_wait, but it takes an additional
* timeout parameter that limits the waiting time.
*
* \param addr The address of the variable of interest
* \param exp The expected value of that variable
* \param time The waiting timeout
* \returns 0 on success
* \returns -1 on error
*
* \see [man 2 syscall](http://man7.org/linux/man-pages/man2/syscall.2.html)
* \see [man 2 futex](http://man7.org/linux/man-pages/man2/futex.2.html)
* \see [man 7 futex](http://man7.org/linux/man-pages/man7/futex.7.html)
*/
futex__exposed int futex_wait_until(atomic_int *addr, int exp, struct timespec *time)
{
return syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, exp, time);
}
#endif /* __FUTEX_H_INCLUDED__ */