implemented rtmp aggregate message support; fixed rtmp acks

This commit is contained in:
Roman Arutyunyan 2012-12-17 21:22:51 +04:00
parent 5c947253af
commit 0a4296b9a2
5 changed files with 167 additions and 6 deletions

View file

@ -417,6 +417,10 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]);
*eh = ngx_rtmp_user_message_handler;
/* aggregate to audio/video map */
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AGGREGATE]);
*eh = ngx_rtmp_aggregate_message_handler;
/* init amf callbacks */
ngx_array_init(&cmcf->amf_arrays, cf->pool, 1, sizeof(ngx_hash_key_t));

View file

@ -139,6 +139,7 @@ typedef struct {
#define NGX_RTMP_USER_PING_REQUEST 6
#define NGX_RTMP_USER_PING_RESPONSE 7
#define NGX_RTMP_USER_UNKNOWN 8
#define NGX_RTMP_USER_BUFFER_END 31
/* Chunk header:
@ -183,6 +184,7 @@ typedef struct {
/* client buffer time in msec */
uint32_t buflen;
uint32_t ack_size;
/* connection parameters */
ngx_str_t app;
@ -408,10 +410,14 @@ ngx_rtmp_r64(uint64_t n)
/* Receiving messages */
ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_aggregate_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s,

View file

@ -187,7 +187,7 @@ ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
/* save AVC/AAC header */
if (in->buf->last - in->buf->pos < 2) {
if (in->buf->last - in->buf->pos < 3) {
return NGX_OK;
}

View file

@ -10,8 +10,6 @@
static void ngx_rtmp_recv(ngx_event_t *rev);
static void ngx_rtmp_send(ngx_event_t *rev);
static void ngx_rtmp_ping(ngx_event_t *rev);
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s);
@ -64,6 +62,7 @@ ngx_rtmp_user_message_type(uint16_t evt)
"stream dry",
"set_buflen",
"recorded",
"",
"ping_request",
"ping_response",
};
@ -267,7 +266,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
b->last += n;
s->in_bytes += n;
if (s->in_bytes - s->in_last_ack >= cscf->ack_window) {
if (s->ack_size && s->in_bytes - s->in_last_ack >= s->ack_size) {
s->in_last_ack = s->in_bytes;
@ -736,7 +735,7 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
}
static ngx_int_t
ngx_int_t
ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
@ -764,7 +763,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
}
#endif
if (h->type >= NGX_RTMP_MSG_MAX) {
if (h->type > NGX_RTMP_MSG_MAX) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"unexpected RTMP message type: %d", (int)h->type);
return NGX_OK;

View file

@ -53,6 +53,7 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
/* receive window size =val */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive ack_size=%uD", val);
s->ack_size = val;
break;
case NGX_RTMP_MSG_BANDWIDTH:
@ -212,6 +213,157 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
static ngx_int_t
ngx_rtmp_fetch(ngx_chain_t **in, u_char *ret)
{
while (*in && (*in)->buf->pos >= (*in)->buf->last) {
*in = (*in)->next;
}
if (*in == NULL) {
return NGX_DONE;
}
*ret = *(*in)->buf->pos++;
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_fetch_uint8(ngx_chain_t **in, uint8_t *ret)
{
return ngx_rtmp_fetch(in, (u_char *) ret);
}
static ngx_int_t
ngx_rtmp_fetch_uint32(ngx_chain_t **in, uint32_t *ret, ngx_int_t n)
{
u_char *r = (u_char *) ret;
ngx_int_t rc;
*ret = 0;
while (--n >= 0) {
rc = ngx_rtmp_fetch(in, &r[n]);
if (rc != NGX_OK) {
return rc;
}
}
return NGX_OK;
}
ngx_int_t
ngx_rtmp_aggregate_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
uint32_t base_time, timestamp, prev_size;
size_t len;
ngx_int_t first;
u_char *last;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *cl, *next;
ngx_rtmp_header_t ch;
ch = *h;
first = 1;
base_time = 0;
while (in) {
if (ngx_rtmp_fetch_uint8(&in, &ch.type) != NGX_OK) {
return NGX_OK;
}
if (ngx_rtmp_fetch_uint32(&in, &ch.mlen, 3) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_rtmp_fetch_uint32(&in, &timestamp, 3) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_rtmp_fetch_uint8(&in, (uint8_t *) &timestamp + 3) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_rtmp_fetch_uint32(&in, &ch.msid, 3) != NGX_OK)
{
return NGX_ERROR;
}
if (first) {
base_time = timestamp;
first = 0;
}
ngx_log_debug6(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP aggregate %s (%d) len=%uD time=%uD (+%D) msid=%uD",
ngx_rtmp_message_type(ch.type),
(ngx_int_t) ch.type, ch.mlen, ch.timestamp,
timestamp - base_time, ch.msid);
/* limit chain */
len = 0;
cl = in;
while (cl) {
b = cl->buf;
len += (b->last - b->pos);
if (len > ch.mlen) {
break;
}
cl = cl->next;
}
if (cl == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"RTMP error parsing aggregate");
return NGX_ERROR;
}
next = cl->next;
cl->next = NULL;
b = cl->buf;
last = b->last;
b->last -= (len - ch.mlen);
/* handle aggregated message */
ch.timestamp = h->timestamp + timestamp - base_time;
rc = ngx_rtmp_receive_message(s, &ch, in);
/* restore chain before checking the result */
in = cl;
in->next = next;
b->pos = b->last;
b->last = last;
if (rc != NGX_OK) {
return rc;
}
/* read 32-bit previous tag size */
if (ngx_rtmp_fetch_uint32(&in, &prev_size, 4) != NGX_OK) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP aggregate prev_size=%uD", prev_size);
}
return NGX_OK;
}
ngx_int_t
ngx_rtmp_amf_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)