socket.c 5.83 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
7
8
9
10
11
12
13
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
14
#include "socket.h"
15

16

17
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
18
19
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
20

21
22
    sock_ptr->dataStore = NULL;

23
24
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
25

26
    if (is_sender) {
27
28
        // Bind Feedback Socket
        struct sockaddr_in address;
29
        memset((char *) &address, 0, sizeof(address));
30
31
32
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
33

34
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
35

36
37
38
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
39

40
41
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
42
    } else {
43
44
        // Bind Data Socket
        struct sockaddr_in address;
45
        memset((char *) &address, 0, sizeof(address));
46
47
48
49
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

50
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
Andreas Schmidt's avatar
Andreas Schmidt committed
51
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
52
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
53
54
55
56
57
58

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
59
60
    }

61
62
63
64
65
66
67
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
68
69
}

70
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
71
    PrrtReceiver recv = {host, port};
72

73
74
75
    if (sock_ptr->receiverLength < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiverLength] = recv;
        sock_ptr->receiverLength++;
76
77
78
    } else {
        return -1;
    }
79
80
81
82

    return 0;
}

83
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
84
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
85

Andreas Schmidt's avatar
Andreas Schmidt committed
86
    PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
87
    check_mem(packet);
88
    PrrtPacket_create_data_packet(packet, 5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
89

90
91
92
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
93

94
    return 0;
95
96
97

    error:
        return -1;
98
99
}

100
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
101
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
102
    while (1) {
103
        pthread_mutex_lock(&t);
104
        while (List_count(sock_ptr->inQueue) == 0) {
105
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
106
        }
107

108
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
109

110
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
111
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
112

113
114
        PrrtPacket_destroy(packet);

115
        pthread_mutex_unlock(&t);
116
        return len;
117
118
119
    }
}

120
int PrrtSocket_close(PrrtSocket *sock_ptr) {
121
    if(sock_ptr->dataStore != NULL) {
122
123
124
125
126
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

    if(sock_ptr->blockStore != NULL) {
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
127
128
    }

129
130
131
    void **res = NULL;
    pthread_join(sock_ptr->sendThread, res);

132
133
    // TODO: shut down threads;

134
135
    // TODO: clean up all receivers

136
137
138
139
140
141
142
143
144
145
146
    if(sock_ptr->outQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
    }

    if(sock_ptr->inQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
        List_destroy(sock_ptr->inQueue);
    }
147

148
149
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
150
151
152
    return 0;
}

153
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
154
155
156
157
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

158
159
    check(n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen) >= 0, "Receiving feedback failed.");

160
161
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
162
    debug("Received feedback %s:%d", remote_host, remote_port);
163

Andreas Schmidt's avatar
Andreas Schmidt committed
164
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
165
    check_mem(packet_ptr);
166
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
167
    return packet_ptr;
168
169
170

    error:
        return NULL;
171
}