implemented detached netcalls

This commit is contained in:
Roman Arutyunyan 2012-04-04 19:16:05 +04:00
parent 2e4ba3d102
commit 0945de3219

View file

@ -14,6 +14,7 @@ static char * ngx_rtmp_netcall_merge_app_conf(ngx_conf_t *cf,
void *parent, void *child); void *parent, void *child);
static void ngx_rtmp_netcall_close(ngx_connection_t *cc); static void ngx_rtmp_netcall_close(ngx_connection_t *cc);
static void ngx_rtmp_netcall_detach(ngx_connection_t *cc);
static void ngx_rtmp_netcall_recv(ngx_event_t *rev); static void ngx_rtmp_netcall_recv(ngx_event_t *rev);
static void ngx_rtmp_netcall_send(ngx_event_t *wev); static void ngx_rtmp_netcall_send(ngx_event_t *wev);
@ -38,6 +39,8 @@ typedef struct ngx_rtmp_netcall_session_s {
ngx_chain_t *in; ngx_chain_t *in;
ngx_chain_t *inlast; ngx_chain_t *inlast;
ngx_chain_t *out; ngx_chain_t *out;
ngx_msec_t timeout;
ngx_int_t detached;
} ngx_rtmp_netcall_session_t; } ngx_rtmp_netcall_session_t;
@ -125,7 +128,7 @@ ngx_rtmp_netcall_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (ctx) { if (ctx) {
while (ctx->cs) { while (ctx->cs) {
ngx_rtmp_netcall_close(ctx->cs->pc->connection); ngx_rtmp_netcall_detach(ctx->cs->pc->connection);
} }
} }
@ -159,6 +162,7 @@ ngx_rtmp_netcall_create(ngx_rtmp_session_t *s, ngx_rtmp_netcall_init_t *ci)
ngx_rtmp_netcall_ctx_t *ctx; ngx_rtmp_netcall_ctx_t *ctx;
ngx_peer_connection_t *pc; ngx_peer_connection_t *pc;
ngx_rtmp_netcall_session_t *cs; ngx_rtmp_netcall_session_t *cs;
ngx_rtmp_netcall_app_conf_t *cacf;
ngx_connection_t *c, *cc; ngx_connection_t *c, *cc;
ngx_pool_t *pool; ngx_pool_t *pool;
@ -201,6 +205,12 @@ ngx_rtmp_netcall_create(ngx_rtmp_session_t *s, ngx_rtmp_netcall_init_t *ci)
ngx_memcpy(cs->arg, ci->arg, ci->argsize); ngx_memcpy(cs->arg, ci->arg, ci->argsize);
} }
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_netcall_module);
if (cacf == NULL) {
return NGX_ERROR;
}
cs->timeout = cacf->timeout;
cs->url = ci->url; cs->url = ci->url;
cs->session = s; cs->session = s;
cs->handle = ci->handle; cs->handle = ci->handle;
@ -260,27 +270,30 @@ ngx_rtmp_netcall_close(ngx_connection_t *cc)
ngx_rtmp_netcall_ctx_t *ctx; ngx_rtmp_netcall_ctx_t *ctx;
cs = cc->data; cs = cc->data;
s = cs->session;
c = s->connection;
if (cc->destroyed) { if (cc->destroyed) {
return; return;
} }
cc->destroyed = 1; cc->destroyed = 1;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module);
for(css = &ctx->cs; *css; css = &((*css)->next)) { if (!cs->detached) {
if (*css == cs) { s = cs->session;
*css = cs->next; c = s->connection;
break; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module);
for(css = &ctx->cs; *css; css = &((*css)->next)) {
if (*css == cs) {
*css = cs->next;
break;
}
} }
}
if (cs->handle && if (cs->handle &&
cs->handle(s, cs->arg, cs->in) != NGX_OK) cs->handle(s, cs->arg, cs->in) != NGX_OK)
{ {
ngx_rtmp_close_connection(c); ngx_rtmp_close_connection(c);
}
} }
pool = cc->pool; pool = cc->pool;
@ -289,19 +302,26 @@ ngx_rtmp_netcall_close(ngx_connection_t *cc)
} }
static void
ngx_rtmp_netcall_detach(ngx_connection_t *cc)
{
ngx_rtmp_netcall_session_t *cs;
cs = cc->data;
cs->detached = 1;
}
static void static void
ngx_rtmp_netcall_recv(ngx_event_t *rev) ngx_rtmp_netcall_recv(ngx_event_t *rev)
{ {
ngx_rtmp_netcall_session_t *cs; ngx_rtmp_netcall_session_t *cs;
ngx_connection_t *cc; ngx_connection_t *cc;
ngx_rtmp_session_t *s;
ngx_rtmp_netcall_app_conf_t *cacf;
ngx_int_t n; ngx_int_t n;
ngx_buf_t *b; ngx_buf_t *b;
cc = rev->data; cc = rev->data;
cs = cc->data; cs = cc->data;
s = cs->session;
if (cc->destroyed) { if (cc->destroyed) {
return; return;
@ -352,8 +372,7 @@ ngx_rtmp_netcall_recv(ngx_event_t *rev)
} }
if (n == NGX_AGAIN) { if (n == NGX_AGAIN) {
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_netcall_module); ngx_add_timer(cc->read, cs->timeout);
ngx_add_timer(cc->read, cacf->timeout);
if (ngx_handle_write_event(cc->read, 0) != NGX_OK) { if (ngx_handle_write_event(cc->read, 0) != NGX_OK) {
ngx_rtmp_netcall_close(cc); ngx_rtmp_netcall_close(cc);
} }
@ -370,13 +389,10 @@ ngx_rtmp_netcall_send(ngx_event_t *wev)
{ {
ngx_rtmp_netcall_session_t *cs; ngx_rtmp_netcall_session_t *cs;
ngx_connection_t *cc; ngx_connection_t *cc;
ngx_rtmp_session_t *s;
ngx_chain_t *cl; ngx_chain_t *cl;
ngx_rtmp_netcall_app_conf_t *cacf;
cc = wev->data; cc = wev->data;
cs = cc->data; cs = cc->data;
s = cs->session;
if (cc->destroyed) { if (cc->destroyed) {
return; return;
@ -405,8 +421,7 @@ ngx_rtmp_netcall_send(ngx_event_t *wev)
/* more data to send? */ /* more data to send? */
if (cl) { if (cl) {
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_netcall_module); ngx_add_timer(cc->write, cs->timeout);
ngx_add_timer(cc->write, cacf->timeout);
if (ngx_handle_write_event(cc->write, 0) != NGX_OK) { if (ngx_handle_write_event(cc->write, 0) != NGX_OK) {
ngx_rtmp_netcall_close(cc); ngx_rtmp_netcall_close(cc);
} }