From b9ee8dbe097b2ab9cd60786c1cfa9a48237c51dd Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 19 Mar 2012 19:55:46 +0400 Subject: [PATCH] added auto-detectiion of size in amf0 writer && improved a lot of code in broadcaster --- ngx_rtmp_amf0.c | 4 + ngx_rtmp_broadcast_module.c | 270 +++++++++++++++++------------------- test/nginx.conf | 6 +- test/www/index.html | 4 +- test/www/record.html | 2 +- 5 files changed, 143 insertions(+), 143 deletions(-) diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index d5c27a1..3cc862e 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -405,6 +405,10 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, break; case NGX_RTMP_AMF0_STRING: + if (len == 0 && data) { + len = ngx_strlen((u_char*)data); + } + if (ngx_rtmp_amf0_put(ctx, ngx_rtmp_amf0_reverse_copy(buf, &len, 2), 2) != NGX_OK) diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index c9812ee..cffe94a 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -365,79 +365,69 @@ static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { + ngx_connection_t *c; ngx_rtmp_core_srv_conf_t *cscf; static double trans; static u_char app[1024]; - static u_char url[1024]; - static u_char acodecs[1024]; - static ngx_str_t app_str; + static ngx_str_t stream; static double capabilities = 31; - static double object_enc; - static ngx_rtmp_amf0_elt_t in_cmd[] = { - { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, - { NGX_RTMP_AMF0_STRING, "tcUrl" , url, sizeof(url) }, - { NGX_RTMP_AMF0_STRING, "audiocodecs" , acodecs, sizeof(acodecs) }, + static ngx_rtmp_amf0_elt_t in_cmd[] = { + { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, }; - static ngx_rtmp_amf0_elt_t out_obj[] = { - { NGX_RTMP_AMF0_STRING, "fmsVer", "FMS/3,0,1,123" , sizeof("FMS/3,0,1,123")-1 }, - { NGX_RTMP_AMF0_NUMBER, "capabilities", &capabilities, sizeof(capabilities) }, + static ngx_rtmp_amf0_elt_t in_elts[] = { + { NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) }, }; - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, - { NGX_RTMP_AMF0_NUMBER, "objectEncoding", &object_enc , sizeof(object_enc) }, + static ngx_rtmp_amf0_elt_t out_obj[] = { + { NGX_RTMP_AMF0_STRING, "fmsVer", "FMS/3,0,1,123", 0 }, + { NGX_RTMP_AMF0_NUMBER, "capabilities", &capabilities, 0 }, }; - static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) }, + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "level", "status", 0 }, + { NGX_RTMP_AMF0_STRING, "code", "NetConnection.Connect.Success", 0 }, + { NGX_RTMP_AMF0_STRING, "description", "Connection succeeded.", 0 }, }; - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_OBJECT, NULL, out_obj, sizeof(out_obj) }, - { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "_result", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_obj, sizeof(out_obj) }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, }; + c = s->connection; + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + /* parse input */ + app[0] = 0; if (ngx_rtmp_receive_amf0(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "connect() called; app='%s'", app); - ngx_str_set(&out_inf[0], "status"); - ngx_str_set(&out_inf[1], "NetConnection.Connect.Success"); - ngx_str_set(&out_inf[2], "Connection succeeded."); - - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "connect() called; app='%s' url='%s'", - app, url); - - /*FIXME: app_str allocation!!!!!!! */ - /*FIXME: add memsetting input data */ /* join stream */ - ngx_str_set(&app_str, "preved"); - /* - app_str.data = app; - app_str.len = ngx_strlen(app); - */ - ngx_rtmp_broadcast_join(s, &app_str, 0); + stream.len = ngx_strlen(app); + stream.data = ngx_palloc(c->pool, stream.len); + ngx_memcpy(stream.data, app, stream.len); + ngx_rtmp_broadcast_join(s, &stream, 0); + /* send all replies */ return ngx_rtmp_send_ack_size(s, cscf->ack_window) || ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC) || ngx_rtmp_send_user_stream_begin(s, 0) || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) || ngx_rtmp_send_amf0(s, h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) - ? NGX_OK + ? NGX_ERROR : NGX_OK; } @@ -446,31 +436,32 @@ static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - static double trans; - static double stream; + static double trans; + static double stream; - static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, + static ngx_rtmp_amf0_elt_t in_elts[] = { + { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, }; - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) }, + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "_result", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) }, }; ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "createStream() called"); + /* parse input */ if (ngx_rtmp_receive_amf0(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } + /* send result with standard stream */ stream = NGX_RTMP_BROADCAST_MSID; - return ngx_rtmp_send_amf0(s, h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])); } @@ -480,54 +471,48 @@ static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_header_t sh; + ngx_rtmp_header_t sh; - static double trans; - static u_char pub_name[1024]; - static u_char pub_type[1024]; - - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, - }; + static double trans; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, - { NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) }, - { NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) }, + { NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", "NetStream.Publish.Start", 0 }, + { NGX_RTMP_AMF0_STRING, "level", "status", 0 }, + { NGX_RTMP_AMF0_STRING, "description", "Publish succeeded.", 0 }, }; static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, }; + /* parse input */ if (ngx_rtmp_receive_amf0(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "publish() called; pubName='%s' pubType='%s'", - pub_name, pub_type); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "publish() called"); + /* mark current session as publisher */ + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER); + + /* start stream */ if (ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_BROADCAST_MSID) != NGX_OK) { return NGX_ERROR; } - ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER); - - ngx_str_set(&out_inf[0], "NetStream.Publish.Start"); - ngx_str_set(&out_inf[1], "status"); - ngx_str_set(&out_inf[2], "Publish succeeded."); - + /* send onStatus reply */ memset(&sh, 0, sizeof(sh)); sh.type = NGX_RTMP_MSG_AMF0_CMD; sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; @@ -547,101 +532,105 @@ static ngx_int_t ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_header_t sh; + ngx_rtmp_header_t sh; - static double trans; - static u_char play_name[1024]; - static int bfalse; - - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, - }; - - static ngx_rtmp_amf0_elt_t out2_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - }; + static double trans; + static uint8_t bfalse; static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, - { NGX_RTMP_AMF0_STRING, NULL, play_name, sizeof(play_name) }, + { NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", "NetStream.Play.Reset", 0 }, + { NGX_RTMP_AMF0_STRING, "level", "status", 0 }, + { NGX_RTMP_AMF0_STRING, "description", "Playing and resetting.", 0 }, }; static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + }; + + static ngx_rtmp_amf0_elt_t out2_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", "NetStream.Play.Start", 0 }, + { NGX_RTMP_AMF0_STRING, "level", "status", 0 }, + { NGX_RTMP_AMF0_STRING, "description", "Started playing.", 0 }, }; static ngx_rtmp_amf0_elt_t out2_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, { NGX_RTMP_AMF0_OBJECT, NULL, out2_inf, sizeof(out2_inf) }, }; - + static ngx_rtmp_amf0_elt_t out3_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "|RtmpSampleAccess", sizeof("|RtmpSampleAccess") - 1 }, - { NGX_RTMP_AMF0_BOOLEAN, NULL, &bfalse, sizeof(bfalse) }, + { NGX_RTMP_AMF0_STRING, NULL, "|RtmpSampleAccess", 0 }, + { NGX_RTMP_AMF0_BOOLEAN,NULL, &bfalse, 0 }, + { NGX_RTMP_AMF0_BOOLEAN,NULL, &bfalse, 0 }, }; + static ngx_rtmp_amf0_elt_t out4_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", "NetStream.Data.Start", 0 }, + }; + static ngx_rtmp_amf0_elt_t out4_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out4_inf, sizeof(out4_inf) }, + }; + + /* parse input */ if (ngx_rtmp_receive_amf0(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - play_name[0] = 0; - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "play() called; playame='%s'", - play_name); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "play() called"); + /* mark session as subscriber */ + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER); + + /* start stream */ if (ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_BROADCAST_MSID) != NGX_OK) { return NGX_ERROR; } - ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER); - + /* send onStatus reply */ memset(&sh, 0, sizeof(sh)); sh.type = NGX_RTMP_MSG_AMF0_CMD; sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; sh.msid = h->msid; - ngx_str_set(&out_inf[0], "NetStream.Play.Reset"); - ngx_str_set(&out_inf[1], "status"); - ngx_str_set(&out_inf[2], "Playing and resetting."); - if (ngx_rtmp_send_amf0(s, &sh, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) { return NGX_ERROR; } - ngx_str_set(&out_inf[0], "NetStream.Play.Start"); - ngx_str_set(&out_inf[1], "status"); - ngx_str_set(&out_inf[2], "Started playing."); - - if (ngx_rtmp_send_amf0(s, &sh, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + /* send sample access meta message FIXME */ + if (ngx_rtmp_send_amf0(s, &sh, out2_elts, + sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK) { return NGX_ERROR; } - ngx_str_set(&out2_inf[0], "NetStream.Data.Start"); + /* send data start meta message */ sh.type = NGX_RTMP_MSG_AMF0_META; - if (ngx_rtmp_send_amf0(s, &sh, out3_elts, sizeof(out3_elts) / sizeof(out3_elts[0])) != NGX_OK) { return NGX_ERROR; } - if (ngx_rtmp_send_amf0(s, &sh, out2_elts, - sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK) + if (ngx_rtmp_send_amf0(s, &sh, out4_elts, + sizeof(out4_elts) / sizeof(out4_elts[0])) != NGX_OK) { return NGX_ERROR; } @@ -661,17 +650,17 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_core_srv_conf_t *cscf; static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, - "@setDataFrame", sizeof("@setDataFrame") - 1 }, + { NGX_RTMP_AMF0_STRING, NULL, "@setDataFrame", 0 }, }; c = s->connection; - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "data_frame arrived"); + /* TODO: allow sending more meta packages to change broadcast content */ + if (ctx->data_frame) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "duplicate data_frame"); @@ -721,32 +710,35 @@ static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - static double trans; + ngx_rtmp_header_t sh; + + static double trans; static ngx_rtmp_amf0_elt_t in_elts[] = { { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, }; - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, - }; - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, - { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + { NGX_RTMP_AMF0_STRING, NULL, "_result", 0 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, }; + /* parse input */ if (ngx_rtmp_receive_amf0(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - return ngx_rtmp_send_amf0(s, h, out_elts, + memset(&sh, 0, sizeof(sh)); + sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0; + sh.type = NGX_RTMP_MSG_AMF0_META; + sh.msid = h->msid; + + /* send simple _result */ + return ngx_rtmp_send_amf0(s, &sh, out_elts, sizeof(out_elts) / sizeof(out_elts[0])); } diff --git a/test/nginx.conf b/test/nginx.conf index 6e0c473..30c7ad5 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -20,7 +20,11 @@ rtmp { listen 1935; - wait_key_frame off; + wait_key_frame on; + + chunk_size 128; + + max_buf 1000000; } } diff --git a/test/www/index.html b/test/www/index.html index a045afe..7e6711a 100644 --- a/test/www/index.html +++ b/test/www/index.html @@ -5,13 +5,13 @@
Loading the player ...