socket.c 11.2 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
7
#include <sys/poll.h>
8
9
10
11
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
13
14
#include "processes/feedbackReceiver.h"
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
15
#include "socket.h"
16
#include "block.h"
17
#include "receiver.h"
18
#include "clock.h"
19

Andreas Schmidt's avatar
Andreas Schmidt committed
20
21
PrrtSocket *PrrtSocket_create(const uint8_t is_sender)
{
22
    PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
23
    check_mem(sock_ptr);
24

25
    sock_ptr->is_sender = is_sender;
Andreas Schmidt's avatar
Andreas Schmidt committed
26
    sock_ptr->isBound = false;
27

28
29
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
30

31
    PrrtChannelStateInformation_init(&sock_ptr->csi);
32

33
34
    sock_ptr->dataStore = NULL;

35
36
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
37

38
    pthread_mutexattr_t attr;
39
40
41
    check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed.");
    check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed.");
42

43
    if (is_sender) {
44
45
        check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed.");
46
        sock_ptr->outQueue = List_create();
47

48
        sock_ptr->receivers = List_create();
49
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
50
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
51
        check_mem(sock_ptr->forwardPacketTable)
52
        check(PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable) == 0, "Creating table failed.");
53

54
55
        check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
56
        sock_ptr->inQueue = List_create();
57
58
    }

59
    return sock_ptr;
60
61

    error:
62
        PrrtSocket_close(sock_ptr);
63
        return NULL;
64
65
}

Andreas Schmidt's avatar
Andreas Schmidt committed
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t port) {
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
    struct sockaddr_in* address = calloc(1, size);
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
    sock_ptr->address = address;

    check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

    check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");

    if(sock_ptr->is_sender) {
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create send thread.");
    } else {
        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create data receiving thread.");
    }

    return true;
    error:
    PrrtSocket_close(sock_ptr);
    return false;

}

103
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
104
    PrrtReceiver *recv = PrrtReceiver_create(host, port);
105
    List_push(sock_ptr->receivers, recv);
106
107
108
    return 0;
}

109
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
110
    check(sock_ptr->is_sender, "Cannot send on receiver socket.")
111
    check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
112

113
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, 0);
114

115
    List_push(sock_ptr->outQueue, packet);
116
117
    check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
    check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
118

119
    return 0;
120
    error:
121
        PERROR("There was a failure while sending from socket.%s", "");
122
        return -1;
123
124
}

125
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
126
    check(sock_ptr->is_sender == false, "Cannot receive on sender socket.")
127
    while (1) {
128
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
129
        while (List_count(sock_ptr->inQueue) == 0) {
130
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
131
            if (sock_ptr->closing) {
132
133
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
134
135
                return -1;
            }
136
137
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
            check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed.");
138
        }
139
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
140

141
        uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
142
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
143
144
        PrrtPacket_destroy(packet);

145
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
146
        return len;
147
    }
148
    error:
149
        PERROR("There was a failure while receiving from socket.%s", "");
150
        return -1;
151
152
}

153
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
154
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
155
    sock_ptr->closing = true;
156
    check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
157

158
    void **res = NULL;
159
    if (sock_ptr->sendThread != 0) {
160
161
162
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed.");
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
163

164
        check(pthread_join(sock_ptr->sendThread, res) == 0, "Join failed.");
165
166
167
        sock_ptr->sendThread = 0;
    }

168
    if (sock_ptr->receiveDataThread != 0) {
169
170
171
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed");
172

173
174
        check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed.");
175
176
177
178

        sock_ptr->receiveDataThread = 0;
    }

179
    if (sock_ptr->receiveFeedbackThread != 0) {
180
181
        check(pthread_cancel(sock_ptr->receiveFeedbackThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed.");
182
183
184

        sock_ptr->receiveFeedbackThread = 0;
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
185
    return EXIT_SUCCESS;
186
187
188
189

    error:
        PERROR("Interruping socket failed.%s","");
        return EXIT_FAILURE;
190
191
192
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
193
    debug("Closing socket.");
194
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
195
    if (!sock_ptr->closing) {
196
197
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
        check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed.");
198
    } else {
199
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
200
201
    }

202
203
204
205
206
    if (sock_ptr->dataStore != NULL) {
        List *dataList = List_create();
        BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1);
        while (List_count(dataList) > 0) {
            PrrtPacket *packet = List_shift(dataList);
207
208
209
210
211
            PrrtPacket_destroy(packet);
        }

        List_destroy(dataList);

212
213
214
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

215
216
217
218
219
    if (sock_ptr->blockStore != NULL) {
        List *blockList = List_create();
        BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
        while (List_count(blockList) > 0) {
            PrrtBlock *block = List_shift(blockList);
Andreas Schmidt's avatar
Andreas Schmidt committed
220
            PrrtBlock_destroy(block);
221
222
223
224
        }

        List_destroy(blockList);

225
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
226
227
    }

228
    if (sock_ptr->receivers != NULL) {
229
        while (List_count(sock_ptr->receivers) > 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
230
            PrrtReceiver *recv = List_shift(sock_ptr->receivers);
231
            PrrtReceiver_destroy(recv);
232
233
        }
        List_destroy(sock_ptr->receivers);
234
        sock_ptr->receivers = NULL;
235
236
    }

237
    if (sock_ptr->outQueue != NULL) {
238
239
        check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed.");
240
        List_destroy(sock_ptr->outQueue);
241
        sock_ptr->outQueue = NULL;
242
243
    }

244
    if (sock_ptr->inQueue != NULL) {
245
246
        check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed.");
247
        List_destroy(sock_ptr->inQueue);
248
249
250
        sock_ptr->inQueue = NULL;
    }

251
    if (sock_ptr->forwardPacketTable != NULL) {
252
        check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed.");
253
        sock_ptr->forwardPacketTable = NULL;
254
    }
255

256
    check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
257

Andreas Schmidt's avatar
Andreas Schmidt committed
258
259
260
261
    if(sock_ptr->address != NULL) {
        free(sock_ptr->address);
    }

262
263
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
264
    debug("Socket closed.");
265
    return 0;
266
267
268
269

    error:
        PERROR("Closing socket failed.%s", "");
        return -1;
270
271
}

272
PrrtPacket *PrrtSocket_recv_feedback(PrrtSocket *prrtSocket, const size_t length)
273
274
{
    char bufin[MAX_PAYLOAD_LENGTH];
275
276
277
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
278
279
280

    struct pollfd fds;
    int timeout_msecs = 1000;
281
    fds.fd = prrtSocket->feedbackSocketFd;
282
283
284
285
286
287
288
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
    check(n >= 0, "Select failed.")
    if (n == 0) {
        return NULL;
    }
289
    uint32_t receiveTime = PrrtClock_get_current_time();
290

291
    n = recvfrom(prrtSocket->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
292
    check(n >= 0, "Receiving feedback failed.");
293

294
295
296
    PrrtPacket *prrtPacket = calloc(1, sizeof(PrrtPacket));
    check_mem(prrtPacket);
    PrrtPacket_decode(bufin, (uint16_t) n, prrtPacket);
297

Andreas Schmidt's avatar
Andreas Schmidt committed
298
299
300
301
302
    PrrtPacketFeedbackPayload* payload = prrtPacket->payload;
    struct in_addr a;
    a.s_addr = payload->receiver_addr;
    printf("%s\n", inet_ntoa(a));

303
304
    uint32_t forwardTripTimestamp = ((PrrtPacketFeedbackPayload*) prrtPacket->payload)->forward_trip_timestamp;
    PrrtChannelStateInformation_update_rtt(&prrtSocket->csi, receiveTime - forwardTripTimestamp);
305

306
    return prrtPacket;
307
308

    error:
309
    return NULL;
310
}