From 0a4296b9a22613fa5783c1842ab1120f81dda209 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 17 Dec 2012 21:22:51 +0400 Subject: [PATCH] implemented rtmp aggregate message support; fixed rtmp acks --- ngx_rtmp.c | 4 ++ ngx_rtmp.h | 6 ++ ngx_rtmp_codec_module.c | 2 +- ngx_rtmp_handler.c | 9 ++- ngx_rtmp_receive.c | 152 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 6 deletions(-) diff --git a/ngx_rtmp.c b/ngx_rtmp.c index dbd8ab6..1962f5e 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -417,6 +417,10 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]); *eh = ngx_rtmp_user_message_handler; + /* aggregate to audio/video map */ + eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AGGREGATE]); + *eh = ngx_rtmp_aggregate_message_handler; + /* init amf callbacks */ ngx_array_init(&cmcf->amf_arrays, cf->pool, 1, sizeof(ngx_hash_key_t)); diff --git a/ngx_rtmp.h b/ngx_rtmp.h index aa22db3..635a982 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -139,6 +139,7 @@ typedef struct { #define NGX_RTMP_USER_PING_REQUEST 6 #define NGX_RTMP_USER_PING_RESPONSE 7 #define NGX_RTMP_USER_UNKNOWN 8 +#define NGX_RTMP_USER_BUFFER_END 31 /* Chunk header: @@ -183,6 +184,7 @@ typedef struct { /* client buffer time in msec */ uint32_t buflen; + uint32_t ack_size; /* connection parameters */ ngx_str_t app; @@ -408,10 +410,14 @@ ngx_rtmp_r64(uint64_t n) /* Receiving messages */ +ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); ngx_int_t ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); +ngx_int_t ngx_rtmp_aggregate_message_handler(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); ngx_int_t ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); ngx_int_t ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s, diff --git a/ngx_rtmp_codec_module.c b/ngx_rtmp_codec_module.c index 3f2be90..dd50e35 100644 --- a/ngx_rtmp_codec_module.c +++ b/ngx_rtmp_codec_module.c @@ -187,7 +187,7 @@ ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } /* save AVC/AAC header */ - if (in->buf->last - in->buf->pos < 2) { + if (in->buf->last - in->buf->pos < 3) { return NGX_OK; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index d9d8b49..485db63 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -10,8 +10,6 @@ static void ngx_rtmp_recv(ngx_event_t *rev); static void ngx_rtmp_send(ngx_event_t *rev); static void ngx_rtmp_ping(ngx_event_t *rev); -static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); static ngx_int_t ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s); @@ -64,6 +62,7 @@ ngx_rtmp_user_message_type(uint16_t evt) "stream dry", "set_buflen", "recorded", + "", "ping_request", "ping_response", }; @@ -267,7 +266,7 @@ ngx_rtmp_recv(ngx_event_t *rev) b->last += n; s->in_bytes += n; - if (s->in_bytes - s->in_last_ack >= cscf->ack_window) { + if (s->ack_size && s->in_bytes - s->in_last_ack >= s->ack_size) { s->in_last_ack = s->in_bytes; @@ -736,7 +735,7 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, } -static ngx_int_t +ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { @@ -764,7 +763,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, } #endif - if (h->type >= NGX_RTMP_MSG_MAX) { + if (h->type > NGX_RTMP_MSG_MAX) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "unexpected RTMP message type: %d", (int)h->type); return NGX_OK; diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index 661aa02..eb8a569 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -53,6 +53,7 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, /* receive window size =val */ ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "receive ack_size=%uD", val); + s->ack_size = val; break; case NGX_RTMP_MSG_BANDWIDTH: @@ -212,6 +213,157 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } +static ngx_int_t +ngx_rtmp_fetch(ngx_chain_t **in, u_char *ret) +{ + while (*in && (*in)->buf->pos >= (*in)->buf->last) { + *in = (*in)->next; + } + + if (*in == NULL) { + return NGX_DONE; + } + + *ret = *(*in)->buf->pos++; + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_fetch_uint8(ngx_chain_t **in, uint8_t *ret) +{ + return ngx_rtmp_fetch(in, (u_char *) ret); +} + + +static ngx_int_t +ngx_rtmp_fetch_uint32(ngx_chain_t **in, uint32_t *ret, ngx_int_t n) +{ + u_char *r = (u_char *) ret; + ngx_int_t rc; + + *ret = 0; + + while (--n >= 0) { + rc = ngx_rtmp_fetch(in, &r[n]); + if (rc != NGX_OK) { + return rc; + } + } + + return NGX_OK; +} + + +ngx_int_t +ngx_rtmp_aggregate_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + uint32_t base_time, timestamp, prev_size; + size_t len; + ngx_int_t first; + u_char *last; + ngx_int_t rc; + ngx_buf_t *b; + ngx_chain_t *cl, *next; + ngx_rtmp_header_t ch; + + ch = *h; + + first = 1; + base_time = 0; + + while (in) { + if (ngx_rtmp_fetch_uint8(&in, &ch.type) != NGX_OK) { + return NGX_OK; + } + + if (ngx_rtmp_fetch_uint32(&in, &ch.mlen, 3) != NGX_OK) { + return NGX_ERROR; + } + + if (ngx_rtmp_fetch_uint32(&in, ×tamp, 3) != NGX_OK) { + return NGX_ERROR; + } + + if (ngx_rtmp_fetch_uint8(&in, (uint8_t *) ×tamp + 3) != NGX_OK) + { + return NGX_ERROR; + } + + if (ngx_rtmp_fetch_uint32(&in, &ch.msid, 3) != NGX_OK) + { + return NGX_ERROR; + } + + if (first) { + base_time = timestamp; + first = 0; + } + + ngx_log_debug6(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP aggregate %s (%d) len=%uD time=%uD (+%D) msid=%uD", + ngx_rtmp_message_type(ch.type), + (ngx_int_t) ch.type, ch.mlen, ch.timestamp, + timestamp - base_time, ch.msid); + + /* limit chain */ + + len = 0; + cl = in; + while (cl) { + b = cl->buf; + len += (b->last - b->pos); + if (len > ch.mlen) { + break; + } + cl = cl->next; + } + + if (cl == NULL) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "RTMP error parsing aggregate"); + return NGX_ERROR; + } + + next = cl->next; + cl->next = NULL; + b = cl->buf; + last = b->last; + b->last -= (len - ch.mlen); + + /* handle aggregated message */ + + ch.timestamp = h->timestamp + timestamp - base_time; + + rc = ngx_rtmp_receive_message(s, &ch, in); + + /* restore chain before checking the result */ + + in = cl; + in->next = next; + b->pos = b->last; + b->last = last; + + if (rc != NGX_OK) { + return rc; + } + + /* read 32-bit previous tag size */ + + if (ngx_rtmp_fetch_uint32(&in, &prev_size, 4) != NGX_OK) { + return NGX_OK; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "RTMP aggregate prev_size=%uD", prev_size); + } + + return NGX_OK; +} + + ngx_int_t ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in)