From f35e965033df09c55009621d5b8c5c2b6fd03a32 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 22 Apr 2013 20:20:19 +0400 Subject: [PATCH] reimplemented vod stats --- ngx_rtmp_play_module.c | 98 ++++++++++++++++++++++++- ngx_rtmp_play_module.h | 14 +++- ngx_rtmp_stat_module.c | 159 +++++++++++++++++++++++++++++++---------- 3 files changed, 230 insertions(+), 41 deletions(-) diff --git a/ngx_rtmp_play_module.c b/ngx_rtmp_play_module.c index 8fcae69..272e594 100644 --- a/ngx_rtmp_play_module.c +++ b/ngx_rtmp_play_module.c @@ -140,6 +140,8 @@ ngx_rtmp_play_create_app_conf(ngx_conf_t *cf) return NULL; } + pacf->nbuckets = 1024; + return pacf; } @@ -155,12 +157,12 @@ ngx_rtmp_play_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_str_value(conf->local_path, prev->local_path, ""); if (prev->entries.nelts == 0) { - return NGX_CONF_OK; + goto done; } if (conf->entries.nelts == 0) { conf->entries = prev->entries; - return NGX_CONF_OK; + goto done; } ppe = ngx_array_push_n(&conf->entries, prev->entries.nelts); @@ -170,10 +172,91 @@ ngx_rtmp_play_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_memcpy(ppe, prev->entries.elts, prev->entries.nelts * sizeof(void *)); +done: + + if (conf->entries.nelts == 0) { + return NGX_CONF_OK; + } + + conf->ctx = ngx_pcalloc(cf->pool, sizeof(void *) * conf->nbuckets); + if (conf->ctx == NULL) { + return NGX_CONF_ERROR; + } + return NGX_CONF_OK; } +static ngx_int_t +ngx_rtmp_play_join(ngx_rtmp_session_t *s) +{ + ngx_rtmp_play_ctx_t *ctx, **pctx; + ngx_rtmp_play_app_conf_t *pacf; + ngx_uint_t h; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "play: join"); + + pacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_play_module); + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); + if (ctx == NULL || ctx->joined) { + return NGX_ERROR; + } + + h = ngx_hash_key(ctx->name, ngx_strlen(ctx->name)); + pctx = &pacf->ctx[h % pacf->nbuckets]; + + while (*pctx) { + if (!ngx_strncmp((*pctx)->name, ctx->name, NGX_RTMP_MAX_NAME)) { + break; + } + pctx = &(*pctx)->next; + } + + ctx->next = *pctx; + *pctx = ctx; + ctx->joined = 1; + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_play_leave(ngx_rtmp_session_t *s) +{ + ngx_rtmp_play_ctx_t *ctx, **pctx; + ngx_rtmp_play_app_conf_t *pacf; + ngx_uint_t h; + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "play: leave"); + + pacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_play_module); + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_play_module); + if (ctx == NULL || !ctx->joined) { + return NGX_ERROR; + } + + h = ngx_hash_key(ctx->name, ngx_strlen(ctx->name)); + pctx = &pacf->ctx[h % pacf->nbuckets]; + + while (*pctx && *pctx != ctx) { + pctx = &(*pctx)->next; + } + + if (*pctx == NULL) { + return NGX_ERROR; + } + + *pctx = (*pctx)->next; + ctx->joined = 0; + + return NGX_OK; +} + + static void ngx_rtmp_play_send(ngx_event_t *e) { @@ -475,6 +558,8 @@ ngx_rtmp_play_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) ngx_rtmp_play_cleanup_local_file(s); } + ngx_rtmp_play_leave(s); + next: return next_close_stream(s, v); } @@ -644,11 +729,14 @@ ngx_rtmp_play_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) ngx_memzero(ctx, sizeof(*ctx)); + ctx->session = s; ctx->aindex = ngx_rtmp_play_parse_index('a', v->args); ctx->vindex = ngx_rtmp_play_parse_index('v', v->args); - + ctx->file.log = s->connection->log; + ngx_memcpy(ctx->name, v->name, NGX_RTMP_MAX_NAME); + name.len = ngx_strlen(v->name); name.data = v->name; @@ -813,6 +901,10 @@ ngx_rtmp_play_open(ngx_rtmp_session_t *s, double start) return NGX_ERROR; } + if (ngx_rtmp_play_join(s) != NGX_OK) { + return NGX_ERROR; + } + e = &ctx->send_evt; e->data = s; e->handler = ngx_rtmp_play_send; diff --git a/ngx_rtmp_play_module.h b/ngx_rtmp_play_module.h index 3c09a37..8fdc332 100644 --- a/ngx_rtmp_play_module.h +++ b/ngx_rtmp_play_module.h @@ -8,6 +8,7 @@ #include "ngx_rtmp.h" +#include "ngx_rtmp_cmd_module.h" typedef ngx_int_t (*ngx_rtmp_play_init_pt) (ngx_rtmp_session_t *s, @@ -38,12 +39,17 @@ typedef struct { } ngx_rtmp_play_fmt_t; -typedef struct { +typedef struct ngx_rtmp_play_ctx_s ngx_rtmp_play_ctx_t; + + +struct ngx_rtmp_play_ctx_s { + ngx_rtmp_session_t *session; ngx_file_t file; ngx_rtmp_play_fmt_t *fmt; ngx_event_t send_evt; unsigned playing:1; unsigned opened:1; + unsigned joined:1; ngx_uint_t ncrs; ngx_uint_t nheader; ngx_uint_t nbody; @@ -53,7 +59,9 @@ typedef struct { ngx_int_t aindex, vindex; ngx_uint_t nentry; ngx_uint_t post_seek; -} ngx_rtmp_play_ctx_t; + u_char name[NGX_RTMP_MAX_NAME]; + ngx_rtmp_play_ctx_t *next; +}; typedef struct { @@ -66,6 +74,8 @@ typedef struct { ngx_str_t temp_path; ngx_str_t local_path; ngx_array_t entries; /* ngx_rtmp_play_entry_t * */ + ngx_uint_t nbuckets; + ngx_rtmp_play_ctx_t **ctx; } ngx_rtmp_play_app_conf_t; diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index e4337ff..fe5b25c 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -8,6 +8,7 @@ #include "ngx_rtmp.h" #include "ngx_rtmp_live_module.h" +#include "ngx_rtmp_play_module.h" #include "ngx_rtmp_codec_module.h" @@ -25,6 +26,7 @@ static time_t start_time; #define NGX_RTMP_STAT_GLOBAL 0x01 #define NGX_RTMP_STAT_LIVE 0x02 #define NGX_RTMP_STAT_CLIENTS 0x04 +#define NGX_RTMP_STAT_PLAY 0x08 /* * global: stat-{bufs-{total,free,used}, total bytes in/out, bw in/out} - cscf @@ -293,6 +295,50 @@ ngx_rtmp_stat_dump_pool(ngx_http_request_t *r, ngx_chain_t ***lll, #endif + +static void +ngx_rtmp_stat_client(ngx_http_request_t *r, ngx_chain_t ***lll, + ngx_rtmp_session_t *s) +{ + u_char buf[NGX_OFF_T_LEN + 1]; + +#ifdef NGX_RTMP_POOL_DEBUG + ngx_rtmp_stat_dump_pool(r, lll, s->connection->pool); +#endif + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), "%ui", + (ngx_uint_t) s->connection->number) - buf); + NGX_RTMP_STAT_L(""); + + NGX_RTMP_STAT_L("
"); + NGX_RTMP_STAT_S(&s->connection->addr_text); + NGX_RTMP_STAT_L("
"); + + NGX_RTMP_STAT_L(""); + + if (s->flashver.len) { + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_ES(&s->flashver); + NGX_RTMP_STAT_L(""); + } + + if (s->page_url.len) { + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_ES(&s->page_url); + NGX_RTMP_STAT_L(""); + } + + if (s->swf_url.len) { + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_ES(&s->swf_url); + NGX_RTMP_STAT_L(""); + } +} + + static void ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, ngx_rtmp_live_app_conf_t *lacf) @@ -307,6 +353,10 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, ngx_rtmp_stat_loc_conf_t *slcf; u_char *cname; + if (!lacf->live) { + return; + } + slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); NGX_RTMP_STAT_L("\r\n"); @@ -334,24 +384,7 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, if (slcf->stat & NGX_RTMP_STAT_CLIENTS) { NGX_RTMP_STAT_L(""); -#ifdef NGX_RTMP_POOL_DEBUG - ngx_rtmp_stat_dump_pool(r, lll, s->connection->pool); -#endif - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), "%ui", - (ngx_uint_t) s->connection->number) - - buf); - NGX_RTMP_STAT_L(""); - - NGX_RTMP_STAT_L("
"); - NGX_RTMP_STAT_S(&s->connection->addr_text); - NGX_RTMP_STAT_L("
"); - - NGX_RTMP_STAT_L(""); + ngx_rtmp_stat_client(r, lll, s); NGX_RTMP_STAT_L(""); NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), @@ -368,24 +401,6 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, } NGX_RTMP_STAT_L(""); - if (s->flashver.len) { - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT_ES(&s->flashver); - NGX_RTMP_STAT_L(""); - } - - if (s->page_url.len) { - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT_ES(&s->page_url); - NGX_RTMP_STAT_L(""); - } - - if (s->swf_url.len) { - NGX_RTMP_STAT_L(""); - NGX_RTMP_STAT_ES(&s->swf_url); - NGX_RTMP_STAT_L(""); - } - if (ctx->publishing) { NGX_RTMP_STAT_L(""); } @@ -464,6 +479,73 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, } +static void +ngx_rtmp_stat_play(ngx_http_request_t *r, ngx_chain_t ***lll, + ngx_rtmp_play_app_conf_t *pacf) +{ + ngx_rtmp_play_ctx_t *ctx, *sctx; + ngx_rtmp_session_t *s; + ngx_uint_t n; + size_t nclients, total_nclients; + u_char buf[NGX_OFF_T_LEN + 1]; + ngx_rtmp_stat_loc_conf_t *slcf; + + if (pacf->entries.nelts == 0) { + return; + } + + slcf = ngx_http_get_module_loc_conf(r, ngx_rtmp_stat_module); + + NGX_RTMP_STAT_L("\r\n"); + + total_nclients = 0; + for (n = 0; n < pacf->nbuckets; ++n) { + for (ctx = pacf->ctx[n]; ctx; ) { + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_ECS(ctx->name); + NGX_RTMP_STAT_L("\r\n"); + + nclients = 0; + sctx = ctx; + for (; ctx; ctx = ctx->next) { + if (ngx_strcmp(ctx->name, sctx->name)) { + break; + } + + ++nclients; + + s = ctx->session; + if (slcf->stat & NGX_RTMP_STAT_CLIENTS) { + NGX_RTMP_STAT_L(""); + + ngx_rtmp_stat_client(r, lll, s); + + NGX_RTMP_STAT_L("\r\n"); + } + } + total_nclients += nclients; + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), + "%uz", nclients) - buf); + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT_L("\r\n"); + } + } + + NGX_RTMP_STAT_L(""); + NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf), + "%uz", total_nclients) - buf); + NGX_RTMP_STAT_L("\r\n"); + + NGX_RTMP_STAT_L("\r\n"); +} + + static void ngx_rtmp_stat_application(ngx_http_request_t *r, ngx_chain_t ***lll, ngx_rtmp_core_app_conf_t *cacf) @@ -482,6 +564,11 @@ ngx_rtmp_stat_application(ngx_http_request_t *r, ngx_chain_t ***lll, cacf->app_conf[ngx_rtmp_live_module.ctx_index]); } + if (slcf->stat & NGX_RTMP_STAT_PLAY) { + ngx_rtmp_stat_play(r, lll, + cacf->app_conf[ngx_rtmp_play_module.ctx_index]); + } + NGX_RTMP_STAT_L("\r\n"); }