implemented proper shared buffer to enable multi-user broadcasts

This commit is contained in:
Roman Arutyunyan 2012-03-17 10:12:19 +04:00
parent fd9e64c138
commit 12dc260fec
7 changed files with 108 additions and 203 deletions

1
config
View file

@ -7,6 +7,7 @@ CORE_MODULES="$CORE_MODULES
NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp.c \
$ngx_addon_dir/ngx_rtmp_shared.c \
$ngx_addon_dir/ngx_rtmp_handler.c \
$ngx_addon_dir/ngx_rtmp_core_module.c \
$ngx_addon_dir/ngx_rtmp_amf0.c \

View file

@ -165,7 +165,7 @@ typedef struct {
} ngx_rtmp_stream_t;
typedef struct ngx_rtmp_session_s {
typedef struct {
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
ngx_connection_t *connection;
@ -196,6 +196,7 @@ typedef struct ngx_rtmp_session_s {
uint32_t in_last_ack;
ngx_chain_t *out;
ngx_chain_t *out_free_chains;
} ngx_rtmp_session_t;
@ -217,7 +218,7 @@ typedef struct {
} ngx_rtmp_core_main_conf_t;
typedef struct {
typedef struct ngx_rtmp_core_srv_conf_s {
ngx_msec_t timeout;
ngx_flag_t so_keepalive;
ngx_int_t max_streams;
@ -227,6 +228,7 @@ typedef struct {
ngx_int_t out_chunk_size;
ngx_pool_t *out_pool;
ngx_chain_t *out_free;
ngx_chain_t *out_free_chains;
ngx_rtmp_conf_ctx_t *ctx;
} ngx_rtmp_core_srv_conf_t;
@ -294,14 +296,19 @@ ngx_int_t ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
/* Sending messages */
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_session_t *s,
ngx_chain_t *out, ngx_chain_t *in);
ngx_int_t ngx_rtmp_addref_shared_bufs(ngx_chain_t *in);
ngx_int_t ngx_rtmp_free_shared_buf(ngx_rtmp_session_t *s,
ngx_chain_t *out);
/* Shared output buffers */
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf);
void ngx_rtmp_free_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *out);
void ngx_rtmp_free_shared_buf(ngx_rtmp_core_srv_conf_t *cscf,
ngx_buf_t *b);
void ngx_rtmp_acquire_shared_buf(ngx_buf_t *b);
ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *head, ngx_chain_t *in);
/* Sending messages */
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_header_t *lh, ngx_chain_t *out);
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);

View file

@ -125,7 +125,7 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
if (b == NULL || b->last == b->end) {
ln = ctx->alloc(ctx->arg);
ln = ctx->alloc(ctx->cscf);
if (ln == NULL) {
return NGX_ERROR;
}

View file

@ -20,22 +20,23 @@
typedef struct {
ngx_int_t type;
char *name;
void *data;
size_t len;
ngx_int_t type;
char *name;
void *data;
size_t len;
} ngx_rtmp_amf0_elt_t;
struct ngx_rtmp_session_s;
struct ngx_rtmp_core_srv_conf_s;
typedef ngx_chain_t * (*ngx_rtmp_amf0_alloc_pt)(struct ngx_rtmp_session_s *s);
typedef ngx_chain_t * (*ngx_rtmp_amf0_alloc_pt)(struct ngx_rtmp_core_srv_conf_s
*cscf);
typedef struct {
ngx_chain_t *link, *first;
ngx_rtmp_amf0_alloc_pt alloc;
void *arg;
ngx_log_t *log;
ngx_chain_t *link, *first;
ngx_rtmp_amf0_alloc_pt alloc;
struct ngx_rtmp_core_srv_conf_s *cscf;
ngx_log_t *log;
} ngx_rtmp_amf0_ctx_t;

View file

@ -249,11 +249,12 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, *cctx;
ngx_chain_t *out;
size_t nsubs;
ngx_int_t vftype;
ngx_rtmp_core_srv_conf_t *cscf;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL
@ -273,12 +274,11 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
vftype = ngx_rtmp_get_video_frame_type(in);
}
out = ngx_rtmp_append_shared_bufs(s, NULL, in);
out = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, h, &ctx->lh, out);
/* broadcast to all subscribers */
nsubs = 0;
for (cctx = *ngx_rtmp_broadcast_get_head(s);
cctx; cctx = cctx->next)
{
@ -299,7 +299,8 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (ngx_rtmp_send_message(cctx->session, ctx->data_frame)
!= NGX_OK)
{
return NGX_ERROR;
ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0,
"error sending message");
}
cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME;
}
@ -316,20 +317,13 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) {
return NGX_ERROR;
ngx_log_error(NGX_LOG_INFO, cctx->session->connection->log, 0,
"error sending message");
}
++nsubs;
}
}
/* TODO: implement proper (refcount-based) buffer deletion */
/* no one subscriber? */
if (!nsubs
&& ngx_rtmp_free_shared_buf(s, out) != NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_free_shared_bufs(cscf, out);
return NGX_OK;
}
@ -620,6 +614,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_broadcast_ctx_t *ctx;
ngx_rtmp_amf0_ctx_t act;
ngx_rtmp_header_t sh;
ngx_rtmp_core_srv_conf_t *cscf;
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL,
@ -628,6 +623,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "data_frame arrived");
@ -640,13 +636,16 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
/* create full metadata chain for output */
memset(&act, 0, sizeof(act));
act.arg = s;
act.cscf = cscf;
act.alloc = ngx_rtmp_alloc_shared_buf;
act.log = c->log;
if (ngx_rtmp_amf0_write(&act, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
if (act.first) {
ngx_rtmp_free_shared_bufs(cscf, act.first);
}
return NGX_ERROR;
}
@ -656,7 +655,10 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ctx->data_frame = act.first;
if (ngx_rtmp_append_shared_bufs(s, ctx->data_frame, in) == NULL) {
if (ngx_rtmp_append_shared_bufs(cscf, ctx->data_frame, in) == NULL) {
if (ctx->data_frame) {
ngx_rtmp_free_shared_bufs(cscf, ctx->data_frame);
}
return NGX_ERROR;
}
@ -665,8 +667,6 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
sh.msid = h->msid;
sh.type = h->type;
ngx_rtmp_addref_shared_bufs(ctx->data_frame);
ngx_rtmp_prepare_message(s, h, NULL, ctx->data_frame);
return NGX_OK;

View file

@ -738,14 +738,6 @@ ngx_rtmp_recv(ngx_event_t *rev)
}
}
#define ngx_rtmp_buf_addref(b) \
(++*(int*)&((b)->tag))
#define ngx_rtmp_buf_release(b) \
(--*(int*)&((b)->tag))
static void
ngx_rtmp_send(ngx_event_t *wev)
{
@ -795,28 +787,19 @@ ngx_rtmp_send(ngx_event_t *wev)
}
while (s->out) {
l = s->out;
if (l->buf->pos < l->buf->last) {
/*l->buf->pos = l->buf->last;*/
break;
}
s->out = s->out->next;
l->next = NULL;
/* anyone still using this buffer? */
if (ngx_rtmp_buf_release(l->buf)) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"keeping shared buffer");
continue;
}
l->next = s->out_free_chains;
s->out_free_chains = l;
/* return buffer to core */
if (ngx_rtmp_free_shared_buf(s, l)) {
ngx_rtmp_close_connection(c);
return;
}
ngx_rtmp_free_shared_buf(cscf, l->buf);
if (s->out == out) {
break;
@ -828,139 +811,6 @@ ngx_rtmp_send(ngx_event_t *wev)
}
ngx_chain_t *
ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
{
ngx_chain_t *out;
ngx_buf_t *b;
size_t size;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_connection_t *c;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (cscf->out_free) {
out = cscf->out_free;
cscf->out_free = out->next;
} else {
out = ngx_alloc_chain_link(cscf->out_pool);
if (out == NULL) {
return NULL;
}
out->buf = ngx_calloc_buf(cscf->out_pool);
if (out->buf == NULL) {
return NULL;
}
size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
b = out->buf;
b->start = ngx_palloc(cscf->out_pool, size);
b->end = b->start + size;
}
out->next = NULL;
b = out->buf;
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
b->tag = (ngx_buf_tag_t)0;
b->memory = 1;
return out;
}
ngx_chain_t *
ngx_rtmp_append_shared_bufs(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_chain_t *in)
{
ngx_connection_t *c;
ngx_chain_t *l, **ll;
u_char *p;
size_t size;
c = s->connection;
ll = &out;
p = in->buf->pos;
l = out;
if (l) {
for(; l->next; l = l->next);
ll = &l->next;
}
for ( ;; ) {
if (l == NULL || l->buf->last == l->buf->end) {
l = ngx_rtmp_alloc_shared_buf(s);
if (l == NULL || l->buf == NULL) {
return NULL;
}
*ll = l;
ll = &l->next;
}
while (l->buf->end - l->buf->last >= in->buf->last - p) {
l->buf->last = ngx_cpymem(l->buf->last, p,
in->buf->last - p);
in = in->next;
if (in == NULL) {
goto done;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"copy link %p : %p", in, in->buf);
p = in->buf->pos;
}
size = l->buf->end - l->buf->last;
l->buf->last = ngx_cpymem(l->buf->last, p, size);
p += size;
}
done:
*ll = NULL;
return out;
}
ngx_int_t
ngx_rtmp_addref_shared_bufs(ngx_chain_t *in)
{
for(; in; in = in->next) {
ngx_rtmp_buf_addref(in->buf);
}
return NGX_OK;
}
ngx_int_t
ngx_rtmp_free_shared_buf(ngx_rtmp_session_t *s,
ngx_chain_t *out)
{
ngx_rtmp_core_srv_conf_t *cscf;
size_t nbufs;
ngx_chain_t *cl;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
for(nbufs = 1, cl = out; cl->next;
cl = cl->next, ++nbufs);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"release %d shared bufs", nbufs);
cl->next = cscf->out_free;
cscf->out_free = out;
return NGX_OK;
}
void
ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_header_t *lh, ngx_chain_t *out)
@ -1101,9 +951,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
{
ngx_chain_t *l, **ll;
ngx_chain_t *l, **ll, *cout;
size_t nbytes, nbufs, noutbytes, noutbufs;
ngx_connection_t *c;
ngx_buf_t *b;
c = s->connection;
nbytes = 0;
@ -1111,11 +962,42 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
noutbytes = 0;
noutbufs = 0;
/* create locally-linked chain of shared buffers */
cout = NULL;
ll = &cout;
for(l = out; l; l = l->next) {
ngx_rtmp_buf_addref(l->buf);
nbytes += (l->buf->last - l->buf->pos);
if (s->out_free_chains) {
*ll = s->out_free_chains;
s->out_free_chains = (*ll)->next;
} else {
*ll = ngx_alloc_chain_link(c->pool);
if (*ll == NULL) {
break;
}
(*ll)->buf = ngx_calloc_buf(c->pool);
if ((*ll)->buf == NULL) {
ngx_free_chain(c->pool, (*ll));
break;
}
}
b = (*ll)->buf;
*b = *l->buf;
if ((*ll)->buf->in_file) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "in file buf!!");
}
ngx_rtmp_acquire_shared_buf(b);
ll = &(*ll)->next;
nbytes += (b->last - b->pos);
++nbufs;
}
*ll = NULL;
/* TODO: optimize lookup */
/* TODO: implement dropper */
@ -1124,7 +1006,7 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
++noutbufs;
}
*ll = out;
*ll = cout;
ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send nbytes=%d (%d), nbufs=%d (%d) ready=%d; active=%d",
@ -1202,6 +1084,7 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
ngx_rtmp_session_t *s;
ngx_pool_t *pool;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_disconnect_handler_pt *h;
size_t n;
@ -1211,6 +1094,7 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
s = c->data;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
@ -1228,7 +1112,7 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
}
if (s->out) {
ngx_rtmp_free_shared_buf(s, s->out);
ngx_rtmp_free_shared_bufs(cscf, s->out);
s->out = NULL;
}

View file

@ -8,14 +8,20 @@
#define NGX_RTMP_USER_START(s, tp) \
ngx_rtmp_header_t __h; \
ngx_chain_t *__l; \
ngx_buf_t *__b; \
ngx_rtmp_header_t __h; \
ngx_chain_t *__l; \
ngx_buf_t *__b; \
ngx_rtmp_core_srv_conf_t *__cscf; \
\
__cscf = ngx_rtmp_get_module_srv_conf( \
s, ngx_rtmp_core_module); \
memset(&__h, 0, sizeof(__h)); \
__h.type = tp; \
__h.csid = 2; \
__l = ngx_rtmp_alloc_shared_buf(s); \
__l = ngx_rtmp_alloc_shared_buf(__cscf); \
if (__l->buf->in_file) { \
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "send in file buf!!"); \
} \
if (__l == NULL) { \
return NGX_ERROR; \
} \
@ -183,14 +189,20 @@ ngx_int_t
ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
{
ngx_rtmp_amf0_ctx_t act;
ngx_rtmp_amf0_ctx_t act;
ngx_rtmp_core_srv_conf_t *cscf;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
memset(&act, 0, sizeof(act));
act.arg = s;
act.cscf = cscf;
act.alloc = ngx_rtmp_alloc_shared_buf;
act.log = s->connection->log;
if (ngx_rtmp_amf0_write(&act, elts, nelts) != NGX_OK) {
if (act.first) {
ngx_rtmp_free_shared_bufs(cscf, act.first);
}
return NGX_ERROR;
}