socket.c 9.16 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
7
#include <sys/poll.h>
8
9
10
11
12
13
14
#include "../defines.h"
#include "packet.h"
#include "../util/dbg.h"
#include "../util/common.h"
#include "processes/feedback_receiver.h"
#include "processes/data_transmitter.h"
#include "processes/data_receiver.h"
15
#include "socket.h"
16
#include "block.h"
17

18
19
20
21
PrrtSocket * PrrtSocket_create(uint16_t port, uint8_t is_sender) {
    PrrtSocket* sock_ptr = calloc(1, sizeof(PrrtSocket));
    check_mem(sock_ptr);
    sock_ptr->is_sender = is_sender;
22
23
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
24

25
26
    sock_ptr->dataStore = NULL;

27
28
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
29

30
31
32
33
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&sock_ptr->closingMutex, &attr);
34

35
36
37
38
39
40
41
42
43
44
    // Bind Data Socket
    struct sockaddr_in address;
    memset((char *) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

    check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
          "Cannot bind data socket.");

45
    if (is_sender) {
46
47
        // Bind Feedback Socket
        struct sockaddr_in address;
48
        memset((char *) &address, 0, sizeof(address));
49
50
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
51
52
        check(port <= 65534, "Port %d cannot be bound to.", port);
        address.sin_port = htons((uint16_t) (port + 1));
53

54
55
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS,
              "Cannot bind feedback socket.");
56

57
58
59
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
60

61
62
63
64
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) ==
              EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create send thread.");
65
66

        sock_ptr->receivers = List_create();
67
    } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
68
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
69
        check_mem(sock_ptr->forwardPacketTable)
70
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
71
72

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
73
        pthread_cond_init(&sock_ptr->inQueueFilledCv, NULL);
74
75
        sock_ptr->inQueue = List_create();

76
77
        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS,
              "Cannot create data receiving thread.");
78
79
    }

80
    return sock_ptr;
81
82

    error:
83
        PrrtSocket_close(sock_ptr);
84
        return NULL;
85
86
}

87
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
88
    PrrtReceiver *recv = calloc(1, sizeof(PrrtReceiver));
89
90
    recv->host_name = host;
    recv->port = port;
91

92
    List_push(sock_ptr->receivers, recv);
93
94
95
    return 0;
}

96
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
97
    check(sock_ptr->is_sender, "Cannot send on receiver socket.")
98
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
99

Andreas Schmidt's avatar
Andreas Schmidt committed
100
    PrrtPacket *packet = PrrtPacket_create_data_packet(5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
101

102
103
104
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
105

106
    return 0;
107
108
    error:
        return -1;
109
110
}

111
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
112
    check(sock_ptr->is_sender == FALSE, "Cannot receive on sender socket.")
113
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
114
    while (1) {
115
        pthread_mutex_lock(&t);
116
        while (List_count(sock_ptr->inQueue) == 0) {
117
            pthread_mutex_lock(&sock_ptr->closingMutex);
118
            if (sock_ptr->closing) {
119
120
121
122
123
                pthread_mutex_unlock(&sock_ptr->closingMutex);
                pthread_mutex_unlock(&t);
                return -1;
            }
            pthread_mutex_unlock(&sock_ptr->closingMutex);
124
            pthread_cond_wait(&sock_ptr->inQueueFilledCv, &t);
125
        }
126
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
127

128
        uint32_t len = (uint32_t) (packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
129
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
130

131
132
        PrrtPacket_destroy(packet);

133
        pthread_mutex_unlock(&t);
134
        return len;
135
    }
136
    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
137
        PERROR("Wrong socket type.%s", "");
138
        return -1;
139
140
}

141
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
142
143
144
145
    pthread_mutex_lock(&sock_ptr->closingMutex);
    sock_ptr->closing = TRUE;
    pthread_mutex_unlock(&sock_ptr->closingMutex);

146
    void **res = NULL;
147
    if (sock_ptr->sendThread != 0) {
148
149
150
151
152
153
154
155
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->outQueueFilledCv);
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);

        pthread_join(sock_ptr->sendThread, res);
        sock_ptr->sendThread = 0;
    }

156
    if (sock_ptr->receiveDataThread != 0) {
157
        pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
158
        pthread_cond_signal(&sock_ptr->inQueueFilledCv);
159
160
        pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);

161
        pthread_cancel(sock_ptr->receiveDataThread);
162
163
164
165
166
        pthread_join(sock_ptr->receiveDataThread, res);

        sock_ptr->receiveDataThread = 0;
    }

167
    if (sock_ptr->receiveFeedbackThread != 0) {
168
169
170
171
172
        pthread_cancel(sock_ptr->receiveFeedbackThread);
        pthread_join(sock_ptr->receiveFeedbackThread, res);

        sock_ptr->receiveFeedbackThread = 0;
    }
173
    debug("Interrupted all threads.");
Andreas Schmidt's avatar
Andreas Schmidt committed
174
    return EXIT_SUCCESS;
175
176
177
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
178
    debug("Closing socket.");
179
    pthread_mutex_lock(&sock_ptr->closingMutex);
180
181
    if (!sock_ptr->closing) {
        pthread_mutex_unlock(&sock_ptr->closingMutex);
182
        PrrtSocket_interrupt(sock_ptr);
183
184
    } else {
        pthread_mutex_unlock(&sock_ptr->closingMutex);
185
186
    }

187
188
189
190
191
    if (sock_ptr->dataStore != NULL) {
        List *dataList = List_create();
        BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE - 1);
        while (List_count(dataList) > 0) {
            PrrtPacket *packet = List_shift(dataList);
192
193
194
195
196
            PrrtPacket_destroy(packet);
        }

        List_destroy(dataList);

197
198
199
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

200
201
202
203
204
    if (sock_ptr->blockStore != NULL) {
        List *blockList = List_create();
        BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE - 1);
        while (List_count(blockList) > 0) {
            PrrtBlock *block = List_shift(blockList);
Andreas Schmidt's avatar
Andreas Schmidt committed
205
            PrrtBlock_destroy(block);
206
207
208
209
        }

        List_destroy(blockList);

210
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
211
212
    }

213
    if (sock_ptr->receivers != NULL) {
214
        while (List_count(sock_ptr->receivers) > 0) {
215
216
217
            free(List_shift(sock_ptr->receivers));
        }
        List_destroy(sock_ptr->receivers);
218
        sock_ptr->receivers = NULL;
219
220
    }

221
    if (sock_ptr->outQueue != NULL) {
222
223
224
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
225
        sock_ptr->outQueue = NULL;
226
227
    }

228
    if (sock_ptr->inQueue != NULL) {
229
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
230
        pthread_cond_destroy(&sock_ptr->inQueueFilledCv);
231
        List_destroy(sock_ptr->inQueue);
232
233
234
        sock_ptr->inQueue = NULL;
    }

235
    if (sock_ptr->forwardPacketTable != NULL) {
236
237
        free(sock_ptr->forwardPacketTable);
        sock_ptr->forwardPacketTable = NULL;
238
    }
239

240
241
    pthread_mutex_destroy(&sock_ptr->closingMutex);

242
243
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
244
    debug("Socket closed.");
245
246
247
    return 0;
}

248
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
249
250
251
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
252
253
254
255
256
257
258
259
260
261
262
263

    struct pollfd fds;
    int timeout_msecs = 1000;
    fds.fd = sock_ptr->feedbackSocketFd;
    fds.events = POLLIN;

    n = poll(&fds, 1, timeout_msecs);
    check(n >= 0, "Select failed.")
    if (n == 0) {
        return NULL;
    }

264
265
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");
266

267
268
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
269
    debug("Received feedback %s:%d", remote_host, remote_port);
270

Andreas Schmidt's avatar
Andreas Schmidt committed
271
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
272
    check_mem(packet_ptr);
273
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
274
    return packet_ptr;
275
276

    error:
277
    return NULL;
278
}