implemented frame dropping

This commit is contained in:
Roman Arutyunyan 2012-03-18 17:09:19 +04:00
parent 8204245eb2
commit 96ebed8573
7 changed files with 173 additions and 103 deletions

View file

@ -226,10 +226,12 @@ typedef struct ngx_rtmp_core_srv_conf_s {
ngx_uint_t ack_window;
ngx_int_t out_chunk_size;
ngx_pool_t *out_pool;
ngx_chain_t *out_free;
ngx_chain_t *out_free_chains;
ngx_int_t chunk_size;
ngx_pool_t *pool;
ngx_chain_t *free;
ngx_chain_t *free_chains;
size_t max_buf;
ngx_flag_t wait_key_frame;
ngx_rtmp_conf_ctx_t *ctx;
} ngx_rtmp_core_srv_conf_t;
@ -312,7 +314,12 @@ ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf,
/* Sending messages */
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_header_t *lh, ngx_chain_t *out);
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_uint_t priority);
/* Note on priorities:
* the bigger value the lower the priority.
* priority=0 is the highest */
#define NGX_RTMP_LIMIT_SOFT 0
#define NGX_RTMP_LIMIT_HARD 1

View file

@ -8,6 +8,20 @@
#include "ngx_rtmp.h"
/* Standard stream ids for broadcasting */
#define NGX_RTMP_BROADCAST_MSID 1
#define NGX_RTMP_BROADCAST_CSID_AMF0 5
#define NGX_RTMP_BROADCAST_CSID_AUDIO 6
#define NGX_RTMP_BROADCAST_CSID_VIDEO 7
/* Frame cutoff */
#define NGX_RTMP_CUTOFF_ALL 0
#define NGX_RTMP_CUTOFF_KEY 1
#define NGX_RTMP_CUTOFF_INTER 2
#define NGX_RTMP_CUTOFF_DISPOSABLE 3
static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf,
@ -93,7 +107,7 @@ ngx_module_t ngx_rtmp_broadcast_module = {
#define NGX_RTMP_BROADCAST_PUBLISHER 0x01
#define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02
#define NGX_RTMP_BROADCAST_WANT_KEYFRAME 0x04
#define NGX_RTMP_BROADCAST_KEYFRAME 0x04
#define NGX_RTMP_BROADCAST_DATA_FRAME 0x08
@ -101,9 +115,8 @@ typedef struct ngx_rtmp_broadcast_ctx_s {
ngx_str_t stream;
ngx_rtmp_session_t *session;
struct ngx_rtmp_broadcast_ctx_s *next;
ngx_uint_t flags; /* publisher/subscriber */
ngx_uint_t flags;
uint32_t csid;
ngx_rtmp_header_t lh; /* last a/v header */
ngx_chain_t *data_frame;
} ngx_rtmp_broadcast_ctx_t;
@ -233,6 +246,7 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s)
#define NGX_RTMP_VIDEO_KEY_FRAME 1
#define NGX_RTMP_VIDEO_INTER_FRAME 2
#define NGX_RTMP_VIDEO_DISPOSABLE_FRAME 3
#define NGX_RTMP_AUDIO_FRAME NGX_RTMP_VIDEO_KEY_FRAME
static ngx_int_t
@ -249,16 +263,30 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, *cctx;
ngx_chain_t *out;
ngx_int_t vftype;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_header_t sh;
ngx_rtmp_header_t sh;
ngx_rtmp_session_t *ss;
ngx_uint_t priority;
int keyframe;
c = s->connection;
sh = *h;
sh.csid = 4;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
sh = *h;
keyframe = 0;
if (h->type == NGX_RTMP_MSG_VIDEO) {
sh.csid = NGX_RTMP_BROADCAST_CSID_VIDEO;
priority = ngx_rtmp_get_video_frame_type(in);
if (priority == NGX_RTMP_VIDEO_KEY_FRAME) {
keyframe = 1;
}
} else {
sh.csid = NGX_RTMP_BROADCAST_CSID_AUDIO;
priority = NGX_RTMP_AUDIO_FRAME;
}
if (ctx == NULL
|| !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER))
{
@ -271,14 +299,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return NGX_OK;
}
vftype = 0;
if (sh.type == NGX_RTMP_MSG_VIDEO) {
vftype = ngx_rtmp_get_video_frame_type(in);
}
out = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &sh, &ctx->lh, out);
ngx_rtmp_prepare_message(s, &sh, NULL, out);
/* broadcast to all subscribers */
for (cctx = *ngx_rtmp_broadcast_get_head(s);
@ -290,6 +313,8 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
ctx->stream.len))
{
ss = cctx->session;
/* if we have metadata check if the subscriber
* has already received one */
if (ctx->data_frame
@ -298,28 +323,33 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"sending data_frame");
if (ngx_rtmp_send_message(cctx->session, ctx->data_frame)
!= NGX_OK)
{
ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0,
"error sending message");
switch (ngx_rtmp_send_message(ss, ctx->data_frame, 0)) {
case NGX_OK:
cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME;
break;
case NGX_AGAIN:
break;
default:
ngx_log_error(NGX_LOG_INFO, ss->connection->log, 0,
"error sending message");
}
cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME;
}
/* is the subscriber waiting for
* a key frame? */
if (sh.type == NGX_RTMP_MSG_VIDEO
&& cctx->flags & NGX_RTMP_BROADCAST_WANT_KEYFRAME)
/* waiting for a keyframe? */
if (cscf->wait_key_frame
&& sh.type == NGX_RTMP_MSG_VIDEO
&& !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME)
&& !keyframe)
{
if (vftype && vftype != NGX_RTMP_VIDEO_KEY_FRAME) {
continue;
}
cctx->flags &= ~NGX_RTMP_BROADCAST_WANT_KEYFRAME;
continue;
}
if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0,
if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK) {
if (keyframe) {
cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME;
}
} else {
ngx_log_error(NGX_LOG_INFO, ss->connection->log, 0,
"error sending message");
}
}
@ -404,7 +434,7 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return ngx_rtmp_send_ack_size(s, cscf->ack_window)
|| ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC)
|| ngx_rtmp_send_user_stream_begin(s, 0)
|| ngx_rtmp_send_chunk_size(s, cscf->out_chunk_size)
|| ngx_rtmp_send_chunk_size(s, cscf->chunk_size)
|| ngx_rtmp_send_amf0(s, h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_OK
@ -439,7 +469,7 @@ ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return NGX_ERROR;
}
stream = 1;
stream = NGX_RTMP_BROADCAST_MSID;
return ngx_rtmp_send_amf0(s, h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
@ -486,7 +516,9 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
"publish() called; pubName='%s' pubType='%s'",
pub_name, pub_type);
if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) {
if (ngx_rtmp_send_user_stream_begin(s,
NGX_RTMP_BROADCAST_MSID) != NGX_OK)
{
return NGX_ERROR;
}
@ -498,7 +530,7 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
memset(&sh, 0, sizeof(sh));
sh.type = NGX_RTMP_MSG_AMF0_CMD;
sh.csid = 5; /*FIXME*/
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
sh.msid = h->msid;
if (ngx_rtmp_send_amf0(s, &sh, out_elts,
@ -566,16 +598,17 @@ ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
"play() called; playame='%s'",
play_name);
if (ngx_rtmp_send_user_stream_begin(s, 1) != NGX_OK) {
if (ngx_rtmp_send_user_stream_begin(s,
NGX_RTMP_BROADCAST_MSID) != NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER
| NGX_RTMP_BROADCAST_WANT_KEYFRAME);
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER);
memset(&sh, 0, sizeof(sh));
sh.type = NGX_RTMP_MSG_AMF0_CMD;
sh.csid = 5; /*FIXME*/
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
sh.msid = h->msid;
ngx_str_set(&out_inf[0], "NetStream.Play.Reset");
@ -674,7 +707,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
memset(&sh, 0, sizeof(sh));
sh.csid = 5;
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
sh.msid = h->msid;
sh.type = h->type;

View file

@ -70,11 +70,25 @@ static ngx_command_t ngx_rtmp_core_commands[] = {
offsetof(ngx_rtmp_core_srv_conf_t, ack_window),
NULL },
{ ngx_string("out_chunk_size"),
{ ngx_string("chunk_size"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_core_srv_conf_t, out_chunk_size),
offsetof(ngx_rtmp_core_srv_conf_t, chunk_size),
NULL },
{ ngx_string("max_buf"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_core_srv_conf_t, max_buf),
NULL },
{ ngx_string("wait_key_frame"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_core_srv_conf_t, wait_key_frame),
NULL },
ngx_null_command
@ -147,8 +161,10 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf)
conf->timeout = NGX_CONF_UNSET_MSEC;
conf->so_keepalive = NGX_CONF_UNSET;
conf->max_streams = NGX_CONF_UNSET;
conf->out_chunk_size = NGX_CONF_UNSET;
conf->chunk_size = NGX_CONF_UNSET;
conf->ack_window = NGX_CONF_UNSET;
conf->max_buf = NGX_CONF_UNSET;
conf->wait_key_frame = NGX_CONF_UNSET;
return conf;
}
@ -164,17 +180,19 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0);
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16);
ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 4096);
ngx_conf_merge_value(conf->chunk_size, prev->chunk_size, 4096);
ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000);
ngx_conf_merge_size_value(conf->max_buf, prev->max_buf, 128 * 1024);
ngx_conf_merge_value(conf->wait_key_frame, prev->wait_key_frame, 1);
if (prev->out_pool == NULL) {
prev->out_pool = ngx_create_pool(8192, cf->log);
if (prev->out_pool == NULL) {
if (prev->pool == NULL) {
prev->pool = ngx_create_pool(8192, cf->log);
if (prev->pool == NULL) {
return NGX_CONF_ERROR;
}
}
conf->out_pool = prev->out_pool;
conf->pool = prev->pool;
return NGX_CONF_OK;
}

View file

@ -880,9 +880,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
fmt = 0;
if (lh && lh->csid && h->msid == lh->msid) {
++fmt;
if (h->type == lh->type
&& mlen == lh->mlen)
{
if (h->type == lh->type && mlen == lh->mlen) {
++fmt;
if (h->timestamp == lh->timestamp) {
++fmt;
@ -982,22 +980,36 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_uint_t priority)
{
ngx_chain_t *l, **ll, *cout;
size_t nbytes, nbufs, noutbytes, noutbufs;
ngx_connection_t *c;
ngx_buf_t *b;
ngx_chain_t *l, **ll;
ngx_connection_t *c;
ngx_buf_t *b;
ngx_rtmp_core_srv_conf_t *cscf;
size_t nbytes, nbufs, qbytes, qbufs;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
qbytes = 0;
qbufs = 0;
nbytes = 0;
nbufs = 0;
noutbytes = 0;
noutbufs = 0;
/* create locally-linked chain of shared buffers */
cout = NULL;
ll = &cout;
for(ll = &s->out; *ll; ll = &(*ll)->next) {
qbytes += (*ll)->buf->last - (*ll)->buf->pos;
++qbufs;
}
/* drop packet? */
if (qbytes > cscf->max_buf / (priority + 1)) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0,
"drop message bytes=%uz, bufs=%uz priority=%ui",
qbytes, qbufs, priority);
return NGX_AGAIN;
}
/* append locally-linked chain of shared buffers */
for(l = out; l; l = l->next) {
if (s->out_free_chains) {
@ -1028,18 +1040,11 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
}
*ll = NULL;
/* TODO: optimize lookup */
/* TODO: implement dropper */
for(ll = &s->out; *ll; ll = &(*ll)->next) {
noutbytes += (*ll)->buf->last - (*ll)->buf->pos;
++noutbufs;
}
*ll = cout;
ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send nbytes=%d (%d), nbufs=%d (%d) ready=%d; active=%d",
nbytes, noutbytes, nbufs, noutbufs, c->write->ready, c->write->active);
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send bytes=%uz+%uz, bufs=%uz+%uz, priority=%ui, "
"ready=%d, active=%d",
qbytes, nbytes, qbufs, nbufs, priority,
c->write->ready, c->write->active);
ngx_rtmp_send(c->write);

View file

@ -43,7 +43,7 @@
#define NGX_RTMP_USER_END(s) \
ngx_rtmp_prepare_message(s, &__h, NULL, __l); \
return ngx_rtmp_send_message(s, __l); \
return ngx_rtmp_send_message(s, __l, 0); \
/* Protocol control messages */
@ -226,7 +226,7 @@ ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (act.first) {
ngx_rtmp_prepare_message(s, h, NULL, act.first);
return ngx_rtmp_send_message(s, act.first);
return ngx_rtmp_send_message(s, act.first, 0);
}
return NGX_OK;

View file

@ -1,3 +1,8 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp.h"
@ -26,37 +31,37 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
ngx_buf_t *b;
size_t size;
if (cscf->out_free) {
out = cscf->out_free;
cscf->out_free = out->next;
if (cscf->free) {
out = cscf->free;
cscf->free = out->next;
} else {
if (cscf->out_free_chains) {
out = cscf->out_free_chains;
cscf->out_free_chains = out->next;
if (cscf->free_chains) {
out = cscf->free_chains;
cscf->free_chains = out->next;
} else {
out = ngx_alloc_chain_link(cscf->out_pool);
out = ngx_alloc_chain_link(cscf->pool);
if (out == NULL) {
return NULL;
}
out->buf = ngx_calloc_buf(cscf->out_pool);
out->buf = ngx_calloc_buf(cscf->pool);
if (out->buf == NULL) {
ngx_free_chain(cscf->out_pool, out);
ngx_free_chain(cscf->pool, out);
return NULL;
}
}
size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER
size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER
+ NGX_RTMP_REFCOUNT_BYTES;
b = out->buf;
b->start = ngx_palloc(cscf->out_pool, size);
b->start = ngx_palloc(cscf->pool, size);
if (b->start == NULL) {
out->next = cscf->out_free_chains;
cscf->out_free_chains = out;
out->next = cscf->free_chains;
cscf->free_chains = out;
return NULL;
}
@ -88,16 +93,16 @@ ngx_rtmp_free_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *out)
if (ngx_rtmp_ref_put(cl->buf->start) == 0) {
/* both chain & buf are free;
* put the whole chain in free list */
cl->next = cscf->out_free;
cscf->out_free = cl;
cl->next = cscf->free;
cscf->free = cl;
continue;
}
/* only chain is free;
* buf is still used by somebody & will
* be freed in ngx_rtmp_free_shared_buf */
cl->next = cscf->out_free_chains;
cscf->out_free_chains = cl;
cl->next = cscf->free_chains;
cscf->free_chains = cl;
}
}
@ -118,27 +123,27 @@ ngx_rtmp_free_shared_buf(ngx_rtmp_core_srv_conf_t *cscf, ngx_buf_t *b)
return;
}
if (cscf->out_free_chains) {
cl = cscf->out_free_chains;
cscf->out_free_chains = cl->next;
if (cscf->free_chains) {
cl = cscf->free_chains;
cscf->free_chains = cl->next;
} else {
cl = ngx_alloc_chain_link(cscf->out_pool);
cl = ngx_alloc_chain_link(cscf->pool);
if (cl == NULL) {
return;
}
cl->buf = ngx_calloc_buf(cscf->out_pool);
cl->buf = ngx_calloc_buf(cscf->pool);
if (cl->buf == NULL) {
ngx_free_chain(cscf->out_pool, cl);
ngx_free_chain(cscf->pool, cl);
return;
}
}
cl->buf->start = b->start;
cl->buf->end = b->end;
cl->next = cscf->out_free;
cscf->out_free = cl;
cl->next = cscf->free;
cscf->free = cl;
}

View file

@ -20,6 +20,8 @@ rtmp {
listen 1935;
wait_key_frame off;
}
}