From caec91b85772dc5e078a3152cb6ed436c26bfbc3 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 29 Mar 2012 14:07:57 +0400 Subject: [PATCH] added HTTP callback engine && implemented on_publish, on_play, on_record_done --- config | 4 + ngx_rtmp.h | 2 + ngx_rtmp_netcall_module.c | 519 ++++++++++++++++++++++++++++++++++++++ ngx_rtmp_netcall_module.h | 41 +++ ngx_rtmp_notify_module.c | 400 +++++++++++++++++++++++++++++ ngx_rtmp_record_module.c | 141 ++++++++++- test/nginx.conf | 21 +- 7 files changed, 1125 insertions(+), 3 deletions(-) create mode 100644 ngx_rtmp_netcall_module.c create mode 100644 ngx_rtmp_netcall_module.h create mode 100644 ngx_rtmp_notify_module.c diff --git a/config b/config index 36bdeb1..703baa1 100644 --- a/config +++ b/config @@ -7,6 +7,8 @@ CORE_MODULES="$CORE_MODULES ngx_rtmp_access_module \ ngx_rtmp_live_module \ ngx_rtmp_record_module \ + ngx_rtmp_netcall_module \ + ngx_rtmp_notify_module \ " NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ @@ -21,4 +23,6 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_access_module.c \ $ngx_addon_dir/ngx_rtmp_live_module.c \ $ngx_addon_dir/ngx_rtmp_record_module.c \ + $ngx_addon_dir/ngx_rtmp_netcall_module.c \ + $ngx_addon_dir/ngx_rtmp_notify_module.c \ " diff --git a/ngx_rtmp.h b/ngx_rtmp.h index f2eb1b2..895a65f 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -321,6 +321,8 @@ typedef struct { ((ngx_rtmp_conf_ctx_t *) cf->ctx)->main_conf[module.ctx_index] #define ngx_rtmp_conf_get_module_srv_conf(cf, module) \ ((ngx_rtmp_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index] +#define ngx_rtmp_conf_get_module_app_conf(cf, module) \ + ((ngx_rtmp_conf_ctx_t *) cf->ctx)->app_conf[module.ctx_index] #ifdef NGX_DEBUG diff --git a/ngx_rtmp_netcall_module.c b/ngx_rtmp_netcall_module.c new file mode 100644 index 0000000..0b54479 --- /dev/null +++ b/ngx_rtmp_netcall_module.c @@ -0,0 +1,519 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include +#include +#include "ngx_rtmp_netcall_module.h" + + +static ngx_int_t ngx_rtmp_netcall_postconfiguration(ngx_conf_t *cf); +static void * ngx_rtmp_netcall_create_app_conf(ngx_conf_t *cf); +static char * ngx_rtmp_netcall_merge_app_conf(ngx_conf_t *cf, + void *parent, void *child); + +static void ngx_rtmp_netcall_close(ngx_connection_t *cc); + +static void ngx_rtmp_netcall_recv(ngx_event_t *rev); +static void ngx_rtmp_netcall_send(ngx_event_t *wev); + + +typedef struct { + ngx_msec_t timeout; +} ngx_rtmp_netcall_app_conf_t; + + +typedef struct ngx_rtmp_netcall_session_s { + ngx_rtmp_session_t *session; + ngx_peer_connection_t *pc; + ngx_url_t *url; + struct ngx_rtmp_netcall_session_s *next; + void *arg; + ngx_rtmp_netcall_handle_pt handle; + ngx_chain_t *in; + ngx_chain_t *inlast; + ngx_chain_t *out; +} ngx_rtmp_netcall_session_t; + + +typedef struct { + ngx_rtmp_netcall_session_t *cs; +} ngx_rtmp_netcall_ctx_t; + + +static ngx_command_t ngx_rtmp_netcall_commands[] = { + + { ngx_string("netcall_timeout"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_RTMP_SRV_CONF_OFFSET, + offsetof(ngx_rtmp_netcall_app_conf_t, timeout), + NULL }, + + ngx_null_command +}; + + +static ngx_rtmp_module_t ngx_rtmp_netcall_module_ctx = { + NULL, /* preconfiguration */ + ngx_rtmp_netcall_postconfiguration, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + ngx_rtmp_netcall_create_app_conf, /* create app configuration */ + ngx_rtmp_netcall_merge_app_conf /* merge app configuration */ +}; + + +ngx_module_t ngx_rtmp_netcall_module = { + NGX_MODULE_V1, + &ngx_rtmp_netcall_module_ctx, /* module context */ + ngx_rtmp_netcall_commands, /* module directives */ + NGX_RTMP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void * +ngx_rtmp_netcall_create_app_conf(ngx_conf_t *cf) +{ + ngx_rtmp_netcall_app_conf_t *nacf; + + nacf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_netcall_app_conf_t)); + if (nacf == NULL) { + return NULL; + } + + nacf->timeout = NGX_CONF_UNSET_MSEC; + + return nacf; +} + + +static char * +ngx_rtmp_netcall_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_rtmp_netcall_app_conf_t *prev = parent; + ngx_rtmp_netcall_app_conf_t *conf = child; + + ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 10000); + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_rtmp_netcall_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + ngx_rtmp_netcall_ctx_t *ctx; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module); + + while (ctx->cs) { + ngx_rtmp_netcall_close(ctx->cs->pc->connection); + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_netcall_get_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_rtmp_netcall_session_t *cs = data; + + pc->sockaddr =(struct sockaddr *)&cs->url->sockaddr; + pc->socklen = cs->url->socklen; + pc->name = &cs->url->host; + + return NGX_OK; +} + + +static void +ngx_rtmp_netcall_free_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ +} + + +ngx_int_t +ngx_rtmp_netcall_create(ngx_rtmp_session_t *s, ngx_rtmp_netcall_init_t *ci) +{ + ngx_rtmp_netcall_ctx_t *ctx; + ngx_peer_connection_t *pc; + ngx_rtmp_netcall_session_t *cs; + ngx_connection_t *c, *cc; + ngx_pool_t *pool; + + pool = NULL; + c = s->connection; + + /* get module context */ + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module); + if (ctx == NULL) { + ctx = ngx_pcalloc(c->pool, + sizeof(ngx_rtmp_netcall_ctx_t)); + if (ctx == NULL) { + return NGX_ERROR; + } + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_netcall_module); + } + + /* create netcall pool, connection, session */ + pool = ngx_create_pool(4096, s->connection->log); + if (pool == NULL) { + goto error; + } + + pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t)); + if (pc == NULL) { + goto error; + } + + cs = ngx_pcalloc(pool, sizeof(ngx_rtmp_netcall_session_t)); + if (cs == NULL) { + goto error; + } + + /* copy arg to connection pool */ + cs->arg = ngx_pcalloc(pool, ci->argsize); + if (cs->arg == NULL) { + goto error; + } + ngx_memcpy(cs->arg, ci->arg, ci->argsize); + + cs->url = ci->url; + cs->session = s; + cs->handle = ci->handle; + + pc->log = s->connection->log; + pc->get = ngx_rtmp_netcall_get_peer; + pc->free = ngx_rtmp_netcall_free_peer; + pc->data = cs; + + /* connect */ + if (ngx_event_connect_peer(pc) == NGX_ERROR) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "netcall: connection failed"); + ngx_close_connection(pc->connection); + goto error; + } + + cc = pc->connection; + cc->data = cs; + cc->pool = pool; + cs->pc = pc; + + cs->out = ci->create(s, ci->arg, pool); + if (cs->out == NULL) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "netcall: creation failed"); + ngx_close_connection(pc->connection); + goto error; + } + + cc->write->handler = ngx_rtmp_netcall_send; + cc->read->handler = ngx_rtmp_netcall_recv; + + cs->next = ctx->cs; + ctx->cs = cs; + + ngx_rtmp_netcall_send(cc->write); + + return c->destroyed ? NGX_ERROR : NGX_OK; + +error: + if (pool) { + ngx_destroy_pool(pool); + } + + return NGX_ERROR; +} + + +static void +ngx_rtmp_netcall_close(ngx_connection_t *cc) +{ + ngx_rtmp_netcall_session_t *cs, **css; + ngx_pool_t *pool; + ngx_rtmp_session_t *s; + ngx_connection_t *c; + ngx_rtmp_netcall_ctx_t *ctx; + + cs = cc->data; + s = cs->session; + c = s->connection; + + if (cc->destroyed) { + return; + } + + cc->destroyed = 1; + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module); + + for(css = &ctx->cs; *css; css = &((*css)->next)) { + if (*css == cs) { + *css = cs->next; + break; + } + } + + if (cs->handle && + cs->handle(s, cs->arg, cs->in) != NGX_OK) + { + ngx_rtmp_close_connection(c); + } + + pool = cc->pool; + ngx_close_connection(cc); + ngx_destroy_pool(pool); +} + + +static void +ngx_rtmp_netcall_recv(ngx_event_t *rev) +{ + ngx_rtmp_netcall_session_t *cs; + ngx_connection_t *cc; + ngx_rtmp_session_t *s; + ngx_rtmp_netcall_app_conf_t *cacf; + ngx_int_t n; + ngx_buf_t *b; + + cc = rev->data; + cs = cc->data; + s = cs->session; + + if (cc->destroyed) { + return; + } + + if (rev->timedout) { + ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT, + "netcall: client recv timed out"); + cc->timedout = 1; + ngx_rtmp_netcall_close(cc); + return; + } + + if (rev->timer_set) { + ngx_del_timer(rev); + } + + for ( ;; ) { + + if (cs->inlast == NULL + || cs->inlast->buf->last == cs->inlast->buf->end) + { + cs->inlast = ngx_alloc_chain_link(cc->pool); + if (cs->inlast == NULL) { + ngx_rtmp_netcall_close(cc); + return; + } + + cs->inlast->buf = ngx_create_temp_buf(cc->pool, 1024); + if (cs->inlast->buf == NULL) { + ngx_rtmp_netcall_close(cc); + return; + } + + if (cs->in == NULL) { + cs->in = cs->inlast; + } + } + + b = cs->inlast->buf; + + n = cc->recv(cc, b->last, b->end - b->last); + + if (n == NGX_ERROR || n == 0) { + ngx_rtmp_netcall_close(cc); + return; + } + + if (n == NGX_AGAIN) { + cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_netcall_module); + ngx_add_timer(cc->read, cacf->timeout); + if (ngx_handle_write_event(cc->read, 0) != NGX_OK) { + ngx_rtmp_netcall_close(cc); + } + return; + } + + b->last += n; + } +} + + +static void +ngx_rtmp_netcall_send(ngx_event_t *wev) +{ + ngx_rtmp_netcall_session_t *cs; + ngx_connection_t *cc; + ngx_rtmp_session_t *s; + ngx_chain_t *cl; + ngx_rtmp_netcall_app_conf_t *cacf; + + cc = wev->data; + cs = cc->data; + s = cs->session; + + if (cc->destroyed) { + return; + } + + if (wev->timedout) { + ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT, + "netcall: client send timed out"); + cc->timedout = 1; + ngx_rtmp_netcall_close(cc); + return; + } + + if (wev->timer_set) { + ngx_del_timer(wev); + } + + cl = cc->send_chain(cc, cs->out, 0); + + if (cl == NGX_CHAIN_ERROR) { + ngx_rtmp_netcall_close(cc); + return; + } + + cs->out = cl; + + /* more data to send? */ + if (cl) { + cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_netcall_module); + ngx_add_timer(cc->write, cacf->timeout); + if (ngx_handle_write_event(cc->write, 0) != NGX_OK) { + ngx_rtmp_netcall_close(cc); + } + return; + } + + /* we've sent everything we had. + * now receive reply */ + ngx_del_event(wev, NGX_WRITE_EVENT, 0); + + ngx_rtmp_netcall_recv(cc->read); +} + + +ngx_chain_t * +ngx_rtmp_netcall_http_format_header(ngx_url_t *url, ngx_pool_t *pool, + size_t content_length) +{ + ngx_chain_t *cl; + ngx_buf_t *b; + + static char rq_tmpl[] = + "POST %V HTTP/1.0\r\n" + "Host: %V\r\n" + "Content-Type: application/x-www-form-urlencoded\r\n" + "Connection: Close\r\n" + "Content-Length: %uz\r\n" + "\r\n" + ; + + cl = ngx_alloc_chain_link(pool); + if (cl == NULL) { + return NULL; + } + + b = ngx_create_temp_buf(pool, sizeof(rq_tmpl) + + url->uri.len + + url->host.len + + 5); + + if (b == NULL) { + return NULL; + } + + cl->buf = b; + + b->last = ngx_snprintf(b->last, b->end - b->last, rq_tmpl, + &url->uri, &url->host, content_length); + + return cl; +} + + +ngx_chain_t * +ngx_rtmp_netcall_http_format_session(ngx_rtmp_session_t *s, ngx_pool_t *pool) +{ + ngx_chain_t *cl; + ngx_buf_t *b; + + cl = ngx_alloc_chain_link(pool); + if (cl == NULL) { + return NULL; + } + + b = ngx_create_temp_buf(pool, + sizeof("app=") + s->app.len * 3 + + sizeof("&flashver=") + s->flashver.len * 3 + + sizeof("&swfurl=") + s->swf_url.len * 3 + + sizeof("&tcurl=") + s->tc_url.len * 3 + + sizeof("&pageurl=") + s->page_url.len * 3 + ); + + if (b == NULL) { + return NULL; + } + + cl->buf = b; + + b->last = ngx_cpymem(b->last, (u_char*)"app=", sizeof("app=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, s->app.data, s->app.len, 0); + + b->last = ngx_cpymem(b->last, (u_char*)"&flashver=", + sizeof("&flashver=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, s->flashver.data, + s->flashver.len, 0); + + b->last = ngx_cpymem(b->last, (u_char*)"&swfurl=", + sizeof("&swfurl=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, s->swf_url.data, + s->swf_url.len, 0); + + b->last = ngx_cpymem(b->last, (u_char*)"&tcurl=", + sizeof("&tcurl=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, s->tc_url.data, + s->tc_url.len, 0); + + b->last = ngx_cpymem(b->last, (u_char*)"&pageurl=", + sizeof("&pageurl=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, s->page_url.data, + s->page_url.len, 0); + + return cl; +} + + +static ngx_int_t +ngx_rtmp_netcall_postconfiguration(ngx_conf_t *cf) +{ + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_handler_pt *h; + + cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); + + h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]); + *h = ngx_rtmp_netcall_disconnect; + + return NGX_OK; +} + diff --git a/ngx_rtmp_netcall_module.h b/ngx_rtmp_netcall_module.h new file mode 100644 index 0000000..9b1192f --- /dev/null +++ b/ngx_rtmp_netcall_module.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#ifndef _NGX_RTMP_NETCALL_H_INCLUDED_ +#define _NGX_RTMP_NETCALL_H_INCLUDED_ + + +#include +#include +#include "ngx_rtmp.h" + + +typedef ngx_chain_t * (*ngx_rtmp_netcall_create_pt)(ngx_rtmp_session_t *s, + void *arg, ngx_pool_t *pool); +typedef ngx_int_t (*ngx_rtmp_netcall_handle_pt)(ngx_rtmp_session_t *s, + void *arg, ngx_chain_t *in); + + +typedef struct { + ngx_url_t *url; + ngx_rtmp_netcall_create_pt create; + ngx_rtmp_netcall_handle_pt handle; + void *arg; + size_t argsize; +} ngx_rtmp_netcall_init_t; + + +ngx_int_t ngx_rtmp_netcall_create(ngx_rtmp_session_t *s, + ngx_rtmp_netcall_init_t *ci); + + +/* HTTP formatting */ +ngx_chain_t * ngx_rtmp_netcall_http_format_session(ngx_rtmp_session_t *s, + ngx_pool_t *pool); +ngx_chain_t * ngx_rtmp_netcall_http_format_header(ngx_url_t *url, + ngx_pool_t *pool, size_t content_length); + + +#endif /* _NGX_RTMP_NETCALL_H_INCLUDED_ */ diff --git a/ngx_rtmp_notify_module.c b/ngx_rtmp_notify_module.c new file mode 100644 index 0000000..070f3f7 --- /dev/null +++ b/ngx_rtmp_notify_module.c @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include +#include +#include "ngx_rtmp.h" + +#include "ngx_rtmp_cmd_module.h" +#include "ngx_rtmp_netcall_module.h" + + +static ngx_rtmp_publish_pt next_publish; +static ngx_rtmp_play_pt next_play; + + +static char *ngx_rtmp_notify_on_event(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static ngx_int_t ngx_rtmp_notify_postconfiguration(ngx_conf_t *cf); +static void * ngx_rtmp_notify_create_app_conf(ngx_conf_t *cf); +static char * ngx_rtmp_notify_merge_app_conf(ngx_conf_t *cf, + void *parent, void *child); + + +typedef struct { + ngx_url_t *publish_url; + ngx_url_t *play_url; +} ngx_rtmp_notify_app_conf_t; + + +static ngx_command_t ngx_rtmp_notify_commands[] = { + + { ngx_string("on_publish"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_rtmp_notify_on_event, + NGX_RTMP_APP_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("on_play"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_rtmp_notify_on_event, + NGX_RTMP_APP_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_rtmp_module_t ngx_rtmp_notify_module_ctx = { + NULL, /* preconfiguration */ + ngx_rtmp_notify_postconfiguration, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + ngx_rtmp_notify_create_app_conf, /* create app configuration */ + ngx_rtmp_notify_merge_app_conf /* merge app configuration */ +}; + + +ngx_module_t ngx_rtmp_notify_module = { + NGX_MODULE_V1, + &ngx_rtmp_notify_module_ctx, /* module context */ + ngx_rtmp_notify_commands, /* module directives */ + NGX_RTMP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void * +ngx_rtmp_notify_create_app_conf(ngx_conf_t *cf) +{ + ngx_rtmp_notify_app_conf_t *nacf; + + nacf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_notify_app_conf_t)); + if (nacf == NULL) { + return NULL; + } + + return nacf; +} + + +static char * +ngx_rtmp_notify_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_rtmp_notify_app_conf_t *prev = parent; + ngx_rtmp_notify_app_conf_t *conf = child; + + ngx_conf_merge_ptr_value(conf->publish_url, prev->publish_url, 0); + ngx_conf_merge_ptr_value(conf->play_url, prev->play_url, 0); + + return NGX_CONF_OK; +} + + +static ngx_chain_t * +ngx_rtmp_notify_publish_create(ngx_rtmp_session_t *s, void *arg, + ngx_pool_t *pool) +{ + ngx_rtmp_publish_t *v = arg; + + ngx_rtmp_notify_app_conf_t *nacf; + ngx_chain_t *hl, *cl, *pl; + ngx_buf_t *b; + size_t name_len, type_len; + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + + /* common variables */ + cl = ngx_rtmp_netcall_http_format_session(s, pool); + + if (cl == NULL) { + return NULL; + } + + /* publish variables */ + pl = ngx_alloc_chain_link(pool); + + if (pl == NULL) { + return NULL; + } + + name_len = ngx_strlen(v->name); + type_len = ngx_strlen(v->type); + + b = ngx_create_temp_buf(pool, + sizeof("&call=publish") + + sizeof("&name=") + name_len * 3 + + sizeof("&type=") + type_len * 3); + if (b == NULL) { + return NULL; + } + + pl->buf = b; + + b->last = ngx_cpymem(b->last, (u_char*)"&call=publish", + sizeof("&call=publish") - 1); + + b->last = ngx_cpymem(b->last, (u_char*)"&name=", sizeof("&name=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, v->name, name_len, 0); + + b->last = ngx_cpymem(b->last, (u_char*)"&type=", sizeof("&type=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, v->type, type_len, 0); + + /* HTTP header */ + hl = ngx_rtmp_netcall_http_format_header(nacf->publish_url, pool, + cl->buf->last - cl->buf->pos + + (pl->buf->last - pl->buf->pos)); + + if (hl == NULL) { + return NULL; + } + + hl->next = cl; + cl->next = pl; + + return hl; +} + + +static ngx_chain_t * +ngx_rtmp_notify_play_create(ngx_rtmp_session_t *s, void *arg, + ngx_pool_t *pool) +{ + ngx_rtmp_play_t *v = arg; + + ngx_rtmp_notify_app_conf_t *nacf; + ngx_chain_t *hl, *cl, *pl; + ngx_buf_t *b; + size_t name_len; + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + + /* common variables */ + cl = ngx_rtmp_netcall_http_format_session(s, pool); + + if (cl == NULL) { + return NULL; + } + + /* play variables */ + pl = ngx_alloc_chain_link(pool); + + if (pl == NULL) { + return NULL; + } + + name_len = ngx_strlen(v->name); + + b = ngx_create_temp_buf(pool, + sizeof("&call=play") + + sizeof("&name=") + name_len * 3 + + 10 * 2 + 1); + if (b == NULL) { + return NULL; + } + + pl->buf = b; + + b->last = ngx_cpymem(b->last, (u_char*)"&call=play", + sizeof("&call=play") - 1); + + b->last = ngx_cpymem(b->last, (u_char*)"&name=", sizeof("&name=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, v->name, name_len, 0); + + b->last = ngx_snprintf(b->last, b->end - b->last, + "&start=%uD&duration=%uD&reset=%d", + (uint32_t)v->start, (uint32_t)v->duration, v->reset & 1); + + /* HTTP header */ + hl = ngx_rtmp_netcall_http_format_header(nacf->play_url, pool, + cl->buf->last - cl->buf->pos + + (pl->buf->last - pl->buf->pos)); + + if (hl == NULL) { + return NULL; + } + + hl->next = cl; + cl->next = pl; + + return hl; +} + + +static ngx_int_t +ngx_rtmp_notify_parse_http_retcode(ngx_rtmp_session_t *s, + ngx_chain_t *in) +{ + ngx_buf_t *b; + ngx_int_t n; + u_char c; + + /* find 10th character */ + + n = 9; + while (in) { + b = in->buf; + if (b->last - b->pos > n) { + c = b->pos[n]; + if (c >= (u_char)'0' && c <= (u_char)'9') { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "notify: HTTP retcode: %dxx", (int)(c - '0')); + return c == (u_char)'2' ? NGX_OK : NGX_ERROR; + } + + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: invalid HTTP retcode: %d..", (int)c); + + return NGX_ERROR; + } + n -= (b->last - b->pos); + in = in->next; + } + + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: invalid HTTP response"); + + return NGX_ERROR; +} + + +static ngx_int_t +ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s, + void *arg, ngx_chain_t *in) +{ + if (ngx_rtmp_notify_parse_http_retcode(s, in) != NGX_OK) { + return NGX_ERROR; + } + + return next_publish(s, (ngx_rtmp_publish_t *)arg); +} + + +static ngx_int_t +ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, + void *arg, ngx_chain_t *in) +{ + if (ngx_rtmp_notify_parse_http_retcode(s, in) != NGX_OK) { + return NGX_ERROR; + } + + return next_play(s, (ngx_rtmp_play_t *)arg); +} + + +static ngx_int_t +ngx_rtmp_notify_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) +{ + ngx_rtmp_notify_app_conf_t *nacf; + ngx_rtmp_netcall_init_t ci; + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + if (nacf == NULL || nacf->publish_url == NULL) { + return next_publish(s, v); + } + + ci.url = nacf->publish_url; + ci.create = ngx_rtmp_notify_publish_create; + ci.handle = ngx_rtmp_notify_publish_handle; + ci.arg = v; + ci.argsize = sizeof(*v); + + return ngx_rtmp_netcall_create(s, &ci); +} + + +static ngx_int_t +ngx_rtmp_notify_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) +{ + ngx_rtmp_notify_app_conf_t *nacf; + ngx_rtmp_netcall_init_t ci; + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + if (nacf == NULL || nacf->play_url == NULL) { + return next_play(s, v); + } + + ci.url = nacf->play_url; + ci.create = ngx_rtmp_notify_play_create; + ci.handle = ngx_rtmp_notify_play_handle; + ci.arg = v; + ci.argsize = sizeof(*v); + + return ngx_rtmp_netcall_create(s, &ci); +} + + +static char * +ngx_rtmp_notify_on_event(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_rtmp_notify_app_conf_t *nacf; + ngx_str_t *url, *name; + ngx_url_t *u; + size_t add; + ngx_str_t *value; + + value = cf->args->elts; + name = &value[0]; + url = &value[1]; + + add = 0; + + u = ngx_pcalloc(cf->pool, sizeof(ngx_url_t)); + if (u == NULL) { + return NGX_CONF_ERROR; + } + + if (ngx_strncasecmp(url->data, (u_char *) "http://", 7) == 0) { + add = 7; + } + + u->url.len = url->len - add; + u->url.data = url->data + add; + u->default_port = 80; + u->uri_part = 1; + + if (ngx_parse_url(cf->pool, u) != NGX_OK) { + if (u->err) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "%s in url \"%V\"", u->err, &u->url); + } + return NGX_CONF_ERROR; + } + + nacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_notify_module); + + if (name->len == sizeof("on_play") - 1) { + nacf->play_url = u; + } else { /* on_publish */ + nacf->publish_url = u; + } + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_rtmp_notify_postconfiguration(ngx_conf_t *cf) +{ + next_publish = ngx_rtmp_publish; + ngx_rtmp_publish = ngx_rtmp_notify_publish; + + next_play = ngx_rtmp_play; + ngx_rtmp_play = ngx_rtmp_notify_play; + + return NGX_OK; +} + diff --git a/ngx_rtmp_record_module.c b/ngx_rtmp_record_module.c index 084f87e..7da39b9 100644 --- a/ngx_rtmp_record_module.c +++ b/ngx_rtmp_record_module.c @@ -7,12 +7,15 @@ #include #include "ngx_rtmp.h" #include "ngx_rtmp_cmd_module.h" +#include "ngx_rtmp_netcall_module.h" static ngx_rtmp_publish_pt next_publish; static ngx_rtmp_delete_stream_pt next_delete_stream; +static char * ngx_rtmp_notify_on_record_done(ngx_conf_t *cf, + ngx_command_t *cmd, void *conf); static ngx_int_t ngx_rtmp_record_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_record_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_record_merge_app_conf(ngx_conf_t *cf, @@ -22,6 +25,7 @@ static char * ngx_rtmp_record_merge_app_conf(ngx_conf_t *cf, typedef struct { ngx_str_t root; size_t max_size; + ngx_url_t *url; } ngx_rtmp_record_app_conf_t; @@ -41,6 +45,13 @@ static ngx_command_t ngx_rtmp_record_commands[] = { offsetof(ngx_rtmp_record_app_conf_t, max_size), NULL }, + { ngx_string("on_record_done"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_rtmp_notify_on_record_done, + NGX_RTMP_APP_CONF_OFFSET, + 0, + NULL }, + ngx_null_command }; @@ -220,11 +231,94 @@ next: } +static ngx_chain_t * +ngx_rtmp_record_notify_create(ngx_rtmp_session_t *s, void *arg, + ngx_pool_t *pool) +{ + ngx_str_t *path = arg; + + ngx_rtmp_record_app_conf_t *racf; + ngx_chain_t *hl, *cl, *pl; + ngx_buf_t *b; + + racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_record_module); + + if (path == NULL) { + return NGX_OK; + } + + /* common variables */ + cl = ngx_rtmp_netcall_http_format_session(s, pool); + + if (cl == NULL) { + return NULL; + } + + /* publish variables */ + pl = ngx_alloc_chain_link(pool); + + if (pl == NULL) { + return NULL; + } + + b = ngx_create_temp_buf(pool, + sizeof("&call=record_done") + + sizeof("&path=") + path->len * 3); + if (b == NULL) { + return NULL; + } + + pl->buf = b; + + b->last = ngx_cpymem(b->last, (u_char*)"&call=record_done", + sizeof("&call=record_done") - 1); + + b->last = ngx_cpymem(b->last, (u_char*)"&path=", sizeof("&path=") - 1); + b->last = (u_char*)ngx_escape_uri(b->last, path->data, path->len, 0); + + /* HTTP header */ + hl = ngx_rtmp_netcall_http_format_header(racf->url, pool, + cl->buf->last - cl->buf->pos + + (pl->buf->last - pl->buf->pos)); + + if (hl == NULL) { + return NULL; + } + + hl->next = cl; + cl->next = pl; + + return hl; +} + + +static ngx_int_t +ngx_rtmp_record_notify(ngx_rtmp_session_t *s, ngx_str_t *path) +{ + ngx_rtmp_record_app_conf_t *racf; + ngx_rtmp_netcall_init_t ci; + + racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_record_module); + if (racf == NULL || racf->url == NULL) { + return NGX_OK; + } + + ngx_memzero(&ci, sizeof(ci)); + + ci.url = racf->url; + ci.create = ngx_rtmp_record_notify_create; + ci.arg = path; + + return ngx_rtmp_netcall_create(s, &ci); +} + + static ngx_int_t ngx_rtmp_record_close(ngx_rtmp_session_t *s) { ngx_rtmp_record_ctx_t *ctx; ngx_err_t err; + ngx_str_t path; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_record_module); @@ -232,6 +326,7 @@ ngx_rtmp_record_close(ngx_rtmp_session_t *s) return NGX_OK; } + path = ctx->path; ngx_str_null(&ctx->path); if (ngx_close_file(ctx->file.fd) == NGX_FILE_ERROR) { @@ -243,7 +338,7 @@ ngx_rtmp_record_close(ngx_rtmp_session_t *s) ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "record: closed"); - return NGX_OK; + return ngx_rtmp_record_notify(s, &path); } @@ -358,6 +453,50 @@ ngx_rtmp_record_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } +static char * +ngx_rtmp_notify_on_record_done(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_rtmp_record_app_conf_t *racf; + ngx_str_t *url; + ngx_url_t *u; + size_t add; + ngx_str_t *value; + + value = cf->args->elts; + url = &value[1]; + + add = 0; + + u = ngx_pcalloc(cf->pool, sizeof(ngx_url_t)); + if (u == NULL) { + return NGX_CONF_ERROR; + } + + if (ngx_strncasecmp(url->data, (u_char *) "http://", 7) == 0) { + add = 7; + } + + u->url.len = url->len - add; + u->url.data = url->data + add; + u->default_port = 80; + u->uri_part = 1; + + if (ngx_parse_url(cf->pool, u) != NGX_OK) { + if (u->err) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "%s in url \"%V\"", u->err, &u->url); + } + return NGX_CONF_ERROR; + } + + racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_record_module); + + racf->url = u; + + return NGX_CONF_OK; +} + + static ngx_int_t ngx_rtmp_record_postconfiguration(ngx_conf_t *cf) { diff --git a/test/nginx.conf b/test/nginx.conf index a641d1e..c6d0945 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -30,9 +30,13 @@ rtmp { live on; - record /tmp/av; + record /tmp; - record_size 10000000; + record_size 1000000; + + on_publish http://localhost:8080/publish; + on_play http://localhost:8080/play; + on_record_done http://localhost:8080/record_done; #wait_key_frame on; @@ -51,9 +55,22 @@ http { listen 8080; + location /publish { + return 201; + } + + location /play { + return 202; + } + + location /record_done { + return 203; + } + location / { root /home/rarutyunyan/nginx-rtmp-module/test/www; } + } }