implemented deferred session close

This commit is contained in:
Roman Arutyunyan 2012-04-08 01:44:57 +04:00
parent a43412edd1
commit 90f985fa2f
3 changed files with 66 additions and 37 deletions

View file

@ -175,6 +175,7 @@ typedef struct {
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
ngx_connection_t *connection;
ngx_event_t close;
void **ctx;
void **main_conf;
@ -334,7 +335,7 @@ char* ngx_rtmp_user_message_type(uint16_t evt);
#endif
void ngx_rtmp_init_connection(ngx_connection_t *c);
void ngx_rtmp_close_connection(ngx_connection_t *c);
void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
uint32_t ngx_rtmp_get_timestamp();
ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size);

View file

@ -14,6 +14,7 @@
static void ngx_rtmp_init_session(ngx_connection_t *c);
static void ngx_rtmp_close_connection(ngx_connection_t *c);
static void ngx_rtmp_handshake_recv(ngx_event_t *rev);
static void ngx_rtmp_handshake_send(ngx_event_t *rev);
@ -241,10 +242,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
/*
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(4096, c->log);
*/
/* start handshake */
b = &s->hs_in_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
@ -267,7 +265,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
for(n = 0; n < ch->nelts; ++n, ++h) {
if (*h) {
if ((*h)(s, NULL, NULL) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
}
@ -310,7 +308,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
if (rev->timedout) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
c->timedout = 1;
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -327,14 +325,14 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_ERROR || n == 0) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN) {
ngx_add_timer(rev, cscf->timeout);
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
}
return;
}
@ -351,7 +349,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
if (*b->pos != NGX_RTMP_VERSION) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -428,7 +426,7 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -447,14 +445,14 @@ restart:
n = c->send(c, b->pos, b->last - b->pos);
if (n == NGX_ERROR) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN || n == 0) {
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
}
@ -540,7 +538,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
if (st->in == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"in buf alloc failed");
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
}
@ -570,13 +568,13 @@ ngx_rtmp_recv(ngx_event_t *rev)
n = c->recv(c, b->last, b->end - b->last);
if (n == NGX_ERROR || n == 0) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
if (n == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
}
return;
}
@ -592,7 +590,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
"sending RTMP ACK(%D)", s->in_bytes);
if (ngx_rtmp_send_ack(s, s->in_bytes)) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
}
@ -631,7 +629,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"RTMP in chunk stream too big: %D >= %D",
csid, cscf->max_streams);
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -733,7 +731,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
if (h->mlen > cscf->max_message) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"too big message: %uz", cscf->max_message);
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
}
@ -763,7 +761,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
st->len = 0;
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -809,7 +807,7 @@ ngx_rtmp_send(ngx_event_t *wev)
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -854,7 +852,7 @@ ngx_rtmp_send(ngx_event_t *wev)
out = c->send_chain(c, s->out, limit);
if (out == NGX_CHAIN_ERROR) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -862,7 +860,7 @@ ngx_rtmp_send(ngx_event_t *wev)
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
}
return;
}
@ -913,7 +911,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"RTMP out chunk stream too big: %D >= %D",
h->csid, cscf->max_streams);
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
return;
}
@ -1257,28 +1255,37 @@ ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s)
}
void
static void
ngx_rtmp_close_connection(ngx_connection_t *c)
{
ngx_rtmp_session_t *s;
ngx_pool_t *pool;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
pool = c->pool;
ngx_close_connection(c);
ngx_destroy_pool(pool);
}
static void
ngx_rtmp_close_session_handler(ngx_event_t *e)
{
ngx_rtmp_session_t *s;
ngx_connection_t *c;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_handler_pt *h;
ngx_array_t *dh;
size_t n;
if (c->destroyed) {
return;
}
s = e->data;
c = s->connection;
c->destroyed = 1;
s = c->data;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session");
if (s) {
dh = &cmcf->events[NGX_RTMP_DISCONNECT];
@ -1305,9 +1312,30 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
ngx_rtmp_free_shared_buf(cscf, s->out->buf);
}
pool = c->pool;
ngx_close_connection(c);
ngx_destroy_pool(pool);
ngx_rtmp_close_connection(c);
}
void
ngx_rtmp_finalize_session(ngx_rtmp_session_t *s)
{
ngx_event_t *e;
ngx_connection_t *c;
/* deferred session finalize;
* schedule handler here */
c = s->connection;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "finalize session");
c->destroyed = 1;
e = &s->close;
e->data = s;
e->handler = ngx_rtmp_close_session_handler;
e->log = c->log;
ngx_post_event(e, &ngx_posted_events);
}

View file

@ -293,7 +293,7 @@ ngx_rtmp_netcall_close(ngx_connection_t *cc)
if (cs->handle &&
cs->handle(s, cs->arg, cs->in) != NGX_OK)
{
ngx_rtmp_close_connection(c);
ngx_rtmp_finalize_session(s);
}
}