socket.c 5.14 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
#include <arpa/inet.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <defines.h>
#include <prrt/packet.h>
#include <prrt/socket.h>
#include <prrt/processes/feedback_receiver.h>
#include <prrt/processes/data_transmitter.h>
12
#include <prrt/processes/data_receiver.h>
13
#include <util/dbg.h>
14

15

16
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
17
18
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
19

20
21
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
22

23
    if (is_sender) {
24
25
        // Bind Feedback Socket
        struct sockaddr_in address;
26
        memset((char *) &address, 0, sizeof(address));
27
28
29
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
30

31
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
32

33
34
35
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
36

37
38
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
39
    } else {
40
41
        // Bind Data Socket
        struct sockaddr_in address;
42
        memset((char *) &address, 0, sizeof(address));
43
44
45
46
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

47
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
48
        sock_ptr->forwardPacketTable = malloc(sizeof(PrrtForwardPacketTable));
49
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
50
51
52
53
54
55

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
56
57
    }

58
59
60
61
62
63
64
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
65
66
}

67
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
68
    PrrtReceiver recv = {host, port};
69

70
71
72
    if (sock_ptr->receiverLength < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiverLength] = recv;
        sock_ptr->receiverLength++;
73
74
75
    } else {
        return -1;
    }
76
77
78
79

    return 0;
}

80
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
81
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
82

83
    PrrtPacket *packet = malloc(sizeof(PrrtPacket));
84
    check_mem(packet);
85
    PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->sequenceNumberSource++);
86

87
88
89
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
90

91
    return 0;
92
93
94

    error:
        return -1;
95
96
}

97
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
98
    pthread_mutex_t t = sock_ptr->inQueueFilledMutex;
99
    while (1) {
100
        pthread_mutex_lock(&t);
101
        while (List_count(sock_ptr->inQueue) == 0) {
102
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &t);
103
        }
104

105
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
106

107
108
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
        memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, len);
109

110
        pthread_mutex_unlock(&t);
111
        return len;
112
113
114
    }
}

115
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
116
117
    // TODO: shut down threads;

118
119
    // TODO: clean up all receivers

120
121
122
    pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
    pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
    List_destroy(sock_ptr->outQueue);
123

124
125
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
126
127
128
    return 0;
}

129
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
130
131
132
133
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

134
135
    check(n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen) >= 0, "Receiving feedback failed.");

136
137
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
138
    debug("Received feedback %s:%d", remote_host, remote_port);
139

140
    PrrtPacket *packet_ptr = malloc(sizeof(PrrtPacket));
141
    check_mem(packet_ptr);
142
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
143
    return packet_ptr;
144
145
146

    error:
        return NULL;
147
}