socket.c 4.87 KB
Newer Older
1
#include <prrt/processes/data_receiver.h>
2
#include "socket.h"
3

4

5
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
6
7
    sock_ptr->sequenceNumberSource = 1;
    sock_ptr->sequenceNumberRedundancy = 1;
8

9
10
    check(sock_ptr->dataSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create data socket.")
    check(sock_ptr->feedbackSocketFd = socket(AF_INET, SOCK_DGRAM, 0), "Cannot create feedback socket.");
11

12
    if (is_sender) {
13
14
        // Bind Feedback Socket
        struct sockaddr_in address;
15
        memset((char *) &address, 0, sizeof(address));
16
17
18
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
19

20
        check(bind(sock_ptr->feedbackSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind feedback socket.");
21

22
23
24
        pthread_mutex_init(&sock_ptr->outQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->outQueueFilledCv, NULL);
        sock_ptr->outQueue = List_create();
25

26
27
        check(pthread_create(&sock_ptr->receiveFeedbackThread, NULL, receive_feedback_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create receive feedback thread.");
        check(pthread_create(&sock_ptr->sendThread, NULL, send_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create send thread.");
28
    } else {
29
30
        // Bind Data Socket
        struct sockaddr_in address;
31
        memset((char *) &address, 0, sizeof(address));
32
33
34
35
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

36
        check(bind(sock_ptr->dataSocketFd, (struct sockaddr *) &address, sizeof(address)) == EXIT_SUCCESS, "Cannot bind data socket.");
37
        sock_ptr->forwardPacketTable = malloc(sizeof(PrrtForwardPacketTable));
38
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
39
40
41
42
43
44

        pthread_mutex_init(&sock_ptr->inQueueFilledMutex, NULL);
        pthread_cond_init(&sock_ptr->inQueueFilledMutexCv, NULL);
        sock_ptr->inQueue = List_create();

        check(pthread_create(&sock_ptr->receiveDataThread, NULL, receive_data_loop, (void *) sock_ptr) == EXIT_SUCCESS, "Cannot create data receiving thread.");
45
46
    }

47
48
49
50
51
52
53
    return EXIT_SUCCESS;

    error:
        // TODO: cancel threads
        close(sock_ptr->dataSocketFd);
        close(sock_ptr->feedbackSocketFd);
        return EXIT_FAILURE;
54
55
}

56
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
57
    PrrtReceiver recv = {host, port};
58

59
60
61
    if (sock_ptr->receiverLength < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiverLength] = recv;
        sock_ptr->receiverLength++;
62
63
64
    } else {
        return -1;
    }
65
66
67
68

    return 0;
}

69
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
70
    pthread_mutex_lock(&sock_ptr->outQueueFilledMutex);
71

72
    PrrtPacket *packet = malloc(sizeof(PrrtPacket));
73
    assert(packet != NULL);
74
    PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->sequenceNumberSource++);
75

76
77
78
    List_push(sock_ptr->outQueue, packet);
    pthread_cond_signal(&sock_ptr->outQueueFilledCv);
    pthread_mutex_unlock(&sock_ptr->outQueueFilledMutex);
79

80
    return 0;
81
82
}

83
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
84
    while (1) {
85
86
87
        pthread_mutex_lock(&sock_ptr->inQueueFilledMutex);
        while (List_count(sock_ptr->inQueue) == 0) {
            pthread_cond_wait(&sock_ptr->inQueueFilledMutexCv, &sock_ptr->inQueueFilledMutex);
88
        }
89

90
        PrrtPacket *packet = List_shift(sock_ptr->inQueue);
91

92
93
        uint32_t len = packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
        memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, len);
94

95
96
        pthread_mutex_unlock(&sock_ptr->inQueueFilledMutex);
        return len;
97
98
99
    }
}

100
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
101
102
    // TODO: shut down threads;

103
104
    // TODO: clean up all receivers

105
106
107
    pthread_mutex_destroy(&sock_ptr->outQueueFilledMutex);
    pthread_cond_destroy(&sock_ptr->outQueueFilledCv);
    List_destroy(sock_ptr->outQueue);
108

109
110
    close(sock_ptr->dataSocketFd);
    close(sock_ptr->feedbackSocketFd);
111
112
113
    return 0;
}

114
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
115
116
117
118
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

119
    n = recvfrom(sock_ptr->feedbackSocketFd, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
120
    if (n < 0) {
121
        perror("RECVFROM FAIL");
122
123
        return NULL;
    }
124
125
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
126
    printf("RECV FEEDBACK: %s:%d\n", remote_host, remote_port);
127

128
    PrrtPacket *packet_ptr = malloc(sizeof(PrrtPacket));
129
    assert(packet_ptr != NULL);
130
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
131
    return packet_ptr;
132
}