socket.c 5.96 KB
Newer Older
1
2
3
4
5
6
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
7
8
9
10
11
12
13
#include <src/defines.h>
#include <src/prrt/packet.h>
#include <src/prrt/socket.h>
#include <src/prrt/processes/feedback_receiver.h>
#include <src/prrt/processes/data_transmitter.h>
#include <src/prrt/processes/data_receiver.h>
#include <src/util/dbg.h>
14
#include "socket.h"
15

16

17
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
18
19
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
20

21
22
    sock_ptr->dataStore = NULL;

23
24
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
25

26
    if (is_sender) {
27
28
        // Bind Feedback Socket
        struct sockaddr_in address;
29
        memset((char *) &address, 0, sizeof(address));
30
31
32
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
33

34
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
35

36
37
38
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
39

40
41
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
42
43

        sock_ptr->receivers = List_create();
44
    } else {
45
46
        // Bind Data Socket
        struct sockaddr_in address;
47
        memset((char *) &address, 0, sizeof(address));
48
49
50
51
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

52
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
Andreas Schmidt's avatar
Andreas Schmidt committed
53
        sock_ptr->forwardPacketTable = calloc(1, sizeof(PrrtForwardPacketTable));
54
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
55
56
57
58
59
60

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
61
62
    }

63
64
65
66
67
68
69
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
70
71
}

72
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
73
74
75
    PrrtReceiver* recv = calloc(1, sizeof(PrrtReceiver));
    recv->host_name = host;
    recv->port = port;
76

77
    List_push(sock_ptr->receivers, recv);
78
79
80
    return 0;
}

81
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
82
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
83

Andreas Schmidt's avatar
Andreas Schmidt committed
84
    PrrtPacket *packet = calloc(1, sizeof(PrrtPacket));
85
    check_mem(packet);
86
    PrrtPacket_create_data_packet(packet, 5, data, (uint32_t) data_len, sock_ptr->sequenceNumberSource++);
87

88
89
90
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
91

92
    return 0;
93
94
95

    error:
        return -1;
96
97
}

98
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
99
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
100
    while (1) {
101
        pthread_mutex_lock(&t);
102
        while (List_count(sock_ptr->inQueue) == 0) {
103
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
104
        }
105

106
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
107

108
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
109
        PrrtPacket_copy_payload_to_buffer(buf_ptr, packet, PRRT_PACKET_DATA_HEADER_SIZE);
110

111
112
        PrrtPacket_destroy(packet);

113
        pthread_mutex_unlock(&t);
114
        return len;
115
116
117
    }
}

118
int PrrtSocket_close(PrrtSocket *sock_ptr) {
119
    if(sock_ptr->dataStore != NULL) {
120
121
122
123
124
        sock_ptr->dataStore = BPTree_destroy(sock_ptr->dataStore);
    }

    if(sock_ptr->blockStore != NULL) {
        sock_ptr->blockStore = BPTree_destroy(sock_ptr->blockStore);
125
126
    }

127
128
    //void **res = NULL;
    //pthread_join(sock_ptr->sendThread, res);
129

130
131
    // TODO: shut down threads;

132
133
134
135
136
137
138
    if(sock_ptr->receivers != NULL) {
        while(List_count(sock_ptr->receivers) > 0) {
            free(List_shift(sock_ptr->receivers));
        }

        List_destroy(sock_ptr->receivers);
    }
139

140
141
142
143
144
145
146
147
148
149
150
    if(sock_ptr->outQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
        List_destroy(sock_ptr->outQueue);
    }

    if(sock_ptr->inQueue != NULL) {
        pthread_mutex_destroy(&sock_ptr->inQueueFilledMutex);
        pthread_cond_destroy(&sock_ptr->inQueueFilledMutexCv);
        List_destroy(sock_ptr->inQueue);
    }
151

152
153
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
154
155
156
    return 0;
}

157
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
158
159
160
161
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

162
163
    check(n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen) >= 0, "Receiving feedback failed.");

164
165
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
166
    debug("Received feedback %s:%d", remote_host, remote_port);
167

Andreas Schmidt's avatar
Andreas Schmidt committed
168
    PrrtPacket *packet_ptr = calloc(1, sizeof(PrrtPacket));
169
    check_mem(packet_ptr);
170
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
171
    return packet_ptr;
172
173
174

    error:
        return NULL;
175
}