socket.c 8.51 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
7
8
9
10
11
12
13
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
14
#include "socket.h"
15
#include "block.h"
16

17

18
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
19
20
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
21

22
23
    sock_ptr->dataStore = NULL;

24
25
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
26

27
28
29
30
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&sock_ptr->closingMutex, &attr);
31

32
    if (is_sender) {
33
34
        // Bind Feedback Socket
        struct sockaddr_in address;
35
        memset((char *) &address, 0, sizeof(address));
36
37
38
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
39

40
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
41

42
43
44
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
45

46
47
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
48
49

        sock_ptr->receivers = List_create();
50
    } else {
51
52
        // Bind Data Socket
        struct sockaddr_in address;
53
        memset((char *) &address, 0, sizeof(address));
54
55
56
57
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

58
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
Andreas Schmidt's avatar
Andreas Schmidt committed
59
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
60
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
61
62
63
64
65
66

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
67
68
    }

69
70
71
72
73
74
75
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
76
77
}

78
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
79
80
81
    PrrtReceiver* recv = calloc(1, sizeof(PrrtReceiver));
    recv->host_name = host;
    recv->port = port;
82

83
    List_push(sock_ptr->receivers, recv);
84
85
86
    return 0;
}

87
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
88
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
89

Andreas Schmidt's avatar
Andreas Schmidt committed
90
    PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
91
    check_mem(packet);
92
    PrrtPacket_create_data_packet(packet, 5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
93

94
95
96
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
97

98
    return 0;
99
100
101

    error:
        return -1;
102
103
}

104
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
105
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
106
    while (1) {
107
        pthread_mutex_lock(&t);
108
        while (List_count(sock_ptr->inQueue) == 0) {
109
110
111
112
113
114
115
116
            pthread_mutex_lock(&sock_ptr->closingMutex);
            if(sock_ptr->closing) {
                // TODO: proper close
                pthread_mutex_unlock(&sock_ptr->closingMutex);
                pthread_mutex_unlock(&t);
                return -1;
            }
            pthread_mutex_unlock(&sock_ptr->closingMutex);
117
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
118
        }
119

120
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
121

122
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
123
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
124

125
126
        PrrtPacket_destroy(packet);

127
        pthread_mutex_unlock(&t);
128
        return len;
129
130
131
    }
}

132
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
133
134
135
136
    pthread_mutex_lock(&sock_ptr->closingMutex);
    sock_ptr->closing = TRUE;
    pthread_mutex_unlock(&sock_ptr->closingMutex);

137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    void **res = NULL;
    if(sock_ptr->sendThread != 0) {
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->outQueueFilledCv);
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);

        pthread_join(sock_ptr->sendThread, res);
        sock_ptr->sendThread = 0;
    }

    if(sock_ptr->receiveDataThread != 0) {
        pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
        pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);

        pthread_cancel(sock_ptr->receiveDataThread);
        pthread_join(sock_ptr->receiveDataThread, res);

        sock_ptr->receiveDataThread = 0;
    }

    if(sock_ptr->receiveFeedbackThread != 0) {
        pthread_cancel(sock_ptr->receiveFeedbackThread);
        pthread_join(sock_ptr->receiveFeedbackThread, res);

        sock_ptr->receiveFeedbackThread = 0;
    }
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
    pthread_mutex_lock(&sock_ptr->closingMutex);
    if(!sock_ptr->closing) {
        PrrtSocket_interrupt(sock_ptr);
    }
    pthread_mutex_unlock(&sock_ptr->closingMutex);

173
    if(sock_ptr->dataStore != NULL) {
174
175
176
177
178
179
180
181
182
        List* dataList = List_create();
        BPTree_get_range(sock_ptr->dataStore, dataList, 0, SEQNO_SPACE-1);
        while(List_count(dataList) > 0) {
            PrrtPacket* packet = List_shift(dataList);
            PrrtPacket_destroy(packet);
        }

        List_destroy(dataList);

183
184
185
186
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

    if(sock_ptr->blockStore != NULL) {
187
188
189
190
191
192
193
194
195
        List* blockList = List_create();
        BPTree_get_range(sock_ptr->blockStore, blockList, 0, SEQNO_SPACE-1);
        while(List_count(blockList) > 0) {
            PrrtBlock* block = List_shift(blockList);
            PrrtBlock_free(block);
        }

        List_destroy(blockList);

196
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
197
198
    }

199
    if(sock_ptr->receivers != NULL) {
200
        while (List_count(sock_ptr->receivers) > 0) {
201
202
203
            free(List_shift(sock_ptr->receivers));
        }
        List_destroy(sock_ptr->receivers);
204
        sock_ptr->receivers = NULL;
205
206
    }

207
208
209
210
    if(sock_ptr->outQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
211
        sock_ptr->outQueue = NULL;
212
213
214
215
216
217
    }

    if(sock_ptr->inQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
        List_destroy(sock_ptr->inQueue);
218
219
220
221
222
223
        sock_ptr->inQueue = NULL;
    }

    if(sock_ptr->forwardPacketTable != NULL ){
        free(sock_ptr->forwardPacketTable);
        sock_ptr->forwardPacketTable = NULL;
224
    }
225

226
227
    pthread_mutex_destroy(&sock_ptr->closingMutex);

228
229
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
230
231
232
    return 0;
}

233
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
234
235
236
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
237
238
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");
239

240
241
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
242
    debug("Received feedback %s:%d", remote_host, remote_port);
243

Andreas Schmidt's avatar
Andreas Schmidt committed
244
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
245
    check_mem(packet_ptr);
246
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
247
    return packet_ptr;
248
249
250

    error:
        return NULL;
251
}