socket.c 7.96 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
7
8
9
10
11
12
13
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
14
#include "socket.h"
15

16

17
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
18
19
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
20

21
22
    sock_ptr->dataStore = NULL;

23
24
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
25

26
27
28
29
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&sock_ptr->closingMutex, &attr);
30

31
    if (is_sender) {
32
33
        // Bind Feedback Socket
        struct sockaddr_in address;
34
        memset((char *) &address, 0, sizeof(address));
35
36
37
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
38

39
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
40

41
42
43
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
44

45
46
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
47
48

        sock_ptr->receivers = List_create();
49
    } else {
50
51
        // Bind Data Socket
        struct sockaddr_in address;
52
        memset((char *) &address, 0, sizeof(address));
53
54
55
56
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

57
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
Andreas Schmidt's avatar
Andreas Schmidt committed
58
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
59
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
60
61
62
63
64
65

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
66
67
    }

68
69
70
71
72
73
74
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
75
76
}

77
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
78
79
80
    PrrtReceiver* recv = calloc(1, sizeof(PrrtReceiver));
    recv->host_name = host;
    recv->port = port;
81

82
    List_push(sock_ptr->receivers, recv);
83
84
85
    return 0;
}

86
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
87
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
88

Andreas Schmidt's avatar
Andreas Schmidt committed
89
    PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
90
    check_mem(packet);
91
    PrrtPacket_create_data_packet(packet, 5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
92

93
94
95
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
96

97
    return 0;
98
99
100

    error:
        return -1;
101
102
}

103
int32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
104
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
105
    while (1) {
106
        pthread_mutex_lock(&t);
107
        while (List_count(sock_ptr->inQueue) == 0) {
108
109
110
111
112
113
114
115
116
            pthread_mutex_lock(&sock_ptr->closingMutex);
            if(sock_ptr->closing) {
                // TODO: proper close
                printf("CLOSE PROCESS");
                pthread_mutex_unlock(&sock_ptr->closingMutex);
                pthread_mutex_unlock(&t);
                return -1;
            }
            pthread_mutex_unlock(&sock_ptr->closingMutex);
117
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
118
        }
119

120
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
121

122
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
123
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
124

125
126
        PrrtPacket_destroy(packet);

127
        pthread_mutex_unlock(&t);
128
        return len;
129
130
131
    }
}

132
int PrrtSocket_interrupt(PrrtSocket *sock_ptr) {
133
134
135
136
    pthread_mutex_lock(&sock_ptr->closingMutex);
    sock_ptr->closing = TRUE;
    pthread_mutex_unlock(&sock_ptr->closingMutex);

137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    void **res = NULL;
    if(sock_ptr->sendThread != 0) {
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->outQueueFilledCv);
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);

        pthread_join(sock_ptr->sendThread, res);
        sock_ptr->sendThread = 0;
    }

    if(sock_ptr->receiveDataThread != 0) {
        pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->inQueueFilledMutexCv);
        pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);

        pthread_cancel(sock_ptr->receiveDataThread);
        pthread_join(sock_ptr->receiveDataThread, res);

        sock_ptr->receiveDataThread = 0;
    }

    if(sock_ptr->receiveFeedbackThread != 0) {
        pthread_cancel(sock_ptr->receiveFeedbackThread);
        pthread_join(sock_ptr->receiveFeedbackThread, res);

        sock_ptr->receiveFeedbackThread = 0;
    }
}

int PrrtSocket_close(PrrtSocket *sock_ptr) {
    pthread_mutex_lock(&sock_ptr->closingMutex);
    if(!sock_ptr->closing) {
        PrrtSocket_interrupt(sock_ptr);
    }
    pthread_mutex_unlock(&sock_ptr->closingMutex);

173
    if(sock_ptr->dataStore != NULL) {
174
175
176
177
178
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

    if(sock_ptr->blockStore != NULL) {
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
179
180
    }

181
    if(sock_ptr->receivers != NULL) {
182
        while (List_count(sock_ptr->receivers) > 0) {
183
184
185
            free(List_shift(sock_ptr->receivers));
        }
        List_destroy(sock_ptr->receivers);
186
        sock_ptr->receivers = NULL;
187
188
    }

189
190
191
192
    if(sock_ptr->outQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
193
        sock_ptr->outQueue = NULL;
194
195
196
197
198
199
    }

    if(sock_ptr->inQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
        List_destroy(sock_ptr->inQueue);
200
201
202
203
204
205
        sock_ptr->inQueue = NULL;
    }

    if(sock_ptr->forwardPacketTable != NULL ){
        free(sock_ptr->forwardPacketTable);
        sock_ptr->forwardPacketTable = NULL;
206
    }
207

208
209
    pthread_mutex_destroy(&sock_ptr->closingMutex);

210
211
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
212
213
214
    return 0;
}

215
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
216
217
218
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
219
220
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");
221

222
223
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
224
    debug("Received feedback %s:%d", remote_host, remote_port);
225

Andreas Schmidt's avatar
Andreas Schmidt committed
226
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
227
    check_mem(packet_ptr);
228
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
229
    return packet_ptr;
230
231
232

    error:
        return NULL;
233
}