implemented async control

This commit is contained in:
Roman Arutyunyan 2014-01-03 09:30:18 +04:00
parent 95075aaa2a
commit b8b055dd77

View file

@ -40,7 +40,7 @@ typedef struct {
ngx_str_t path;
ngx_uint_t filter;
ngx_str_t method;
ngx_rtmp_control_handler_t handler;
ngx_array_t sessions; /* ngx_rtmp_session_t * */
} ngx_rtmp_control_ctx_t;
@ -146,8 +146,9 @@ ngx_rtmp_control_record_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index];
ngx_str_null(&rec);
ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
if (ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec) != NGX_OK) {
rec.len = 0;
}
rn = ngx_rtmp_record_find(racf, &rec);
if (rn == NGX_CONF_UNSET_UINT) {
@ -203,10 +204,9 @@ ngx_rtmp_control_redirect_handler(ngx_http_request_t *r, ngx_rtmp_session_t *s)
ngx_rtmp_control_ctx_t *ctx;
ngx_rtmp_close_stream_t vc;
ngx_str_null(&name);
ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name);
if (name.len == 0) {
if (ngx_http_arg(r, (u_char *) "newname", sizeof("newname") - 1, &name)
!= NGX_OK)
{
return "newname not specified";
}
@ -260,7 +260,7 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
ngx_rtmp_live_ctx_t *lctx)
{
ngx_str_t addr, *paddr;
ngx_rtmp_session_t *s;
ngx_rtmp_session_t *s, **ss;
ngx_rtmp_control_ctx_t *ctx;
s = lctx->session;
@ -269,10 +269,9 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
return NGX_CONF_OK;
}
ngx_str_null(&addr);
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
if (addr.len) {
if (ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr)
== NGX_OK)
{
paddr = &s->connection->addr_text;
if (paddr->len != addr.len ||
ngx_strncmp(paddr->data, addr.data, addr.len))
@ -300,7 +299,14 @@ ngx_rtmp_control_walk_session(ngx_http_request_t *r,
break;
}
return ctx->handler(r, s);
ss = ngx_array_push(&ctx->sessions);
if (ss == NULL) {
return "allocation error";
}
*ss = s;
return NGX_CONF_OK;
}
@ -333,12 +339,10 @@ ngx_rtmp_control_walk_app(ngx_http_request_t *r,
ngx_rtmp_live_stream_t *ls;
ngx_rtmp_live_app_conf_t *lacf;
ngx_memzero(&name, sizeof(name));
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
if (name.len == 0) {
if (ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name) != NGX_OK)
{
for (n = 0; n < (ngx_uint_t) lacf->nbuckets; ++n) {
for (ls = lacf->streams[n]; ls; ls = ls->next) {
s = ngx_rtmp_control_walk_stream(r, ls);
@ -378,8 +382,9 @@ ngx_rtmp_control_walk_server(ngx_http_request_t *r,
const char *s;
ngx_rtmp_core_app_conf_t **pcacf;
ngx_memzero(&app, sizeof(app));
ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app);
if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
app.len = 0;
}
pcacf = cscf->applications.elts;
@ -401,12 +406,15 @@ ngx_rtmp_control_walk_server(ngx_http_request_t *r,
static const char *
ngx_rtmp_control_walk(ngx_http_request_t *r)
ngx_rtmp_control_walk(ngx_http_request_t *r, ngx_rtmp_control_handler_t h)
{
ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf;
ngx_str_t srv;
ngx_uint_t sn;
ngx_uint_t sn, n;
const char *msg;
ngx_rtmp_session_t **s;
ngx_rtmp_control_ctx_t *ctx;
ngx_rtmp_core_srv_conf_t **pcscf;
sn = 0;
@ -421,7 +429,22 @@ ngx_rtmp_control_walk(ngx_http_request_t *r)
pcscf = cmcf->servers.elts;
pcscf += sn;
return ngx_rtmp_control_walk_server(r, *pcscf);
msg = ngx_rtmp_control_walk_server(r, *pcscf);
if (msg != NGX_CONF_OK) {
return msg;
}
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
s = ctx->sessions.elts;
for (n = 0; n < ctx->sessions.nelts; n++) {
msg = h(r, s[n]);
if (msg != NGX_CONF_OK) {
return msg;
}
}
return NGX_CONF_OK;
}
@ -434,11 +457,9 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
ngx_rtmp_control_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->filter = NGX_RTMP_CONTROL_FILTER_PUBLISHER;
ctx->handler = ngx_rtmp_control_record_handler;
msg = ngx_rtmp_control_walk(r);
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_record_handler);
if (msg != NGX_CONF_OK) {
goto error;
}
@ -501,9 +522,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
goto error;
}
ctx->handler = ngx_rtmp_control_drop_handler;
msg = ngx_rtmp_control_walk(r);
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_drop_handler);
if (msg != NGX_CONF_OK) {
goto error;
}
@ -555,7 +574,6 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
ngx_rtmp_control_ctx_t *ctx;
ctx = ngx_http_get_module_ctx(r, ngx_rtmp_control_module);
ctx->handler = ngx_rtmp_control_redirect_handler;
if (ctx->method.len == sizeof("publisher") - 1 &&
ngx_memcmp(ctx->method.data, "publisher", ctx->method.len) == 0)
@ -578,7 +596,7 @@ ngx_rtmp_control_redirect(ngx_http_request_t *r, ngx_str_t *method)
goto error;
}
msg = ngx_rtmp_control_walk(r);
msg = ngx_rtmp_control_walk(r, ngx_rtmp_control_redirect_handler);
if (msg != NGX_CONF_OK) {
goto error;
}
@ -666,6 +684,10 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
ngx_http_set_ctx(r, ctx, ngx_rtmp_control_module);
if (ngx_array_init(&ctx->sessions, r->pool, 1, sizeof(void *)) != NGX_OK) {
return NGX_ERROR;
}
ctx->method = method;
#define NGX_RTMP_CONTROL_SECTION(flag, secname) \
@ -689,7 +711,7 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
static void *
ngx_rtmp_control_create_loc_conf(ngx_conf_t *cf)
{
ngx_rtmp_control_loc_conf_t *conf;
ngx_rtmp_control_loc_conf_t *conf;
conf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_control_loc_conf_t));
if (conf == NULL) {
@ -705,8 +727,8 @@ ngx_rtmp_control_create_loc_conf(ngx_conf_t *cf)
static char *
ngx_rtmp_control_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_rtmp_control_loc_conf_t *prev = parent;
ngx_rtmp_control_loc_conf_t *conf = child;
ngx_rtmp_control_loc_conf_t *prev = parent;
ngx_rtmp_control_loc_conf_t *conf = child;
ngx_conf_merge_bitmask_value(conf->control, prev->control, 0);