diff --git a/ngx_rtmp_enotify_module.c b/ngx_rtmp_enotify_module.c index ccfd93c..d888a33 100644 --- a/ngx_rtmp_enotify_module.c +++ b/ngx_rtmp_enotify_module.c @@ -136,10 +136,11 @@ ngx_module_t ngx_rtmp_enotify_module = { static void -ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_enotify_eval_astr(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_enotify_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_enotify_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); if (ctx == NULL) { @@ -153,10 +154,11 @@ ngx_rtmp_enotify_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, static void -ngx_rtmp_enotify_eval_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_enotify_eval_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_enotify_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_enotify_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_enotify_module); if (ctx == NULL) { @@ -283,7 +285,8 @@ ngx_rtmp_enotify_exec(ngx_rtmp_session_t *s, ngx_rtmp_enotify_conf_t *ec) for (n = 0; n < ec->args.nelts; ++n, ++arg_in) { - ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a); + ngx_rtmp_eval(s, arg_in, ngx_rtmp_enotify_eval_p, &a, + s->connection->log); if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { continue; diff --git a/ngx_rtmp_eval.c b/ngx_rtmp_eval.c index b2c29b3..0cb0f81 100644 --- a/ngx_rtmp_eval.c +++ b/ngx_rtmp_eval.c @@ -12,17 +12,17 @@ static void -ngx_rtmp_eval_session_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_eval_session_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - *ret = *(ngx_str_t *) ((u_char *) s + e->offset); + *ret = *(ngx_str_t *) ((u_char *) ctx + e->offset); } static void -ngx_rtmp_eval_connection_str(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_eval_connection_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { + ngx_rtmp_session_t *s = ctx; + *ret = *(ngx_str_t *) ((u_char *) s->connection + e->offset); } @@ -58,15 +58,14 @@ ngx_rtmp_eval_t ngx_rtmp_eval_session[] = { static void -ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b, - void *data, size_t len) +ngx_rtmp_eval_append(ngx_buf_t *b, void *data, size_t len, ngx_log_t *log) { size_t buf_len; if (b->last + len > b->end) { buf_len = 2 * (b->last - b->pos) + len; - b->start = ngx_palloc(s->connection->pool, buf_len); + b->start = ngx_alloc(buf_len, log); if (b->start == NULL) { return; } @@ -81,8 +80,8 @@ ngx_rtmp_eval_append(ngx_rtmp_session_t *s, ngx_buf_t *b, static void -ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, - ngx_rtmp_eval_t **e, ngx_str_t *name) +ngx_rtmp_eval_append_var(void *ctx, ngx_buf_t *b, ngx_rtmp_eval_t **e, + ngx_str_t *name, ngx_log_t *log) { ngx_uint_t k; ngx_str_t v; @@ -91,10 +90,10 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, for (; *e; ++e) { for (k = 0, ee = *e; ee->handler; ++k, ++ee) { if (ee->name.len == name->len && - ngx_memcmp(ee->name.data, name->data, name->len) == 0) + ngx_memcmp(ee->name.data, name->data, name->len) == 0) { - ee->handler(s, ee, &v); - ngx_rtmp_eval_append(s, b, v.data, v.len); + ee->handler(ctx, ee, &v); + ngx_rtmp_eval_append(b, v.data, v.len, log); } } } @@ -102,8 +101,8 @@ ngx_rtmp_eval_append_var(ngx_rtmp_session_t *s, ngx_buf_t *b, ngx_int_t -ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, - ngx_str_t *out) +ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e, ngx_str_t *out, + ngx_log_t *log) { u_char c, *p; ngx_str_t name; @@ -117,8 +116,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, SNAME } state = NORMAL; - b.pos = b.last = b.start = ngx_palloc(s->connection->pool, - NGX_RTMP_EVAL_BUFLEN); + b.pos = b.last = b.start = ngx_alloc(NGX_RTMP_EVAL_BUFLEN, log); if (b.pos == NULL) { return NGX_ERROR; } @@ -137,7 +135,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); state = NORMAL; @@ -154,7 +152,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); case NORMAL: switch (c) { @@ -168,7 +166,7 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, } case ESCAPE: - ngx_rtmp_eval_append(s, &b, &c, 1); + ngx_rtmp_eval_append(&b, &c, 1, log); state = NORMAL; break; @@ -178,11 +176,11 @@ ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, ngx_rtmp_eval_t **e, if (state == NAME) { p = &in->data[n]; name.len = p - name.data; - ngx_rtmp_eval_append_var(s, &b, e, &name); + ngx_rtmp_eval_append_var(ctx, &b, e, &name, log); } c = 0; - ngx_rtmp_eval_append(s, &b, &c, 1); + ngx_rtmp_eval_append(&b, &c, 1, log); out->data = b.pos; out->len = b.last - b.pos - 1; diff --git a/ngx_rtmp_eval.h b/ngx_rtmp_eval.h index a7ba7c2..1f48525 100644 --- a/ngx_rtmp_eval.h +++ b/ngx_rtmp_eval.h @@ -15,7 +15,7 @@ typedef struct ngx_rtmp_eval_s ngx_rtmp_eval_t; -typedef void (* ngx_rtmp_eval_pt)(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, +typedef void (* ngx_rtmp_eval_pt)(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret); @@ -33,8 +33,8 @@ struct ngx_rtmp_eval_s { extern ngx_rtmp_eval_t ngx_rtmp_eval_session[]; -ngx_int_t ngx_rtmp_eval(ngx_rtmp_session_t *s, ngx_str_t *in, - ngx_rtmp_eval_t **e, ngx_str_t *out); +ngx_int_t ngx_rtmp_eval(void *ctx, ngx_str_t *in, ngx_rtmp_eval_t **e, + ngx_str_t *out, ngx_log_t *log); ngx_int_t ngx_rtmp_eval_streams(ngx_str_t *in); diff --git a/ngx_rtmp_exec_module.c b/ngx_rtmp_exec_module.c index 7eb5ae2..e6e02bf 100644 --- a/ngx_rtmp_exec_module.c +++ b/ngx_rtmp_exec_module.c @@ -16,6 +16,7 @@ #if !(NGX_WIN32) static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_play_pt next_play; static ngx_rtmp_close_stream_pt next_close_stream; #endif @@ -27,7 +28,7 @@ static char * ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf); static void * ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child); -static char * ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, +static char * ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static char * ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); @@ -48,7 +49,8 @@ typedef struct { typedef struct { ngx_rtmp_exec_conf_t *conf; ngx_log_t *log; - ngx_rtmp_session_t *session; /* NULL for init execs */ + ngx_rtmp_eval_t **eval; + void *eval_ctx; unsigned active:1; ngx_pid_t pid; ngx_pid_t *save_pid; @@ -62,23 +64,40 @@ typedef struct { typedef struct { - ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ - ngx_array_t execs; /* ngx_rtmp_exec_t */ + ngx_array_t static_confs; /* ngx_rtmp_exec_conf_t */ + ngx_array_t static_execs; /* ngx_rtmp_exec_t */ ngx_msec_t respawn_timeout; ngx_int_t kill_signal; + ngx_log_t *log; } ngx_rtmp_exec_main_conf_t; +typedef struct ngx_rtmp_exec_pull_ctx_s ngx_rtmp_exec_pull_ctx_t; + +struct ngx_rtmp_exec_pull_ctx_s { + ngx_pool_t *pool; + ngx_uint_t counter; + ngx_str_t name; + ngx_str_t app; + ngx_array_t pull_execs; /* ngx_rtmp_exec_t */ + ngx_rtmp_exec_pull_ctx_t *next; +}; + + typedef struct { - ngx_array_t confs; /* ngx_rtmp_exec_conf_t */ + ngx_array_t push_confs; /* ngx_rtmp_exec_conf_t */ + ngx_array_t pull_confs; /* ngx_rtmp_exec_conf_t */ ngx_flag_t respawn; + ngx_uint_t nbuckets; + ngx_rtmp_exec_pull_ctx_t **pull; } ngx_rtmp_exec_app_conf_t; typedef struct { u_char name[NGX_RTMP_MAX_NAME]; u_char args[NGX_RTMP_MAX_ARGS]; - ngx_array_t execs; + ngx_array_t push_execs; /* ngx_rtmp_exec_t */ + ngx_rtmp_exec_pull_ctx_t *pull; } ngx_rtmp_exec_ctx_t; @@ -93,9 +112,23 @@ static ngx_command_t ngx_rtmp_exec_commands[] = { { ngx_string("exec"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, - ngx_rtmp_exec_exec, + ngx_rtmp_exec_conf, NGX_RTMP_APP_CONF_OFFSET, - 0, + offsetof(ngx_rtmp_exec_app_conf_t, push_confs), + NULL }, + + { ngx_string("exec_push"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, push_confs), + NULL }, + + { ngx_string("exec_pull"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_1MORE, + ngx_rtmp_exec_conf, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_exec_app_conf_t, pull_confs), NULL }, { ngx_string("exec_static"), @@ -159,10 +192,11 @@ ngx_module_t ngx_rtmp_exec_module = { static void -ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, - ngx_str_t *ret) +ngx_rtmp_exec_eval_ctx_str(void *sctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) { - ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_session_t *s = sctx; + + ngx_rtmp_exec_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); if (ctx == NULL) { @@ -175,23 +209,50 @@ ngx_rtmp_exec_eval_astr(ngx_rtmp_session_t *s, ngx_rtmp_eval_t *e, } -static ngx_rtmp_eval_t ngx_rtmp_exec_eval[] = { +static void +ngx_rtmp_exec_eval_pctx_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret) +{ + *ret = *(ngx_str_t *) ((u_char *) ctx + e->offset); +} + + +static ngx_rtmp_eval_t ngx_rtmp_exec_push_specific_eval[] = { { ngx_string("name"), - ngx_rtmp_exec_eval_astr, + ngx_rtmp_exec_eval_ctx_str, offsetof(ngx_rtmp_exec_ctx_t, name) }, { ngx_string("args"), - ngx_rtmp_exec_eval_astr, + ngx_rtmp_exec_eval_ctx_str, offsetof(ngx_rtmp_exec_ctx_t, args) }, ngx_rtmp_null_eval }; -static ngx_rtmp_eval_t * ngx_rtmp_exec_eval_p[] = { +static ngx_rtmp_eval_t * ngx_rtmp_exec_push_eval[] = { ngx_rtmp_eval_session, - ngx_rtmp_exec_eval, + ngx_rtmp_exec_push_specific_eval, + NULL +}; + + +static ngx_rtmp_eval_t ngx_rtmp_exec_pull_specific_eval[] = { + + { ngx_string("name"), + ngx_rtmp_exec_eval_pctx_str, + offsetof(ngx_rtmp_exec_pull_ctx_t, name) }, + + { ngx_string("app"), + ngx_rtmp_exec_eval_pctx_str, + offsetof(ngx_rtmp_exec_pull_ctx_t, app) }, + + ngx_rtmp_null_eval +}; + + +static ngx_rtmp_eval_t * ngx_rtmp_exec_pull_eval[] = { + ngx_rtmp_exec_pull_specific_eval, NULL }; @@ -209,7 +270,7 @@ ngx_rtmp_exec_create_main_conf(ngx_conf_t *cf) emcf->respawn_timeout = NGX_CONF_UNSET_MSEC; emcf->kill_signal = NGX_CONF_UNSET; - if (ngx_array_init(&emcf->confs, cf->pool, 1, + if (ngx_array_init(&emcf->static_confs, cf->pool, 1, sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) { return NULL; @@ -237,23 +298,26 @@ ngx_rtmp_exec_init_main_conf(ngx_conf_t *cf, void *conf) } #endif - if (ngx_array_init(&emcf->execs, cf->pool, emcf->confs.nelts, + if (ngx_array_init(&emcf->static_execs, cf->pool, + emcf->static_confs.nelts, sizeof(ngx_rtmp_exec_t)) != NGX_OK) { return NGX_CONF_ERROR; } - e = ngx_array_push_n(&emcf->execs, emcf->confs.nelts); + e = ngx_array_push_n(&emcf->static_execs, emcf->static_confs.nelts); if (e == NULL) { return NGX_CONF_ERROR; } - ec = emcf->confs.elts; + emcf->log = &cf->cycle->new_log; - for (n = 0; n < emcf->confs.nelts; ++n, ++e, ++ec) { + ec = emcf->static_confs.elts; + + for (n = 0; n < emcf->static_confs.nelts; ++n, ++e, ++ec) { ngx_memzero(e, sizeof(*e)); e->conf = ec; - e->log = &cf->cycle->new_log; + e->log = emcf->log; e->respawn_timeout = emcf->respawn_timeout; e->kill_signal = emcf->kill_signal; } @@ -273,8 +337,15 @@ ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf) } eacf->respawn = NGX_CONF_UNSET; + eacf->nbuckets = NGX_CONF_UNSET_UINT; - if (ngx_array_init(&eacf->confs, cf->pool, 1, + if (ngx_array_init(&eacf->push_confs, cf->pool, 1, + sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) + { + return NULL; + } + + if (ngx_array_init(&eacf->pull_confs, cf->pool, 1, sizeof(ngx_rtmp_exec_conf_t)) != NGX_OK) { return NULL; @@ -284,27 +355,56 @@ ngx_rtmp_exec_create_app_conf(ngx_conf_t *cf) } +static ngx_int_t +ngx_rtmp_exec_merge_confs(ngx_array_t *conf, ngx_array_t *prev) +{ + size_t n; + ngx_rtmp_exec_conf_t *ec, *pec; + + if (prev->nelts == 0) { + return NGX_OK; + } + + ec = ngx_array_push_n(conf, prev->nelts); + if (ec == NULL) { + return NGX_ERROR; + } + + pec = prev->elts; + for (n = 0; n < prev->nelts; n++, ec++, pec++) { + *ec = *pec; + } + + return NGX_OK; +} + + static char * ngx_rtmp_exec_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_rtmp_exec_app_conf_t *prev = parent; ngx_rtmp_exec_app_conf_t *conf = child; - size_t n; - ngx_rtmp_exec_conf_t *ec, *pec; ngx_conf_merge_value(conf->respawn, prev->respawn, 1); + ngx_conf_merge_uint_value(conf->nbuckets, prev->nbuckets, 1024); - if (prev->confs.nelts) { - ec = ngx_array_push_n(&conf->confs, prev->confs.nelts); - if (ec == NULL) { - return NGX_CONF_ERROR; - } - pec = prev->confs.elts; - for (n = 0; n < prev->confs.nelts; ++n, ++ec, ++pec) { - *ec = *pec; - } + if (ngx_rtmp_exec_merge_confs(&conf->push_confs, &prev->push_confs) + != NGX_OK) + { + return NGX_CONF_ERROR; } - + + if (ngx_rtmp_exec_merge_confs(&conf->pull_confs, &prev->pull_confs) + != NGX_OK) + { + return NGX_CONF_ERROR; + } + + conf->pull = ngx_pcalloc(cf->pool, sizeof(void *) * conf->nbuckets); + if (conf->pull == NULL) { + return NGX_CONF_ERROR; + } + return NGX_CONF_OK; } @@ -344,8 +444,8 @@ ngx_rtmp_exec_init_process(ngx_cycle_t *cycle) * when nginx worker is terminated. */ - e = emcf->execs.elts; - for (n = 0; n < emcf->execs.nelts; ++n, ++e) { + e = emcf->static_execs.elts; + for (n = 0; n < emcf->static_execs.nelts; ++n, ++e) { e->respawn_evt.data = e; e->respawn_evt.log = e->log; e->respawn_evt.handler = ngx_rtmp_exec_respawn; @@ -527,10 +627,10 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) for (n = 0; n < ec->args.nelts; n++, ++arg_in) { - if (e->session == NULL) { + if (e->eval == NULL) { a = *arg_in; } else { - ngx_rtmp_eval(e->session, arg_in, ngx_rtmp_exec_eval_p, &a); + ngx_rtmp_eval(e->eval_ctx, arg_in, e->eval, &a, e->log); } if (ngx_rtmp_eval_streams(&a) != NGX_DONE) { @@ -585,10 +685,11 @@ ngx_rtmp_exec_run(ngx_rtmp_exec_t *e) static ngx_int_t ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { - ngx_rtmp_exec_app_conf_t *eacf; - ngx_rtmp_exec_ctx_t *ctx; - ngx_rtmp_exec_t *e; - size_t n; + size_t n; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx; + ngx_rtmp_exec_app_conf_t *eacf; eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); if (eacf == NULL) { @@ -596,36 +697,229 @@ ngx_rtmp_exec_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); - if (ctx == NULL || ctx->execs.nelts == 0) { + if (ctx == NULL) { goto next; } + + if (ctx->push_execs.nelts > 0) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: delete %uz push command(s)", + ctx->push_execs.nelts); - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "exec: delete %uz command(s)", ctx->execs.nelts); - - e = ctx->execs.elts; - for (n = 0; n < ctx->execs.nelts; ++n, ++e) { - ngx_rtmp_exec_kill(e, e->kill_signal); + e = ctx->push_execs.elts; + for (n = 0; n < ctx->push_execs.nelts; n++, e++) { + ngx_rtmp_exec_kill(e, e->kill_signal); + } } + pctx = ctx->pull; + + if (pctx && --pctx->counter == 0) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: delete %uz pull command(s)", + pctx->pull_execs.nelts); + + e = pctx->pull_execs.elts; + for (n = 0; n < pctx->pull_execs.nelts; n++, e++) { + ngx_rtmp_exec_kill(e, e->kill_signal); + } + + ppctx = &eacf->pull[ngx_hash_key(pctx->name.data, pctx->name.len) & + eacf->nbuckets]; + + for (; *ppctx; ppctx = &(*ppctx)->next) { + if (pctx == *ppctx) { + *ppctx = pctx->next; + break; + } + } + + ngx_destroy_pool(pctx->pool); + } + + ctx->pull = NULL; + next: return next_close_stream(s, v); } +static ngx_int_t +ngx_rtmp_exec_init_ctx(ngx_rtmp_session_t *s, u_char name[NGX_RTMP_MAX_NAME], + u_char args[NGX_RTMP_MAX_ARGS]) +{ + ngx_uint_t n; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_main_conf_t *emcf; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + + if (ctx != NULL) { + goto done; + } + + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t)); + if (ctx == NULL) { + return NGX_ERROR; + } + + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module); + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); + + if (ngx_array_init(&ctx->push_execs, s->connection->pool, + eacf->push_confs.nelts, + sizeof(ngx_rtmp_exec_t)) != NGX_OK) + { + return NGX_ERROR; + } + + e = ngx_array_push_n(&ctx->push_execs, eacf->push_confs.nelts); + if (e == NULL) { + return NGX_ERROR; + } + + ec = eacf->push_confs.elts; + for (n = 0; n < eacf->push_confs.nelts; n++, e++, ec++) { + ngx_memzero(e, sizeof(*e)); + e->conf = ec; + e->log = s->connection->log; + e->eval = ngx_rtmp_exec_push_eval; + e->eval_ctx = s; + e->kill_signal = emcf->kill_signal; + e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : + NGX_CONF_UNSET_MSEC); + } + +done: + + ngx_memcpy(ctx->name, name, NGX_RTMP_MAX_NAME); + ngx_memcpy(ctx->args, args, NGX_RTMP_MAX_ARGS); + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_exec_init_pull_ctx(ngx_rtmp_session_t *s, + u_char name[NGX_RTMP_MAX_NAME]) +{ + size_t len; + ngx_uint_t n; + ngx_pool_t *pool; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_conf_t *ec; + ngx_rtmp_exec_pull_ctx_t *pctx, **ppctx; + ngx_rtmp_exec_app_conf_t *eacf; + ngx_rtmp_exec_main_conf_t *emcf; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + if (ctx->pull != NULL) { + return NGX_OK; + } + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); + + len = ngx_strlen(name); + + ppctx = &eacf->pull[ngx_hash_key(name, len) & eacf->nbuckets]; + + for (; *ppctx; ppctx = &(*ppctx)->next) { + pctx = *ppctx; + + if (pctx->name.len == len && + ngx_strncmp(name, pctx->name.data, len) == 0) + { + goto done; + } + } + + pool = ngx_create_pool(4096, emcf->log); + if (pool == NULL) { + return NGX_ERROR; + } + + pctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_exec_pull_ctx_t)); + if (pctx == NULL) { + goto error; + } + + pctx->pool = pool; + pctx->name.len = len; + pctx->name.data = ngx_palloc(pool, len); + + if (pctx->name.data == NULL) { + goto error; + } + + ngx_memcpy(pctx->name.data, name, len); + + pctx->app.len = s->app.len; + pctx->app.data = ngx_palloc(pool, s->app.len); + + if (pctx->app.data == NULL) { + goto error; + } + + ngx_memcpy(pctx->app.data, s->app.data, s->app.len); + + if (ngx_array_init(&pctx->pull_execs, pool, eacf->pull_confs.nelts, + sizeof(ngx_rtmp_exec_t)) != NGX_OK) + { + goto error; + } + + e = ngx_array_push_n(&pctx->pull_execs, eacf->pull_confs.nelts); + if (e == NULL) { + goto error; + } + + ec = eacf->pull_confs.elts; + for (n = 0; n < eacf->pull_confs.nelts; n++, e++, ec++) { + ngx_memzero(e, sizeof(*e)); + e->conf = ec; + e->log = emcf->log; + e->eval = ngx_rtmp_exec_pull_eval; + e->eval_ctx = pctx; + e->kill_signal = emcf->kill_signal; + e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : + NGX_CONF_UNSET_MSEC); + } + + *ppctx = pctx; + +done: + + ctx->pull = pctx; + ctx->pull->counter++; + + return NGX_OK; + +error: + + ngx_destroy_pool(pool); + + return NGX_ERROR; +} + + static ngx_int_t ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { - ngx_rtmp_exec_main_conf_t *emcf; - ngx_rtmp_exec_app_conf_t *eacf; - ngx_rtmp_exec_t *e; - ngx_rtmp_exec_conf_t *ec; - ngx_rtmp_exec_ctx_t *ctx; - size_t n; + size_t n; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_app_conf_t *eacf; - emcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_exec_module); eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); - if (eacf == NULL || eacf->confs.nelts == 0) { + + if (eacf == NULL || eacf->push_confs.nelts == 0) { goto next; } @@ -633,69 +927,84 @@ ngx_rtmp_exec_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) goto next; } - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); - - if (ctx == NULL) { - ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_exec_ctx_t)); - if (ctx == NULL) { - return NGX_ERROR; - } - - ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_exec_module); - - if (ngx_array_init(&ctx->execs, s->connection->pool, eacf->confs.nelts, - sizeof(ngx_rtmp_exec_t)) != NGX_OK) - { - return NGX_ERROR; - } - - e = ngx_array_push_n(&ctx->execs, eacf->confs.nelts); - if (e == NULL) { - return NGX_ERROR; - } - - ec = eacf->confs.elts; - for (n = 0; n < eacf->confs.nelts; ++n, ++e, ++ec) { - ngx_memzero(e, sizeof(*e)); - e->conf = ec; - e->log = s->connection->log; - e->session = s; - e->kill_signal = emcf->kill_signal; - e->respawn_timeout = (eacf->respawn ? emcf->respawn_timeout : - NGX_CONF_UNSET_MSEC); - } + if (ngx_rtmp_exec_init_ctx(s, v->name, v->args) != NGX_OK) { + goto next; } - ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); - ngx_memcpy(ctx->args, v->args, NGX_RTMP_MAX_ARGS); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "exec: run %uz command(s)", ctx->execs.nelts); + "exec: push %uz command(s)", ctx->push_execs.nelts); - e = ctx->execs.elts; - for (n = 0; n < ctx->execs.nelts; ++n, ++e) { + e = ctx->push_execs.elts; + for (n = 0; n < ctx->push_execs.nelts; ++n, ++e) { ngx_rtmp_exec_run(e); } next: return next_publish(s, v); } + + +static ngx_int_t +ngx_rtmp_exec_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) +{ + size_t n; + ngx_rtmp_exec_t *e; + ngx_rtmp_exec_ctx_t *ctx; + ngx_rtmp_exec_pull_ctx_t *pctx; + ngx_rtmp_exec_app_conf_t *eacf; + + eacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_exec_module); + + if (eacf == NULL || eacf->pull_confs.nelts == 0) { + goto next; + } + + if (ngx_rtmp_exec_init_ctx(s, v->name, v->args) != NGX_OK) { + goto next; + } + + if (ngx_rtmp_exec_init_pull_ctx(s, v->name) != NGX_OK) { + goto next; + } + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_exec_module); + pctx = ctx->pull; + + if (pctx->counter != 1) { + goto next; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "exec: pull %uz command(s)", pctx->pull_execs.nelts); + + e = pctx->pull_execs.elts; + for (n = 0; n < pctx->pull_execs.nelts; n++, e++) { + ngx_rtmp_exec_run(e); + } + +next: + return next_play(s, v); +} #endif /* NGX_WIN32 */ static char * -ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +ngx_rtmp_exec_conf(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { - ngx_str_t *value; - ngx_rtmp_exec_app_conf_t *eacf; - size_t n, nargs; - ngx_str_t *s; - ngx_rtmp_exec_conf_t *ec; + char *p = conf; + + size_t n, nargs; + ngx_str_t *s, *value; + ngx_array_t *confs; + ngx_rtmp_exec_conf_t *ec; + + confs = (ngx_array_t *) (p + cmd->offset); - eacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_exec_module); value = cf->args->elts; - ec = ngx_array_push(&eacf->confs); + ec = ngx_array_push(confs); if (ec == NULL) { return NGX_CONF_ERROR; } @@ -707,8 +1016,7 @@ ngx_rtmp_exec_exec(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } nargs = cf->args->nelts - 2; - if (ngx_array_init(&ec->args, cf->pool, nargs, - sizeof(ngx_str_t)) != NGX_OK) + if (ngx_array_init(&ec->args, cf->pool, nargs, sizeof(ngx_str_t)) != NGX_OK) { return NGX_CONF_ERROR; } @@ -734,7 +1042,7 @@ ngx_rtmp_exec_exec_static(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) value = cf->args->elts; - ec = ngx_array_push(&emcf->confs); + ec = ngx_array_push(&emcf->static_confs); if (ec == NULL) { return NGX_CONF_ERROR; } @@ -821,6 +1129,9 @@ ngx_rtmp_exec_postconfiguration(ngx_conf_t *cf) next_publish = ngx_rtmp_publish; ngx_rtmp_publish = ngx_rtmp_exec_publish; + next_play = ngx_rtmp_play; + ngx_rtmp_play = ngx_rtmp_exec_play; + next_close_stream = ngx_rtmp_close_stream; ngx_rtmp_close_stream = ngx_rtmp_exec_close_stream;