dataReceiver.c 16.5 KB
Newer Older
1
#include <netdb.h>
Andreas Schmidt's avatar
Andreas Schmidt committed
2
#include <stdio.h>
3
#include <arpa/inet.h>
4 5 6
#include "../../defines.h"
#include "../../util/dbg.h"
#include "../../util/common.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
7
#include "../../util/time.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
8
#include "../types/lossStatistics.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
9
#include "../types/block.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
10 11
#include "../clock.h"
#include "../socket.h"
Andreas Schmidt's avatar
Andreas Schmidt committed
12
#include "dataReceiver.h"
13

Andreas Schmidt's avatar
Andreas Schmidt committed
14 15 16 17
static void retrieve_data_packets_for_block(PrrtSocket *sock_ptr,
                                            prrtSequenceNumber_t base_seqno,
                                            uint8_t k,
                                            const PrrtBlock *block) {
18
    List *res = List_create();
Andreas Schmidt's avatar
Andreas Schmidt committed
19 20
    prrtSequenceNumber_t last_seqno = (prrtSequenceNumber_t) (base_seqno + k - 1);
    debug(DEBUG_BLOCK, "Size: %d", PrrtDataPacketStore_size(sock_ptr->dataPacketStore));
21
    PrrtDataPacketStore_remove_range(sock_ptr->dataPacketStore, res, base_seqno,
Andreas Schmidt's avatar
Andreas Schmidt committed
22 23
                                     last_seqno);
    debug(DEBUG_BLOCK, "Retrieve %d packets in range: %u-%u.", List_count(res), base_seqno, last_seqno);
24

25
    LIST_FOREACH(res, first, next, cur) {
Andreas Schmidt's avatar
Andreas Schmidt committed
26 27
        PrrtPacket *packet = cur->value;
        check(PrrtBlock_insert_data_packet((PrrtBlock *) block, packet), "Insert failed!")
28
    }
29 30
    List_destroy(res);
    return;
31
    error:
32
    PERROR("Insert failed.");
33
    List_destroy(res);
34 35
}

Andreas Schmidt's avatar
Andreas Schmidt committed
36 37
static void decode_block(PrrtSocket *sock_ptr, PrrtBlock *block) {
    if (block != NULL && PrrtBlock_decode_ready(block)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51
        bool data_relevant = PrrtDeliveredPacketTable_test_is_block_relevant(sock_ptr->deliveredPacketTable,
                                                                             block->baseSequenceNumber,
                                                                             block->codingParams->n);
        if (data_relevant) {
            check(PrrtBlock_decode(block), "Decoding failed");

            while (List_count(block->dataPackets) > 0) {
                PrrtPacket *pkt = List_shift(block->dataPackets);
                if (PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
                                                                         pkt->sequenceNumber)) {
                    PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, pkt);
                } else {
                    PrrtPacket_destroy(pkt);
                }
52 53
            }
        }
54
        PrrtRepairBlockStore_delete(sock_ptr->repairBlockStore, block->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
55
        PrrtBlock_destroy(block);
56
    }
Andreas Schmidt's avatar
Andreas Schmidt committed
57 58

    return;
59 60
    error:
    PERROR("Decoding failed.%s", "")
61
}
62

Andreas Schmidt's avatar
Andreas Schmidt committed
63 64 65 66 67 68 69
static bool send_feedback(PrrtSocket *sock_ptr,
                          struct sockaddr_in remote,
                          prrtSequenceNumber_t seqno,
                          prrtTimestamp_t receiveStamp,
                          prrtTimestamp_t sentTimestamp,
                          prrtPacketType_t type) {

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    enum XlapTimestampPacketKind kind = ts_data_packet;
    if (type == PACKET_TYPE_DATA) {
        kind = ts_data_packet;
    } else if (type == PACKET_TYPE_REDUNDANCY) {
        kind = ts_redundancy_packet;
    }

    XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackStart);

    prrtFeedback_t feedback = {
            .seqNo = seqno,
            .type = type,
            .receivedTime = receiveStamp,
            .sentTime = sentTimestamp
    };

86 87 88
    prrtTimestamp_t forwardTripTime = (uint32_t) (((int64_t) PrrtClock_get_current_time_us()) +
                                                  (sock_ptr->lastSentTimestamp - sock_ptr->lastReceivedTimestamp));

Andreas Schmidt's avatar
Andreas Schmidt committed
89 90
    PrrtLossStatistics stats = sock_ptr->lossStatistics;

91
    int group_RTT = 0; // TODO: To be determined.
92
    uint32_t local_bottleneck_pace = MAX(PrrtPace_get_effective(sock_ptr->appDeliverPace), PrrtPace_get_effective(sock_ptr->prrtReceivePace));
93 94 95
    PrrtPacket *feedback_pkt_ptr = PrrtPacket_create_feedback_packet(0, sock_ptr->sequenceNumberFeedback++, group_RTT,
                                                                     stats.gapLength, stats.gapCount, stats.burstLength,
                                                                     stats.burstCount, forwardTripTime,
Andreas Schmidt's avatar
Andreas Schmidt committed
96 97 98
                                                                     stats.erasureCount, stats.packetCount,
                                                                     feedback.seqNo, feedback.type,
                                                                     local_bottleneck_pace);
99
    prrtPacketLength_t length = PrrtPacket_size(feedback_pkt_ptr);
100
    void *buf = calloc(1, length);
101
    check_mem(buf);
102

103
    check(PrrtPacket_encode(buf, MAX_PAYLOAD_LENGTH, feedback_pkt_ptr), "Buffer for encoding feedback is too small");
104

Andreas Schmidt's avatar
Andreas Schmidt committed
105
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
Andreas Schmidt's avatar
Andreas Schmidt committed
106
    check(sendto(sock_ptr->socketFd, buf, length, 0, (struct sockaddr *) &remote, sizeof(remote)) ==
107
          length, "Sending feedback failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
108
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
109
    free(buf);
110

111
    PrrtPacket_destroy(feedback_pkt_ptr);
112
    XlapTimeStampCycle(sock_ptr, kind, seqno, SendFeedbackEnd);
113

114
    return true;
115 116

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
117 118
    if (buf != NULL) { free(buf); }
    if (feedback_pkt_ptr != NULL) { PrrtPacket_destroy(feedback_pkt_ptr); }
119
    return false;
120 121
}

Andreas Schmidt's avatar
Andreas Schmidt committed
122
static void handle_data_packet(PrrtSocket *sock_ptr, PrrtPacket *packet) {
123
    PrrtPacketDataPayload *payload = packet->payload;
124 125 126
    prrtTimestamp_t sentTimestamp = payload->timestamp;
    sock_ptr->lastSentTimestamp = sentTimestamp;
    PrrtClock_update(&sock_ptr->clock, sentTimestamp, payload->groupRTprop_us);
Andreas Schmidt's avatar
Andreas Schmidt committed
127
    debug(DEBUG_DATARECEIVER, "Timeout: %lu", payload->packetTimeout_us);
128

129
    PrrtPacketTimeout *packetTimeout = PrrtPacketTimeout_create(packet);
130
    check(PrrtPacketTimeoutTable_insert(sock_ptr->packetTimeoutTable, packetTimeout), "Could not insert data packet.");
131

Andreas Schmidt's avatar
Andreas Schmidt committed
132
    prrtSequenceNumber_t seqno = packet->sequenceNumber;
Andreas Schmidt's avatar
Andreas Schmidt committed
133
    PrrtReceptionTable_mark_received(sock_ptr->dataReceptionTable, seqno);
Stefan Reif's avatar
Stefan Reif committed
134

Andreas Schmidt's avatar
Andreas Schmidt committed
135 136 137 138
    prrtTimestamp_t now = PrrtClock_get_current_time_us();
    if (PrrtTimestamp_cmp(now, payload->packetTimeout_us) > 0) {
        debug(DEBUG_DATARECEIVER, "Timeout data packet %u (%lu > %lu)", seqno, (unsigned long) now,
              (unsigned long) payload->packetTimeout_us);
139
        PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable, packet->sequenceNumber);
140
        PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
141 142
    } else if (!PrrtDeliveredPacketTable_test_set_is_number_relevant(sock_ptr->deliveredPacketTable,
                                                                    packet->sequenceNumber)) {
Andreas Schmidt's avatar
Andreas Schmidt committed
143
        debug(DEBUG_DATARECEIVER, "Not relevant: %u", seqno);
144 145
        PrrtPacket_destroy(packet);
    } else {
146
        PrrtChannelStateInformation_update_delivery_rate(sock_ptr->senderChannelStateInformation, payload->btlbw);
Andreas Schmidt's avatar
Andreas Schmidt committed
147
        sock_ptr->send_peer_btl_pace = payload->btl_pace;
148 149
        prrtSequenceNumber_t baseSequenceNumber = packet->sequenceNumber - packet->index;

Andreas Schmidt's avatar
Andreas Schmidt committed
150 151 152 153
        // forward to application layer
        debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
        PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);

154
        PrrtPacket *reference = PrrtPacket_copy(packet);
155 156 157 158
        
        // forward to application layer
        debug(DEBUG_DATARECEIVER, "Forward: %u", seqno);
        PrrtPacketDeliveryStore_insert(sock_ptr->packetDeliveryStore, packet);
159 160

        PrrtBlock *block = PrrtRepairBlockStore_get_block(sock_ptr->repairBlockStore, baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
161
        if (block != NULL) {
Andreas Schmidt's avatar
Andreas Schmidt committed
162 163 164 165 166
            if(PrrtBlock_insert_data_packet(block, reference)) {
                decode_block(sock_ptr, block);
            } else {
                PrrtPacket_destroy(reference);
            }
167
        } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
168
            debug(DEBUG_DATARECEIVER, "Inserting data packet %d for later.", reference->sequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
169
            if (PrrtDataStore_insert(sock_ptr->dataPacketStore, reference) == false) {
Andreas Schmidt's avatar
Andreas Schmidt committed
170
                debug(DEBUG_DATARECEIVER, "Failed to insert %d.", reference->sequenceNumber);
171
                PrrtPacket_destroy(reference);
Andreas Schmidt's avatar
Andreas Schmidt committed
172 173
            } else {
                debug(DEBUG_DATARECEIVER, "Inserted %d.", reference->sequenceNumber);
174 175 176
            }
        }

177
        XlapTimeStampClock(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
Andreas Schmidt's avatar
Andreas Schmidt committed
178
        XlapTimeStampCycle(sock_ptr, ts_data_packet, seqno, PrrtReturnPackage);
179 180 181 182 183 184 185
    }
    return;

    error:
    PERROR("Handling data packet failed%s.", "");
}

Andreas Schmidt's avatar
Andreas Schmidt committed
186
static void handle_redundancy_packet(PrrtSocket *socket, PrrtPacket *packet) {
187 188 189 190
    PrrtPacketRedundancyPayload *payload = packet->payload;
    prrtTimestamp_t sentTimestamp = payload->timestamp;
    socket->lastSentTimestamp = sentTimestamp;

191 192
    PrrtPacketRedundancyPayload *redundancyPayload = packet->payload;

Andreas Schmidt's avatar
Andreas Schmidt committed
193 194
    PrrtReceptionTable_mark_received(socket->redundancyReceptionTable, packet->sequenceNumber);

195 196 197
    if (!PrrtDeliveredPacketTable_test_is_block_relevant(socket->deliveredPacketTable,
                                                         redundancyPayload->baseSequenceNumber,
                                                         redundancyPayload->n)) {
198 199 200 201
        PrrtPacket_destroy(packet);
    } else {
        PrrtBlock *block = PrrtRepairBlockStore_get_block(socket->repairBlockStore,
                                                          redundancyPayload->baseSequenceNumber);
Andreas Schmidt's avatar
Andreas Schmidt committed
202
        socket->send_peer_btl_pace = payload->btl_pace;
Andreas Schmidt's avatar
Andreas Schmidt committed
203
        if (block == NULL) {
204
            uint8_t n_cycle[1] = {redundancyPayload->n - redundancyPayload->k};
Andreas Schmidt's avatar
Andreas Schmidt committed
205
            PrrtCodingConfiguration *codingParams = PrrtCodingConfiguration_create(redundancyPayload->k,
206
                                                                                   redundancyPayload->n, 1, n_cycle);
207

Andreas Schmidt's avatar
Andreas Schmidt committed
208 209
            block = PrrtBlock_create(codingParams, PrrtSocket_get_matching_coder(socket, codingParams),
                                     redundancyPayload->baseSequenceNumber);
210

211
            PrrtRepairBlockStore_insert(socket->repairBlockStore, block);
212

Andreas Schmidt's avatar
Andreas Schmidt committed
213 214
            retrieve_data_packets_for_block(socket, redundancyPayload->baseSequenceNumber, block->codingParams->k, block);
        }
215

Andreas Schmidt's avatar
Andreas Schmidt committed
216
        if (PrrtBlock_insert_redundancy_packet(block, packet)) {
217 218 219 220 221 222 223
            decode_block(socket, block);
        } else {
            PrrtPacket_destroy(packet);
        }
    }
}

Andreas Schmidt's avatar
Andreas Schmidt committed
224 225
void handle_feedback_packet(PrrtSocket *socket, PrrtPacket *packet, prrtTimestamp_t receiveTime) {
    check(packet != NULL, "Cannot be null");
226
    debug(DEBUG_DATARECEIVER, "handle_feedback_packet");
Andreas Schmidt's avatar
Andreas Schmidt committed
227
    PrrtPacketFeedbackPayload *feedbackPayload = (PrrtPacketFeedbackPayload *) packet->payload;
228 229
    prrtTimestamp_t forwardTripTimestamp = feedbackPayload->forwardTripTimestamp_us;

230
    prrtTimedelta_t rtt = (prrtTimedelta_t) (receiveTime - forwardTripTimestamp);
231
    socket->recv_peer_btl_pace = feedbackPayload->btl_pace;
232

Andreas Schmidt's avatar
Andreas Schmidt committed
233
    PrrtReceiver_on_ack(socket->receiver, feedbackPayload, receiveTime, rtt, socket->applicationConstraints);
234 235 236 237 238 239
    return;

    error:
    PERROR("handle_feedback_packet failed.");
}

240
void receive_from_socket(PrrtSocket *socket_ptr, unsigned char buffer_ptr[65528], ssize_t *received_size,
Andreas Schmidt's avatar
Andreas Schmidt committed
241 242
                         struct sockaddr_in *remote_ptr, socklen_t *remote_len_ptr,
                         struct timespec *packet_timestamp_ptr,
243
                         uint64_t *packet_cyclestamp_ptr) {
Andreas Schmidt's avatar
Andreas Schmidt committed
244
    if (socket_ptr->isHardwareTimestamping) {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        struct cmsghdr *cmsg;
        struct msghdr msg;
        struct iovec entry;

        struct {
            struct cmsghdr cm;
            char control[512];
        } control;

        memset(&msg, 0, sizeof(msg));
        msg.msg_iov = &entry;
        msg.msg_iovlen = 1;
        entry.iov_base = buffer_ptr;
        entry.iov_len = MAX_PAYLOAD_LENGTH;
        msg.msg_name = (caddr_t) remote_ptr;
        msg.msg_namelen = *remote_len_ptr;
        msg.msg_control = &control;
        msg.msg_controllen = sizeof(control);

264
        *received_size = recvmsg(socket_ptr->socketFd, &msg, 0);
265

Andreas Schmidt's avatar
Andreas Schmidt committed
266 267
        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
            switch (cmsg->cmsg_type) {
268
                case SO_TIMESTAMPNS: {
269
                    memcpy(packet_timestamp_ptr, (struct timespec *) CMSG_DATA(cmsg), sizeof(struct timespec));
270 271 272 273 274 275 276 277
                    break;
                }
                default:
                    PERROR("Unknown Control Msg Type: %d", cmsg->cmsg_type);
                    break;
            }
        }
    } else {
278
        *received_size = recvfrom(socket_ptr->socketFd, buffer_ptr, MAX_PAYLOAD_LENGTH, 0,
279
                                  (struct sockaddr *) remote_ptr, remote_len_ptr);
280
        clock_gettime(CLOCK_REALTIME, packet_timestamp_ptr);
281
    }
282
    *packet_cyclestamp_ptr = __builtin_ia32_rdtsc();
283 284
}

Andreas Schmidt's avatar
Andreas Schmidt committed
285
void *receive_data_loop(void *ptr) {
286 287 288 289
    ssize_t n;
    struct sockaddr_in remote;
    socklen_t addrlen = sizeof(remote);
    unsigned char buffer[MAX_PAYLOAD_LENGTH];
290
    PrrtSocket *s = ptr;
291

Andreas Schmidt's avatar
Andreas Schmidt committed
292
    while (1) {
Andreas Schmidt's avatar
Andreas Schmidt committed
293
        PrrtPace_track_start(s->prrtReceivePace);
294
        debug(DEBUG_DATARECEIVER, "About to receive.");
Andreas Schmidt's avatar
Andreas Schmidt committed
295 296 297 298 299 300
        XlapTimestampPlaceholder tsph1;
        XlapTimestampPlaceholder tsph2;
        XlapTimestampPlaceholder tsph3;
        XlapTimestampPlaceholderInitialize(&tsph1);
        XlapTimestampPlaceholderInitialize(&tsph2);
        XlapTimestampPlaceholderInitialize(&tsph3);
Stefan Reif's avatar
Stefan Reif committed
301

302 303
        struct timespec packet_recv_timestamp;
        uint64_t packet_recv_cyclestamp = 0;
Andreas Schmidt's avatar
Andreas Schmidt committed
304 305
        PrrtPace_track_pause(s->prrtReceivePace);
        receive_from_socket(s, buffer, &n, &remote, &addrlen, &packet_recv_timestamp, &packet_recv_cyclestamp);
Andreas Schmidt's avatar
Andreas Schmidt committed
306
        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
Andreas Schmidt's avatar
Andreas Schmidt committed
307
        PrrtPace_track_resume(s->prrtReceivePace);
Andreas Schmidt's avatar
Andreas Schmidt committed
308
        if (atomic_load_explicit(&s->closing, memory_order_acquire)) {
309 310 311
            break;
        }

312 313
        debug(DEBUG_HARDSTAMPING, "Packet TS:\t%ld.%09ld; Who? %s", (long) packet_recv_timestamp.tv_sec,
              packet_recv_timestamp.tv_nsec, inet_ntoa(remote.sin_addr));
Andreas Schmidt's avatar
Andreas Schmidt committed
314 315
        XlapTimeStampClock(&tsph1, ts_any_packet, 0, LinkReceive);
        XlapTimeStampCycle(&tsph1, ts_any_packet, 0, LinkReceive);
316

317
        PrrtPacket *packet = (PrrtPacket *) calloc(1, sizeof(PrrtPacket));
318
        check_mem(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
319
        XlapTimeStampCycle(&tsph2, ts_any_packet, 0, DecodeStart);
320
        check(PrrtPacket_decode(buffer, (uint16_t) n, packet), "Decode failed.");
Andreas Schmidt's avatar
Andreas Schmidt committed
321 322
        XlapTimeStampCycle(&tsph3, ts_any_packet, 0, DecodeEnd);
        prrtSequenceNumber_t seqno = packet->sequenceNumber;
323
        prrtTimestamp_t prrt_recv_timestamp = PrrtClock_TimespecToPrrtTimestamp(packet_recv_timestamp);
324
        memcpy(&packet->sender_addr, &remote, addrlen);
325

326
        prrtPacketType_t packetType = PrrtPacket_type(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
327
        debug(DEBUG_DATARECEIVER, "received packet %d:%u", (int) packetType, seqno);
328
        packet->channelReceive = packet_recv_timestamp;
329

Andreas Schmidt's avatar
Andreas Schmidt committed
330
        enum XlapTimestampPacketKind kind = ts_any_packet;
331
        prrtTimestamp_t sentTimestamp;
Andreas Schmidt's avatar
Andreas Schmidt committed
332
        if (packetType == PACKET_TYPE_DATA) {
Andreas Schmidt's avatar
Andreas Schmidt committed
333
            kind = ts_data_packet;
334
            sentTimestamp = PrrtPacket_get_data_timestamp(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
335
        } else if (packetType == PACKET_TYPE_REDUNDANCY) {
Andreas Schmidt's avatar
Andreas Schmidt committed
336
            kind = ts_redundancy_packet;
337
            sentTimestamp = PrrtPacket_get_redundancy_timestamp(packet);
338 339
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
            kind = ts_feedback_packet;
Andreas Schmidt's avatar
Andreas Schmidt committed
340 341
        }
        if (packetType == PACKET_TYPE_DATA || packetType == PACKET_TYPE_REDUNDANCY) {
342
            s->lastReceivedTimestamp = prrt_recv_timestamp;
343

344 345
            XlapCycleStampValue(s, kind, seqno, ChannelReceive, packet_recv_cyclestamp);
            XlapTimeStampValue(s, kind, seqno, ChannelReceive, packet_recv_timestamp);
Andreas Schmidt's avatar
Andreas Schmidt committed
346

347 348 349
            XlapTimestampPlaceholderUse(s, kind, seqno, &tsph1);
            XlapTimestampPlaceholderUse(s, kind, seqno, &tsph2);
            XlapTimestampPlaceholderUse(s, kind, seqno, &tsph3);
Andreas Schmidt's avatar
Andreas Schmidt committed
350

351
            XlapTimeStampCycle(s, kind, seqno, HandlePacketStart);
352

Andreas Schmidt's avatar
Andreas Schmidt committed
353
            if (packetType == PACKET_TYPE_DATA) {
354
                handle_data_packet(s, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
355
            } else {
Andreas Schmidt's avatar
Andreas Schmidt committed
356
                handle_redundancy_packet(s, packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
357
            }
358 359
            send_feedback(s, remote, seqno, prrt_recv_timestamp, sentTimestamp, packetType);
            XlapTimeStampCycle(s, kind, seqno, HandlePacketEnd);
360
        } else if (packetType == PACKET_TYPE_FEEDBACK) {
361
            handle_feedback_packet(s, packet, prrt_recv_timestamp);
362
            PrrtPacket_destroy(packet);
Andreas Schmidt's avatar
Andreas Schmidt committed
363 364 365
        } else {
            PrrtPacket_print(packet);
            PrrtPacket_destroy(packet);
366
        }
367

368
        debug(DEBUG_DATARECEIVER, "Cleanup");
369
        PrrtSocket_cleanup(s);
370
        debug(DEBUG_DATARECEIVER, "Cleaned");
Andreas Schmidt's avatar
Andreas Schmidt committed
371
        PrrtPace_track_end(s->prrtReceivePace);
Andreas Schmidt's avatar
Andreas Schmidt committed
372
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
373 374 375
    }

    error:
Andreas Schmidt's avatar
Andreas Schmidt committed
376
    PERROR("receive_data_loop() ended unexpectedly.");
377
    PrrtSocket_cleanup(s);
378
    return NULL;
Andreas Schmidt's avatar
Andreas Schmidt committed
379
}