socket.c 5.97 KB
Newer Older
1
2
3
4
5
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
6
#include <stdlib.h>
7
#include <pthread.h>
8
#include <assert.h>
9
#include "../defines.h"
10
#include "socket.h"
11
12
#include "feedback_receiver.h"
#include "data_transmitter.h"
13

14
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
15
16
    sock_ptr->seqno_source = 1;

17
    // Create Data Socket
18
    if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
19
20
21
22
23
24
25
26
27
28
29
        perror("cannot create socket");
        return -1;
    }

    // Bind Data Socket
    struct sockaddr_in address;
    memset((char*) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

30
    if(bind(sock_ptr->fd_data, (struct sockaddr *) &address, sizeof(address)) < 0) {
31
32
33
34
35
        perror("cannot bind socket");
        return -1;
    }

    // Create Feedback Socket
36
    if((sock_ptr->fd_feedback = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
37
38
39
40
41
42
43
44
45
46
        perror("cannot create socket");
        return -1;
    }

    // Bind Feedback Socket
    memset((char*) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535

47
    if(bind(sock_ptr->fd_feedback, (struct sockaddr *) &address, sizeof(address)) < 0) {
48
49
50
51
        perror("cannot bind socket");
        return -1;
    }

52
    if(is_sender) {
53
54
55
56
        pthread_mutex_init(&sock_ptr->out_queue_filled_mutex, NULL);
        pthread_cond_init(&sock_ptr->out_queue_filled_cv, NULL);

        sock_ptr->out_queue = List_create();
57
58

        int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
59
60
61
62
63
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }

64
65
66
67
68
        rc = pthread_create(&sock_ptr->send_thread, NULL, send_data_loop, (void *) sock_ptr);
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
69
70
71
72
    } else {

    }

73
74
75
    return 0;
}

76
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
77
    PrrtReceiver recv =  {host , port};
78

79
80
81
82
83
84
    if(sock_ptr->receiver_len < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiver_len] =recv;
        sock_ptr->receiver_len++;
    } else {
        return -1;
    }
85
86
87
88

    return 0;
}

89
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
90
    pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
91

92
    PrrtPacket * packet = malloc(sizeof(PrrtPacket));
93
    assert(packet != NULL);
94
    PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->seqno_source++);
95

96
97
98
    List_push(sock_ptr->out_queue, packet);
    pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
    pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
99

100
    return 0;
101
102
}

103
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
104
105
    unsigned char buffer[MAX_PAYLOAD_LENGTH];

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    while(1)
    {
        // RECEIVE DATA
        ssize_t n;
        struct sockaddr_in remote;
        socklen_t addrlen = sizeof(remote);

        n = recvfrom(sock_ptr->fd_data, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
        uint16_t remote_port = ntohs(remote.sin_port);
        char *remote_host = inet_ntoa(remote.sin_addr);

        PrrtPacket *packet = malloc(sizeof(PrrtPacket));
        assert(packet != NULL);
        PrrtPacket_decode(buffer, (uint16_t) n, packet);

        // REPLY
        struct sockaddr_in targetaddr;
        memset((char*) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
        targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

        struct hostent *hp;
        hp = gethostbyname(remote_host);
        memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

        PrrtPacket * feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
        uint8_t buf[MAX_PAYLOAD_LENGTH];
        uint32_t length = PrrtPacket_size(feedback_pkt_ptr);

        if(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) < 0) {
            perror("BUF too small.");
        } else {
            if((sendto(sock_ptr->fd_feedback, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
                perror("sendto failed");
                return NULL;
            }
        }
143

144
145
146
147
148
149
150
        switch(PrrtPacket_type(packet)) {
            case PACKET_TYPE_DATA:
                memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
                return packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
            default:
                PrrtPacket_print(packet);
                break;
151
152
        }

153
        // TODO: enough packets received?
154
    }
155
    return 0;
156
157
}

158
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
159
160
    // TODO: shut down threads;

161
162
    // TODO: clean up all receivers

163
164
165
    pthread_mutex_destroy(&sock_ptr->out_queue_filled_mutex);
    pthread_cond_destroy(&sock_ptr->out_queue_filled_cv);
    List_destroy(sock_ptr->out_queue);
166

167
168
    close(sock_ptr->fd_data);
    close(sock_ptr->fd_feedback);
169
170
171
    return 0;
}

172
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
173
174
175
176
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

177
    n = recvfrom(sock_ptr->fd_feedback, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
178
    if(n < 0) {
179
        perror("RECVFROM FAIL");
180
181
        return NULL;
    }
182
183
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
184
    printf("RECV FEEDBACK: %s:%d\n", remote_host, remote_port);
185

186
    PrrtPacket * packet_ptr = malloc(sizeof(PrrtPacket));
187
    assert(packet_ptr != NULL);
188
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
189
    return packet_ptr;
190
}