added auto-detectiion of size in amf0 writer && improved a lot of code in broadcaster

This commit is contained in:
Roman Arutyunyan 2012-03-19 19:55:46 +04:00
parent 96ebed8573
commit b9ee8dbe09
5 changed files with 143 additions and 143 deletions

View file

@ -405,6 +405,10 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx,
break;
case NGX_RTMP_AMF0_STRING:
if (len == 0 && data) {
len = ngx_strlen((u_char*)data);
}
if (ngx_rtmp_amf0_put(ctx,
ngx_rtmp_amf0_reverse_copy(buf,
&len, 2), 2) != NGX_OK)

View file

@ -365,79 +365,69 @@ static ngx_int_t
ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_connection_t *c;
ngx_rtmp_core_srv_conf_t *cscf;
static double trans;
static u_char app[1024];
static u_char url[1024];
static u_char acodecs[1024];
static ngx_str_t app_str;
static ngx_str_t stream;
static double capabilities = 31;
static double object_enc;
static ngx_rtmp_amf0_elt_t in_cmd[] = {
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
{ NGX_RTMP_AMF0_STRING, "tcUrl" , url, sizeof(url) },
{ NGX_RTMP_AMF0_STRING, "audiocodecs" , acodecs, sizeof(acodecs) },
static ngx_rtmp_amf0_elt_t in_cmd[] = {
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
};
static ngx_rtmp_amf0_elt_t out_obj[] = {
{ NGX_RTMP_AMF0_STRING, "fmsVer", "FMS/3,0,1,123" , sizeof("FMS/3,0,1,123")-1 },
{ NGX_RTMP_AMF0_NUMBER, "capabilities", &capabilities, sizeof(capabilities) },
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
{ NGX_RTMP_AMF0_NUMBER, "objectEncoding", &object_enc , sizeof(object_enc) },
static ngx_rtmp_amf0_elt_t out_obj[] = {
{ NGX_RTMP_AMF0_STRING, "fmsVer", "FMS/3,0,1,123", 0 },
{ NGX_RTMP_AMF0_NUMBER, "capabilities", &capabilities, 0 },
};
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) },
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "level", "status", 0 },
{ NGX_RTMP_AMF0_STRING, "code", "NetConnection.Connect.Success", 0 },
{ NGX_RTMP_AMF0_STRING, "description", "Connection succeeded.", 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_obj, sizeof(out_obj) },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
/* parse input */
app[0] = 0;
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"connect() called; app='%s'", app);
ngx_str_set(&out_inf[0], "status");
ngx_str_set(&out_inf[1], "NetConnection.Connect.Success");
ngx_str_set(&out_inf[2], "Connection succeeded.");
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"connect() called; app='%s' url='%s'",
app, url);
/*FIXME: app_str allocation!!!!!!! */
/*FIXME: add memsetting input data */
/* join stream */
ngx_str_set(&app_str, "preved");
/*
app_str.data = app;
app_str.len = ngx_strlen(app);
*/
ngx_rtmp_broadcast_join(s, &app_str, 0);
stream.len = ngx_strlen(app);
stream.data = ngx_palloc(c->pool, stream.len);
ngx_memcpy(stream.data, app, stream.len);
ngx_rtmp_broadcast_join(s, &stream, 0);
/* send all replies */
return ngx_rtmp_send_ack_size(s, cscf->ack_window)
|| ngx_rtmp_send_bandwidth(s, cscf->ack_window, NGX_RTMP_LIMIT_DYNAMIC)
|| ngx_rtmp_send_user_stream_begin(s, 0)
|| ngx_rtmp_send_chunk_size(s, cscf->chunk_size)
|| ngx_rtmp_send_amf0(s, h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_OK
? NGX_ERROR
: NGX_OK;
}
@ -446,31 +436,32 @@ static ngx_int_t
ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static double trans;
static double stream;
static double trans;
static double stream;
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) },
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) },
};
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"createStream() called");
/* parse input */
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
/* send result with standard stream */
stream = NGX_RTMP_BROADCAST_MSID;
return ngx_rtmp_send_amf0(s, h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
@ -480,54 +471,48 @@ static ngx_int_t
ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_header_t sh;
ngx_rtmp_header_t sh;
static double trans;
static u_char pub_name[1024];
static u_char pub_type[1024];
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static double trans;
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
{ NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) },
{ NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) },
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", "NetStream.Publish.Start", 0 },
{ NGX_RTMP_AMF0_STRING, "level", "status", 0 },
{ NGX_RTMP_AMF0_STRING, "description", "Publish succeeded.", 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
/* parse input */
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"publish() called; pubName='%s' pubType='%s'",
pub_name, pub_type);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"publish() called");
/* mark current session as publisher */
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER);
/* start stream */
if (ngx_rtmp_send_user_stream_begin(s,
NGX_RTMP_BROADCAST_MSID) != NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER);
ngx_str_set(&out_inf[0], "NetStream.Publish.Start");
ngx_str_set(&out_inf[1], "status");
ngx_str_set(&out_inf[2], "Publish succeeded.");
/* send onStatus reply */
memset(&sh, 0, sizeof(sh));
sh.type = NGX_RTMP_MSG_AMF0_CMD;
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
@ -547,101 +532,105 @@ static ngx_int_t
ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_header_t sh;
ngx_rtmp_header_t sh;
static double trans;
static u_char play_name[1024];
static int bfalse;
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out2_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
};
static double trans;
static uint8_t bfalse;
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
{ NGX_RTMP_AMF0_STRING, NULL, play_name, sizeof(play_name) },
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, 0 },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", "NetStream.Play.Reset", 0 },
{ NGX_RTMP_AMF0_STRING, "level", "status", 0 },
{ NGX_RTMP_AMF0_STRING, "description", "Playing and resetting.", 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
static ngx_rtmp_amf0_elt_t out2_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", "NetStream.Play.Start", 0 },
{ NGX_RTMP_AMF0_STRING, "level", "status", 0 },
{ NGX_RTMP_AMF0_STRING, "description", "Started playing.", 0 },
};
static ngx_rtmp_amf0_elt_t out2_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out2_inf, sizeof(out2_inf) },
};
static ngx_rtmp_amf0_elt_t out3_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "|RtmpSampleAccess", sizeof("|RtmpSampleAccess") - 1 },
{ NGX_RTMP_AMF0_BOOLEAN, NULL, &bfalse, sizeof(bfalse) },
{ NGX_RTMP_AMF0_STRING, NULL, "|RtmpSampleAccess", 0 },
{ NGX_RTMP_AMF0_BOOLEAN,NULL, &bfalse, 0 },
{ NGX_RTMP_AMF0_BOOLEAN,NULL, &bfalse, 0 },
};
static ngx_rtmp_amf0_elt_t out4_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", "NetStream.Data.Start", 0 },
};
static ngx_rtmp_amf0_elt_t out4_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out4_inf, sizeof(out4_inf) },
};
/* parse input */
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
play_name[0] = 0;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"play() called; playame='%s'",
play_name);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"play() called");
/* mark session as subscriber */
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER);
/* start stream */
if (ngx_rtmp_send_user_stream_begin(s,
NGX_RTMP_BROADCAST_MSID) != NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER);
/* send onStatus reply */
memset(&sh, 0, sizeof(sh));
sh.type = NGX_RTMP_MSG_AMF0_CMD;
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
sh.msid = h->msid;
ngx_str_set(&out_inf[0], "NetStream.Play.Reset");
ngx_str_set(&out_inf[1], "status");
ngx_str_set(&out_inf[2], "Playing and resetting.");
if (ngx_rtmp_send_amf0(s, &sh, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
ngx_str_set(&out_inf[0], "NetStream.Play.Start");
ngx_str_set(&out_inf[1], "status");
ngx_str_set(&out_inf[2], "Started playing.");
if (ngx_rtmp_send_amf0(s, &sh, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
/* send sample access meta message FIXME */
if (ngx_rtmp_send_amf0(s, &sh, out2_elts,
sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
ngx_str_set(&out2_inf[0], "NetStream.Data.Start");
/* send data start meta message */
sh.type = NGX_RTMP_MSG_AMF0_META;
if (ngx_rtmp_send_amf0(s, &sh, out3_elts,
sizeof(out3_elts) / sizeof(out3_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_rtmp_send_amf0(s, &sh, out2_elts,
sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK)
if (ngx_rtmp_send_amf0(s, &sh, out4_elts,
sizeof(out4_elts) / sizeof(out4_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
@ -661,17 +650,17 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_core_srv_conf_t *cscf;
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL,
"@setDataFrame", sizeof("@setDataFrame") - 1 },
{ NGX_RTMP_AMF0_STRING, NULL, "@setDataFrame", 0 },
};
c = s->connection;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "data_frame arrived");
/* TODO: allow sending more meta packages to change broadcast content */
if (ctx->data_frame) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"duplicate data_frame");
@ -721,32 +710,35 @@ static ngx_int_t
ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static double trans;
ngx_rtmp_header_t sh;
static double trans;
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
{ NGX_RTMP_AMF0_STRING, NULL, "_result", 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
};
/* parse input */
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
return ngx_rtmp_send_amf0(s, h, out_elts,
memset(&sh, 0, sizeof(sh));
sh.csid = NGX_RTMP_BROADCAST_CSID_AMF0;
sh.type = NGX_RTMP_MSG_AMF0_META;
sh.msid = h->msid;
/* send simple _result */
return ngx_rtmp_send_amf0(s, &sh, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}

View file

@ -20,7 +20,11 @@ rtmp {
listen 1935;
wait_key_frame off;
wait_key_frame on;
chunk_size 128;
max_buf 1000000;
}
}

View file

@ -5,13 +5,13 @@
<div id="container">Loading the player ...</div>
<script type="text/javascript">
jwplayer("container").setup({
image: "http://192.168.0.100/showme.jpg",
image: "http://10.31.1.78/showme.jpg",
modes: [
{ type: "flash",
src: "/jwplayer/player.swf",
config: {
file: "video.mp4",
streamer: "rtmp://192.168.0.100/helo/p",
streamer: "rtmp://10.31.1.78/helo",
provider: "rtmp"
}
}

View file

@ -6,7 +6,7 @@
<script type="text/javascript">
var flashvars =
{
'streamer': 'rtmp://192.168.0.100/helo/p',
'streamer': 'rtmp://10.31.1.78/helo',
'file': 'livestream',
'type': 'camera',
'controlbar': 'bottom',