socket.c 6.5 KB
Newer Older
1
2
3
4
5
#include <netdb.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
6
#include <stdlib.h>
7
#include <pthread.h>
8
#include <assert.h>
9
#include "../defines.h"
10
#include "socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
11
12
#include "processes/feedback_receiver.h"
#include "processes/data_transmitter.h"
13

14
int PrrtSocket_create(PrrtSocket *sock_ptr, const uint16_t port, const uint8_t is_sender) {
15
    sock_ptr->seqno_source = 1;
16
    sock_ptr->seqnoRedundancy = 1;
17

18
    // Create Data Socket
19
    if((sock_ptr->fd_data = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
20
21
22
23
24
        perror("cannot create socket");
        return -1;
    }

    // Create Feedback Socket
25
    if((sock_ptr->fd_feedback = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
26
27
28
29
        perror("cannot create socket");
        return -1;
    }

30
31
32
33
34
35
36
    if(is_sender) {
        // Bind Feedback Socket
        struct sockaddr_in address;
        memset((char*) &address, 0, sizeof(address));
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons((uint16_t) (port + 1)); // TODO: fail if port is 65535
37

38
39
40
41
        if(bind(sock_ptr->fd_feedback, (struct sockaddr *) &address, sizeof(address)) < 0) {
            perror("cannot bind socket");
            return -1;
        }
42

43
44
45
46
        pthread_mutex_init(&sock_ptr->out_queue_filled_mutex, NULL);
        pthread_cond_init(&sock_ptr->out_queue_filled_cv, NULL);

        sock_ptr->out_queue = List_create();
47
48

        int rc = pthread_create(&sock_ptr->receive_thread, NULL, receive_feedback_loop, (void *) sock_ptr);
49
50
51
52
53
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }

54
55
56
57
58
        rc = pthread_create(&sock_ptr->send_thread, NULL, send_data_loop, (void *) sock_ptr);
        if(rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
59
    } else {
60
61
62
63
64
65
66
67
68
69
70
        // Bind Data Socket
        struct sockaddr_in address;
        memset((char*) &address, 0, sizeof(address));
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = htonl(INADDR_ANY);
        address.sin_port = htons(port);

        if(bind(sock_ptr->fd_data, (struct sockaddr *) &address, sizeof(address)) < 0) {
            perror("cannot bind socket");
            return -1;
        }
71

72
        PrrtForwardPacketTable_create(sock_ptr->forwardPacketTable);
73
74
    }

75
76
77
    return 0;
}

78
int PrrtSocket_connect(PrrtSocket *sock_ptr, const char *host, const uint16_t port) {
79
    PrrtReceiver recv =  {host , port};
80

81
82
83
84
85
86
    if(sock_ptr->receiver_len < PRRT_MAX_RECEIVER_COUNT) {
        sock_ptr->receivers[sock_ptr->receiver_len] =recv;
        sock_ptr->receiver_len++;
    } else {
        return -1;
    }
87
88
89
90

    return 0;
}

91
int PrrtSocket_send(PrrtSocket *sock_ptr, const void *data, const size_t data_len) {
92
    pthread_mutex_lock(&sock_ptr->out_queue_filled_mutex);
93

94
    PrrtPacket * packet = malloc(sizeof(PrrtPacket));
95
    assert(packet != NULL);
96
    PrrtPacket_create_data_packet(packet, 5, data, data_len, sock_ptr->seqno_source++);
97

98
99
100
    List_push(sock_ptr->out_queue, packet);
    pthread_cond_signal(&sock_ptr->out_queue_filled_cv);
    pthread_mutex_unlock(&sock_ptr->out_queue_filled_mutex);
101

102
    return 0;
103
104
}

105
uint32_t PrrtSocket_recv(const PrrtSocket *sock_ptr, void *buf_ptr) {
106
107
    unsigned char buffer[MAX_PAYLOAD_LENGTH];

108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    while(1)
    {
        // RECEIVE DATA
        ssize_t n;
        struct sockaddr_in remote;
        socklen_t addrlen = sizeof(remote);

        n = recvfrom(sock_ptr->fd_data, buffer, MAX_PAYLOAD_LENGTH, 0, (struct sockaddr *) &remote, &addrlen);
        uint16_t remote_port = ntohs(remote.sin_port);
        char *remote_host = inet_ntoa(remote.sin_addr);

        PrrtPacket *packet = malloc(sizeof(PrrtPacket));
        assert(packet != NULL);
        PrrtPacket_decode(buffer, (uint16_t) n, packet);

        // REPLY
        struct sockaddr_in targetaddr;
        memset((char*) &targetaddr, 0, sizeof(targetaddr));
        targetaddr.sin_family = AF_INET;
        targetaddr.sin_port = htons((uint16_t) (remote_port + 1));

        struct hostent *hp;
        hp = gethostbyname(remote_host);
        memcpy((void *)&targetaddr.sin_addr, hp->h_addr_list[0], (size_t) hp->h_length);

        PrrtPacket * feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, 19, 4715, 7, 3, 50, 4, 6, 8, 9, 5, 1);
        uint8_t buf[MAX_PAYLOAD_LENGTH];
        uint32_t length = PrrtPacket_size(feedback_pkt_ptr);

        if(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr) < 0) {
            perror("BUF too small.");
        } else {
            if((sendto(sock_ptr->fd_feedback, buf, length, 0, (struct sockaddr *) &targetaddr, sizeof(targetaddr)) < 0)) {
                perror("sendto failed");
                return NULL;
            }
        }
145

146
147
        switch(PrrtPacket_type(packet)) {
            case PACKET_TYPE_DATA:
148
149
150
151
152
153
154
155
156
                // packet.timestamp + packet.timeout < now: break

                // if (FTP_is_irrelevant(packet)): break;

                // check incomplete_prrt_blocks for this seqno: insert if found
                // else: insert in data_packet_store

                // forward to application layer

157
158
159
160
161
                memcpy(buf_ptr, packet->payload + PRRT_PACKET_DATA_HEADER_SIZE, packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE);
                return packet->payload_len - PRRT_PACKET_DATA_HEADER_SIZE;
            default:
                PrrtPacket_print(packet);
                break;
162
163
        }

164
        // TODO: enough packets received?
165
    }
166
    return 0;
167
168
}

169
int PrrtSocket_close(const PrrtSocket *sock_ptr) {
170
171
    // TODO: shut down threads;

172
173
    // TODO: clean up all receivers

174
175
176
    pthread_mutex_destroy(&sock_ptr->out_queue_filled_mutex);
    pthread_cond_destroy(&sock_ptr->out_queue_filled_cv);
    List_destroy(sock_ptr->out_queue);
177

178
179
    close(sock_ptr->fd_data);
    close(sock_ptr->fd_feedback);
180
181
182
    return 0;
}

183
PrrtPacket *PrrtSocket_recv_feedback(const PrrtSocket *sock_ptr, void *bufin, const size_t length) {
184
185
186
187
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);

188
    n = recvfrom(sock_ptr->fd_feedback, bufin, length, 0, (struct sockaddr *) &remote, &addrlen);
189
    if(n < 0) {
190
        perror("RECVFROM FAIL");
191
192
        return NULL;
    }
193
194
    uint16_t remote_port = ntohs(remote.sin_port);
    char *remote_host = inet_ntoa(remote.sin_addr);
195
    printf("RECV FEEDBACK: %s:%d\n", remote_host, remote_port);
196

197
    PrrtPacket * packet_ptr = malloc(sizeof(PrrtPacket));
198
    assert(packet_ptr != NULL);
199
    PrrtPacket_decode(bufin, (uint16_t) n, packet_ptr);
200
    return packet_ptr;
201
}