socket.c 11.8 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
7
#include <sys/poll.h>
8
9
10
11
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
13
14
#include "processes/feedbackReceiver.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
15
#include "socket.h"
16
#include "block.h"
17
#include "receiver.h"
18
#include "clock.h"
19

20
PrrtSocket *PrrtSocket_create(const bool is_sender)
Andreas Schmidt's avatar
Andreas Schmidt committed
21
{
22
    PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
23
    check_mem(sock_ptr);
24

25
    sock_ptr->isSender = is_sender;
Andreas Schmidt's avatar
Andreas Schmidt committed
26
    sock_ptr->isBound = false;
27

28
29
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
30

31
    sock_ptr->csi = PrrtChannelStateInformation_create();
32
    sock_ptr->applicationConstraints = PrrtApplicationConstraints_create();
33

34
35
    sock_ptr->dataStore = NULL;

36
37
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
38

39
    pthread_mutexattr_t attr;
40
41
42
    check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed.");
    check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed.");
43

44
    if (is_sender) {
45
46
        check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed.");
47
        sock_ptr->outQueue = List_create();
48

49
        sock_ptr->receivers = List_create();
50
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
51
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
52
        check_mem(sock_ptr->forwardPacketTable)
53
        check(PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable) == 0, "Creating table failed.");
54

55
56
        check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
57
        sock_ptr->inQueue = List_create();
58
59
    }

60
    return sock_ptr;
61
62

    error:
63
        PrrtSocket_close(sock_ptr);
64
        return NULL;
65
66
}

Andreas Schmidt's avatar
Andreas Schmidt committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t port) {
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
    struct sockaddr_in* address = calloc(1, size);
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
    sock_ptr->address = address;

    check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

    check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");

87
    if(sock_ptr->isSender) {
Andreas Schmidt's avatar
Andreas Schmidt committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create send thread.");
    } else {
        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create data receiving thread.");
    }

    return true;
    error:
    PrrtSocket_close(sock_ptr);
    return false;

}

104
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
105
    PrrtReceiver *recv = PrrtReceiver_create(host, port);
106
    List_push(sock_ptr->receivers, recv);
107
108
109
    return 0;
}

110
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
111
    check(sock_ptr->isSender, "Cannot send on receiver socket.")
112
    check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
113

114
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, 0);
115

116
    List_push(sock_ptr->outQueue, packet);
117
118
    check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
    check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
119

120
    return 0;
121
    error:
122
        PERROR("There was a failure while sending from socket.%s", "");
123
        return -1;
124
125
}

126
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
127
    check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
128
    while (1) {
129
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
130
        while (List_count(sock_ptr->inQueue) == 0) {
131
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
132
            if (sock_ptr->closing) {
133
134
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
135
136
                return -1;
            }
137
138
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
            check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed.");
139
        }
140
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
141

142
        prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
143
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
144
145
        PrrtPacket_destroy(packet);

146
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
147
        return len;
148
    }
149
    error:
150
        PERROR("There was a failure while receiving from socket.%s", "");
151
        return -1;
152
153
}

154
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
155
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
156
    sock_ptr->closing = true;
157
    check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
158

159
    void **res = NULL;
160
    if (sock_ptr->sendThread != 0) {
161
162
163
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed.");
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
164

165
        check(pthread_join(sock_ptr->sendThread, res) == 0, "Join failed.");
166
167
168
        sock_ptr->sendThread = 0;
    }

169
    if (sock_ptr->receiveDataThread != 0) {
170
171
172
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed");
173

174
175
        check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed.");
176
177
178
179

        sock_ptr->receiveDataThread = 0;
    }

180
    if (sock_ptr->receiveFeedbackThread != 0) {
181
182
        check(pthread_cancel(sock_ptr->receiveFeedbackThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed.");
183
184
185

        sock_ptr->receiveFeedbackThread = 0;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
186
    return EXIT_SUCCESS;
187
188
189
190

    error:
        PERROR("Interruping socket failed.%s","");
        return EXIT_FAILURE;
191
192
193
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
194
    debug("Closing socket.");
195
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
196
    if (!sock_ptr->closing) {
197
198
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
        check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed.");
199
    } else {
200
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
201
202
    }

203
204
205
206
207
    if (sock_ptr->dataStore != NULL) {
        List *dataList = List_create();
        BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1);
        while (List_count(dataList) > 0) {
            PrrtPacket *packet = List_shift(dataList);
208
209
210
211
212
            PrrtPacket_destroy(packet);
        }

        List_destroy(dataList);

213
214
215
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

216
217
218
219
220
    if (sock_ptr->blockStore != NULL) {
        List *blockList = List_create();
        BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
        while (List_count(blockList) > 0) {
            PrrtBlock *block = List_shift(blockList);
Andreas Schmidt's avatar
Andreas Schmidt committed
221
            PrrtBlock_destroy(block);
222
223
224
225
        }

        List_destroy(blockList);

226
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
227
228
    }

229
    if (sock_ptr->receivers != NULL) {
230
        while (List_count(sock_ptr->receivers) > 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
231
            PrrtReceiver *recv = List_shift(sock_ptr->receivers);
232
            PrrtReceiver_destroy(recv);
233
234
        }
        List_destroy(sock_ptr->receivers);
235
        sock_ptr->receivers = NULL;
236
237
    }

238
    if (sock_ptr->outQueue != NULL) {
239
240
        check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed.");
241
        List_destroy(sock_ptr->outQueue);
242
        sock_ptr->outQueue = NULL;
243
244
    }

245
    if (sock_ptr->inQueue != NULL) {
246
247
        check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed.");
248
        List_destroy(sock_ptr->inQueue);
249
250
251
        sock_ptr->inQueue = NULL;
    }

252
    if (sock_ptr->forwardPacketTable != NULL) {
253
        check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed.");
254
        sock_ptr->forwardPacketTable = NULL;
255
    }
256

257
    check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
258

Andreas Schmidt's avatar
Andreas Schmidt committed
259
260
261
262
    if(sock_ptr->address != NULL) {
        free(sock_ptr->address);
    }

263
    check(PrrtChannelStateInformation_destroy(sock_ptr->csi), "Could not destroy channel state information.")
264
    check(PrrtApplicationConstraints_destroy(sock_ptr->applicationConstraints), "Could not destroy application constraints.")
265

266
267
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
268
    debug("Socket closed.");
269
    return 0;
270
271
272
273

    error:
        PERROR("Closing socket failed.%s", "");
        return -1;
274
275
}

276
PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length)
277
278
{
    char bufin[MAX_PAYLOAD_LENGTH];
279
280
281
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
282
283
284

    struct pollfd fds;
    int timeout_msecs = 1000;
285
    fds.fd = prrtSocket->feedbackSocketFd;
286
287
288
289
290
291
292
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
    check(n >= 0, "Select failed.")
    if (n == 0) {
        return NULL;
    }
293
    prrtTimestamp_t receiveTime = PrrtClock_get_current_time();
294

295
    n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
296
    check(n >= 0, "Receiving feedback failed.");
297

298
299
300
    PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket));
    check_mem(prrtPacket);
    PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
301

Andreas Schmidt's avatar
Andreas Schmidt committed
302
303
    PrrtPacketFeedbackPayload* payload = prrtPacket->payload;
    struct in_addr a;
304
    a.s_addr = payload->receiverAddress;
Andreas Schmidt's avatar
Andreas Schmidt committed
305
306
    printf("%s\n", inet_ntoa(a));

307
    prrtTimestamp_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forwardTripTimestamp_us;
308
    PrrtChannelStateInformation_update_rtt(prrtSocket->csi, receiveTime - forwardTripTimestamp);
309

310
    return prrtPacket;
311
312

    error:
313
    return NULL;
314
}
315
316
317
318
319
320
321
322
323
324
325

bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value)
{
    if(strcmp(name, "targetdelay") == 0) {
        sock_ptr->applicationConstraints->targetDelay = value;
    } else {
        return false;
    }

    return true;
}