diff --git a/toxav/av_test.c b/toxav/av_test.c index dab1f6efd..c20d459b7 100644 --- a/toxav/av_test.c +++ b/toxav/av_test.c @@ -1,5 +1,31 @@ +/** av_test.c + * + * Copyright (C) 2013-2015 Tox project All Rights Reserved. + * + * This file is part of Tox. + * + * Tox is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Tox is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Tox. If not, see . + * + * Compile with (Linux only; in newly created directory toxcore/dir_name): + * gcc -o av_test ../toxav/av_test.c ../build/.libs/libtox*.a -lopencv_core \ + * -lopencv_highgui -lopencv_imgproc -lsndfile -pthread -lvpx -lopus -lsodium -lportaudio + */ + + #include "../toxav/toxav.h" #include "../toxcore/tox.h" +#include "../toxcore/util.h" /* Playing audio data */ #include @@ -11,7 +37,6 @@ #include #include - #include #include #include @@ -63,8 +88,8 @@ struct toxav_thread_data { int32_t sig; }; -const char* vdout = "AV Test"; -PaStream* adout = NULL; +const char* vdout = "AV Test"; /* Video output */ +PaStream* adout = NULL; /* Audio output */ const char* stringify_state(TOXAV_CALL_STATE s) { @@ -230,7 +255,7 @@ void* iterate_toxav (void * data) toxav_iterate(data_cast->BobAV); int rc = MIN(toxav_iteration_interval(data_cast->AliceAV), toxav_iteration_interval(data_cast->BobAV)); -// cvWaitKey(rc); +// cvWaitKey(10); c_sleep(10); } @@ -306,7 +331,26 @@ int print_help (const char* name) int main (int argc, char** argv) { + RingBuffer* rb = rb_new(4); + int a[5] = {0, 1, 2, 3, 4}; + int* x; + rb_write(rb, a + 0); + rb_write(rb, a + 1); + rb_write(rb, a + 2); + rb_write(rb, a + 3); +// rb_write(rb, a + 4); + + x = rb_write(rb, a + 4); + while (rb_read(rb, (void**) &x)) +// rb_read(rb, (void**)&x); + printf("%d ", *x); + + printf("\n"); +// int r = 43; +// printf("%d\n", r >= 40 ? 3 : r / 10); + return 0; Pa_Initialize(); + struct stat st; /* AV files for testing */ @@ -395,53 +439,6 @@ int main (int argc, char** argv) return 1; } - if (0) { - SNDFILE* af_handle; - SF_INFO af_info; - - /* Open audio file */ - af_handle = sf_open(af_name, SFM_READ, &af_info); - if (af_handle == NULL) { - printf("Failed to open the file.\n"); - exit(1); - } - - int frame_size = (af_info.samplerate * audio_frame_duration / 1000) * af_info.channels; - - struct PaStreamParameters output; - output.device = audio_out_dev_idx; /* default output device */ - output.channelCount = af_info.channels; - output.sampleFormat = paInt16; - output.suggestedLatency = audio_dev->defaultHighOutputLatency; - output.hostApiSpecificStreamInfo = NULL; - - - PaError err = Pa_OpenStream(&adout, NULL, &output, af_info.samplerate, frame_size, paNoFlag, NULL, NULL); - assert(err == paNoError); - - err = Pa_StartStream(adout); - assert(err == paNoError); - - int16_t PCM[frame_size]; - - time_t start_time = time(NULL); - time_t expected_time = af_info.frames / af_info.samplerate + 2; - - printf("Sample rate %d\n", af_info.samplerate); - while ( start_time + expected_time > time(NULL) ) { - - int64_t count = sf_read_short(af_handle, PCM, frame_size); - if (count > 0) { - t_toxav_receive_audio_frame_cb(NULL, 0, PCM, count, af_info.channels, af_info.samplerate, NULL); - } - - c_sleep(audio_frame_duration / 2); - } - - Pa_Terminate(); - return 0; - } - printf("Using audio device: %s\n", audio_dev->name); printf("Using audio file: %s\n", af_name); printf("Using video file: %s\n", vf_name); @@ -758,17 +755,15 @@ int main (int argc, char** argv) printf("Sample rate %d\n", af_info.samplerate); while ( start_time + expected_time > time(NULL) ) { - int64_t count = sf_read_short(af_handle, PCM, frame_size); if (count > 0) { TOXAV_ERR_SEND_FRAME rc; if (toxav_send_audio_frame(AliceAV, 0, PCM, count/af_info.channels, af_info.channels, af_info.samplerate, &rc) == false) { printf("Error sending frame of size %ld: %d\n", count, rc); -// exit(1); } } iterate_tox(bootstrap, AliceAV, BobAV); - c_sleep(30); + c_sleep(53); } @@ -794,6 +789,8 @@ int main (int argc, char** argv) while(data.sig != 1) pthread_yield(); + Pa_StopStream(adout); + printf("Success!"); } @@ -890,5 +887,7 @@ int main (int argc, char** argv) tox_kill(bootstrap); printf("\nTest successful!\n"); + + Pa_Terminate(); return 0; } diff --git a/toxav/codec.c b/toxav/codec.c index d55cc3456..671be1ac1 100644 --- a/toxav/codec.c +++ b/toxav/codec.c @@ -46,78 +46,13 @@ #define MAX_VIDEOFRAME_SIZE 0x40000 /* 256KiB */ #define VIDEOFRAME_HEADER_SIZE 0x2 -/* FIXME: Might not be enough */ +/* FIXME: Might not be enough? NOTE: I think it is enough */ #define VIDEO_DECODE_BUFFER_SIZE 20 #define ARRAY(TYPE__) struct { uint16_t size; TYPE__ data[]; } typedef ARRAY(uint8_t) Payload; -typedef struct { - uint16_t size; /* Max size */ - uint16_t start; - uint16_t end; - Payload **packets; -} PayloadBuffer; - -static bool buffer_full(const PayloadBuffer *b) -{ - return (b->end + 1) % b->size == b->start; -} - -static bool buffer_empty(const PayloadBuffer *b) -{ - return b->end == b->start; -} - -static void buffer_write(PayloadBuffer *b, Payload *p) -{ - b->packets[b->end] = p; - b->end = (b->end + 1) % b->size; - - if (b->end == b->start) b->start = (b->start + 1) % b->size; /* full, overwrite */ -} - -static void buffer_read(PayloadBuffer *b, Payload **p) -{ - *p = b->packets[b->start]; - b->start = (b->start + 1) % b->size; -} - -static void buffer_clear(PayloadBuffer *b) -{ - while (!buffer_empty(b)) { - Payload *p; - buffer_read(b, &p); - free(p); - } -} - -static PayloadBuffer *buffer_new(int size) -{ - PayloadBuffer *buf = calloc(sizeof(PayloadBuffer), 1); - - if (!buf) return NULL; - - buf->size = size + 1; /* include empty elem */ - - if (!(buf->packets = calloc(buf->size, sizeof(Payload *)))) { - free(buf); - return NULL; - } - - return buf; -} - -static void buffer_free(PayloadBuffer *b) -{ - if (b) { - buffer_clear(b); - free(b->packets); - free(b); - } -} - /* JITTER BUFFER WORK */ typedef struct JitterBuffer_s { RTPMessage **queue; @@ -318,7 +253,7 @@ bool reconfigure_audio_decoder(CSession* cs, int32_t sampling_rate, int8_t chann int status; OpusDecoder* new_dec = opus_decoder_create(sampling_rate, channels, &status ); if ( status != OPUS_OK ) { - LOGGER_ERROR("Error while starting audio decoder: %s", opus_strerror(status)); + LOGGER_ERROR("Error while starting audio decoder(%d %d): %s", sampling_rate, channels, opus_strerror(status)); return false; } @@ -336,7 +271,6 @@ bool reconfigure_audio_decoder(CSession* cs, int32_t sampling_rate, int8_t chann } /* PUBLIC */ - void cs_do(CSession *cs) { /* Codec session should always be protected by call mutex so no need to check for cs validity @@ -416,9 +350,9 @@ void cs_do(CSession *cs) } /********************* VIDEO *********************/ - if (cs->vbuf_raw && !buffer_empty(cs->vbuf_raw)) { + if (cs->vbuf_raw && !rb_empty(cs->vbuf_raw)) { /* Decode video */ - buffer_read(cs->vbuf_raw, &p); + rb_read(cs->vbuf_raw, (void**)&p); /* Leave space for (possibly) other thread to queue more data after we read it here */ LOGGED_UNLOCK(cs->queue_mutex); @@ -447,7 +381,6 @@ void cs_do(CSession *cs) LOGGED_UNLOCK(cs->queue_mutex); } - CSession *cs_new(uint32_t peer_video_frame_piece_size) { CSession *cs = calloc(sizeof(CSession), 1); @@ -510,7 +443,7 @@ CSession *cs_new(uint32_t peer_video_frame_piece_size) goto AUDIO_DECODER_CLEANUP; } - if ( !(cs->vbuf_raw = buffer_new(VIDEO_DECODE_BUFFER_SIZE)) ) { + if ( !(cs->vbuf_raw = rb_new(VIDEO_DECODE_BUFFER_SIZE)) ) { free(cs->frame_buf); vpx_codec_destroy(cs->v_decoder); goto AUDIO_DECODER_CLEANUP; @@ -542,7 +475,7 @@ CSession *cs_new(uint32_t peer_video_frame_piece_size) return cs; VIDEO_DECODER_CLEANUP: - buffer_free(cs->vbuf_raw); + rb_free(cs->vbuf_raw); free(cs->frame_buf); vpx_codec_destroy(cs->v_decoder); AUDIO_DECODER_CLEANUP: @@ -553,7 +486,6 @@ FAILURE: free(cs); return NULL; } - void cs_kill(CSession *cs) { if (!cs) @@ -567,22 +499,21 @@ void cs_kill(CSession *cs) vpx_codec_destroy(cs->v_decoder); opus_encoder_destroy(cs->audio_encoder); opus_decoder_destroy(cs->audio_decoder); - buffer_free(cs->vbuf_raw); + rb_free(cs->vbuf_raw); jbuf_free(cs->j_buf); free(cs->frame_buf); + free(cs->split_video_frame); pthread_mutex_destroy(cs->queue_mutex); LOGGER_DEBUG("Terminated codec state: %p", cs); free(cs); } - void cs_init_video_splitter_cycle(CSession* cs) { cs->split_video_frame[0] = cs->frameid_out++; cs->split_video_frame[1] = 0; } - int cs_update_video_splitter_cycle(CSession *cs, const uint8_t *payload, uint16_t length) { cs->processing_video_frame = payload; @@ -590,7 +521,6 @@ int cs_update_video_splitter_cycle(CSession *cs, const uint8_t *payload, uint16_ return ((length - 1) / VIDEOFRAME_PIECE_SIZE) + 1; } - const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size) { if (!cs || !size) return NULL; @@ -616,9 +546,6 @@ const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size) return cs->split_video_frame; } - - - int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width, uint16_t height) { vpx_codec_enc_cfg_t cfg = *cs->v_encoder[0].config.enc; @@ -637,7 +564,6 @@ int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width, return 0; } - int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling_rate, uint8_t channels) { /* Values are checked in toxav.c */ @@ -667,8 +593,6 @@ int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling LOGGER_DEBUG ("Reconfigured audio encoder br: %d sr: %d cc:%d", bitrate, sampling_rate, channels); return 0; } - - /* Called from RTP */ void queue_message(RTPSession *session, RTPMessage *msg) { @@ -705,10 +629,10 @@ void queue_message(RTPSession *session, RTPMessage *msg) if (p) { LOGGED_LOCK(cs->queue_mutex); - if (buffer_full(cs->vbuf_raw)) { + if (rb_full(cs->vbuf_raw)) { LOGGER_DEBUG("Dropped video frame"); Payload *tp; - buffer_read(cs->vbuf_raw, &tp); + rb_read(cs->vbuf_raw, (void**)&tp); free(tp); } else { p->size = cs->frame_size; @@ -720,7 +644,7 @@ void queue_message(RTPSession *session, RTPMessage *msg) cs->lcfd = t_lcfd > 100 ? cs->lcfd : t_lcfd; cs->linfts = current_time_monotonic(); - buffer_write(cs->vbuf_raw, p); + rb_write(cs->vbuf_raw, p); LOGGED_UNLOCK(cs->queue_mutex); } else { LOGGER_WARNING("Allocation failed! Program might misbehave!"); diff --git a/toxav/codec.h b/toxav/codec.h index 830dbbf66..7cc9b15d7 100644 --- a/toxav/codec.h +++ b/toxav/codec.h @@ -25,6 +25,8 @@ #include "toxav.h" #include "rtp.h" +#include "../toxcore/util.h" + #include #include #include @@ -40,8 +42,6 @@ /* Audio encoding/decoding */ #include -#define PAIR(TYPE1__, TYPE2__) struct { TYPE1__ first; TYPE2__ second; } - #define PACKED_AUDIO_SIZE(x) (x + 5) #define UNPACKED_AUDIO_SIZE(x) (x - 5) @@ -125,8 +125,4 @@ const uint8_t *cs_iterate_split_video_frame(CSession *cs, uint16_t *size); int cs_reconfigure_video_encoder(CSession* cs, int32_t bitrate, uint16_t width, uint16_t height); int cs_reconfigure_audio_encoder(CSession* cs, int32_t bitrate, int32_t sampling_rate, uint8_t channels); - - -/* Internal. Called from rtp_handle_message */ -void queue_message(RTPSession *session, RTPMessage *msg); #endif /* CODEC_H */ diff --git a/toxav/msi.h b/toxav/msi.h index 8404df191..7d82afc81 100644 --- a/toxav/msi.h +++ b/toxav/msi.h @@ -29,7 +29,7 @@ #include "../toxcore/Messenger.h" /** Preconfigured value for video splitting */ -#define VIDEOFRAME_PIECE_SIZE 500 /* 1.25 KiB*/ +#define VIDEOFRAME_PIECE_SIZE 500 /** * Error codes. @@ -42,7 +42,7 @@ typedef enum { msi_EStrayMessage, msi_ESystem, msi_EHandle, - msi_EUndisclosed, /* NOTE: must be last enum otherwise parsing wont work */ + msi_EUndisclosed, /* NOTE: must be last enum otherwise parsing will not work */ } MSIError; /** diff --git a/toxav/rtp.c b/toxav/rtp.c index e5f453103..77fce0569 100644 --- a/toxav/rtp.c +++ b/toxav/rtp.c @@ -28,9 +28,9 @@ #include "rtp.h" #include -void queue_message(RTPSession *_session, RTPMessage *_msg); #define size_32 4 +#define RTCP_REPORT_INTERVAL_MS 500 #define ADD_FLAG_VERSION(_h, _v) do { ( _h->flags ) &= 0x3F; ( _h->flags ) |= ( ( ( _v ) << 6 ) & 0xC0 ); } while(0) #define ADD_FLAG_PADDING(_h, _v) do { if ( _v > 0 ) _v = 1; ( _h->flags ) &= 0xDF; ( _h->flags ) |= ( ( ( _v ) << 5 ) & 0x20 ); } while(0) @@ -46,24 +46,236 @@ void queue_message(RTPSession *_session, RTPMessage *_msg); #define GET_SETTING_MARKER(_h) (( _h->marker_payloadt ) >> 7) #define GET_SETTING_PAYLOAD(_h) ((_h->marker_payloadt) & 0x7f) -/** - * Checks if message came in late. - */ -static int check_late_message (RTPSession *session, RTPMessage *msg) + +typedef struct { + uint64_t timestamp; /* in ms */ + + uint32_t packets_missing; + uint32_t expected_packets; + /* ... other stuff in the future */ +} RTCPReport; + +typedef struct RTCPSession_s { + uint8_t prefix; + uint64_t last_sent_report_ts; + uint32_t last_missing_packets; + uint32_t last_expected_packets; + + RingBuffer* pl_stats; /* Packet loss stats over time */ +} RTCPSession; + + + +/* queue_message() is defined in codec.c */ +void queue_message(RTPSession *session, RTPMessage *msg); +RTPHeader *parse_header_in ( const uint8_t *payload, int length ); +RTPExtHeader *parse_ext_header_in ( const uint8_t *payload, uint16_t length ); +RTPMessage *msg_parse ( const uint8_t *data, int length ); +uint8_t *parse_header_out ( const RTPHeader* header, uint8_t* payload ); +uint8_t *parse_ext_header_out ( const RTPExtHeader* header, uint8_t* payload ); +void build_header ( RTPSession* session, RTPHeader* header ); +void send_rtcp_report ( RTCPSession* session, Messenger* m, int32_t friendnumber ); +int handle_rtp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object ); +int handle_rtcp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object ); + + + + +RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num ) { - /* - * Check Sequence number. If this new msg has lesser number then the session->rsequnum - * it shows that the message came in late. Also check timestamp to be 100% certain. - * - */ - return ( msg->header->sequnum < session->rsequnum && msg->header->timestamp < session->timestamp ) ? 0 : -1; + RTPSession *retu = calloc(1, sizeof(RTPSession)); + + if ( !retu ) { + LOGGER_WARNING("Alloc failed! Program might misbehave!"); + return NULL; + } + + retu->version = RTP_VERSION; /* It's always 2 */ + retu->padding = 0; /* If some additional data is needed about the packet */ + retu->extension = 0; /* If extension to header is needed */ + retu->cc = 1; /* Amount of contributors */ + retu->csrc = NULL; /* Container */ + retu->ssrc = random_int(); + retu->marker = 0; + retu->payload_type = payload_type % 128; + + retu->m = messenger; + retu->dest = friend_num; + retu->rsequnum = retu->sequnum = 0; + retu->ext_header = NULL; /* When needed allocate */ + + if ( !(retu->csrc = calloc(1, sizeof(uint32_t))) ) { + LOGGER_WARNING("Alloc failed! Program might misbehave!"); + free(retu); + return NULL; + } + + retu->csrc[0] = retu->ssrc; /* Set my ssrc to the list receive */ + + /* Also set payload type as prefix */ + retu->prefix = payload_type; + + + /* Initialize rtcp session */ + if (!(retu->rtcp = calloc(1, sizeof(RTCPSession)))) { + LOGGER_WARNING("Alloc failed! Program might misbehave!"); + free(retu->csrc); + free(retu); + return NULL; + } + + retu->rtcp->prefix = 222 + payload_type % 192; + retu->rtcp->pl_stats = rb_new(4); + + return retu; +} +void rtp_kill ( RTPSession *session ) +{ + if ( !session ) return; + + rtp_stop_receiving (session); + + free ( session->ext_header ); + free ( session->csrc ); + + void* t; + while (!rb_empty(session->rtcp->pl_stats)) { + rb_read(session->rtcp->pl_stats, (void**) &t); + free(t); + } + rb_free(session->rtcp->pl_stats); + + LOGGER_DEBUG("Terminated RTP session: %p", session); + + /* And finally free session */ + free ( session ); +} +void rtp_do(RTPSession *session) +{ + if (!session || !session->rtcp) + return; + + if (current_time_monotonic() - session->rtcp->last_sent_report_ts >= RTCP_REPORT_INTERVAL_MS) { + send_rtcp_report(session->rtcp, session->m, session->dest); + } + + if (rb_full(session->rtcp->pl_stats)) { + RTCPReport* reports[4]; + + int i = 0; + for (; rb_read(session->rtcp->pl_stats, (void**) reports + i); i++); + + /* Check for timed out reports (> 6 sec) */ + uint64_t now = current_time_monotonic(); + for (i = 0; i < 4 && now - reports[i]->timestamp < 6000; i ++); + for (; i < 4; i ++) { + rb_write(session->rtcp->pl_stats, reports[i]); + reports[i] = NULL; + } + if (!rb_empty(session->rtcp->pl_stats)) { + for (i = 0; reports[i] != NULL; i ++) + free(reports[i]); + return; /* As some reports are timed out, we need more... */ + } + + /* We have 4 on-time reports so we can proceed */ + uint32_t quality_loss = 0; + for (i = 0; i < 4; i++) { + uint32_t idx = reports[i]->packets_missing * 100 / reports[i]->expected_packets; + quality_loss += idx; + } + + if (quality_loss > 40) { + LOGGER_DEBUG("Packet loss detected"); + } + } +} +int rtp_start_receiving(RTPSession* session) +{ + if (session == NULL) + return -1; + + if (custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, + handle_rtp_packet, session) == -1) { + LOGGER_WARNING("Failed to register rtp receive handler"); + return -1; + } + if (custom_lossy_packet_registerhandler(session->m, session->dest, session->rtcp->prefix, + handle_rtcp_packet, session->rtcp) == -1) { + LOGGER_WARNING("Failed to register rtcp receive handler"); + custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, NULL, NULL); + return -1; + } + + return 0; +} +int rtp_stop_receiving(RTPSession* session) +{ + if (session == NULL) + return -1; + + custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, NULL, NULL); + custom_lossy_packet_registerhandler(session->m, session->dest, session->rtcp->prefix, NULL, NULL); /* RTCP */ + + return 0; +} +int rtp_send_msg ( RTPSession *session, const uint8_t *data, uint16_t length ) +{ + if ( !session ) { + LOGGER_WARNING("No session!"); + return -1; + } + + uint8_t parsed[MAX_RTP_SIZE]; + uint8_t *it; + + RTPHeader header[1]; + build_header(session, header); + + uint32_t parsed_len = length + header->length + 1; + + parsed[0] = session->prefix; + + it = parse_header_out ( header, parsed + 1 ); + + if ( session->ext_header ) { + parsed_len += ( 4 /* Minimum ext header len */ + session->ext_header->length * size_32 ); + it = parse_ext_header_out ( session->ext_header, it ); + } + + memcpy ( it, data, length ); + + if ( -1 == send_custom_lossy_packet(session->m, session->dest, parsed, parsed_len) ) { + LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", length, strerror(errno)); + return -1; + } + + /* Set sequ number */ + session->sequnum = session->sequnum >= MAX_SEQU_NUM ? 0 : session->sequnum + 1; + return 0; +} +void rtp_free_msg ( RTPSession *session, RTPMessage *msg ) +{ + if ( !session ) { + if ( msg->ext_header ) { + free ( msg->ext_header->table ); + free ( msg->ext_header ); + } + } else { + if ( msg->ext_header && session->ext_header != msg->ext_header ) { + free ( msg->ext_header->table ); + free ( msg->ext_header ); + } + } + + free ( msg->header ); + free ( msg ); } -/** - * Extracts header from payload. - */ -RTPHeader *extract_header ( const uint8_t *payload, int length ) + + +RTPHeader *parse_header_in ( const uint8_t *payload, int length ) { if ( !payload || !length ) { LOGGER_WARNING("No payload to extract!"); @@ -111,8 +323,6 @@ RTPHeader *extract_header ( const uint8_t *payload, int length ) return NULL; } - memset(retu->csrc, 0, 16 * sizeof (uint32_t)); - retu->marker_payloadt = *it; ++it; retu->length = total; @@ -125,7 +335,6 @@ RTPHeader *extract_header ( const uint8_t *payload, int length ) retu->ssrc = ntohl(retu->ssrc); uint8_t x; - for ( x = 0; x < cc; x++ ) { it += 4; memcpy(&retu->csrc[x], it, sizeof(retu->csrc[x])); @@ -134,11 +343,7 @@ RTPHeader *extract_header ( const uint8_t *payload, int length ) return retu; } - -/** - * Extracts external header from payload. Must be called AFTER extract_header()! - */ -RTPExtHeader *extract_ext_header ( const uint8_t *payload, uint16_t length ) +RTPExtHeader *parse_ext_header_in ( const uint8_t *payload, uint16_t length ) { const uint8_t *it = payload; @@ -182,11 +387,47 @@ RTPExtHeader *extract_ext_header ( const uint8_t *payload, uint16_t length ) return retu; } +RTPMessage *msg_parse ( const uint8_t *data, int length ) +{ + RTPMessage *retu = calloc(1, sizeof (RTPMessage)); -/** - * Adds header to payload. Make sure _payload_ has enough space. - */ -uint8_t *add_header ( RTPHeader *header, uint8_t *payload ) + retu->header = parse_header_in ( data, length ); /* It allocates memory and all */ + + if ( !retu->header ) { + LOGGER_WARNING("Header failed to extract!"); + free(retu); + return NULL; + } + + uint16_t from_pos = retu->header->length; + retu->length = length - from_pos; + + if ( GET_FLAG_EXTENSION ( retu->header ) ) { + retu->ext_header = parse_ext_header_in ( data + from_pos, length ); + + if ( retu->ext_header ) { + retu->length -= ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 ); + from_pos += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 ); + } else { /* Error */ + LOGGER_WARNING("Ext Header failed to extract!"); + rtp_free_msg(NULL, retu); + return NULL; + } + } else { + retu->ext_header = NULL; + } + + if ( length - from_pos <= MAX_RTP_SIZE ) + memcpy ( retu->data, data + from_pos, length - from_pos ); + else { + LOGGER_WARNING("Invalid length!"); + rtp_free_msg(NULL, retu); + return NULL; + } + + return retu; +} +uint8_t *parse_header_out ( const RTPHeader *header, uint8_t *payload ) { uint8_t cc = GET_FLAG_CSRCC ( header ); uint8_t *it = payload; @@ -223,11 +464,7 @@ uint8_t *add_header ( RTPHeader *header, uint8_t *payload ) return it + 4; } - -/** - * Adds extension header to payload. Make sure _payload_ has enough space. - */ -uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload ) +uint8_t *parse_ext_header_out ( const RTPExtHeader *header, uint8_t *payload ) { uint8_t *it = payload; uint16_t length; @@ -242,9 +479,7 @@ uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload ) it -= 2; /* Return to 0 position */ if ( header->table ) { - uint16_t x; - for ( x = 0; x < header->length; x++ ) { it += 4; entry = htonl(header->table[x]); @@ -254,92 +489,45 @@ uint8_t *add_ext_header ( RTPExtHeader *header, uint8_t *payload ) return it + 4; } - -/** - * Builds header from control session values. - */ -RTPHeader *build_header ( RTPSession *session ) +void build_header ( RTPSession *session, RTPHeader *header ) { - RTPHeader *retu = calloc ( 1, sizeof (RTPHeader) ); + ADD_FLAG_VERSION ( header, session->version ); + ADD_FLAG_PADDING ( header, session->padding ); + ADD_FLAG_EXTENSION ( header, session->extension ); + ADD_FLAG_CSRCC ( header, session->cc ); + ADD_SETTING_MARKER ( header, session->marker ); + ADD_SETTING_PAYLOAD ( header, session->payload_type ); - if ( !retu ) { - LOGGER_WARNING("Alloc failed! Program might misbehave!"); - return NULL; - } - - ADD_FLAG_VERSION ( retu, session->version ); - ADD_FLAG_PADDING ( retu, session->padding ); - ADD_FLAG_EXTENSION ( retu, session->extension ); - ADD_FLAG_CSRCC ( retu, session->cc ); - ADD_SETTING_MARKER ( retu, session->marker ); - ADD_SETTING_PAYLOAD ( retu, session->payload_type ); - - retu->sequnum = session->sequnum; - retu->timestamp = current_time_monotonic(); /* milliseconds */ - retu->ssrc = session->ssrc; + header->sequnum = session->sequnum; + header->timestamp = current_time_monotonic(); /* milliseconds */ + header->ssrc = session->ssrc; int i; - for ( i = 0; i < session->cc; i++ ) - retu->csrc[i] = session->csrc[i]; + header->csrc[i] = session->csrc[i]; - retu->length = 12 /* Minimum header len */ + ( session->cc * size_32 ); - - return retu; + header->length = 12 /* Minimum header len */ + ( session->cc * size_32 ); } - - -/** - * Parses data into RTPMessage struct. Stores headers separately from the payload data - * and so the length variable is set accordingly. - */ -RTPMessage *msg_parse ( const uint8_t *data, int length ) +void send_rtcp_report(RTCPSession* session, Messenger* m, int32_t friendnumber) { - RTPMessage *retu = calloc(1, sizeof (RTPMessage)); - - retu->header = extract_header ( data, length ); /* It allocates memory and all */ - - if ( !retu->header ) { - LOGGER_WARNING("Header failed to extract!"); - free(retu); - return NULL; - } - - uint16_t from_pos = retu->header->length; - retu->length = length - from_pos; - - - - if ( GET_FLAG_EXTENSION ( retu->header ) ) { - retu->ext_header = extract_ext_header ( data + from_pos, length ); - - if ( retu->ext_header ) { - retu->length -= ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 ); - from_pos += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 ); - } else { /* Error */ - LOGGER_WARNING("Ext Header failed to extract!"); - rtp_free_msg(NULL, retu); - return NULL; - } - } else { - retu->ext_header = NULL; - } - - if ( length - from_pos <= MAX_RTP_SIZE ) - memcpy ( retu->data, data + from_pos, length - from_pos ); - else { - LOGGER_WARNING("Invalid length!"); - rtp_free_msg(NULL, retu); - return NULL; - } - - return retu; + if (session->last_expected_packets == 0) + return; + + uint8_t parsed[9]; + parsed[0] = session->prefix; + + uint32_t packets_missing = htonl(session->last_missing_packets); + uint32_t expected_packets = htonl(session->last_expected_packets); + + memcpy(parsed + 1, &packets_missing, 4); + memcpy(parsed + 5, &expected_packets, 4); + + if (-1 == send_custom_lossy_packet(m, friendnumber, parsed, sizeof(parsed))) + LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", sizeof(parsed), strerror(errno)); + else + session->last_sent_report_ts = current_time_monotonic(); } - -/** - * Callback for networking core. - */ -int rtp_handle_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object ) +int handle_rtp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object ) { RTPSession *session = object; RTPMessage *msg; @@ -357,178 +545,37 @@ int rtp_handle_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, } /* Check if message came in late */ - if ( check_late_message(session, msg) < 0 ) { /* Not late */ + if ( msg->header->sequnum > session->rsequnum && msg->header->timestamp > session->rtimestamp ) { + /* Not late */ session->rsequnum = msg->header->sequnum; - session->timestamp = msg->header->timestamp; + session->rtimestamp = msg->header->timestamp; } queue_message(session, msg); return 0; } - -/** - * Allocate message and store data there - */ -RTPMessage *rtp_new_message ( RTPSession *session, const uint8_t *data, uint32_t length ) +int handle_rtcp_packet ( Messenger *m, int32_t friendnumber, const uint8_t *data, uint32_t length, void *object ) { - if ( !session ) { - LOGGER_WARNING("No session!"); - return NULL; - } - - uint8_t *from_pos; - RTPMessage *retu = calloc(1, sizeof (RTPMessage)); - - if ( !retu ) { - LOGGER_WARNING("Alloc failed! Program might misbehave!"); - return NULL; - } - - /* Sets header values and copies the extension header in retu */ - retu->header = build_header ( session ); /* It allocates memory and all */ - retu->ext_header = session->ext_header; - - - uint32_t total_length = length + retu->header->length + 1; - - retu->data[0] = session->prefix; - - if ( retu->ext_header ) { - total_length += ( 4 /* Minimum ext header len */ + retu->ext_header->length * size_32 ); - - from_pos = add_header ( retu->header, retu->data + 1 ); - from_pos = add_ext_header ( retu->ext_header, from_pos + 1 ); - } else { - from_pos = add_header ( retu->header, retu->data + 1 ); - } - - /* - * Parses the extension header into the message - * Of course if any - */ - - /* Appends data on to retu->data */ - memcpy ( from_pos, data, length ); - - retu->length = total_length; - - return retu; -} - - - -RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num ) -{ - RTPSession *retu = calloc(1, sizeof(RTPSession)); - - if ( !retu ) { - LOGGER_WARNING("Alloc failed! Program might misbehave!"); - return NULL; - } - - retu->version = RTP_VERSION; /* It's always 2 */ - retu->padding = 0; /* If some additional data is needed about the packet */ - retu->extension = 0; /* If extension to header is needed */ - retu->cc = 1; /* Amount of contributors */ - retu->csrc = NULL; /* Container */ - retu->ssrc = random_int(); - retu->marker = 0; - retu->payload_type = payload_type % 128; - - retu->dest = friend_num; - - retu->rsequnum = retu->sequnum = 0; - - retu->ext_header = NULL; /* When needed allocate */ - - - if ( !(retu->csrc = calloc(1, sizeof (uint32_t))) ) { - LOGGER_WARNING("Alloc failed! Program might misbehave!"); - free(retu); - return NULL; - } - - retu->csrc[0] = retu->ssrc; /* Set my ssrc to the list receive */ - - /* Also set payload type as prefix */ - retu->prefix = payload_type; - - retu->m = messenger; - /* - * - */ - return retu; -} - -void rtp_kill ( RTPSession *session ) -{ - if ( !session ) return; - - rtp_stop_receiving (session); - - free ( session->ext_header ); - free ( session->csrc ); - - LOGGER_DEBUG("Terminated RTP session: %p", session); - - /* And finally free session */ - free ( session ); -} - -int rtp_start_receiving(RTPSession* session) -{ - if (session == NULL) - return 0; - - LOGGER_DEBUG("Registering packet handler: pt: %d; friend: %d", session->prefix, session->dest); - return custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, - rtp_handle_packet, session); -} - -int rtp_stop_receiving(RTPSession* session) -{ - if (session == NULL) - return 0; - - LOGGER_DEBUG("Unregistering packet handler: pt: %d; friend: %d", session->prefix, session->dest); - return custom_lossy_packet_registerhandler(session->m, session->dest, session->prefix, - NULL, NULL); -} - -int rtp_send_msg ( RTPSession *session, const uint8_t *data, uint16_t length ) -{ - RTPMessage *msg = rtp_new_message (session, data, length); - - if ( !msg ) return -1; - - if ( -1 == send_custom_lossy_packet(session->m, session->dest, msg->data, msg->length) ) { - LOGGER_WARNING("Failed to send full packet (len: %d)! std error: %s", length, strerror(errno)); - rtp_free_msg ( session, msg ); + if (length < 9) return -1; + + RTCPSession* session = object; + RTCPReport* report = malloc(sizeof(RTCPReport)); + + memcpy(&report->packets_missing, data + 1, 4); + memcpy(&report->expected_packets, data + 5, 4); + + report->packets_missing = ntohl(report->packets_missing); + report->expected_packets = ntohl(report->expected_packets); + + /* This would cause undefined behaviour */ + if (report->expected_packets == 0) { + free(report); + return 0; } + report->timestamp = current_time_monotonic(); - /* Set sequ number */ - session->sequnum = session->sequnum >= MAX_SEQU_NUM ? 0 : session->sequnum + 1; - rtp_free_msg ( session, msg ); - + free(rb_write(session->pl_stats, report)); return 0; -} - -void rtp_free_msg ( RTPSession *session, RTPMessage *msg ) -{ - if ( !session ) { - if ( msg->ext_header ) { - free ( msg->ext_header->table ); - free ( msg->ext_header ); - } - } else { - if ( msg->ext_header && session->ext_header != msg->ext_header ) { - free ( msg->ext_header->table ); - free ( msg->ext_header ); - } - } - - free ( msg->header ); - free ( msg ); -} +} \ No newline at end of file diff --git a/toxav/rtp.h b/toxav/rtp.h index 6b796d5a9..fa5af9fe1 100644 --- a/toxav/rtp.h +++ b/toxav/rtp.h @@ -23,8 +23,6 @@ #define RTP_H #define RTP_VERSION 2 -#include -// #include #include "../toxcore/Messenger.h" @@ -51,8 +49,8 @@ typedef enum { rtp_TypeVideo } RTPPayloadType; -/** - * Standard rtp header +/** + * Standard rtp header. */ typedef struct { uint8_t flags; /* Version(2),Padding(1), Ext(1), Cc(4) */ @@ -62,17 +60,14 @@ typedef struct { uint32_t ssrc; /* SSRC */ uint32_t csrc[16]; /* CSRC's table */ uint32_t length; /* Length of the header in payload string. */ - } RTPHeader; - -/** +/** * Standard rtp extension header. */ typedef struct { uint16_t type; /* Extension profile */ uint16_t length; /* Number of extensions */ uint32_t *table; /* Extension's table */ - } RTPExtHeader; /** @@ -90,31 +85,32 @@ typedef struct { * RTP control session. */ typedef struct { - uint8_t version; - uint8_t padding; - uint8_t extension; - uint8_t cc; - uint8_t marker; - uint8_t payload_type; - uint16_t sequnum; /* Set when sending */ - uint16_t rsequnum; /* Check when recving msg */ - uint32_t timestamp; - uint32_t ssrc; - uint32_t *csrc; + uint8_t version; + uint8_t padding; + uint8_t extension; + uint8_t cc; + uint8_t marker; + uint8_t payload_type; + uint16_t sequnum; /* Sending sequence number */ + uint16_t rsequnum; /* Receiving sequence number */ + uint32_t rtimestamp; + uint32_t ssrc; + uint32_t *csrc; /* If some additional data must be sent via message * apply it here. Only by allocating this member you will be * automatically placing it within a message. */ - RTPExtHeader *ext_header; + RTPExtHeader *ext_header; /* Msg prefix for core to know when recving */ - uint8_t prefix; + uint8_t prefix; - int dest; + int dest; - struct CSession_s *cs; - Messenger *m; + struct RTCPSession_s *rtcp; + struct CSession_s *cs; + Messenger *m; } RTPSession; @@ -128,6 +124,11 @@ RTPSession *rtp_new ( int payload_type, Messenger *messenger, int friend_num ); */ void rtp_kill ( RTPSession* session ); +/** + * Do periodical rtp work. + */ +void rtp_do(RTPSession *session); + /** * By default rtp is not in receiving state */ diff --git a/toxav/toxav.c b/toxav/toxav.c index bd788d7d7..721b9d91f 100644 --- a/toxav/toxav.c +++ b/toxav/toxav.c @@ -220,6 +220,9 @@ void toxav_iterate(ToxAV* av) LOGGED_UNLOCK(av->mutex); cs_do(i->cs); + rtp_do(i->rtps[0]); + rtp_do(i->rtps[1]); + if (i->last_self_capabilities & msi_CapRAudio) /* Receiving audio */ rc = MIN(i->cs->last_packet_frame_duration, rc); if (i->last_self_capabilities & msi_CapRVideo) /* Receiving video */ diff --git a/toxcore/util.c b/toxcore/util.c index 5a72c4a48..d6db946da 100644 --- a/toxcore/util.c +++ b/toxcore/util.c @@ -185,3 +185,76 @@ int create_recursive_mutex(pthread_mutex_t *mutex) return 0; } + + +struct RingBuffer { + uint16_t size; /* Max size */ + uint16_t start; + uint16_t end; + void **data; +}; + +bool rb_full(const RingBuffer *b) +{ + return (b->end + 1) % b->size == b->start; +} +bool rb_empty(const RingBuffer *b) +{ + return b->end == b->start; +} +void* rb_write(RingBuffer *b, void *p) +{ + void* rc = NULL; + if ((b->end + 1) % b->size == b->start) /* full */ + rc = b->data[b->start]; + + b->data[b->end] = p; + b->end = (b->end + 1) % b->size; + + if (b->end == b->start) + b->start = (b->start + 1) % b->size; + + return rc; +} +bool rb_read(RingBuffer *b, void **p) +{ + if (b->end == b->start) { /* Empty */ + *p = NULL; + return false; + } + + *p = b->data[b->start]; + b->start = (b->start + 1) % b->size; + return true; +} +void rb_clear(RingBuffer *b) +{ + while (!rb_empty(b)) { + void *p; + rb_read(b, &p); + free(p); + } +} +RingBuffer *rb_new(int size) +{ + RingBuffer *buf = calloc(sizeof(RingBuffer), 1); + + if (!buf) return NULL; + + buf->size = size + 1; /* include empty elem */ + + if (!(buf->data = calloc(buf->size, sizeof(void *)))) { + free(buf); + return NULL; + } + + return buf; +} +void rb_free(RingBuffer *b) +{ + if (b) { + rb_clear(b); + free(b->data); + free(b); + } +} \ No newline at end of file diff --git a/toxcore/util.h b/toxcore/util.h index fab26e294..6c3d3b389 100644 --- a/toxcore/util.h +++ b/toxcore/util.h @@ -30,6 +30,7 @@ #include #define MIN(a,b) (((a)<(b))?(a):(b)) +#define PAIR(TYPE1__, TYPE2__) struct { TYPE1__ first; TYPE2__ second; } void unix_time_update(); uint64_t unix_time(); @@ -56,4 +57,13 @@ int load_state(load_state_callback_func load_state_callback, void *outer, /* Returns -1 if failed or 0 if success */ int create_recursive_mutex(pthread_mutex_t *mutex); +/* Ring buffer */ +typedef struct RingBuffer RingBuffer; +bool rb_full(const RingBuffer *b); +bool rb_empty(const RingBuffer *b); +void* rb_write(RingBuffer* b, void* p); +bool rb_read(RingBuffer* b, void** p); +void rb_clear(RingBuffer *b); +RingBuffer *rb_new(int size); +void rb_free(RingBuffer *b); #endif /* __UTIL_H__ */