socket.c 6.59 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
7
8
9
10
11
12
13
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
14
#include "socket.h"
15

16

17
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
18
19
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
20

21
22
    sock_ptr->dataStore = NULL;

23
24
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
25

26
27
    pthread_mutex_init(&sock_ptr->closingMutex, NULL);

28
    if (is_sender) {
29
30
        // Bind Feedback Socket
        struct sockaddr_in address;
31
        memset((char *) &address, 0, sizeof(address));
32
33
34
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
35

36
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
37

38
39
40
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
41

42
43
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
44
45

        sock_ptr->receivers = List_create();
46
    } else {
47
48
        // Bind Data Socket
        struct sockaddr_in address;
49
        memset((char *) &address, 0, sizeof(address));
50
51
52
53
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

54
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
Andreas Schmidt's avatar
Andreas Schmidt committed
55
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
56
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
57
58
59
60
61
62

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
63
64
    }

65
66
67
68
69
70
71
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
72
73
}

74
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
75
76
77
    PrrtReceiver* recv = calloc(1, sizeof(PrrtReceiver));
    recv->host_name = host;
    recv->port = port;
78

79
    List_push(sock_ptr->receivers, recv);
80
81
82
    return 0;
}

83
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
84
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
85

Andreas Schmidt's avatar
Andreas Schmidt committed
86
    PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
87
    check_mem(packet);
88
    PrrtPacket_create_data_packet(packet, 5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
89

90
91
92
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
93

94
    return 0;
95
96
97

    error:
        return -1;
98
99
}

100
uint32_t PrrtSocket_recv(PrrtSocket *sock_ptr, void *buf_ptr) {
101
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
102
    while (1) {
103
        pthread_mutex_lock(&t);
104
        while (List_count(sock_ptr->inQueue) == 0) {
105
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
106
        }
107

108
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
109

110
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
111
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
112

113
114
        PrrtPacket_destroy(packet);

115
        pthread_mutex_unlock(&t);
116
        return len;
117
118
119
    }
}

120
int PrrtSocket_close(PrrtSocket *sock_ptr) {
121
122
123
124
    pthread_mutex_lock(&sock_ptr->closingMutex);
    sock_ptr->closing = TRUE;
    pthread_mutex_unlock(&sock_ptr->closingMutex);

125
    if(sock_ptr->dataStore != NULL) {
126
127
128
129
130
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

    if(sock_ptr->blockStore != NULL) {
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
131
132
    }

133
    if(sock_ptr->receivers != NULL) {
134
        while (List_count(sock_ptr->receivers) > 0) {
135
136
137
138
            free(List_shift(sock_ptr->receivers));
        }
        List_destroy(sock_ptr->receivers);
    }
139

140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    void **res = NULL;
    if(sock_ptr->sendThread != 0) {
        pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
        pthread_cond_signal(&sock_ptr->outQueueFilledCv);
        pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);

        pthread_join(sock_ptr->sendThread, res);
    }

    // TODO: shut down receiver threads
    
    if(sock_ptr->receiveFeedbackThread != 0) {
        pthread_cancel(sock_ptr->receiveFeedbackThread);
        pthread_join(sock_ptr->receiveFeedbackThread, res);
    }

156
157
158
159
160
161
162
163
164
165
166
    if(sock_ptr->outQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
    }

    if(sock_ptr->inQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
        List_destroy(sock_ptr->inQueue);
    }
167

168
169
    pthread_mutex_destroy(&sock_ptr->closingMutex);

170
171
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
172
173
174
    return 0;
}

175
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
176
177
178
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
179
180
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
    check(n >= 0, "Receiving feedback failed.");
181

182
183
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
184
    debug("Received feedback %s:%d", remote_host, remote_port);
185

Andreas Schmidt's avatar
Andreas Schmidt committed
186
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
187
    check_mem(packet_ptr);
188
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
189
    return packet_ptr;
190
191
192

    error:
        return NULL;
193
}