diff --git a/config b/config index f2a7c4d..da61214 100644 --- a/config +++ b/config @@ -15,7 +15,6 @@ CORE_MODULES="$CORE_MODULES ngx_rtmp_relay_module \ ngx_rtmp_exec_module \ ngx_rtmp_auto_push_module \ - ngx_rtmp_enotify_module \ ngx_rtmp_notify_module \ ngx_rtmp_log_module \ ngx_rtmp_limit_module \ @@ -73,7 +72,6 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_bandwidth.c \ $ngx_addon_dir/ngx_rtmp_exec_module.c \ $ngx_addon_dir/ngx_rtmp_auto_push_module.c \ - $ngx_addon_dir/ngx_rtmp_enotify_module.c \ $ngx_addon_dir/ngx_rtmp_notify_module.c \ $ngx_addon_dir/ngx_rtmp_log_module.c \ $ngx_addon_dir/ngx_rtmp_limit_module.c \ diff --git a/ngx_rtmp_enotify_module.c b/ngx_rtmp_enotify_module.c index a201638..3960dd3 100644 --- a/ngx_rtmp_enotify_module.c +++ b/ngx_rtmp_enotify_module.c @@ -137,10 +137,11 @@ ngx_module_t ngx_rtmp_enotify_module = { static void -ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_enotify_eval_astr(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_enotify_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_enotify_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); if (ctx == NULL) { @@ -154,10 +155,11 @@ ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, static void -ngx_rtmp_enotify_eval_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_enotify_eval_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_enotify_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_enotify_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); if (ctx == NULL) { @@ -284,7 +286,8 @@ ngx_rtmp_enotify_exec(ngx_rtmp_session_t *s, ngx_rtmp_enotify_conf_t *ec) for (n = 0; n < ec->args.nelts; ++n, ++arg_in) { - ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a); + ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a, + s->connection->log); if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { continue; diff --git a/ngx_rtmp_eval.c b/ngx_rtmp_eval.c index f10fdd2..6a9ac4e 100644 --- a/ngx_rtmp_eval.c +++ b/ngx_rtmp_eval.c @@ -13,17 +13,17 @@ static void -ngx_rtmp_eval_session_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_eval_session_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - *ret = *(ngx_str_t *) ((u_char *) s + e->offset); + *ret = *(ngx_str_t *) ((u_char *) ctx + e->offset); } static void -ngx_rtmp_eval_connection_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_eval_connection_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { + ngx_rtmp_session_t *s = ctx; + *ret = *(ngx_str_t *) ((u_char *) s->connection + e->offset); } @@ -59,15 +59,14 @@ ngx_rtmp_eval_t ngx_rtmp_eval_session[] = { static void -ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b, - void *data, size_t len) +ngx_rtmp_eval_append(ngx_buf_t *b, void *data, size_t len, ngx_log_t *log) { size_t buf_len; if (b->last + len > b->end) { buf_len = 2 * (b->last - b->pos) + len; - b->start = ngx_palloc(s->connection->pool, buf_len); + b->start = ngx_alloc(buf_len, log); if (b->start == NULL) { return; } @@ -82,8 +81,8 @@ ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b, static void -ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, - ngx_rtmp_eval_t **e, ngx_str_t *name) +ngx_rtmp_eval_append_var(void *ctx, ngx_buf_t *b, ngx_rtmp_eval_t **e, + ngx_str_t *name, ngx_log_t *log) { ngx_uint_t k; ngx_str_t v; @@ -92,10 +91,10 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, for (; *e; ++e) { for (k = 0, ee = *e; ee->handler; ++k, ++ee) { if (ee->name.len == name->len && - ngx_memcmp(ee->name.data, name->data, name->len) == 0) + ngx_memcmp(ee->name.data, name->data, name->len) == 0) { - ee->handler(s, ee, &v); - ngx_rtmp_eval_append(s, b, v.data, v.len); + ee->handler(ctx, ee, &v); + ngx_rtmp_eval_append(b, v.data, v.len, log); } } } @@ -103,8 +102,8 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, ngx_int_t -ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, - ngx_str_t *out) +ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e, ngx_str_t *out, + ngx_log_t *log) { u_char c, *p; ngx_str_t name; @@ -118,8 +117,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, SNAME } state = NORMAL; - b.pos = b.last = b.start = ngx_palloc(s->connection->pool, - NGX_RTMP_EVAL_BUFLEN); + b.pos = b.last = b.start = ngx_alloc(NGX_RTMP_EVAL_BUFLEN, log); if (b.pos == NULL) { return NGX_ERROR; } @@ -138,7 +136,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); state = NORMAL; @@ -155,7 +153,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); case NORMAL: switch (c) { @@ -169,7 +167,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } case ESCAPE: - ngx_rtmp_eval_append(s, &b, &c, 1); + ngx_rtmp_eval_append(&b, &c, 1, log); state = NORMAL; break; @@ -179,11 +177,11 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, if (state == NAME) { p = &in->data[n]; name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); } c = 0; - ngx_rtmp_eval_append(s, &b, &c, 1); + ngx_rtmp_eval_append(&b, &c, 1, log); out->data = b.pos; out->len = b.last - b.pos - 1; diff --git a/ngx_rtmp_eval.h b/ngx_rtmp_eval.h index f8bddcb..b05d16b 100644 --- a/ngx_rtmp_eval.h +++ b/ngx_rtmp_eval.h @@ -16,7 +16,7 @@ typedef struct ngx_rtmp_eval_s ngx_rtmp_eval_t; -typedef void (* ngx_rtmp_eval_pt)(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, +typedef void (* ngx_rtmp_eval_pt)(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret); @@ -34,8 +34,8 @@ struct ngx_rtmp_eval_s { extern ngx_rtmp_eval_t ngx_rtmp_eval_session[]; -ngx_int_t ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, - ngx_rtmp_eval_t **e, ngx_str_t *out); +ngx_int_t ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e, + ngx_str_t *out, ngx_log_t *log); ngx_int_t ngx_rtmp_eval_streams(ngx_str_t *in); diff --git a/ngx_rtmp_exec_module.c b/ngx_rtmp_exec_module.c index b4efb77..4459ed9 100644 --- a/ngx_rtmp_exec_module.c +++ b/ngx_rtmp_exec_module.c @@ -7,6 +7,7 @@ #include #include #include "ngx_rtmp_cmd_module.h" +#include "ngx_rtmp_record_module.h" #include "ngx_rtmp_eval.h" #include @@ -17,7 +18,9 @@ #if !(NGX_WIN32) static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_play_pt next_play; static ngx_rtmp_close_stream_pt next_close_stream; +static ngx_rtmp_record_done_pt next_record_done; #endif @@ -28,9 +31,9 @@ static char * ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf); static void * ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child); -static char * ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, - void *conf); -static char * ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, +/*static char * ngx_rtmp_exec_block(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf);*/ +static char * ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static char *ngx_rtmp_exec_kill_signal(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); @@ -40,21 +43,46 @@ static char *ngx_rtmp_exec_kill_signal(ngx_conf_t *cf, ngx_command_t *cmd, #define NGX_RTMP_EXEC_KILL 0x02 +#define NGX_RTMP_EXEC_PUBLISHING 0x01 +#define NGX_RTMP_EXEC_PLAYING 0x02 + + +enum { + NGX_RTMP_EXEC_PUSH, + NGX_RTMP_EXEC_PULL, + + NGX_RTMP_EXEC_PUBLISH, + NGX_RTMP_EXEC_PUBLISH_DONE, + NGX_RTMP_EXEC_PLAY, + NGX_RTMP_EXEC_PLAY_DONE, + NGX_RTMP_EXEC_RECORD_DONE, + + NGX_RTMP_EXEC_MAX, + + NGX_RTMP_EXEC_STATIC +}; + + typedef struct { + ngx_str_t id; + ngx_uint_t type; ngx_str_t cmd; ngx_array_t args; /* ngx_str_t */ + ngx_array_t names; } ngx_rtmp_exec_conf_t; typedef struct { ngx_rtmp_exec_conf_t *conf; ngx_log_t *log; - ngx_rtmp_session_t *session; /* NULL for init execs */ + ngx_rtmp_eval_t **eval; + void *eval_ctx; unsigned active:1; + unsigned managed:1; ngx_pid_t pid; ngx_pid_t *save_pid; int pipefd; - ngx_connection_t dummy_conn; /*needed by ngx_xxx_event*/ + ngx_connection_t dummy_conn; /*needed by ngx_xxx_event*/ ngx_event_t read_evt, write_evt; ngx_event_t respawn_evt; ngx_msec_t respawn_timeout; @@ -63,23 +91,45 @@ typedef struct { typedef struct { - ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ - ngx_array_t execs; /* ngx_rtmp_exec_t */ + ngx_array_t static_conf; /* ngx_rtmp_exec_conf_t */ + ngx_array_t static_exec; /* ngx_rtmp_exec_t */ ngx_msec_t respawn_timeout; ngx_int_t kill_signal; + ngx_log_t *log; } ngx_rtmp_exec_main_conf_t; +typedef struct ngx_rtmp_exec_pull_ctx_s ngx_rtmp_exec_pull_ctx_t; + +struct ngx_rtmp_exec_pull_ctx_s { + ngx_pool_t *pool; + ngx_uint_t counter; + ngx_str_t name; + ngx_str_t app; + ngx_array_t pull_exec; /* ngx_rtmp_exec_t */ + ngx_rtmp_exec_pull_ctx_t *next; +}; + + typedef struct { - ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ + ngx_int_t active; + ngx_array_t conf[NGX_RTMP_EXEC_MAX]; + /* ngx_rtmp_exec_conf_t */ ngx_flag_t respawn; + ngx_flag_t options; + ngx_uint_t nbuckets; + ngx_rtmp_exec_pull_ctx_t **pull; } ngx_rtmp_exec_app_conf_t; typedef struct { + ngx_uint_t flags; + ngx_str_t path; + ngx_str_t recorder; u_char name[NGX_RTMP_MAX_NAME]; u_char args[NGX_RTMP_MAX_ARGS]; - ngx_array_t execs; + ngx_array_t push_exec; /* ngx_rtmp_exec_t */ + ngx_rtmp_exec_pull_ctx_t *pull; } ngx_rtmp_exec_ctx_t; @@ -91,21 +141,86 @@ static ngx_int_t ngx_rtmp_exec_run(ngx_rtmp_exec_t *e); static ngx_command_t ngx_rtmp_exec_commands[] = { - - { ngx_string("exec"), - NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, - ngx_rtmp_exec_exec, +/* + { ngx_string("exec_block"), + NGX_RTMP_APP_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS|NGX_CONF_TAKE1, + ngx_rtmp_exec_block, NGX_RTMP_APP_CONF_OFFSET, 0, NULL }, - - { ngx_string("exec_static"), +*/ + { ngx_string("exec"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, - ngx_rtmp_exec_exec_static, - NGX_RTMP_MAIN_CONF_OFFSET, - 0, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PUSH * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_push"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PUSH * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_pull"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PULL * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_publish"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PUBLISH * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_publish_done"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PUBLISH_DONE * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_play"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PLAY * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_play_done"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_PLAY_DONE * sizeof(ngx_array_t), + NULL }, + + { ngx_string("exec_record_done"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_RTMP_REC_CONF| + NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, conf) + + NGX_RTMP_EXEC_RECORD_DONE * sizeof(ngx_array_t), NULL }, + { ngx_string("exec_static"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_MAIN_CONF_OFFSET, + offsetof(ngx_rtmp_exec_main_conf_t, static_conf), + NULL }, + { ngx_string("respawn"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, ngx_conf_set_flag_slot, @@ -127,6 +242,13 @@ static ngx_command_t ngx_rtmp_exec_commands[] = { 0, NULL }, + { ngx_string("exec_options"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, options), + NULL }, + ngx_null_command }; @@ -160,10 +282,11 @@ ngx_module_t ngx_rtmp_exec_module = { static void -ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_exec_eval_ctx_cstr(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_exec_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); if (ctx == NULL) { @@ -176,23 +299,96 @@ ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, } -static ngx_rtmp_eval_t ngx_rtmp_exec_eval[] = { +static void +ngx_rtmp_exec_eval_ctx_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) +{ + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_exec_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx == NULL) { + ret->len = 0; + return; + } + + *ret = * (ngx_str_t *) ((u_char *) ctx + e->offset); +} + + +static void +ngx_rtmp_exec_eval_pctx_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) +{ + *ret = *(ngx_str_t *) ((u_char *) ctx + e->offset); +} + + +static ngx_rtmp_eval_t ngx_rtmp_exec_push_specific_eval[] = { { ngx_string("name"), - ngx_rtmp_exec_eval_astr, + ngx_rtmp_exec_eval_ctx_cstr, offsetof(ngx_rtmp_exec_ctx_t, name) }, { ngx_string("args"), - ngx_rtmp_exec_eval_astr, + ngx_rtmp_exec_eval_ctx_cstr, offsetof(ngx_rtmp_exec_ctx_t, args) }, ngx_rtmp_null_eval }; -static ngx_rtmp_eval_t * ngx_rtmp_exec_eval_p[] = { +static ngx_rtmp_eval_t * ngx_rtmp_exec_push_eval[] = { ngx_rtmp_eval_session, - ngx_rtmp_exec_eval, + ngx_rtmp_exec_push_specific_eval, + NULL +}; + + +static ngx_rtmp_eval_t ngx_rtmp_exec_pull_specific_eval[] = { + + { ngx_string("name"), + ngx_rtmp_exec_eval_pctx_str, + offsetof(ngx_rtmp_exec_pull_ctx_t, name) }, + + { ngx_string("app"), + ngx_rtmp_exec_eval_pctx_str, + offsetof(ngx_rtmp_exec_pull_ctx_t, app) }, + + ngx_rtmp_null_eval +}; + + +static ngx_rtmp_eval_t * ngx_rtmp_exec_pull_eval[] = { + ngx_rtmp_exec_pull_specific_eval, + NULL +}; + + +static ngx_rtmp_eval_t ngx_rtmp_exec_event_specific_eval[] = { + + { ngx_string("name"), + ngx_rtmp_exec_eval_ctx_cstr, + offsetof(ngx_rtmp_exec_ctx_t, name) }, + + { ngx_string("args"), + ngx_rtmp_exec_eval_ctx_cstr, + offsetof(ngx_rtmp_exec_ctx_t, args) }, + + { ngx_string("path"), + ngx_rtmp_exec_eval_ctx_str, + offsetof(ngx_rtmp_exec_ctx_t, path) }, + + { ngx_string("recorder"), + ngx_rtmp_exec_eval_ctx_str, + offsetof(ngx_rtmp_exec_ctx_t, recorder) }, + + ngx_rtmp_null_eval +}; + + +static ngx_rtmp_eval_t * ngx_rtmp_exec_event_eval[] = { + ngx_rtmp_eval_session, + ngx_rtmp_exec_event_specific_eval, NULL }; @@ -210,7 +406,7 @@ ngx_rtmp_exec_create_main_conf(ngx_conf_t *cf) emcf->respawn_timeout = NGX_CONF_UNSET_MSEC; emcf->kill_signal = NGX_CONF_UNSET; - if (ngx_array_init(&emcf->confs, cf->pool, 1, + if (ngx_array_init(&emcf->static_conf, cf->pool, 1, sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) { return NULL; @@ -238,23 +434,27 @@ ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf) } #endif - if (ngx_array_init(&emcf->execs, cf->pool, emcf->confs.nelts, + if (ngx_array_init(&emcf->static_exec, cf->pool, + emcf->static_conf.nelts, sizeof(ngx_rtmp_exec_t)) != NGX_OK) { return NGX_CONF_ERROR; } - e = ngx_array_push_n(&emcf->execs, emcf->confs.nelts); + e = ngx_array_push_n(&emcf->static_exec, emcf->static_conf.nelts); if (e == NULL) { return NGX_CONF_ERROR; } - ec = emcf->confs.elts; + emcf->log = &cf->cycle->new_log; - for (n = 0; n < emcf->confs.nelts; ++n, ++e, ++ec) { + ec = emcf->static_conf.elts; + + for (n = 0; n < emcf->static_conf.nelts; n++, e++, ec++) { ngx_memzero(e, sizeof(*e)); e->conf = ec; - e->log = &cf->cycle->new_log; + e->managed = 1; + e->log = emcf->log; e->respawn_timeout = emcf->respawn_timeout; e->kill_signal = emcf->kill_signal; } @@ -274,38 +474,71 @@ ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf) } eacf->respawn = NGX_CONF_UNSET; - - if (ngx_array_init(&eacf->confs, cf->pool, 1, - sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) - { - return NULL; - } + eacf->options = NGX_CONF_UNSET; + eacf->nbuckets = NGX_CONF_UNSET_UINT; return eacf; } +static ngx_int_t +ngx_rtmp_exec_merge_confs(ngx_array_t *conf, ngx_array_t *prev) +{ + size_t n; + ngx_rtmp_exec_conf_t *ec, *pec; + + if (prev->nelts == 0) { + return NGX_OK; + } + + if (conf->nelts == 0) { + *conf = *prev; + return NGX_OK; + } + + ec = ngx_array_push_n(conf, prev->nelts); + if (ec == NULL) { + return NGX_ERROR; + } + + pec = prev->elts; + for (n = 0; n < prev->nelts; n++, ec++, pec++) { + *ec = *pec; + } + + return NGX_OK; +} + + static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_rtmp_exec_app_conf_t *prev = parent; ngx_rtmp_exec_app_conf_t *conf = child; - size_t n; - ngx_rtmp_exec_conf_t *ec, *pec; + + ngx_uint_t n; ngx_conf_merge_value(conf->respawn, prev->respawn, 1); + ngx_conf_merge_uint_value(conf->nbuckets, prev->nbuckets, 1024); - if (prev->confs.nelts) { - ec = ngx_array_push_n(&conf->confs, prev->confs.nelts); - if (ec == NULL) { + for (n = 0; n < NGX_RTMP_EXEC_MAX; n++) { + if (ngx_rtmp_exec_merge_confs(&conf->conf[n], &prev->conf[n]) != NGX_OK) + { return NGX_CONF_ERROR; } - pec = prev->confs.elts; - for (n = 0; n < prev->confs.nelts; ++n, ++ec, ++pec) { - *ec = *pec; + + if (conf->conf[n].nelts) { + conf->active = 1; } } - + + if (conf->conf[NGX_RTMP_EXEC_PULL].nelts > 0) { + conf->pull = ngx_pcalloc(cf->pool, sizeof(void *) * conf->nbuckets); + if (conf->pull == NULL) { + return NGX_CONF_ERROR; + } + } + return NGX_CONF_OK; } @@ -345,8 +578,8 @@ ngx_rtmp_exec_init_process(ngx_cycle_t *cycle) * when nginx worker is terminated. */ - e = emcf->execs.elts; - for (n = 0; n < emcf->execs.nelts; ++n, ++e) { + e = emcf->static_exec.elts; + for (n = 0; n < emcf->static_exec.nelts; ++n, ++e) { e->respawn_evt.data = e; e->respawn_evt.log = e->log; e->respawn_evt.handler = ngx_rtmp_exec_respawn; @@ -374,9 +607,9 @@ ngx_rtmp_exec_child_dead(ngx_event_t *ev) e = dummy_conn->data; - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, e->log, 0, - "exec: child %ui exited; %s", (ngx_int_t) e->pid, - e->respawn_timeout == NGX_CONF_UNSET_MSEC ? "respawning" : + ngx_log_error(NGX_LOG_INFO, e->log, 0, + "exec: child %ui exited; %s", (ngx_int_t) e->pid, + e->respawn_timeout == NGX_CONF_UNSET_MSEC ? "respawning" : "ignoring"); ngx_rtmp_exec_kill(e, 0); @@ -416,9 +649,8 @@ ngx_rtmp_exec_kill(ngx_rtmp_exec_t *e, ngx_int_t kill_signal) return NGX_OK; } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, e->log, 0, - "exec: terminating child %ui", - (ngx_int_t) e->pid); + ngx_log_error(NGX_LOG_INFO, e->log, 0, + "exec: terminating child %ui", (ngx_int_t) e->pid); e->active = 0; close(e->pipefd); @@ -445,63 +677,90 @@ ngx_rtmp_exec_kill(ngx_rtmp_exec_t *e, ngx_int_t kill_signal) static ngx_int_t ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) { - ngx_pid_t pid; - int fd, maxfd; - int pipefd[2]; - int ret; - ngx_rtmp_exec_conf_t *ec; - ngx_str_t *arg_in, a; - char **args, **arg_out; - ngx_uint_t n; + int fd, ret, maxfd, pipefd[2]; + char **args, **arg_out; + ngx_pid_t pid; + ngx_str_t *arg_in, a; + ngx_uint_t n; + ngx_rtmp_exec_conf_t *ec; ec = e->conf; - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, e->log, 0, - "exec: starting child '%V'", &ec->cmd); + ngx_log_error(NGX_LOG_INFO, e->log, 0, + "exec: starting %s child '%V'", + e->managed ? "managed" : "unmanaged", &ec->cmd); - if (e->active) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, e->log, 0, - "exec: already active '%V'", &ec->cmd); - return NGX_OK; - } + pipefd[0] = -1; + pipefd[1] = -1; - if (pipe(pipefd) == -1) { - ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, - "exec: pipe failed"); - return NGX_ERROR; - } + if (e->managed) { - /* make pipe write end survive through exec */ - ret = fcntl(pipefd[1], F_GETFD); - if (ret != -1) { - ret &= ~FD_CLOEXEC; - ret = fcntl(pipefd[1], F_SETFD, ret); - } - if (ret == -1) { - close(pipefd[0]); - close(pipefd[1]); - ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, - "exec: fcntl failed"); - return NGX_ERROR; + if (e->active) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, e->log, 0, + "exec: already active '%V'", &ec->cmd); + return NGX_OK; + } + + if (pipe(pipefd) == -1) { + ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, + "exec: pipe failed"); + return NGX_ERROR; + } + + /* make pipe write end survive through exec */ + + ret = fcntl(pipefd[1], F_GETFD); + + if (ret != -1) { + ret &= ~FD_CLOEXEC; + ret = fcntl(pipefd[1], F_SETFD, ret); + } + + if (ret == -1) { + + close(pipefd[0]); + close(pipefd[1]); + + ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, + "exec: fcntl failed"); + + return NGX_ERROR; + } } pid = fork(); + switch (pid) { + case -1: - close(pipefd[0]); - close(pipefd[1]); + + /* failure */ + + if (pipefd[0] != -1) { + close(pipefd[0]); + } + + if (pipefd[1] != -1) { + close(pipefd[1]); + } + ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, "exec: fork failed"); + return NGX_ERROR; case 0: + /* child */ #if (NGX_LINUX) - prctl(PR_SET_PDEATHSIG, e->kill_signal, 0, 0, 0); + if (e->managed) { + prctl(PR_SET_PDEATHSIG, e->kill_signal, 0, 0, 0); + } #endif /* close all descriptors but pipe write end */ + maxfd = sysconf(_SC_OPEN_MAX); for (fd = 0; fd < maxfd; ++fd) { if (fd == pipefd[1]) { @@ -528,10 +787,10 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) for (n = 0; n < ec->args.nelts; n++, ++arg_in) { - if (e->session == NULL) { + if (e->eval == NULL) { a = *arg_in; } else { - ngx_rtmp_eval(e->session, arg_in, ngx_rtmp_exec_eval_p, &a); + ngx_rtmp_eval(e->eval_ctx, arg_in, e->eval, &a, e->log); } if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { @@ -543,35 +802,66 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) *arg_out = NULL; +#if (NGX_DEBUG) + { + char **p; + + for (p = args; *p; p++) { + ngx_write_fd(STDERR_FILENO, "'", 1); + ngx_write_fd(STDERR_FILENO, *p, strlen(*p)); + ngx_write_fd(STDERR_FILENO, "' ", 2); + } + + ngx_write_fd(STDERR_FILENO, "\n", 1); + } +#endif + if (execvp((char *) ec->cmd.data, args) == -1) { + char *msg; + + msg = strerror(errno); + + ngx_write_fd(STDERR_FILENO, "execvp error: ", 14); + ngx_write_fd(STDERR_FILENO, msg, strlen(msg)); + ngx_write_fd(STDERR_FILENO, "\n", 1); + exit(1); } break; default: + /* parent */ - close(pipefd[1]); - e->active = 1; - e->pid = pid; - e->pipefd = pipefd[0]; - if (e->save_pid) { - *e->save_pid = pid; + + if (pipefd[1] != -1) { + close(pipefd[1]); } - e->dummy_conn.fd = e->pipefd; - e->dummy_conn.data = e; - e->dummy_conn.read = &e->read_evt; - e->dummy_conn.write = &e->write_evt; - e->read_evt.data = &e->dummy_conn; - e->write_evt.data = &e->dummy_conn; + if (pipefd[0] != -1) { + + e->active = 1; + e->pid = pid; + e->pipefd = pipefd[0]; - e->read_evt.log = e->log; - e->read_evt.handler = ngx_rtmp_exec_child_dead; + if (e->save_pid) { + *e->save_pid = pid; + } - if (ngx_add_event(&e->read_evt, NGX_READ_EVENT, 0) != NGX_OK) { - ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, - "exec: failed to add child control event"); + e->dummy_conn.fd = e->pipefd; + e->dummy_conn.data = e; + e->dummy_conn.read = &e->read_evt; + e->dummy_conn.write = &e->write_evt; + e->read_evt.data = &e->dummy_conn; + e->write_evt.data = &e->dummy_conn; + + e->read_evt.log = e->log; + e->read_evt.handler = ngx_rtmp_exec_child_dead; + + if (ngx_add_event(&e->read_evt, NGX_READ_EVENT, 0) != NGX_OK) { + ngx_log_error(NGX_LOG_INFO, e->log, ngx_errno, + "exec: failed to add child control event"); + } } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, e->log, 0, @@ -579,54 +869,288 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) &ec->cmd, (ngx_int_t) pid); break; } + return NGX_OK; } static ngx_int_t -ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) +ngx_rtmp_exec_init_ctx(ngx_rtmp_session_t *s, u_char name[NGX_RTMP_MAX_NAME], + u_char args[NGX_RTMP_MAX_ARGS], ngx_uint_t flags) { - ngx_rtmp_exec_app_conf_t *eacf; - ngx_rtmp_exec_ctx_t *ctx; - ngx_rtmp_exec_t *e; - size_t n; + ngx_uint_t n; + ngx_array_t *push_conf; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_main_conf_t *emcf; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + + if (ctx != NULL) { + goto done; + } + + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t)); + + if (ctx == NULL) { + return NGX_ERROR; + } + + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module); eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); - if (eacf == NULL) { - goto next; + + emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); + + push_conf = &eacf->conf[NGX_RTMP_EXEC_PUSH]; + + if (push_conf->nelts > 0) { + + if (ngx_array_init(&ctx->push_exec, s->connection->pool, + push_conf->nelts, + sizeof(ngx_rtmp_exec_t)) != NGX_OK) + { + return NGX_ERROR; + } + + e = ngx_array_push_n(&ctx->push_exec, push_conf->nelts); + + if (e == NULL) { + return NGX_ERROR; + } + + ec = push_conf->elts; + + for (n = 0; n < push_conf->nelts; n++, e++, ec++) { + ngx_memzero(e, sizeof(*e)); + e->conf = ec; + e->managed = 1; + e->log = s->connection->log; + e->eval = ngx_rtmp_exec_push_eval; + e->eval_ctx = s; + e->kill_signal = emcf->kill_signal; + e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : + NGX_CONF_UNSET_MSEC); + } + } + +done: + + ngx_memcpy(ctx->name, name, NGX_RTMP_MAX_NAME); + ngx_memcpy(ctx->args, args, NGX_RTMP_MAX_ARGS); + + ctx->flags |= flags; + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_exec_init_pull_ctx(ngx_rtmp_session_t *s, + u_char name[NGX_RTMP_MAX_NAME]) +{ + size_t len; + ngx_uint_t n; + ngx_pool_t *pool; + ngx_array_t *pull_conf; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx; + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_main_conf_t *emcf; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx->pull != NULL) { + return NGX_OK; + } + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + + pull_conf = &eacf->conf[NGX_RTMP_EXEC_PULL]; + + if (pull_conf->nelts == 0) { + return NGX_OK; + } + + emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); + + len = ngx_strlen(name); + + ppctx = &eacf->pull[ngx_hash_key(name, len) % eacf->nbuckets]; + + for (; *ppctx; ppctx = &(*ppctx)->next) { + pctx = *ppctx; + + if (pctx->name.len == len && + ngx_strncmp(name, pctx->name.data, len) == 0) + { + goto done; + } + } + + pool = ngx_create_pool(4096, emcf->log); + if (pool == NULL) { + return NGX_ERROR; + } + + pctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_exec_pull_ctx_t)); + if (pctx == NULL) { + goto error; + } + + pctx->pool = pool; + pctx->name.len = len; + pctx->name.data = ngx_palloc(pool, len); + + if (pctx->name.data == NULL) { + goto error; + } + + ngx_memcpy(pctx->name.data, name, len); + + pctx->app.len = s->app.len; + pctx->app.data = ngx_palloc(pool, s->app.len); + + if (pctx->app.data == NULL) { + goto error; + } + + ngx_memcpy(pctx->app.data, s->app.data, s->app.len); + + if (ngx_array_init(&pctx->pull_exec, pool, pull_conf->nelts, + sizeof(ngx_rtmp_exec_t)) != NGX_OK) + { + goto error; + } + + e = ngx_array_push_n(&pctx->pull_exec, pull_conf->nelts); + if (e == NULL) { + goto error; + } + + ec = pull_conf->elts; + for (n = 0; n < pull_conf->nelts; n++, e++, ec++) { + ngx_memzero(e, sizeof(*e)); + e->conf = ec; + e->managed = 1; + e->log = emcf->log; + e->eval = ngx_rtmp_exec_pull_eval; + e->eval_ctx = pctx; + e->kill_signal = emcf->kill_signal; + e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : + NGX_CONF_UNSET_MSEC); + } + + *ppctx = pctx; + +done: + + ctx->pull = pctx; + ctx->pull->counter++; + + return NGX_OK; + +error: + + ngx_destroy_pool(pool); + + return NGX_ERROR; +} + + +static ngx_int_t +ngx_rtmp_exec_filter(ngx_rtmp_session_t *s, ngx_rtmp_exec_conf_t *ec) +{ + size_t len; + ngx_str_t *v; + ngx_uint_t n; + ngx_rtmp_exec_ctx_t *ctx; + + if (ec->names.nelts == 0) { + return NGX_OK; } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); - if (ctx == NULL || ctx->execs.nelts == 0) { - goto next; + + len = ngx_strlen(ctx->name); + + v = ec->names.elts; + for (n = 0; n < ec->names.nelts; n++, s++) { + if (v->len == len && ngx_strncmp(v->data, ctx->name, len) == 0) { + return NGX_OK; + } } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "exec: delete %uz command(s)", ctx->execs.nelts); + return NGX_DECLINED; +} - e = ctx->execs.elts; - for (n = 0; n < ctx->execs.nelts; ++n, ++e) { - ngx_rtmp_exec_kill(e, e->kill_signal); + +static void +ngx_rtmp_exec_unmanaged(ngx_rtmp_session_t *s, ngx_array_t *e, const char *op) +{ + ngx_uint_t n; + ngx_rtmp_exec_t en; + ngx_rtmp_exec_conf_t *ec; + + if (e->nelts == 0) { + return; } -next: - return next_close_stream(s, v); + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: %s %uz unmanaged command(s)", op, e->nelts); + + ec = e->elts; + for (n = 0; n < e->nelts; n++, ec++) { + if (ngx_rtmp_exec_filter(s, ec) != NGX_OK) { + continue; + } + + ngx_memzero(&en, sizeof(ngx_rtmp_exec_t)); + + en.conf = ec; + en.eval = ngx_rtmp_exec_event_eval; + en.eval_ctx = s; + en.log = s->connection->log; + + ngx_rtmp_exec_run(&en); + } +} + + +static void +ngx_rtmp_exec_managed(ngx_rtmp_session_t *s, ngx_array_t *e, const char *op) +{ + ngx_uint_t n; + ngx_rtmp_exec_t *en; + + if (e->nelts == 0) { + return; + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: %s %uz managed command(s)", op, e->nelts); + + en = e->elts; + for (n = 0; n < e->nelts; n++, en++) { + if (ngx_rtmp_exec_filter(s, en->conf) == NGX_OK) { + ngx_rtmp_exec_run(en); + } + } } static ngx_int_t ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { - ngx_rtmp_exec_main_conf_t *emcf; - ngx_rtmp_exec_app_conf_t *eacf; - ngx_rtmp_exec_t *e; - ngx_rtmp_exec_conf_t *ec; - ngx_rtmp_exec_ctx_t *ctx; - size_t n; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_app_conf_t *eacf; - emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); - if (eacf == NULL || eacf->confs.nelts == 0) { + + if (eacf == NULL || !eacf->active) { goto next; } @@ -634,139 +1158,339 @@ ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) goto next; } + if (ngx_rtmp_exec_init_ctx(s, v->name, v->args, NGX_RTMP_EXEC_PUBLISHING) + != NGX_OK) + { + goto next; + } + + ngx_rtmp_exec_unmanaged(s, &eacf->conf[NGX_RTMP_EXEC_PUBLISH], "publish"); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); - if (ctx == NULL) { - ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t)); - if (ctx == NULL) { - return NGX_ERROR; - } - - ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module); - - if (ngx_array_init(&ctx->execs, s->connection->pool, eacf->confs.nelts, - sizeof(ngx_rtmp_exec_t)) != NGX_OK) - { - return NGX_ERROR; - } - - e = ngx_array_push_n(&ctx->execs, eacf->confs.nelts); - if (e == NULL) { - return NGX_ERROR; - } - - ec = eacf->confs.elts; - for (n = 0; n < eacf->confs.nelts; ++n, ++e, ++ec) { - ngx_memzero(e, sizeof(*e)); - e->conf = ec; - e->log = s->connection->log; - e->session = s; - e->kill_signal = emcf->kill_signal; - e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : - NGX_CONF_UNSET_MSEC); - } - } - - ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); - ngx_memcpy(ctx->args, v->args, NGX_RTMP_MAX_ARGS); - - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "exec: run %uz command(s)", ctx->execs.nelts); - - e = ctx->execs.elts; - for (n = 0; n < ctx->execs.nelts; ++n, ++e) { - ngx_rtmp_exec_run(e); - } + ngx_rtmp_exec_managed(s, &ctx->push_exec, "push"); next: return next_publish(s, v); } + + +static ngx_int_t +ngx_rtmp_exec_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) +{ + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_pull_ctx_t *pctx; + ngx_rtmp_exec_app_conf_t *eacf; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + + if (eacf == NULL || !eacf->active) { + goto next; + } + + if (ngx_rtmp_exec_init_ctx(s, v->name, v->args, NGX_RTMP_EXEC_PLAYING) + != NGX_OK) + { + goto next; + } + + ngx_rtmp_exec_unmanaged(s, &eacf->conf[NGX_RTMP_EXEC_PLAY], "play"); + + if (ngx_rtmp_exec_init_pull_ctx(s, v->name) != NGX_OK) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + pctx = ctx->pull; + + if (pctx && pctx->counter == 1) { + ngx_rtmp_exec_managed(s, &pctx->pull_exec, "pull"); + } + +next: + return next_play(s, v); +} + + +static ngx_int_t +ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) +{ + size_t n; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx; + ngx_rtmp_exec_app_conf_t *eacf; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + if (eacf == NULL) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx == NULL) { + goto next; + } + + if (ctx->flags & NGX_RTMP_EXEC_PUBLISHING) { + ngx_rtmp_exec_unmanaged(s, &eacf->conf[NGX_RTMP_EXEC_PUBLISH_DONE], + "publish_done"); + } else { + ngx_rtmp_exec_unmanaged(s, &eacf->conf[NGX_RTMP_EXEC_PLAY_DONE], + "play_done"); + } + + ctx->flags = 0; + + if (ctx->push_exec.nelts > 0) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: delete %uz push command(s)", + ctx->push_exec.nelts); + + e = ctx->push_exec.elts; + for (n = 0; n < ctx->push_exec.nelts; n++, e++) { + ngx_rtmp_exec_kill(e, e->kill_signal); + } + } + + pctx = ctx->pull; + + if (pctx && --pctx->counter == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: delete %uz pull command(s)", + pctx->pull_exec.nelts); + + e = pctx->pull_exec.elts; + for (n = 0; n < pctx->pull_exec.nelts; n++, e++) { + ngx_rtmp_exec_kill(e, e->kill_signal); + } + + ppctx = &eacf->pull[ngx_hash_key(pctx->name.data, pctx->name.len) % + eacf->nbuckets]; + + for (; *ppctx; ppctx = &(*ppctx)->next) { + if (pctx == *ppctx) { + *ppctx = pctx->next; + break; + } + } + + ngx_destroy_pool(pctx->pool); + } + + ctx->pull = NULL; + +next: + return next_close_stream(s, v); +} + + +static ngx_int_t +ngx_rtmp_exec_record_done(ngx_rtmp_session_t *s, ngx_rtmp_record_done_t *v) +{ + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_app_conf_t *eacf; + + if (s->auto_pushed) { + goto next; + } + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + if (eacf == NULL || !eacf->active) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx == NULL) { + goto next; + } + + ctx->recorder = v->recorder; + ctx->path = v->path; + + ngx_rtmp_exec_unmanaged(s, &eacf->conf[NGX_RTMP_EXEC_RECORD_DONE], + "record_done"); + + ngx_str_null(&v->recorder); + ngx_str_null(&v->path); + +next: + return next_record_done(s, v); +} #endif /* NGX_WIN32 */ static char * -ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { - ngx_str_t *value; - ngx_rtmp_exec_app_conf_t *eacf; - size_t n, nargs; - ngx_str_t *s; - ngx_rtmp_exec_conf_t *ec; + char *p = conf; + + size_t n, nargs; + ngx_str_t *s, *value, v; + ngx_array_t *confs; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_app_conf_t *eacf; + + confs = (ngx_array_t *) (p + cmd->offset); eacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_exec_module); + + if (confs->nalloc == 0 && ngx_array_init(confs, cf->pool, 1, + sizeof(ngx_rtmp_exec_conf_t)) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + value = cf->args->elts; - ec = ngx_array_push(&eacf->confs); + ec = ngx_array_push(confs); if (ec == NULL) { return NGX_CONF_ERROR; } + ngx_memzero(ec, sizeof(ngx_rtmp_exec_conf_t)); + + /* type is undefined for explicit execs */ + + ec->type = NGX_CONF_UNSET_UINT; ec->cmd = value[1]; + if (ngx_array_init(&ec->names, cf->pool, 1, sizeof(ngx_str_t)) != NGX_OK) { + return NGX_CONF_ERROR; + } + if (cf->args->nelts == 2) { return NGX_CONF_OK; } nargs = cf->args->nelts - 2; - if (ngx_array_init(&ec->args, cf->pool, nargs, - sizeof(ngx_str_t)) != NGX_OK) + if (ngx_array_init(&ec->args, cf->pool, nargs, sizeof(ngx_str_t)) != NGX_OK) { return NGX_CONF_ERROR; } - s = ngx_array_push_n(&ec->args, nargs); - for (n = 2; n < cf->args->nelts; ++n, ++s) { - *s = value[n]; + for (n = 2; n < cf->args->nelts; n++) { + + v = value[n]; + + if (eacf->options == 1) { + + if (v.len >= 5 && ngx_strncmp(v.data, "name=", 5) == 0) { + + s = ngx_array_push(&ec->names); + if (s == NULL) { + return NGX_CONF_ERROR; + } + + v.data += 5; + v.len -= 5; + + *s = v; + + continue; + } + } + + s = ngx_array_push(&ec->args); + if (s == NULL) { + return NGX_CONF_ERROR; + } + + *s = v; } return NGX_CONF_OK; } - +/* static char * -ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +ngx_rtmp_exec_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { - ngx_rtmp_exec_main_conf_t *emcf = conf; - + char *rv; ngx_str_t *value; - size_t n, nargs; - ngx_str_t *s; - ngx_rtmp_exec_conf_t *ec; + ngx_conf_t save; + ngx_array_t *confs; + ngx_rtmp_conf_ctx_t *ctx, *pctx; + ngx_rtmp_exec_conf_t *ec, *eec; + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_main_conf_t *emcf; value = cf->args->elts; - ec = ngx_array_push(&emcf->confs); + eacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_exec_module); + + emcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_exec_module); + + ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_conf_ctx_t)); + if (ctx == NULL) { + return NGX_CONF_ERROR; + } + + pctx = cf->ctx; + + ctx->main_conf = pctx->main_conf; + ctx->srv_conf = pctx->srv_conf; + + ctx->app_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_rtmp_max_module); + if (ctx->app_conf == NULL) { + return NGX_CONF_ERROR; + } + + ec = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_exec_conf_t)); if (ec == NULL) { return NGX_CONF_ERROR; } - ec->cmd = value[1]; + ec->id = value[1]; + ec->type = NGX_CONF_UNSET_UINT; - if (cf->args->nelts == 2) { - return NGX_CONF_OK; + ctx->app_conf[ngx_rtmp_exec_module.ctx_index] = ec; + + save = *cf; + + cf->ctx = ctx; + cf->cmd_type = NGX_RTMP_EXEC_CONF; + + rv = ngx_conf_parse(cf, NULL); + *cf= save; + + switch (ec->type) { + + case NGX_RTMP_EXEC_STATIC: + confs = &emcf->static_conf; + break; + + case NGX_CONF_UNSET_UINT: + return "unspecified exec type"; + + default: + confs = &eacf->conf[ec->type]; } - nargs = cf->args->nelts - 2; - if (ngx_array_init(&ec->args, cf->pool, nargs, - sizeof(ngx_str_t)) != NGX_OK) + if (confs->nalloc == 0 && ngx_array_init(confs, cf->pool, 1, + sizeof(ngx_rtmp_exec_conf_t)) + != NGX_OK) { return NGX_CONF_ERROR; } - s = ngx_array_push_n(&ec->args, nargs); - for (n = 2; n < cf->args->nelts; ++n, ++s) { - *s = value[n]; + eec = ngx_array_push(confs); + if (eec == NULL) { + return NGX_CONF_ERROR; } - return NGX_CONF_OK; -} + *eec = *ec; + return rv; +} +*/ static char * ngx_rtmp_exec_kill_signal(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_rtmp_exec_main_conf_t *emcf = conf; - ngx_str_t *value; + + ngx_str_t *value; value = cf->args->elts; value++; @@ -822,9 +1546,15 @@ ngx_rtmp_exec_postconfiguration(ngx_conf_t *cf) next_publish = ngx_rtmp_publish; ngx_rtmp_publish = ngx_rtmp_exec_publish; + next_play = ngx_rtmp_play; + ngx_rtmp_play = ngx_rtmp_exec_play; + next_close_stream = ngx_rtmp_close_stream; ngx_rtmp_close_stream = ngx_rtmp_exec_close_stream; + next_record_done = ngx_rtmp_record_done; + ngx_rtmp_record_done = ngx_rtmp_exec_record_done; + #endif /* NGX_WIN32 */ return NGX_OK; diff --git a/ngx_rtmp_version.h b/ngx_rtmp_version.h index 50ad3d5..74ce9d2 100644 --- a/ngx_rtmp_version.h +++ b/ngx_rtmp_version.h @@ -8,8 +8,8 @@ #define _NGX_RTMP_VERSION_H_INCLUDED_ -#define nginx_rtmp_version 1000006 -#define NGINX_RTMP_VERSION "1.0.6" +#define nginx_rtmp_version 1000007 +#define NGINX_RTMP_VERSION "1.0.7" #endif /* _NGX_RTMP_VERSION_H_INCLUDED_ */