socket.c 11.2 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
7
#include <sys/poll.h>
8
9
10
11
12
13
14
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "processes/feedback_receiver.h"
#include "processes/data_transmitter.h"
#include "processes/data_receiver.h"
15
#include "socket.h"
16
#include "block.h"
17
#include "receiver.h"
18
#include "clock.h"
19

20
21
PrrtSocket* PrrtSocket_create(const uint16_t port, const uint8_t is_sender) {
    PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
22
    check_mem(sock_ptr);
23

24
    sock_ptr->is_sender = is_sender;
25

26
27
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
28

29
30
31
    sock_ptr->rttMean = 0;
    sock_ptr->rttDev = 0;

32
33
    sock_ptr->dataStore = NULL;

34
35
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
36

37
    pthread_mutexattr_t attr;
38
39
40
    check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed.");
    check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed.");
41

42
43
44
45
46
47
48
49
50
51
    // Bind Data Socket
    struct sockaddr_in address;
    memset((char *) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

    check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
          "Cannot bind data socket.");

52
    if (is_sender) {
53
54
        // Bind Feedback Socket
        struct sockaddr_in address;
55
        memset((char *) &address, 0, sizeof(address));
56
57
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
58
59
        check(port <= 65534, "Port %d cannot be bound to.", port);
        address.sin_port = htons((uint16_t) (port + 1));
60

61
62
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
              "Cannot bind feedback socket.");
63

64
65
        check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed.");
66
        sock_ptr->outQueue = List_create();
67

68
69
70
71
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create send thread.");
72
73

        sock_ptr->receivers = List_create();
74
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
75
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
76
        check_mem(sock_ptr->forwardPacketTable)
77
        check(PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable) == 0, "Creating table failed.");
78

79
80
        check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
81
82
        sock_ptr->inQueue = List_create();

83
84
        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create data receiving thread.");
85
86
    }

87
    return sock_ptr;
88
89

    error:
90
        PrrtSocket_close(sock_ptr);
91
        return NULL;
92
93
}

94
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
95
    PrrtReceiver *recv = PrrtReceiver_create(host, port);
96
    List_push(sock_ptr->receivers, recv);
97
98
99
    return 0;
}

100
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
101
    check(sock_ptr->is_sender, "Cannot send on receiver socket.")
102
    check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
103

104
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
105

106
    List_push(sock_ptr->outQueue, packet);
107
108
    check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
    check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
109

110
    return 0;
111
    error:
112
        PERROR("There was a failure while sending from socket.%s", "");
113
        return -1;
114
115
}

116
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
117
    check(sock_ptr->is_sender == false, "Cannot receive on sender socket.")
118
    while (1) {
119
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
120
        while (List_count(sock_ptr->inQueue) == 0) {
121
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
122
            if (sock_ptr->closing) {
123
124
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
125
126
                return -1;
            }
127
128
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
            check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed.");
129
        }
130
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
131

132
        uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
133
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
134
135
        PrrtPacket_destroy(packet);

136
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
137
        return len;
138
    }
139
    error:
140
        PERROR("There was a failure while receiving from socket.%s", "");
141
        return -1;
142
143
}

144
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
145
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
146
    sock_ptr->closing = true;
147
    check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
148

149
    void **res = NULL;
150
    if (sock_ptr->sendThread != 0) {
151
152
153
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed.");
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
154

155
        check(pthread_join(sock_ptr->sendThread, res) == 0, "Join failed.");
156
157
158
        sock_ptr->sendThread = 0;
    }

159
    if (sock_ptr->receiveDataThread != 0) {
160
161
162
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed");
163

164
165
        check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed.");
166
167
168
169

        sock_ptr->receiveDataThread = 0;
    }

170
    if (sock_ptr->receiveFeedbackThread != 0) {
171
172
        check(pthread_cancel(sock_ptr->receiveFeedbackThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed.");
173
174
175

        sock_ptr->receiveFeedbackThread = 0;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
176
    return EXIT_SUCCESS;
177
178
179
180

    error:
        PERROR("Interruping socket failed.%s","");
        return EXIT_FAILURE;
181
182
183
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
184
    debug("Closing socket.");
185
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
186
    if (!sock_ptr->closing) {
187
188
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
        check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed.");
189
    } else {
190
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
191
192
    }

193
194
195
196
197
    if (sock_ptr->dataStore != NULL) {
        List *dataList = List_create();
        BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1);
        while (List_count(dataList) > 0) {
            PrrtPacket *packet = List_shift(dataList);
198
199
200
201
202
            PrrtPacket_destroy(packet);
        }

        List_destroy(dataList);

203
204
205
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

206
207
208
209
210
    if (sock_ptr->blockStore != NULL) {
        List *blockList = List_create();
        BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
        while (List_count(blockList) > 0) {
            PrrtBlock *block = List_shift(blockList);
Andreas Schmidt's avatar
Andreas Schmidt committed
211
            PrrtBlock_destroy(block);
212
213
214
215
        }

        List_destroy(blockList);

216
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
217
218
    }

219
    if (sock_ptr->receivers != NULL) {
220
        while (List_count(sock_ptr->receivers) > 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
221
            PrrtReceiver *recv = List_shift(sock_ptr->receivers);
222
            PrrtReceiver_destroy(recv);
223
224
        }
        List_destroy(sock_ptr->receivers);
225
        sock_ptr->receivers = NULL;
226
227
    }

228
    if (sock_ptr->outQueue != NULL) {
229
230
        check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed.");
231
        List_destroy(sock_ptr->outQueue);
232
        sock_ptr->outQueue = NULL;
233
234
    }

235
    if (sock_ptr->inQueue != NULL) {
236
237
        check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed.");
238
        List_destroy(sock_ptr->inQueue);
239
240
241
        sock_ptr->inQueue = NULL;
    }

242
    if (sock_ptr->forwardPacketTable != NULL) {
243
        check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed.");
244
        sock_ptr->forwardPacketTable = NULL;
245
    }
246

247
    check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
248

249
250
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
251
    debug("Socket closed.");
252
    return 0;
253
254
255
256

    error:
        PERROR("Closing socket failed.%s", "");
        return -1;
257
258
}

259
PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *sock_ptr, const size_t length)
260
261
{
    char bufin[MAX_PAYLOAD_LENGTH];
262
263
264
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
265
266
267
268
269
270
271
272
273
274
275

    struct pollfd fds;
    int timeout_msecs = 1000;
    fds.fd = sock_ptr->feedbackSocketFd;
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
    check(n >= 0, "Select failed.")
    if (n == 0) {
        return NULL;
    }
276
    uint32_t receive_time = PrrtClock_get_current_time();
277

278
279
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");
280

Andreas Schmidt's avatar
Andreas Schmidt committed
281
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
282
    check_mem(packet_ptr);
283
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
284
285

    PrrtPacketFeedbackPayload* feedbackPayload = packet_ptr->payload;
286
287
288
289
    int32_t delta = (receive_time - feedbackPayload->forward_trip_timestamp) - sock_ptr->rttMean;
    // TODO: ensure that there are no arithemtic problems via rounding etc.
    sock_ptr->rttMean = (uint32_t) (sock_ptr->rttMean + RRT_ALPHA * delta);
    sock_ptr->rttDev = (uint32_t) (sock_ptr->rttDev + RRT_ALPHA * (labs(delta) - sock_ptr->rttDev));
290

291
    return packet_ptr;
292
293

    error:
294
    return NULL;
295
}