implemented connection dropped in control module

This commit is contained in:
Roman Arutyunyan 2012-10-23 10:46:44 +04:00
parent c86e30fd27
commit f65f07deb3

View file

@ -17,8 +17,22 @@ static char * ngx_rtmp_control_merge_loc_conf(ngx_conf_t *cf,
void *parent, void *child);
typedef struct {
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_app_conf_t *cacf;
} ngx_rtmp_control_core_t;
typedef struct {
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_stream_t *ls;
} ngx_rtmp_control_live_t;
#define NGX_RTMP_CONTROL_ALL 0xff
#define NGX_RTMP_CONTROL_RECORD 0x01
#define NGX_RTMP_CONTROL_DROP 0x02
typedef struct {
@ -27,9 +41,10 @@ typedef struct {
static ngx_conf_bitmask_t ngx_rtmp_control_masks[] = {
{ ngx_string("all"), NGX_RTMP_CONTROL_ALL },
{ ngx_string("record"), NGX_RTMP_CONTROL_RECORD },
{ ngx_null_string, 0 }
{ ngx_string("all"), NGX_RTMP_CONTROL_ALL },
{ ngx_string("record"), NGX_RTMP_CONTROL_RECORD },
{ ngx_string("drop"), NGX_RTMP_CONTROL_DROP },
{ ngx_null_string, 0 }
};
@ -77,6 +92,131 @@ ngx_module_t ngx_rtmp_control_module = {
};
static ngx_int_t
ngx_rtmp_control_output_error(ngx_http_request_t *r, const char *msg)
{
size_t len;
ngx_buf_t *b;
ngx_chain_t cl;
len = ngx_strlen(msg);
r->headers_out.status = NGX_HTTP_BAD_REQUEST;
r->headers_out.content_length_n = len;
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
return NGX_ERROR;
}
ngx_memzero(&cl, sizeof(cl));
cl.buf = b;
b->start = b->pos = (u_char *) msg;
b->end = b->last = (u_char *) msg + len;
b->memory = 1;
b->last_buf = 1;
ngx_http_send_header(r);
return ngx_http_output_filter(r, &cl);
}
static const char *
ngx_rtmp_control_parse_core(ngx_http_request_t *r,
ngx_rtmp_control_core_t *core)
{
ngx_str_t srv, app;
ngx_uint_t sn, n;
ngx_rtmp_core_srv_conf_t **pcscf;
ngx_rtmp_core_app_conf_t **pcacf;
core->cmcf = ngx_rtmp_core_main_conf;
if (core->cmcf == NULL) {
return "Missing main RTMP conf";
}
/* find server */
sn = 0;
if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
sn = ngx_atoi(srv.data, srv.len);
}
if (sn >= core->cmcf->servers.nelts) {
return "Server index out of range";
}
pcscf = core->cmcf->servers.elts;
pcscf += sn;
core->cscf = *pcscf;
/* find application */
if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"rtmp_control: app not specified");
return "Application not specified";
}
core->cacf = NULL;
pcacf = core->cscf->applications.elts;
for (n = 0; n < core->cscf->applications.nelts; ++n, ++pcacf) {
if ((*pcacf)->name.len == app.len &&
ngx_strncmp((*pcacf)->name.data, app.data, app.len) == 0)
{
core->cacf = *pcacf;
break;
}
}
if (core->cacf == NULL) {
return "Application not found";
}
return NGX_CONF_OK;
}
static const char *
ngx_rtmp_control_parse_live(ngx_http_request_t *r,
ngx_rtmp_control_core_t *core,
ngx_rtmp_control_live_t *live)
{
ngx_str_t name;
size_t len;
ngx_memzero(&name, sizeof(name));
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
live->lacf = core->cacf->app_conf[ngx_rtmp_live_module.ctx_index];
/* find live stream by name */
for (live->ls = live->lacf->streams[ngx_hash_key(name.data, name.len) %
live->lacf->nbuckets];
live->ls; live->ls = live->ls->next)
{
len = ngx_strlen(live->ls->name);
if (name.len == len && ngx_strncmp(name.data, live->ls->name, name.len)
== 0)
{
break;
}
}
if (live->ls == NULL) {
return "Live stream not found";
}
return NGX_CONF_OK;
}
/* /record arguments:
* srv - server index (optional)
* app - application name
@ -88,116 +228,55 @@ ngx_module_t ngx_rtmp_control_module = {
static ngx_int_t
ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
{
ngx_rtmp_control_core_t core;
ngx_rtmp_control_live_t live;
ngx_rtmp_record_app_conf_t *racf;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t **pcscf, *cscf;
ngx_rtmp_core_app_conf_t **pcacf, *cacf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_stream_t *ls;
ngx_rtmp_live_ctx_t *lctx;
ngx_rtmp_session_t *s;
ngx_chain_t cl;
ngx_uint_t sn, rn, n;
ngx_str_t srv, app, rec, name, path;
ngx_str_t msg;
ngx_uint_t rn;
ngx_str_t rec, path;
ngx_buf_t *b;
ngx_int_t rc;
size_t len;
const char *msg;
sn = 0;
if (ngx_http_arg(r, (u_char *) "srv", sizeof("srv") - 1, &srv) == NGX_OK) {
sn = ngx_atoi(srv.data, srv.len);
}
if (ngx_http_arg(r, (u_char *) "app", sizeof("app") - 1, &app) != NGX_OK) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"rtmp_control: app not specified");
ngx_str_set(&msg, "Application not specified");
msg = ngx_rtmp_control_parse_core(r, &core);
if (msg != NGX_CONF_OK) {
goto error;
}
ngx_memzero(&rec, sizeof(rec));
ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
ngx_memzero(&name, sizeof(name));
ngx_http_arg(r, (u_char *) "name", sizeof("name") - 1, &name);
cmcf = ngx_rtmp_core_main_conf;
if (cmcf == NULL) {
ngx_str_set(&msg, "Missing main RTMP conf");
goto error;
}
/* find server */
if (sn >= cmcf->servers.nelts) {
ngx_str_set(&msg, "Server index out of range");
goto error;
}
pcscf = cmcf->servers.elts;
pcscf += sn;
cscf = *pcscf;
/* find application */
pcacf = cscf->applications.elts;
cacf = NULL;
for (n = 0; n < cscf->applications.nelts; ++n, ++pcacf) {
if ((*pcacf)->name.len == app.len &&
ngx_strncmp((*pcacf)->name.data, app.data, app.len) == 0)
{
cacf = *pcacf;
break;
}
}
if (cacf == NULL) {
ngx_str_set(&msg, "Application not found");
goto error;
}
lacf = cacf->app_conf[ngx_rtmp_live_module.ctx_index];
racf = cacf->app_conf[ngx_rtmp_record_module.ctx_index];
/* find live stream by name */
for (ls = lacf->streams[ngx_hash_key(name.data, name.len) % lacf->nbuckets];
ls; ls = ls->next)
{
len = ngx_strlen(ls->name);
if (name.len == len && ngx_strncmp(name.data, ls->name, name.len)
== 0)
{
break;
}
}
if (ls == NULL) {
ngx_str_set(&msg, "Live stream not found");
msg = ngx_rtmp_control_parse_live(r, &core, &live);
if (msg != NGX_CONF_OK) {
goto error;
}
/* find publisher context */
for (lctx = ls->ctx; lctx; lctx = lctx->next) {
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
break;
}
}
if (lctx == NULL) {
ngx_str_set(&msg, "No publisher");
msg = "No publisher";
goto error;
}
s = lctx->session;
/* find recorder */
ngx_memzero(&rec, sizeof(rec));
ngx_http_arg(r, (u_char *) "rec", sizeof("rec") - 1, &rec);
racf = core.cacf->app_conf[ngx_rtmp_record_module.ctx_index];
rn = ngx_rtmp_record_find(racf, &rec);
if (rn == NGX_CONF_UNSET_UINT) {
ngx_str_set(&msg, "Recorder not found");
msg = "Recorder not found";
goto error;
}
/* call the method */
ngx_memzero(&path, sizeof(path));
if (method->len == sizeof("start") - 1 &&
@ -211,12 +290,12 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
rc = ngx_rtmp_record_close(s, rn, &path);
} else {
ngx_str_set(&msg, "Undefined method");
msg = "Undefined method";
goto error;
}
if (rc == NGX_ERROR) {
ngx_str_set(&msg, "Recorder error");
msg = "Recorder error";
goto error;
}
@ -245,25 +324,105 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
return ngx_http_output_filter(r, &cl);
error:
r->headers_out.status = NGX_HTTP_BAD_REQUEST;
r->headers_out.content_length_n = msg.len;
return ngx_rtmp_control_output_error(r, msg);
}
static ngx_int_t
ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
{
ngx_rtmp_control_core_t core;
ngx_rtmp_control_live_t live;
ngx_rtmp_live_ctx_t *lctx;
ngx_str_t addr, *paddr;
const char *msg;
ngx_uint_t ndropped;
size_t len;
u_char *p;
ngx_buf_t *b;
ngx_chain_t cl;
msg = ngx_rtmp_control_parse_core(r, &core);
if (msg != NGX_CONF_OK) {
goto error;
}
msg = ngx_rtmp_control_parse_live(r, &core, &live);
if (msg != NGX_CONF_OK) {
goto error;
}
ndropped = 0;
if (method->len == sizeof("publisher") - 1 &&
ngx_strncmp(method->data, "publisher", method->len) == 0)
{
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
ngx_rtmp_finalize_session(lctx->session);
++ndropped;
break;
}
}
} else if (method->len == sizeof("client") - 1 &&
ngx_strncmp(method->data, "client", method->len) == 0)
{
ngx_memzero(&addr, sizeof(addr));
ngx_http_arg(r, (u_char *) "addr", sizeof("addr") - 1, &addr);
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (addr.len && lctx->session && lctx->session->connection) {
paddr = &lctx->session->connection->addr_text;
if (paddr->len != addr.len ||
ngx_strncmp(paddr->data, addr.data, addr.len))
{
continue;
}
}
ngx_rtmp_finalize_session(lctx->session);
++ndropped;
}
} else {
msg = "Undefined method";
goto error;
}
/* output ndropped */
len = NGX_OFF_T_LEN;
p = ngx_palloc(r->connection->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
len = (size_t) (ngx_snprintf(p, len, "%ui", ndropped) - p);
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = len;
b = ngx_calloc_buf(r->pool);
if (b == NULL) {
return NGX_ERROR;
}
b->start = b->pos = p;
b->end = b->last = p + len;
b->temporary = 1;
b->last_buf = 1;
ngx_memzero(&cl, sizeof(cl));
cl.buf = b;
b->start = b->pos = msg.data;
b->end = b->last = msg.data + msg.len;
b->memory = 1;
b->last_buf = 1;
ngx_http_send_header(r);
return ngx_http_output_filter(r, &cl);
error:
return ngx_rtmp_control_output_error(r, msg);
}
@ -315,6 +474,7 @@ ngx_rtmp_control_handler(ngx_http_request_t *r)
}
NGX_RTMP_CONTROL_SECTION(RECORD, record);
NGX_RTMP_CONTROL_SECTION(DROP, drop);
#undef NGX_RTMP_CONTROL_SECTION