implemenmted several optimizations

This commit is contained in:
Roman Arutyunyan 2012-04-19 09:53:18 +04:00
parent 40684b12b4
commit 6ae16bf9d8
3 changed files with 75 additions and 92 deletions

View file

@ -171,6 +171,9 @@ typedef struct {
} ngx_rtmp_stream_t;
#define NGX_RTMP_OUT_QUEUE 256
typedef struct {
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
@ -217,8 +220,8 @@ typedef struct {
ngx_int_t in_chunk_size_changing;
/* circular buffer of RTMP message pointers */
ngx_chain_t **out_start, **out_end;
ngx_chain_t **out_pos, **out_last;
size_t out_pos, out_last;
ngx_chain_t *out[NGX_RTMP_OUT_QUEUE];
ngx_chain_t *out_chain;
u_char *out_bpos;
} ngx_rtmp_session_t;
@ -362,13 +365,33 @@ ngx_int_t ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s,
/* Shared output buffers */
/* Store refcount in negative bytes of shared buffer */
#define NGX_RTMP_REFCOUNT_TYPE uint32_t
#define NGX_RTMP_REFCOUNT_BYTES sizeof(NGX_RTMP_REFCOUNT_TYPE)
#define ngx_rtmp_ref(b) \
*((NGX_RTMP_REFCOUNT_TYPE*)(b) - 1)
#define ngx_rtmp_ref_set(b, v) \
ngx_rtmp_ref(b) = v
#define ngx_rtmp_ref_get(b) \
++ngx_rtmp_ref(b)
#define ngx_rtmp_ref_put(b) \
--ngx_rtmp_ref(b)
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf);
void ngx_rtmp_acquire_shared_chain(ngx_chain_t *in);
void ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *in);
ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf,
ngx_chain_t *head, ngx_chain_t *in);
#define ngx_rtmp_acquire_shared_chain(in) \
ngx_rtmp_ref_get(in); \
/* Sending messages */
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,

View file

@ -243,13 +243,13 @@ ngx_rtmp_init_session(ngx_connection_t *c)
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
s->out_start = ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue);
/*s->out_start = s->out;ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue);
if (s->out_start == NULL) {
ngx_rtmp_close_connection(c);
return;
}
s->out_pos = s->out_last = s->out_start;
s->out_end = s->out_start + cscf->max_queue;
}*/
/*s->out_pos = s->last = s->out;s->out_last = s->out_start;*/
/*s->out_end = s->out_start + sizeof(s->out) / sizeof(s->out[0]);cscf->max_queue*/;
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
@ -824,6 +824,11 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_del_timer(wev);
}
if (s->out_chain == NULL && s->out_pos != s->out_last) {
s->out_chain = s->out[s->out_pos];
s->out_bpos = s->out_chain->buf->pos;
}
while (s->out_chain) {
n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos);
@ -844,15 +849,13 @@ ngx_rtmp_send(ngx_event_t *wev)
if (s->out_bpos == s->out_chain->buf->last) {
s->out_chain = s->out_chain->next;
if (s->out_chain == NULL) {
ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos]);
++s->out_pos;
if (s->out_pos == s->out_end) {
s->out_pos = s->out_start;
}
s->out_pos %= NGX_RTMP_OUT_QUEUE;
if (s->out_pos == s->out_last) {
break;
}
s->out_chain = *s->out_pos;
s->out_chain = s->out[s->out_pos];
}
s->out_bpos = s->out_chain->buf->pos;
}
@ -1012,48 +1015,34 @@ ngx_int_t
ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
ngx_uint_t priority)
{
ngx_connection_t *c;
ngx_rtmp_core_srv_conf_t *cscf;
size_t nmsg;
ngx_int_t nmsg;
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
nmsg = (s->out_pos <= s->out_last)
? s->out_last - s->out_pos
: (s->out_end - s->out_pos) + (s->out_last - s->out_start);
++nmsg;
nmsg = (s->out_last - s->out_pos) % NGX_RTMP_OUT_QUEUE + 1;
/* drop packet?
* Note we always leave 1 slot free */
if (nmsg >= (s->out_end - s->out_start) / (priority + 1)) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
if (nmsg * (priority + 1) >= NGX_RTMP_OUT_QUEUE) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP drop message bufs=%ui, priority=%ui",
nmsg, priority);
return NGX_AGAIN;
}
s->out[s->out_last++] = out;
s->out_last %= NGX_RTMP_OUT_QUEUE;
ngx_rtmp_acquire_shared_chain(out);
*s->out_last++ = out;
if (s->out_last >= s->out_end) {
s->out_last = s->out_start;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP send nmsg=%ui, priority=%ui",
nmsg, priority);
if (!s->connection->write->active) {
ngx_rtmp_send(s->connection->write);
/*return ngx_add_event(s->connection->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT);*/
}
if (s->out_chain == NULL) {
s->out_chain = out;
s->out_bpos = out->buf->pos;
}
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send nmsg=%ui, priority=%ui, ready=%d, active=%d",
nmsg, priority, c->write->ready, c->write->active);
if (!c->write->active) {
ngx_rtmp_send(c->write);
}
return c->destroyed ? NGX_ERROR : NGX_OK;
return NGX_OK;
}
@ -1255,11 +1244,8 @@ ngx_rtmp_close_session_handler(ngx_event_t *e)
}
while (s->out_pos != s->out_last) {
ngx_rtmp_free_shared_chain(cscf, *s->out_pos);
++s->out_pos;
if (s->out_pos == s->out_end) {
s->out_pos = s->out_start;
}
ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos++]);
s->out_pos %= NGX_RTMP_OUT_QUEUE;
}
ngx_rtmp_close_connection(c);

View file

@ -6,30 +6,13 @@
#include "ngx_rtmp.h"
/* Store refcount in negative bytes of shared buffer */
#define NGX_RTMP_REFCOUNT_TYPE uint32_t
#define NGX_RTMP_REFCOUNT_BYTES sizeof(NGX_RTMP_REFCOUNT_TYPE)
#define ngx_rtmp_ref(b) \
*((NGX_RTMP_REFCOUNT_TYPE*)(b) - 1)
#define ngx_rtmp_ref_set(b, v) \
ngx_rtmp_ref(b) = v
#define ngx_rtmp_ref_get(b) \
++ngx_rtmp_ref(b)
#define ngx_rtmp_ref_put(b) \
--ngx_rtmp_ref(b)
ngx_chain_t *
ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
{
ngx_chain_t *out;
ngx_buf_t *b;
size_t size;
u_char *p;
ngx_chain_t *out;
ngx_buf_t *b;
size_t size;
if (cscf->free) {
out = cscf->free;
@ -37,27 +20,25 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
} else {
out = ngx_alloc_chain_link(cscf->pool);
if (out == NULL) {
size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
p = ngx_pcalloc(cscf->pool, NGX_RTMP_REFCOUNT_BYTES
+ sizeof(ngx_chain_t)
+ sizeof(ngx_buf_t)
+ size);
if (p == NULL) {
return NULL;
}
out->buf = ngx_calloc_buf(cscf->pool);
if (out->buf == NULL) {
return NULL;
}
p += NGX_RTMP_REFCOUNT_BYTES;
out = (ngx_chain_t *)p;
size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER
+ NGX_RTMP_REFCOUNT_BYTES;
p += sizeof(ngx_chain_t);
out->buf = (ngx_buf_t *)p;
b = out->buf;
b->start = ngx_palloc(cscf->pool, size);
if (b->start == NULL) {
return NULL;
}
b->start += NGX_RTMP_REFCOUNT_BYTES;
b->end = b->start + size - NGX_RTMP_REFCOUNT_BYTES;
p += sizeof(ngx_buf_t);
out->buf->start = p;
out->buf->end = p + size;
}
out->next = NULL;
@ -66,25 +47,18 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
b->memory = 1;
/* buffer has refcount =1 when created! */
ngx_rtmp_ref_set(b->start, 1);
ngx_rtmp_ref_set(out, 1);
return out;
}
void
ngx_rtmp_acquire_shared_chain(ngx_chain_t *in)
{
ngx_rtmp_ref_get(in->buf->start);
}
void
ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *in)
{
ngx_chain_t *cl;
if (ngx_rtmp_ref_put(in->buf->start)) {
if (ngx_rtmp_ref_put(in)) {
return;
}