socket.c 11.3 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
7
#include <sys/poll.h>
8
#include <assert.h>
9
10
11
12
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
13
#include "processes/cleaner.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
14
15
#include "processes/dataTransmitter.h"
#include "processes/dataReceiver.h"
16
#include "processes/feedbackReceiver.h"
17
#include "socket.h"
18
#include "receiver.h"
19

20
PrrtSocket *PrrtSocket_create(const bool is_sender)
Andreas Schmidt's avatar
Andreas Schmidt committed
21
{
22
    assert(sizeof(float) == 4);
23
    PrrtSocket *sock_ptr = (PrrtSocket*) calloc(1, sizeof(PrrtSocket));
24
    check_mem(sock_ptr);
25

26

27
    sock_ptr->isSender = is_sender;
28

29
30
    sock_ptr->clock = PrrtClock_create();

Andreas Schmidt's avatar
Andreas Schmidt committed
31
    sock_ptr->isBound = false;
32

33
34
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
35

36
    sock_ptr->csi = PrrtChannelStateInformation_create();
37
    sock_ptr->applicationConstraints = PrrtApplicationConstraints_create();
38

39
40
    sock_ptr->packetTimeoutTable = PrrtPacketTimeoutTable_create();

41
    sock_ptr->dataPacketStore = PrrtDataPacketStore_create();
42

43
44
45
46
    int enabled = 1;

    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.");
    check(setsockopt(sock_ptr->dataSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)), "Socket option set failed.");
47
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
48
    check(setsockopt(sock_ptr->feedbackSocketFd, SOL_SOCKET, SO_BROADCAST, &enabled, sizeof(enabled)), "Socket option set failed.");
49

50
    pthread_mutexattr_t attr;
51
52
53
    check(pthread_mutexattr_init(&attr) == 0, "Mutex attr init failed.");
    check(pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) == 0, "Setting type failed.");
    check(pthread_mutex_init(&sock_ptr->closingMutex, &attr) == 0, "Mutex init failed.");
54

55
    if (is_sender) {
56
57
        check(pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL) == 0, "Cond init failed.");
58
        sock_ptr->outQueue = List_create();
59

60
        sock_ptr->receivers = List_create();
61
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
62
        sock_ptr->forwardPacketTable = PrrtForwardPacketTable_create();
63
        sock_ptr->repairBlockStore = PrrtRepairBlockStore_create();
64

65
66
        check(pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL) == 0, "Mutex init failed.");
        check(pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL) == 0, "Cond init failed.");
67
        sock_ptr->inQueue = List_create();
68
69
    }

70
    return sock_ptr;
71
72

    error:
73
        PrrtSocket_close(sock_ptr);
74
        return NULL;
75
76
}

Andreas Schmidt's avatar
Andreas Schmidt committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
bool PrrtSocket_bind(PrrtSocket *sock_ptr, const char* ipAddress, const uint16_t port) {
    check(port <= 65534, "Port %d cannot be bound to.", port);

    size_t size = sizeof(struct sockaddr_in);
    struct sockaddr_in* address = calloc(1, size);
    check_mem(address);

    address->sin_family = AF_INET;
    address->sin_addr.s_addr = inet_addr(ipAddress);
    address->sin_port = htons((uint16_t) (port + 1));
    sock_ptr->address = address;

    check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind feedback socket.");

    address->sin_port = htons((uint16_t) (port));

    check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) address, size) == EXIT_SUCCESS,
          "Cannot bind data socket.");

97
    if(sock_ptr->isSender) {
Andreas Schmidt's avatar
Andreas Schmidt committed
98
99
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
Andreas Schmidt's avatar
Andreas Schmidt committed
100
        check(pthread_create(&sock_ptr->sendDataThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
Andreas Schmidt's avatar
Andreas Schmidt committed
101
102
103
104
105
106
              "Cannot create send thread.");
    } else {
        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create data receiving thread.");
    }

107
    check(pthread_create(&sock_ptr->cleanupThread, NULL, cleanup, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create cleanup thread.");
108

Andreas Schmidt's avatar
Andreas Schmidt committed
109
110
111
112
113
114
    return true;
    error:
    PrrtSocket_close(sock_ptr);
    return false;
}

115
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
116
    PrrtReceiver *recv = PrrtReceiver_create(host, port);
117
    List_push(sock_ptr->receivers, recv);
118
119
120
    return 0;
}

121
int PrrtSocket_send(PrrtSocket *sock_ptr, const uint8_t *data, const size_t data_len) {
122
    check(sock_ptr->isSender, "Cannot send on receiver socket.")
123
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (prrtPacketLength_t) data_len, 0, PrrtApplicationConstraints_get_target_delay(sock_ptr->applicationConstraints));
124

Andreas Schmidt's avatar
Andreas Schmidt committed
125
    check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
126
    List_push(sock_ptr->outQueue, packet);
127
128
    check(pthread_cond_signal(&sock_ptr->outQueueFilledCv) == 0, "Signal failed.");
    check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed");
129

130
    return 0;
131
    error:
132
        PERROR("There was a failure while sending from socket.%s", "");
133
        return -1;
134
135
}

136
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
137
    check(sock_ptr->isSender == false, "Cannot receive on sender socket.")
138
    while (1) {
139
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
140
        while (List_count(sock_ptr->inQueue) == 0) {
141
            check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
142
            if (sock_ptr->closing) {
143
144
                check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
                check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
145
146
                return -1;
            }
147
148
            check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
            check(pthread_cond_wait(&sock_ptr->inQueueFilledCv, &sock_ptr->inQueueFilledMutex) == 0, "Wait failed.");
149
        }
150
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
151

152
        prrtPacketLength_t len = (prrtPacketLength_t) (packet->payloadLength - PRRT_PACKET_DATA_HEADER_SIZE);
153
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
154
155
        PrrtPacket_destroy(packet);

156
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed.");
157
        return len;
158
    }
159
    error:
160
        PERROR("There was a failure while receiving from socket.%s", "");
161
        return -1;
162
163
}

164
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
165
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
166
    sock_ptr->closing = true;
167
    check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
168

169
    void **res = NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
170
    if (sock_ptr->sendDataThread != 0) {
171
172
173
        check(pthread_mutex_lock(&sock_ptr->outQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->outQueueFilledCv) == 0, "Broadcast failed.");
        check(pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex) == 0, "Unlock failed.");
174

Andreas Schmidt's avatar
Andreas Schmidt committed
175
176
        check(pthread_join(sock_ptr->sendDataThread, res) == 0, "Join failed.");
        sock_ptr->sendDataThread = 0;
177
178
    }

179
    if (sock_ptr->receiveDataThread != 0) {
180
181
182
        check(pthread_mutex_lock(&sock_ptr->inQueueFilledMutex) == 0, "Lock failed.");
        check(pthread_cond_broadcast(&sock_ptr->inQueueFilledCv) == 0, "Broadcast failed");
        check(pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex) == 0, "Unlock failed");
183

184
185
        check(pthread_cancel(sock_ptr->receiveDataThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->receiveDataThread, res) == 0, "Join failed.");
186
187
188
189

        sock_ptr->receiveDataThread = 0;
    }

190
    if (sock_ptr->receiveFeedbackThread != 0) {
191
        check(pthread_join(sock_ptr->receiveFeedbackThread, res) == 0, "Join failed.");
192
193
194

        sock_ptr->receiveFeedbackThread = 0;
    }
195
196
197
198
199
200
201
202

    if(sock_ptr->cleanupThread != 0) {
        check(pthread_cancel(sock_ptr->cleanupThread) == 0, "Cancel failed.");
        check(pthread_join(sock_ptr->cleanupThread, res) == 0, "Join failed.");

        sock_ptr->cleanupThread = 0;
    }

Andreas Schmidt's avatar
Andreas Schmidt committed
203
    return EXIT_SUCCESS;
204
205
206

    error:
        return EXIT_FAILURE;
207
208
209
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
210
    debug("Closing socket.");
211
    check(pthread_mutex_lock(&sock_ptr->closingMutex) == 0, "Lock failed.");
212
    if (!sock_ptr->closing) {
213
214
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
        check(PrrtSocket_interrupt(sock_ptr) == EXIT_SUCCESS, "Interrupt failed.");
215
    } else {
216
        check(pthread_mutex_unlock(&sock_ptr->closingMutex) == 0, "Unlock failed.");
217
218
    }

219
220
221
    if(sock_ptr->dataPacketStore != NULL) {
        check(PrrtDataPacketStore_destroy(sock_ptr->dataPacketStore), "Destroy failed.");
        sock_ptr->dataPacketStore = NULL;
222
223
    }

224
225
226
    if(sock_ptr->repairBlockStore != NULL) {
        PrrtRepairBlockStore_destroy(sock_ptr->repairBlockStore);
        sock_ptr->repairBlockStore = NULL;
227
228
    }

229
    if (sock_ptr->receivers != NULL) {
230
        while (List_count(sock_ptr->receivers) > 0) {
Andreas Schmidt's avatar
Andreas Schmidt committed
231
            PrrtReceiver *recv = List_shift(sock_ptr->receivers);
232
            PrrtReceiver_destroy(recv);
233
234
        }
        List_destroy(sock_ptr->receivers);
235
        sock_ptr->receivers = NULL;
236
237
    }

238
    if (sock_ptr->outQueue != NULL) {
239
240
        check(pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->outQueueFilledCv) == 0, "Cond destroy failed.");
241
        List_destroy(sock_ptr->outQueue);
242
        sock_ptr->outQueue = NULL;
243
244
    }

245
    if (sock_ptr->inQueue != NULL) {
246
247
        check(pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex) == 0, "Mutex destroy failed.");
        check(pthread_cond_destroy(&sock_ptr->inQueueFilledCv) == 0, "Cond destroy failed.");
248
        List_destroy(sock_ptr->inQueue);
249
250
251
        sock_ptr->inQueue = NULL;
    }

252
    if (sock_ptr->forwardPacketTable != NULL) {
253
        check(PrrtForwardPacketTable_destroy(sock_ptr->forwardPacketTable), "Destroy failed.");
254
        sock_ptr->forwardPacketTable = NULL;
255
    }
256

257
258
259
260
261
    if(sock_ptr->packetTimeoutTable != NULL) {
        check(PrrtPacketTimeoutTable_destroy(sock_ptr->packetTimeoutTable), "Destroy failed.");
        sock_ptr->packetTimeoutTable = NULL;
    }

262
    check(pthread_mutex_destroy(&sock_ptr->closingMutex) == 0, "Mutex destroy failed.");
263

Andreas Schmidt's avatar
Andreas Schmidt committed
264
265
266
267
    if(sock_ptr->address != NULL) {
        free(sock_ptr->address);
    }

268
269
270
271
272
273
274
275
    if(sock_ptr->csi != NULL) {
        check(PrrtChannelStateInformation_destroy(sock_ptr->csi), "Could not destroy channel state information.")
    }

    if(sock_ptr->applicationConstraints) {
        check(PrrtApplicationConstraints_destroy(sock_ptr->applicationConstraints), "Could not destroy application constraints.")
    }

276
277
278
    if(sock_ptr->clock != NULL) {
        check(PrrtClock_destroy(sock_ptr->clock), "Destroy clock failed");
    }
279

280
281
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
282
    debug("Socket closed.");
283
    return 0;
284
285
286
287

    error:
        PERROR("Closing socket failed.%s", "");
        return -1;
288
289
}

290
291
292
bool PrrtSocket_set_sock_opt(PrrtSocket *sock_ptr, const char *name, const uint32_t value)
{
    if(strcmp(name, "targetdelay") == 0) {
293
        PrrtApplicationConstraints_set_target_delay(sock_ptr->applicationConstraints, value);
294
295
296
297
298
299
    } else {
        return false;
    }

    return true;
}