socket.c 6.32 KB
Newer Older
1
2
3
4
5
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
6
#include <stdlib.h>
7
#include <pthread.h>
8
#include <assert.h>
9
#include "../defines.h"
10
#include "socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
12
#include "processes/feedback_receiver.h"
#include "processes/data_transmitter.h"
13

14
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
15
    sock_ptr->seqno_source = 1;
16
    sock_ptr->seqnoRedundancy = 1;
17

18
    // Create Data Socket
19
    if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
20
21
22
23
24
25
26
27
28
29
30
        perror("cannot create socket");
        return -1;
    }

    // Bind Data Socket
    struct sockaddr_in address;
    memset((char*) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons(port);

31
    if(bind(sock_ptr->fd_data, (struct sockaddr *) &address, sizeof(address)) < 0) {
32
33
34
35
36
        perror("cannot bind socket");
        return -1;
    }

    // Create Feedback Socket
37
    if((sock_ptr->fd_feedback = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
38
39
40
41
42
43
44
45
46
47
        perror("cannot create socket");
        return -1;
    }

    // Bind Feedback Socket
    memset((char*) &address, 0, sizeof(address));
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY);
    address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535

48
    if(bind(sock_ptr->fd_feedback, (struct sockaddr *) &address, sizeof(address)) < 0) {
49
50
51
52
        perror("cannot bind socket");
        return -1;
    }

53
    if(is_sender) {
54
55
56
57
        pthread_mutex_init(&sock_ptr->out_queue_filled_mutex, NULL);
        pthread_cond_init(&sock_ptr->out_queue_filled_cv, NULL);

        sock_ptr->out_queue = List_create();
58
59

        int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
60
61
62
63
64
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }

65
66
67
68
69
        rc = pthread_create(&sock_ptr->send_thread, NULL, send_data_loop, (void *) sock_ptr);
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
70
71
72
73
    } else {

    }

74
75
76
    return 0;
}

77
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
78
    PrrtReceiver recv =  {host , port};
79

80
81
82
83
84
85
    if(sock_ptr->receiver_len < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiver_len] =recv;
        sock_ptr->receiver_len++;
    } else {
        return -1;
    }
86
87
88
89

    return 0;
}

90
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
91
    pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
92

93
    PrrtPacket * packet = malloc(sizeof(PrrtPacket));
94
    assert(packet != NULL);
95
    PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->seqno_source++);
96

97
98
99
    List_push(sock_ptr->out_queue, packet);
    pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
    pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
100

101
    return 0;
102
103
}

104
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
105
106
    unsigned char buffer[MAX_PAYLOAD_LENGTH];

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    while(1)
    {
        // RECEIVE DATA
        ssize_t n;
        struct sockaddr_in remote;
        socklen_t addrlen = sizeof(remote);

        n = recvfrom(sock_ptr->fd_data, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
        uint16_t remote_port = ntohs(remote.sin_port);
        char *remote_host = inet_ntoa(remote.sin_addr);

        PrrtPacket *packet = malloc(sizeof(PrrtPacket));
        assert(packet != NULL);
        PrrtPacket_decode(buffer, (uint16_t) n, packet);

        // REPLY
        struct sockaddr_in targetaddr;
        memset((char*) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
        targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

        struct hostent *hp;
        hp = gethostbyname(remote_host);
        memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

        PrrtPacket * feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
        uint8_t buf[MAX_PAYLOAD_LENGTH];
        uint32_t length = PrrtPacket_size(feedback_pkt_ptr);

        if(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) < 0) {
            perror("BUF too small.");
        } else {
            if((sendto(sock_ptr->fd_feedback, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
                perror("sendto failed");
                return NULL;
            }
        }
144

145
146
        switch(PrrtPacket_type(packet)) {
            case PACKET_TYPE_DATA:
147
148
149
150
151
152
153
154
155
                // packet.timestamp + packet.timeout < now: break

                // if (FTP_is_irrelevant(packet)): break;

                // check incomplete_prrt_blocks for this seqno: insert if found
                // else: insert in data_packet_store

                // forward to application layer

156
157
158
159
160
                memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
                return packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
            default:
                PrrtPacket_print(packet);
                break;
161
162
        }

163
        // TODO: enough packets received?
164
    }
165
    return 0;
166
167
}

168
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
169
170
    // TODO: shut down threads;

171
172
    // TODO: clean up all receivers

173
174
175
    pthread_mutex_destroy(&sock_ptr->out_queue_filled_mutex);
    pthread_cond_destroy(&sock_ptr->out_queue_filled_cv);
    List_destroy(sock_ptr->out_queue);
176

177
178
    close(sock_ptr->fd_data);
    close(sock_ptr->fd_feedback);
179
180
181
    return 0;
}

182
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
183
184
185
186
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

187
    n = recvfrom(sock_ptr->fd_feedback, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
188
    if(n < 0) {
189
        perror("RECVFROM FAIL");
190
191
        return NULL;
    }
192
193
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
194
    printf("RECV FEEDBACK: %s:%d\n", remote_host, remote_port);
195

196
    PrrtPacket * packet_ptr = malloc(sizeof(PrrtPacket));
197
    assert(packet_ptr != NULL);
198
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
199
    return packet_ptr;
200
}