From 96ebed857347b368cb78749de3515797eb0d9364 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Sun, 18 Mar 2012 17:09:19 +0400 Subject: [PATCH] implemented frame dropping --- ngx_rtmp.h | 17 ++++-- ngx_rtmp_broadcast_module.c | 109 +++++++++++++++++++++++------------- ngx_rtmp_core_module.c | 34 ++++++++--- ngx_rtmp_handler.c | 55 +++++++++--------- ngx_rtmp_send.c | 4 +- ngx_rtmp_shared.c | 55 +++++++++--------- test/nginx.conf | 2 + 7 files changed, 173 insertions(+), 103 deletions(-) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 433bdab..0f2f216 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -226,10 +226,12 @@ typedef struct ngx_rtmp_core_srv_conf_s { ngx_uint_t ack_window; - ngx_int_t out_chunk_size; - ngx_pool_t *out_pool; - ngx_chain_t *out_free; - ngx_chain_t *out_free_chains; + ngx_int_t chunk_size; + ngx_pool_t *pool; + ngx_chain_t *free; + ngx_chain_t *free_chains; + size_t max_buf; + ngx_flag_t wait_key_frame; ngx_rtmp_conf_ctx_t *ctx; } ngx_rtmp_core_srv_conf_t; @@ -312,7 +314,12 @@ ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, /* Sending messages */ void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_header_t *lh, ngx_chain_t *out); -ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out); +ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, + ngx_uint_t priority); + +/* Note on priorities: + * the bigger value the lower the priority. + * priority=0 is the highest */ #define NGX_RTMP_LIMIT_SOFT 0 #define NGX_RTMP_LIMIT_HARD 1 diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index 4bb7054..c9812ee 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -8,6 +8,20 @@ #include "ngx_rtmp.h" +/* Standard stream ids for broadcasting */ +#define NGX_RTMP_BROADCAST_MSID 1 +#define NGX_RTMP_BROADCAST_CSID_AMF0 5 +#define NGX_RTMP_BROADCAST_CSID_AUDIO 6 +#define NGX_RTMP_BROADCAST_CSID_VIDEO 7 + + +/* Frame cutoff */ +#define NGX_RTMP_CUTOFF_ALL 0 +#define NGX_RTMP_CUTOFF_KEY 1 +#define NGX_RTMP_CUTOFF_INTER 2 +#define NGX_RTMP_CUTOFF_DISPOSABLE 3 + + static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf); static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, @@ -93,7 +107,7 @@ ngx_module_t ngx_rtmp_broadcast_module = { #define NGX_RTMP_BROADCAST_PUBLISHER 0x01 #define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02 -#define NGX_RTMP_BROADCAST_WANT_KEYFRAME 0x04 +#define NGX_RTMP_BROADCAST_KEYFRAME 0x04 #define NGX_RTMP_BROADCAST_DATA_FRAME 0x08 @@ -101,9 +115,8 @@ typedef struct ngx_rtmp_broadcast_ctx_s { ngx_str_t stream; ngx_rtmp_session_t *session; struct ngx_rtmp_broadcast_ctx_s *next; - ngx_uint_t flags; /* publisher/subscriber */ + ngx_uint_t flags; uint32_t csid; - ngx_rtmp_header_t lh; /* last a/v header */ ngx_chain_t *data_frame; } ngx_rtmp_broadcast_ctx_t; @@ -233,6 +246,7 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) #define NGX_RTMP_VIDEO_KEY_FRAME 1 #define NGX_RTMP_VIDEO_INTER_FRAME 2 #define NGX_RTMP_VIDEO_DISPOSABLE_FRAME 3 +#define NGX_RTMP_AUDIO_FRAME NGX_RTMP_VIDEO_KEY_FRAME static ngx_int_t @@ -249,16 +263,30 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_connection_t *c; ngx_rtmp_broadcast_ctx_t *ctx, *cctx; ngx_chain_t *out; - ngx_int_t vftype; ngx_rtmp_core_srv_conf_t *cscf; - ngx_rtmp_header_t sh; + ngx_rtmp_header_t sh; + ngx_rtmp_session_t *ss; + ngx_uint_t priority; + int keyframe; c = s->connection; - sh = *h; - sh.csid = 4; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + sh = *h; + keyframe = 0; + if (h->type == NGX_RTMP_MSG_VIDEO) { + sh.csid = NGX_RTMP_BROADCAST_CSID_VIDEO; + priority = ngx_rtmp_get_video_frame_type(in); + if (priority == NGX_RTMP_VIDEO_KEY_FRAME) { + keyframe = 1; + } + + } else { + sh.csid = NGX_RTMP_BROADCAST_CSID_AUDIO; + priority = NGX_RTMP_AUDIO_FRAME; + } + if (ctx == NULL || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) { @@ -271,14 +299,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_OK; } - vftype = 0; - if (sh.type == NGX_RTMP_MSG_VIDEO) { - vftype = ngx_rtmp_get_video_frame_type(in); - } - out = ngx_rtmp_append_shared_bufs(cscf, NULL, in); - ngx_rtmp_prepare_message(s, &sh, &ctx->lh, out); + ngx_rtmp_prepare_message(s, &sh, NULL, out); /* broadcast to all subscribers */ for (cctx = *ngx_rtmp_broadcast_get_head(s); @@ -290,6 +313,8 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, && !ngx_strncmp(cctx->stream.data, ctx->stream.data, ctx->stream.len)) { + ss = cctx->session; + /* if we have metadata check if the subscriber * has already received one */ if (ctx->data_frame @@ -298,28 +323,33 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "sending data_frame"); - if (ngx_rtmp_send_message(cctx->session, ctx->data_frame) - != NGX_OK) - { - ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0, - "error sending message"); + switch (ngx_rtmp_send_message(ss, ctx->data_frame, 0)) { + case NGX_OK: + cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME; + break; + case NGX_AGAIN: + break; + default: + ngx_log_error(NGX_LOG_INFO, ss->connection->log, 0, + "error sending message"); } - cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME; } - /* is the subscriber waiting for - * a key frame? */ - if (sh.type == NGX_RTMP_MSG_VIDEO - && cctx->flags & NGX_RTMP_BROADCAST_WANT_KEYFRAME) + /* waiting for a keyframe? */ + if (cscf->wait_key_frame + && sh.type == NGX_RTMP_MSG_VIDEO + && !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME) + && !keyframe) { - if (vftype && vftype != NGX_RTMP_VIDEO_KEY_FRAME) { - continue; - } - cctx->flags &= ~NGX_RTMP_BROADCAST_WANT_KEYFRAME; + continue; } - if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) { - ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0, + if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK) { + if (keyframe) { + cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME; + } + } else { + ngx_log_error(NGX_LOG_INFO, ss->connection->log, 0, "error sending message"); } } @@ -404,7 +434,7 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return ngx_rtmp_send_ack_size(s, cscf->ack_window) || ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC) || ngx_rtmp_send_user_stream_begin(s, 0) - || ngx_rtmp_send_chunk_size(s, cscf->out_chunk_size) + || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) || ngx_rtmp_send_amf0(s, h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) ? NGX_OK @@ -439,7 +469,7 @@ ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_ERROR; } - stream = 1; + stream = NGX_RTMP_BROADCAST_MSID; return ngx_rtmp_send_amf0(s, h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])); @@ -486,7 +516,9 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, "publish() called; pubName='%s' pubType='%s'", pub_name, pub_type); - if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) { + if (ngx_rtmp_send_user_stream_begin(s, + NGX_RTMP_BROADCAST_MSID) != NGX_OK) + { return NGX_ERROR; } @@ -498,7 +530,7 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, memset(&sh, 0, sizeof(sh)); sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.csid = 5; /*FIXME*/ + sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; sh.msid = h->msid; if (ngx_rtmp_send_amf0(s, &sh, out_elts, @@ -566,16 +598,17 @@ ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, "play() called; playame='%s'", play_name); - if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) { + if (ngx_rtmp_send_user_stream_begin(s, + NGX_RTMP_BROADCAST_MSID) != NGX_OK) + { return NGX_ERROR; } - ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER - | NGX_RTMP_BROADCAST_WANT_KEYFRAME); + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER); memset(&sh, 0, sizeof(sh)); sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.csid = 5; /*FIXME*/ + sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; sh.msid = h->msid; ngx_str_set(&out_inf[0], "NetStream.Play.Reset"); @@ -674,7 +707,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } memset(&sh, 0, sizeof(sh)); - sh.csid = 5; + sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; sh.msid = h->msid; sh.type = h->type; diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index b4f714e..8a5db97 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -70,11 +70,25 @@ static ngx_command_t ngx_rtmp_core_commands[] = { offsetof(ngx_rtmp_core_srv_conf_t, ack_window), NULL }, - { ngx_string("out_chunk_size"), + { ngx_string("chunk_size"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_num_slot, NGX_RTMP_SRV_CONF_OFFSET, - offsetof(ngx_rtmp_core_srv_conf_t, out_chunk_size), + offsetof(ngx_rtmp_core_srv_conf_t, chunk_size), + NULL }, + + { ngx_string("max_buf"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_RTMP_SRV_CONF_OFFSET, + offsetof(ngx_rtmp_core_srv_conf_t, max_buf), + NULL }, + + { ngx_string("wait_key_frame"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_RTMP_SRV_CONF_OFFSET, + offsetof(ngx_rtmp_core_srv_conf_t, wait_key_frame), NULL }, ngx_null_command @@ -147,8 +161,10 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf) conf->timeout = NGX_CONF_UNSET_MSEC; conf->so_keepalive = NGX_CONF_UNSET; conf->max_streams = NGX_CONF_UNSET; - conf->out_chunk_size = NGX_CONF_UNSET; + conf->chunk_size = NGX_CONF_UNSET; conf->ack_window = NGX_CONF_UNSET; + conf->max_buf = NGX_CONF_UNSET; + conf->wait_key_frame = NGX_CONF_UNSET; return conf; } @@ -164,17 +180,19 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0); ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16); - ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 4096); + ngx_conf_merge_value(conf->chunk_size, prev->chunk_size, 4096); ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000); + ngx_conf_merge_size_value(conf->max_buf, prev->max_buf, 128 * 1024); + ngx_conf_merge_value(conf->wait_key_frame, prev->wait_key_frame, 1); - if (prev->out_pool == NULL) { - prev->out_pool = ngx_create_pool(8192, cf->log); - if (prev->out_pool == NULL) { + if (prev->pool == NULL) { + prev->pool = ngx_create_pool(8192, cf->log); + if (prev->pool == NULL) { return NGX_CONF_ERROR; } } - conf->out_pool = prev->out_pool; + conf->pool = prev->pool; return NGX_CONF_OK; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 949d281..18d9df2 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -880,9 +880,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, fmt = 0; if (lh && lh->csid && h->msid == lh->msid) { ++fmt; - if (h->type == lh->type - && mlen == lh->mlen) - { + if (h->type == lh->type && mlen == lh->mlen) { ++fmt; if (h->timestamp == lh->timestamp) { ++fmt; @@ -982,22 +980,36 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_int_t -ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) +ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, + ngx_uint_t priority) { - ngx_chain_t *l, **ll, *cout; - size_t nbytes, nbufs, noutbytes, noutbufs; - ngx_connection_t *c; - ngx_buf_t *b; + ngx_chain_t *l, **ll; + ngx_connection_t *c; + ngx_buf_t *b; + ngx_rtmp_core_srv_conf_t *cscf; + size_t nbytes, nbufs, qbytes, qbufs; c = s->connection; + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + qbytes = 0; + qbufs = 0; nbytes = 0; nbufs = 0; - noutbytes = 0; - noutbufs = 0; - /* create locally-linked chain of shared buffers */ - cout = NULL; - ll = &cout; + for(ll = &s->out; *ll; ll = &(*ll)->next) { + qbytes += (*ll)->buf->last - (*ll)->buf->pos; + ++qbufs; + } + + /* drop packet? */ + if (qbytes > cscf->max_buf / (priority + 1)) { + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0, + "drop message bytes=%uz, bufs=%uz priority=%ui", + qbytes, qbufs, priority); + return NGX_AGAIN; + } + + /* append locally-linked chain of shared buffers */ for(l = out; l; l = l->next) { if (s->out_free_chains) { @@ -1028,18 +1040,11 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) } *ll = NULL; - /* TODO: optimize lookup */ - /* TODO: implement dropper */ - for(ll = &s->out; *ll; ll = &(*ll)->next) { - noutbytes += (*ll)->buf->last - (*ll)->buf->pos; - ++noutbufs; - } - - *ll = cout; - - ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP send nbytes=%d (%d), nbufs=%d (%d) ready=%d; active=%d", - nbytes, noutbytes, nbufs, noutbufs, c->write->ready, c->write->active); + ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP send bytes=%uz+%uz, bufs=%uz+%uz, priority=%ui, " + "ready=%d, active=%d", + qbytes, nbytes, qbufs, nbufs, priority, + c->write->ready, c->write->active); ngx_rtmp_send(c->write); diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index 49ba754..098f662 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -43,7 +43,7 @@ #define NGX_RTMP_USER_END(s) \ ngx_rtmp_prepare_message(s, &__h, NULL, __l); \ - return ngx_rtmp_send_message(s, __l); \ + return ngx_rtmp_send_message(s, __l, 0); \ /* Protocol control messages */ @@ -226,7 +226,7 @@ ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, if (act.first) { ngx_rtmp_prepare_message(s, h, NULL, act.first); - return ngx_rtmp_send_message(s, act.first); + return ngx_rtmp_send_message(s, act.first, 0); } return NGX_OK; diff --git a/ngx_rtmp_shared.c b/ngx_rtmp_shared.c index 0f6ae94..421e7b6 100644 --- a/ngx_rtmp_shared.c +++ b/ngx_rtmp_shared.c @@ -1,3 +1,8 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + #include "ngx_rtmp.h" @@ -26,37 +31,37 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf) ngx_buf_t *b; size_t size; - if (cscf->out_free) { - out = cscf->out_free; - cscf->out_free = out->next; + if (cscf->free) { + out = cscf->free; + cscf->free = out->next; } else { - if (cscf->out_free_chains) { - out = cscf->out_free_chains; - cscf->out_free_chains = out->next; + if (cscf->free_chains) { + out = cscf->free_chains; + cscf->free_chains = out->next; } else { - out = ngx_alloc_chain_link(cscf->out_pool); + out = ngx_alloc_chain_link(cscf->pool); if (out == NULL) { return NULL; } - out->buf = ngx_calloc_buf(cscf->out_pool); + out->buf = ngx_calloc_buf(cscf->pool); if (out->buf == NULL) { - ngx_free_chain(cscf->out_pool, out); + ngx_free_chain(cscf->pool, out); return NULL; } } - size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER + size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER + NGX_RTMP_REFCOUNT_BYTES; b = out->buf; - b->start = ngx_palloc(cscf->out_pool, size); + b->start = ngx_palloc(cscf->pool, size); if (b->start == NULL) { - out->next = cscf->out_free_chains; - cscf->out_free_chains = out; + out->next = cscf->free_chains; + cscf->free_chains = out; return NULL; } @@ -88,16 +93,16 @@ ngx_rtmp_free_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *out) if (ngx_rtmp_ref_put(cl->buf->start) == 0) { /* both chain & buf are free; * put the whole chain in free list */ - cl->next = cscf->out_free; - cscf->out_free = cl; + cl->next = cscf->free; + cscf->free = cl; continue; } /* only chain is free; * buf is still used by somebody & will * be freed in ngx_rtmp_free_shared_buf */ - cl->next = cscf->out_free_chains; - cscf->out_free_chains = cl; + cl->next = cscf->free_chains; + cscf->free_chains = cl; } } @@ -118,27 +123,27 @@ ngx_rtmp_free_shared_buf(ngx_rtmp_core_srv_conf_t *cscf, ngx_buf_t *b) return; } - if (cscf->out_free_chains) { - cl = cscf->out_free_chains; - cscf->out_free_chains = cl->next; + if (cscf->free_chains) { + cl = cscf->free_chains; + cscf->free_chains = cl->next; } else { - cl = ngx_alloc_chain_link(cscf->out_pool); + cl = ngx_alloc_chain_link(cscf->pool); if (cl == NULL) { return; } - cl->buf = ngx_calloc_buf(cscf->out_pool); + cl->buf = ngx_calloc_buf(cscf->pool); if (cl->buf == NULL) { - ngx_free_chain(cscf->out_pool, cl); + ngx_free_chain(cscf->pool, cl); return; } } cl->buf->start = b->start; cl->buf->end = b->end; - cl->next = cscf->out_free; - cscf->out_free = cl; + cl->next = cscf->free; + cscf->free = cl; } diff --git a/test/nginx.conf b/test/nginx.conf index 3b3f28c..6e0c473 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -20,6 +20,8 @@ rtmp { listen 1935; + wait_key_frame off; + } }