implemented segmented live streaming; moved specific code from cmd module to live/play modules

This commit is contained in:
Roman Arutyunyan 2012-11-13 00:22:57 +04:00
parent 903abb6646
commit fd99086834
12 changed files with 627 additions and 758 deletions

View file

@ -462,6 +462,17 @@ ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
#define NGX_RTMP_LIMIT_DYNAMIC 2
/* Protocol control messages */
ngx_chain_t * ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s,
uint32_t chunk_size);
ngx_chain_t * ngx_rtmp_create_abort(ngx_rtmp_session_t *s,
uint32_t csid);
ngx_chain_t * ngx_rtmp_create_ack(ngx_rtmp_session_t *s,
uint32_t seq);
ngx_chain_t * ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s,
uint32_t ack_size);
ngx_chain_t * ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s,
uint32_t ack_size, uint8_t limit_type);
ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s,
uint32_t chunk_size);
ngx_int_t ngx_rtmp_send_abort(ngx_rtmp_session_t *s,
@ -474,37 +485,60 @@ ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s,
uint32_t ack_size, uint8_t limit_type);
/* User control messages */
ngx_int_t ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_stream_dry(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_set_buflen(ngx_rtmp_session_t *s,
uint32_t msid, uint32_t buflen_msec);
ngx_int_t ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_recorded(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_ping_request(ngx_rtmp_session_t *s,
uint32_t timestamp);
ngx_int_t ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s,
ngx_chain_t * ngx_rtmp_create_ping_response(ngx_rtmp_session_t *s,
uint32_t timestamp);
ngx_int_t ngx_rtmp_send_user_unknown(ngx_rtmp_session_t *s,
ngx_int_t ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_stream_dry(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_set_buflen(ngx_rtmp_session_t *s,
uint32_t msid, uint32_t buflen_msec);
ngx_int_t ngx_rtmp_send_recorded(ngx_rtmp_session_t *s,
uint32_t msid);
ngx_int_t ngx_rtmp_send_ping_request(ngx_rtmp_session_t *s,
uint32_t timestamp);
ngx_int_t ngx_rtmp_send_ping_response(ngx_rtmp_session_t *s,
uint32_t timestamp);
/* AMF sender/receiver */
ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s,
ngx_chain_t **first, ngx_chain_t **last,
ngx_rtmp_amf_elt_t *elts, size_t nelts);
ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts);
ngx_int_t ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in,
ngx_rtmp_amf_elt_t *elts, size_t nelts);
ngx_chain_t * ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts);
ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts);
/* AMF status sender */
ngx_chain_t * ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code,
char* level, char *desc);
ngx_chain_t * ngx_rtmp_create_play_status(ngx_rtmp_session_t *s, char *code,
char* level, ngx_uint_t duration, ngx_uint_t bytes);
ngx_chain_t * ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s);
ngx_int_t ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code,
char* level, char *desc);
ngx_int_t ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code,
char* level, ngx_uint_t duration, ngx_uint_t bytes);
ngx_int_t ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s);
/* Frame types */

View file

@ -14,15 +14,8 @@ ngx_rtmp_connect_pt ngx_rtmp_connect;
ngx_rtmp_create_stream_pt ngx_rtmp_create_stream;
ngx_rtmp_close_stream_pt ngx_rtmp_close_stream;
ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream;
ngx_rtmp_publish_pt ngx_rtmp_publish;
ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish;
ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish;
ngx_rtmp_play_pt ngx_rtmp_play;
ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe;
ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe;
ngx_rtmp_seek_pt ngx_rtmp_seek;
ngx_rtmp_pause_pt ngx_rtmp_pause;
@ -124,9 +117,7 @@ ngx_rtmp_cmd_connect_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
v.app[len - 1] = 0;
}
return ngx_rtmp_connect
? ngx_rtmp_connect(s, &v)
: NGX_OK;
return ngx_rtmp_connect(s, &v);
}
@ -250,27 +241,25 @@ ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
if (s->app_conf == NULL) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"connect: application not found: '%s'", v->app);
"connect: application not found: '%s'", v->app);
return NGX_ERROR;
}
object_encoding = v->object_encoding;
/* send all replies */
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK
|| ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
ngx_rtmp_send_bandwidth(s, cscf->ack_window,
NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
!= NGX_OK ? NGX_ERROR : NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
ngx_chain_t *in)
{
static ngx_rtmp_create_stream_t v;
@ -287,9 +276,9 @@ ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return NGX_ERROR;
}
return ngx_rtmp_create_stream
? ngx_rtmp_create_stream(s, &v)
: NGX_OK;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");
return ngx_rtmp_create_stream(s, &v);
}
@ -324,23 +313,19 @@ ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
stream = NGX_RTMP_MSID;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"createStream");
/* send result with standard stream */
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK
? NGX_DONE
: NGX_ERROR;
sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
NGX_DONE : NGX_ERROR;
}
static ngx_int_t
ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
ngx_chain_t *in)
{
static ngx_rtmp_close_stream_t v;
@ -352,34 +337,27 @@ ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
};
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"closeStream");
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "closeStream");
return ngx_rtmp_close_stream
? ngx_rtmp_close_stream(s, &v)
: NGX_OK;
return ngx_rtmp_close_stream(s, &v);
}
static ngx_int_t
ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
{
ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID);
/* Whatever happens return OK
* since we should be careful with destruction */
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
ngx_chain_t *in)
{
static ngx_rtmp_delete_stream_t v;
@ -399,14 +377,12 @@ ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
};
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
return ngx_rtmp_delete_stream
? ngx_rtmp_delete_stream(s, &v)
: NGX_OK;
return ngx_rtmp_delete_stream(s, &v);
}
@ -415,14 +391,11 @@ ngx_rtmp_cmd_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
{
ngx_rtmp_close_stream_t cv;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"deleteStream");
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "deleteStream");
/* chain close_stream */
cv.stream = 0;
return ngx_rtmp_close_stream
? ngx_rtmp_close_stream(s, &cv)
: NGX_OK;
return ngx_rtmp_close_stream(s, &cv);
}
@ -470,186 +443,35 @@ ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
return ngx_rtmp_publish
? ngx_rtmp_publish(s, &v)
: NGX_OK;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name='%s' args='%s' type=%s silent=%d",
v.name, v.args, v.type, v.silent);
return ngx_rtmp_publish(s, &v);
}
static ngx_int_t
ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_header_t h;
static double trans;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Publish.Start", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Publish succeeded.", 0 },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onStatus", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"publish: name='%s' args='%s' type=%s silent=%d",
v->name, v->args, v->type, v->silent);
if (v->silent) {
return NGX_OK;
}
/* send onStatus reply */
memset(&h, 0, sizeof(h));
h.type = NGX_RTMP_MSG_AMF_CMD;
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
if (ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_fcpublish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_fcpublish_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
};
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
return ngx_rtmp_fcpublish
? ngx_rtmp_fcpublish(s, &v)
: NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_fcpublish(ngx_rtmp_session_t *s, ngx_rtmp_fcpublish_t *v)
{
ngx_rtmp_header_t h;
static double trans;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Publish.Start", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"FCPublish succeeded.", 0 },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onFCPublish", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"fcpublish: name='%s'", v->name);
/* send onFCPublish reply */
memset(&h, 0, sizeof(h));
h.type = NGX_RTMP_MSG_AMF_CMD;
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
if (ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_play_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
@ -679,279 +501,28 @@ ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_rtmp_cmd_fill_args(v.name, v.args);
return ngx_rtmp_play
? ngx_rtmp_play(s, &v)
: NGX_OK;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"play: name='%s' args='%s' start=%i duration=%i "
"reset=%i silent=%i",
v.name, v.args, (ngx_int_t) v.start,
(ngx_int_t) v.duration, (ngx_int_t) v.reset,
(ngx_int_t) v.silent);
return ngx_rtmp_play(s, &v);
}
static ngx_int_t
ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_header_t h;
static double trans;
static int access = 1;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Play.Start", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Playback started.", 0 },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onStatus", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf, sizeof(out_inf) },
};
static ngx_rtmp_amf_elt_t out2_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Play.Start", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Started playing.", 0 },
};
static ngx_rtmp_amf_elt_t out2_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onStatus", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out2_inf,
sizeof(out2_inf) },
};
static ngx_rtmp_amf_elt_t out3_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"|RtmpSampleAccess", 0 },
{ NGX_RTMP_AMF_BOOLEAN,
ngx_null_string,
&access, 0 },
{ NGX_RTMP_AMF_BOOLEAN,
ngx_null_string,
&access, 0 },
};
static ngx_rtmp_amf_elt_t out4_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Data.Start", 0 },
};
static ngx_rtmp_amf_elt_t out4_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onStatus", 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out4_inf, sizeof(out4_inf) },
};
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"play name='%s' args='%s' start=%i duration=%i "
"reset=%i silent=%i",
v->name, v->args, (ngx_int_t) v->start,
(ngx_int_t) v->duration, (ngx_int_t) v->reset,
(ngx_int_t) v->silent);
if (v->silent) {
return NGX_OK;
}
/* send onStatus reply */
memset(&h, 0, sizeof(h));
h.type = NGX_RTMP_MSG_AMF_CMD;
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
/*
if (ngx_rtmp_send_user_recorded(s, NGX_RTMP_MSID) != NGX_OK) {
return NGX_ERROR;
}*/
if (ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
/* send sample access meta message FIXME */
if (ngx_rtmp_send_amf(s, &h, out2_elts,
sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
/* send data start meta message */
h.type = NGX_RTMP_MSG_AMF_META;
if (ngx_rtmp_send_amf(s, &h, out3_elts,
sizeof(out3_elts) / sizeof(out3_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_rtmp_send_amf(s, &h, out4_elts,
sizeof(out4_elts) / sizeof(out4_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_fcsubscribe_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
static ngx_rtmp_fcsubscribe_t v;
static ngx_rtmp_amf_elt_t in_elts[] = {
/* transaction is always 0 */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
&v.name, sizeof(v.name) },
};
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
return ngx_rtmp_fcsubscribe
? ngx_rtmp_fcsubscribe(s, &v)
: NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_fcsubscribe(ngx_rtmp_session_t *s, ngx_rtmp_fcsubscribe_t *v)
{
ngx_rtmp_header_t h;
static double trans;
static ngx_rtmp_amf_elt_t out_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
"NetStream.Play.Start", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
"status", 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
"Started playing.", 0 },
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"onFCSubscribe", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_inf,
sizeof(out_inf) },
};
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"fcsubscribe: name='%s'", v->name);
/* send onFCSubscribe reply */
memset(&h, 0, sizeof(h));
h.type = NGX_RTMP_MSG_AMF_CMD;
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
if (ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
@ -983,7 +554,6 @@ ngx_rtmp_cmd_pause_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
@ -991,42 +561,27 @@ ngx_rtmp_cmd_pause_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"cmd: pause pause=%i position=%i",
(ngx_int_t)v.pause, (ngx_int_t)v.position);
"pause: pause=%i position=%i",
(ngx_int_t) v.pause, (ngx_int_t) v.position);
return ngx_rtmp_pause
? ngx_rtmp_pause(s, &v)
: NGX_OK;
return ngx_rtmp_pause(s, &v);
}
static ngx_int_t
ngx_rtmp_cmd_pause(ngx_rtmp_session_t *s, ngx_rtmp_pause_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"pause: state='%i' position=%i",
v->pause, (ngx_int_t) v->position);
if (v->pause) {
return ngx_rtmp_send_status(s, "NetStream.Pause.Notify", "status",
"Paused");
} else {
return ngx_rtmp_send_status(s, "NetStream.Unpause.Notify", "status",
"Unpaused");
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
ngx_chain_t *in)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"disconnect");
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "disconnect");
return ngx_rtmp_delete_stream
? ngx_rtmp_delete_stream(s, NULL)
: NGX_OK;
return ngx_rtmp_delete_stream(s, NULL);
}
@ -1054,49 +609,33 @@ ngx_rtmp_cmd_seek_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_memzero(&v, sizeof(v));
/* parse input */
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
return ngx_rtmp_seek
? ngx_rtmp_seek(s, &v)
: NGX_OK;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"seek: offset=%i", (ngx_int_t) v.offset);
return ngx_rtmp_seek(s, &v);
}
static ngx_int_t
ngx_rtmp_cmd_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"seek: offset=%i", (ngx_int_t) v->offset);
return (ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID) != NGX_OK
|| ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_MSID) != NGX_OK
|| ngx_rtmp_send_status(s, "NetStream.Seek.Notify", "status",
"Seeking"))
? NGX_ERROR
: NGX_OK;
return NGX_OK;
}
static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = {
{ ngx_string("connect"), ngx_rtmp_cmd_connect_init },
{ ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init },
{ ngx_string("closeStream"), ngx_rtmp_cmd_close_stream_init },
{ ngx_string("deleteStream"), ngx_rtmp_cmd_delete_stream_init },
{ ngx_string("publish"), ngx_rtmp_cmd_publish_init },
{ ngx_string("fcpublish"), ngx_rtmp_cmd_fcpublish_init },
/*{ ngx_string("fcunpublish"), ngx_rtmp_cmd_fcunpublish_init },*/
{ ngx_string("play"), ngx_rtmp_cmd_play_init },
{ ngx_string("fcsubscribe"), ngx_rtmp_cmd_fcsubscribe_init },
/*{ ngx_string("fcunsubscribe"), ngx_rtmp_cmd_fcunsubscribe_init },*/
{ ngx_string("seek"), ngx_rtmp_cmd_seek_init },
{ ngx_string("pause"), ngx_rtmp_cmd_pause_init },
{ ngx_string("pauseraw"), ngx_rtmp_cmd_pause_init },
@ -1116,35 +655,37 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf)
/* redirect disconnects to deleteStream
* to free client modules from registering
* disconnect callback */
h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
*h = ngx_rtmp_cmd_disconnect;
/* register AMF callbacks */
ncalls = sizeof(ngx_rtmp_cmd_map) / sizeof(ngx_rtmp_cmd_map[0]);
ch = ngx_array_push_n(&cmcf->amf, ncalls);
h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
if (h == NULL) {
return NGX_ERROR;
}
*h = ngx_rtmp_cmd_disconnect;
/* register AMF callbacks */
ncalls = sizeof(ngx_rtmp_cmd_map) / sizeof(ngx_rtmp_cmd_map[0]);
ch = ngx_array_push_n(&cmcf->amf, ncalls);
if (ch == NULL) {
return NGX_ERROR;
}
bh = ngx_rtmp_cmd_map;
for(n = 0; n < ncalls; ++n, ++ch, ++bh) {
*ch = *bh;
}
/* set initial handlers */
ngx_rtmp_connect = ngx_rtmp_cmd_connect;
ngx_rtmp_create_stream = ngx_rtmp_cmd_create_stream;
ngx_rtmp_close_stream = ngx_rtmp_cmd_close_stream;
ngx_rtmp_delete_stream = ngx_rtmp_cmd_delete_stream;
ngx_rtmp_publish = ngx_rtmp_cmd_publish;
ngx_rtmp_fcpublish = ngx_rtmp_cmd_fcpublish;
/*ngx_rtmp_fcunpublish = ngx_rtmp_cmd_fcunpublish;*/
ngx_rtmp_play = ngx_rtmp_cmd_play;
ngx_rtmp_fcsubscribe = ngx_rtmp_cmd_fcsubscribe;
/*ngx_rtmp_fcunsubscribe = ngx_rtmp_cmd_fcunsubsrcibe;*/
ngx_rtmp_seek = ngx_rtmp_cmd_seek;
ngx_rtmp_pause = ngx_rtmp_cmd_pause;

View file

@ -58,16 +58,6 @@ typedef struct {
} ngx_rtmp_publish_t;
typedef struct {
u_char name[NGX_RTMP_MAX_NAME];
} ngx_rtmp_fcpublish_t;
typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunpublish_t;
typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcsubscribe_t;
typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunsubscribe_t;
typedef struct {
u_char name[NGX_RTMP_MAX_NAME];
u_char args[NGX_RTMP_MAX_ARGS];
@ -100,17 +90,9 @@ typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s,
typedef ngx_int_t (*ngx_rtmp_publish_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
typedef ngx_int_t (*ngx_rtmp_fcpublish_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_fcpublish_t *v);
typedef ngx_int_t (*ngx_rtmp_fcunpublish_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_fcunpublish_t *v);
typedef ngx_int_t (*ngx_rtmp_play_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_play_t *v);
typedef ngx_int_t (*ngx_rtmp_fcsubscribe_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_fcsubscribe_t *v);
typedef ngx_int_t (*ngx_rtmp_fcunsubscribe_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_fcunsubscribe_t *v);
typedef ngx_int_t (*ngx_rtmp_seek_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_seek_t *v);
@ -122,15 +104,8 @@ extern ngx_rtmp_connect_pt ngx_rtmp_connect;
extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream;
extern ngx_rtmp_close_stream_pt ngx_rtmp_close_stream;
extern ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream;
extern ngx_rtmp_publish_pt ngx_rtmp_publish;
extern ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish;
extern ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish;
extern ngx_rtmp_play_pt ngx_rtmp_play;
extern ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe;
extern ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe;
extern ngx_rtmp_seek_pt ngx_rtmp_seek;
extern ngx_rtmp_pause_pt ngx_rtmp_pause;

View file

@ -252,7 +252,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method)
/* find publisher context */
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
if (lctx->publishing) {
break;
}
}
@ -358,7 +358,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method)
ngx_strncmp(method->data, "publisher", method->len) == 0)
{
for (lctx = live.ls->ctx; lctx; lctx = lctx->next) {
if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
if (lctx->publishing) {
ngx_rtmp_finalize_session(lctx->session);
++ndropped;
break;

View file

@ -174,9 +174,7 @@ ngx_rtmp_ping(ngx_event_t *pev)
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"ping: schedule %Mms", cscf->ping_timeout);
if (ngx_rtmp_send_user_ping_request(s, (uint32_t)ngx_current_msec)
!= NGX_OK)
{
if (ngx_rtmp_send_ping_request(s, (uint32_t)ngx_current_msec) != NGX_OK) {
ngx_rtmp_finalize_session(s);
return;
}

View file

@ -198,8 +198,138 @@ ngx_rtmp_live_get_stream(ngx_rtmp_session_t *s, u_char *name, int create)
static void
ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
ngx_uint_t flags)
ngx_rtmp_live_set_status(ngx_rtmp_session_t *s, ngx_chain_t *control,
ngx_chain_t **status, size_t nstatus,
unsigned active)
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_chain_t **cl;
size_t n;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: set active=%ui", active);
if (ctx->active == active) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: unchanged active=%ui", active);
return;
}
ctx->active = active;
if (ctx->publishing) {
/* publisher */
ctx->stream->active = active;
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx->publishing == 0) {
ngx_rtmp_live_set_status(pctx->session, control, status,
nstatus, active);
}
}
return;
}
/* subscriber */
if (control && ngx_rtmp_send_message(s, control, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
return;
}
if (!ctx->silent) {
cl = status;
for (n = 0; n < nstatus; ++n, ++cl) {
if (*cl && ngx_rtmp_send_message(s, *cl, 0) != NGX_OK) {
ngx_rtmp_finalize_session(s);
return;
}
}
}
ctx->cs[0].active = 0;
ctx->cs[0].dropped = 0;
ctx->cs[1].active = 0;
ctx->cs[1].dropped = 0;
}
/*TODO: intercept stream begin/eof for publishers & call these functions */
static void
ngx_rtmp_live_stream_begin(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *control;
ngx_chain_t *status[3];
size_t n, nstatus;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID);
nstatus = 0;
status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start",
"status", "Start live");
status[nstatus++] = ngx_rtmp_create_sample_access(s);
/*status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.PublishNotify",
"status", "Start publishing");
*/
ngx_rtmp_live_set_status(s, control, status, nstatus, 1);
if (control) {
ngx_rtmp_free_shared_chain(cscf, control);
}
for (n = 0; n < nstatus; ++n) {
ngx_rtmp_free_shared_chain(cscf, status[n]);
}
}
static void
ngx_rtmp_live_stream_eof(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *control;
ngx_chain_t *status[3];
size_t n, nstatus;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
control = ngx_rtmp_create_stream_eof(s, NGX_RTMP_MSID);
nstatus = 0;
status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Stop",
"status", "Stop live");
/*status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.UnpublishNotify",
"status", "Start publishing");
*/
ngx_rtmp_live_set_status(s, control, status, nstatus, 0);
if (control) {
ngx_rtmp_free_shared_chain(cscf, control);
}
for (n = 0; n < nstatus; ++n) {
ngx_rtmp_free_shared_chain(cscf, status[n]);
}
}
static void
ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher)
{
ngx_rtmp_live_ctx_t *ctx;
ngx_rtmp_live_stream_t **stream;
@ -212,8 +342,8 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx && ctx->stream) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log,
0, "live: already joined");
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: already joined");
return;
}
@ -223,26 +353,31 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
}
ngx_memzero(ctx, sizeof(*ctx));
ctx->session = s;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: join '%s'", name);
"live: join '%s'", name);
stream = ngx_rtmp_live_get_stream(s, name, 1);
if (stream == NULL) {
return;
}
if (flags & NGX_RTMP_LIVE_PUBLISHING) {
if ((*stream)->flags & NGX_RTMP_LIVE_PUBLISHING) {
if (publisher) {
if ((*stream)->publishing) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"live: already publishing");
"live: already publishing");
return;
}
(*stream)->flags |= NGX_RTMP_LIVE_PUBLISHING;
(*stream)->publishing = 1;
}
ctx->stream = *stream;
ctx->flags = flags;
ctx->publishing = publisher;
ctx->next = (*stream)->ctx;
(*stream)->ctx = ctx;
if (lacf->buflen) {
@ -251,93 +386,10 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
ctx->cs[0].csid = NGX_RTMP_MSG_AUDIO;
ctx->cs[1].csid = NGX_RTMP_MSG_VIDEO;
}
static ngx_int_t
ngx_rtmp_live_sync_streams(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch, lh;
ngx_chain_t *pkt;
ngx_uint_t csidx;
/* Stream synchronization:
*
* - both streams active
* advance lagging stream
*
* - one stream actvie
* add absolute frame for inactive stream
* (the active stream became active
* during current publisher session)
*
* - none active
* quit
*/
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf->interleave) {
return NGX_OK;
if (ctx->publishing || ctx->stream->active) {
ngx_rtmp_live_stream_begin(s);
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx->cs[0].active == 0 && ctx->cs[1].active == 0) {
return NGX_OK;
}
if (ctx->cs[0].active == 0) {
csidx = 0;
} else if (ctx->cs[1].active == 0) {
csidx = 1;
} else {
csidx = (ctx->cs[0].timestamp > ctx->cs[1].timestamp);
}
ngx_memzero(&ch, sizeof(ch));
ch.msid = NGX_RTMP_MSID;
ch.type = csidx ? NGX_RTMP_MSG_VIDEO : NGX_RTMP_MSG_AUDIO;
ch.csid = ctx->cs[csidx].csid;
ch.timestamp = ctx->cs[1 - csidx].timestamp;
lh = ch;
lh.timestamp = ctx->cs[csidx].timestamp;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
pkt = ngx_rtmp_alloc_shared_buf(cscf);
if (pkt == NULL) {
return NGX_ERROR;
}
ngx_rtmp_prepare_message(s, &ch, ctx->cs[csidx].active ? &lh : NULL, pkt);
for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {
if (pctx->cs[1 - csidx].active == 0) {
continue;
}
ss = pctx->session;
if (ngx_rtmp_send_message(ss, pkt, 0) != NGX_OK) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sync failed");
ngx_rtmp_finalize_session(ss);
}
pctx->cs[csidx].timestamp += ch.timestamp;
pctx->cs[csidx].active = 1;
}
ngx_rtmp_free_shared_chain(cscf, pkt);
return NGX_OK;
}
@ -360,17 +412,15 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
if (ctx->stream == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: not joined ");
"live: not joined");
goto next;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: leave '%s'", ctx->stream->name);
if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING
&& ctx->flags & NGX_RTMP_LIVE_PUBLISHING)
{
ctx->stream->flags &= ~NGX_RTMP_LIVE_PUBLISHING;
if (ctx->stream->publishing && ctx->publishing) {
ctx->stream->publishing = 0;
}
for (cctx = &ctx->stream->ctx; *cctx; cctx = &(*cctx)->next) {
@ -380,9 +430,8 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
}
}
if (ngx_rtmp_live_sync_streams(s) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"live: sync error");
if (ctx->publishing || ctx->stream->active) {
ngx_rtmp_live_stream_eof(s);
}
if (ctx->stream->ctx) {
@ -391,11 +440,12 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: delete empty stream '%s'", ctx->stream->name);
"live: delete empty stream '%s'",
ctx->stream->name);
stream = ngx_rtmp_live_get_stream(s, ctx->stream->name, 0);
if (stream == NULL) {
return NGX_ERROR;
goto next;
}
*stream = (*stream)->next;
@ -426,7 +476,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_uint_t meta_version;
ngx_uint_t csidx;
uint32_t delta;
ngx_rtmp_live_chunk_stream_t *cs, *acs;
ngx_rtmp_live_chunk_stream_t *cs;
#ifdef NGX_DEBUG
const char *type_s;
@ -447,7 +497,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
return NGX_OK;
}
if ((ctx->flags & NGX_RTMP_LIVE_PUBLISHING) == 0) {
if (ctx->publishing == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", type_s);
return NGX_OK;
@ -472,7 +522,6 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
cs = &ctx->cs[csidx];
acs = &ctx->cs[1 - csidx];
ngx_memzero(&ch, sizeof(ch));
@ -483,11 +532,8 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
lh = ch;
if (cs->active || cs->secondary) {
if (cs->active) {
lh.timestamp = cs->timestamp;
} else if (!acs->active) {
acs->secondary = 1;
acs->timestamp = h->timestamp;
}
cs->active = 1;
@ -567,8 +613,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sync %s dropped=%uD", type_s, cs->dropped);
/*TODO send empty delta packet */
cs->active = 0;
cs->dropped = 0;
}
@ -675,6 +720,7 @@ static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
@ -683,11 +729,24 @@ ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: publish: name='%s' type='%s'",
v->name, v->type);
"live: publish: name='%s' type='%s'",
v->name, v->type);
/* join stream as publisher */
ngx_rtmp_live_join(s, v->name, NGX_RTMP_LIVE_PUBLISHING);
ngx_rtmp_live_join(s, v->name, 1);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL || !ctx->publishing) {
goto next;
}
ctx->silent = v->silent;
if (!ctx->silent) {
ngx_rtmp_send_status(s, "NetStream.Publish.Start",
"status", "Start publishing");
}
next:
return next_publish(s, v);
@ -698,6 +757,7 @@ static ngx_int_t
ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_live_ctx_t *ctx;
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
@ -710,9 +770,17 @@ ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
v->name, (uint32_t)v->start,
(uint32_t)v->duration, (uint32_t)v->reset);
/* join stream as player */
/* join stream as subscriber */
ngx_rtmp_live_join(s, v->name, 0);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (ctx == NULL) {
goto next;
}
ctx->silent = v->silent;
next:
return next_play(s, v);
}

View file

@ -13,17 +13,12 @@
#include "ngx_rtmp_streams.h"
/* session flags */
#define NGX_RTMP_LIVE_PUBLISHING 0x01
typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t;
typedef struct ngx_rtmp_live_stream_s ngx_rtmp_live_stream_t;
typedef struct {
unsigned active:1;
unsigned secondary:1;
uint32_t timestamp;
uint32_t csid;
uint32_t dropped;
@ -34,10 +29,12 @@ struct ngx_rtmp_live_ctx_s {
ngx_rtmp_session_t *session;
ngx_rtmp_live_stream_t *stream;
ngx_rtmp_live_ctx_t *next;
ngx_uint_t flags;
ngx_uint_t ndropped;
ngx_rtmp_live_chunk_stream_t cs[2];
ngx_uint_t meta_version;
unsigned active:1;
unsigned publishing:1;
unsigned silent:1;
};
@ -45,10 +42,11 @@ struct ngx_rtmp_live_stream_s {
u_char name[NGX_RTMP_MAX_NAME];
ngx_rtmp_live_stream_t *next;
ngx_rtmp_live_ctx_t *ctx;
ngx_uint_t flags;
ngx_rtmp_bandwidth_t bw_in;
ngx_rtmp_bandwidth_t bw_out;
ngx_msec_t epoch;
unsigned active:1;
unsigned publishing:1;
};

View file

@ -188,7 +188,7 @@ ngx_rtmp_play_send(ngx_event_t *e)
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"play: send done");
ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID);
ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID);
ngx_rtmp_send_play_status(s, "NetStream.Play.Complete", "status", ts, 0);
@ -349,6 +349,11 @@ ngx_rtmp_play_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
if (ctx->file.fd != NGX_INVALID_FILE) {
ngx_close_file(ctx->file.fd);
ctx->file.fd = NGX_INVALID_FILE;
ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID);
ngx_rtmp_send_status(s, "NetStream.Play.Stop", "status",
"Stop video on demand");
}
next:
@ -366,8 +371,22 @@ ngx_rtmp_play_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v)
goto next;
}
if (ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID) != NGX_OK) {
return NGX_ERROR;
}
ngx_rtmp_play_do_seek(s, v->offset);
if (ngx_rtmp_send_status(s, "NetStream.Seek.Notify", "status", "Seeking")
!= NGX_OK)
{
return NGX_ERROR;
}
if (ngx_rtmp_send_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) {
return NGX_ERROR;
}
next:
return next_seek(s, v);
}
@ -389,8 +408,23 @@ ngx_rtmp_play_pause(ngx_rtmp_session_t *s, ngx_rtmp_pause_t *v)
(ngx_int_t) v->pause, v->position);
if (v->pause) {
if (ngx_rtmp_send_status(s, "NetStream.Pause.Notify", "status",
"Paused video on demand")
!= NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_play_do_stop(s);
} else {
if (ngx_rtmp_send_status(s, "NetStream.Unpause.Notify", "status",
"Unpaused video on demand")
!= NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_play_do_start(s); /*TODO: v->position? */
}
@ -563,12 +597,23 @@ ngx_rtmp_play_open(ngx_rtmp_session_t *s, double start)
return NGX_ERROR;
}
if (ngx_rtmp_send_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_rtmp_send_status(s, "NetStream.Play.Start", "status",
"Start video on demand")
!= NGX_OK)
{
return NGX_ERROR;
}
e = &ctx->send_evt;
e->data = s;
e->handler = ngx_rtmp_play_send;
e->log = s->connection->log;
ngx_rtmp_send_user_recorded(s, 1);
ngx_rtmp_send_recorded(s, 1);
if (ngx_rtmp_play_do_init(s) != NGX_OK) {
return NGX_ERROR;

View file

@ -113,10 +113,14 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
switch(evt) {
case NGX_RTMP_USER_STREAM_BEGIN:
/* use =val as stream id which started */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"stream begin msid=%uD", val);
break;
case NGX_RTMP_USER_STREAM_EOF:
/* use =val as stream id which is over */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"stream eof msid=%uD", val);
break;
case NGX_RTMP_USER_STREAM_DRY:
@ -144,7 +148,7 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
break;
case NGX_RTMP_USER_PING_REQUEST:
ngx_rtmp_send_user_ping_response(s, val);
ngx_rtmp_send_ping_response(s, val);
break;
case NGX_RTMP_USER_PING_RESPONSE:

View file

@ -952,7 +952,7 @@ ngx_rtmp_relay_send_play(ngx_rtmp_session_t *s)
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
|| ngx_rtmp_send_user_set_buflen(s, NGX_RTMP_RELAY_MSID,
|| ngx_rtmp_send_set_buflen(s, NGX_RTMP_RELAY_MSID,
racf->buflen) != NGX_OK
? NGX_ERROR
: NGX_OK;

View file

@ -13,7 +13,6 @@
ngx_chain_t *__l; \
ngx_buf_t *__b; \
ngx_rtmp_core_srv_conf_t *__cscf; \
ngx_int_t rc; \
\
__cscf = ngx_rtmp_get_module_srv_conf( \
s, ngx_rtmp_core_module); \
@ -22,7 +21,7 @@
__h.csid = 2; \
__l = ngx_rtmp_alloc_shared_buf(__cscf); \
if (__l == NULL) { \
return NGX_ERROR; \
return NULL; \
} \
__b = __l->buf;
@ -42,17 +41,36 @@
#define NGX_RTMP_USER_END(s) \
ngx_rtmp_prepare_message(s, &__h, NULL, __l); \
rc = ngx_rtmp_send_message(s, __l, 0); \
ngx_rtmp_free_shared_chain(__cscf, __l); \
return __l;
static ngx_int_t
ngx_rtmp_send_shared_packet(ngx_rtmp_session_t *s, ngx_chain_t *cl)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_int_t rc;
if (cl == NULL) {
return NGX_ERROR;
}
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
rc = ngx_rtmp_send_message(s, cl, 0);
ngx_rtmp_free_shared_chain(cscf, cl);
return rc;
}
/* Protocol control messages */
ngx_int_t
ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
ngx_chain_t *
ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"send chunk_size=%uD", chunk_size);
"chunk_size=%uD", chunk_size);
NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE);
@ -63,8 +81,19 @@ ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
ngx_int_t
ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid)
ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_chunk_size(s, chunk_size));
}
ngx_chain_t *
ngx_rtmp_create_abort(ngx_rtmp_session_t *s, uint32_t csid)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: abort csid=%uD", csid);
NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE);
NGX_RTMP_USER_OUT4(csid);
@ -74,10 +103,18 @@ ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid)
ngx_int_t
ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq)
ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_abort(s, csid));
}
ngx_chain_t *
ngx_rtmp_create_ack(ngx_rtmp_session_t *s, uint32_t seq)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"send ack seq=%uD", seq);
"create: ack seq=%uD", seq);
NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK);
@ -88,10 +125,18 @@ ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq)
ngx_int_t
ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_ack(s, seq));
}
ngx_chain_t *
ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"send ack_size=%uD", ack_size);
"create: ack_size=%uD", ack_size);
NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE);
@ -102,12 +147,20 @@ ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
ngx_int_t
ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
uint8_t limit_type)
ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_ack_size(s, ack_size));
}
ngx_chain_t *
ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
uint8_t limit_type)
{
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"send bandwidth ack_size=%uD limit=%d",
ack_size, (int)limit_type);
"create: bandwidth ack_size=%uD limit=%d",
ack_size, (int)limit_type);
NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH);
@ -118,10 +171,23 @@ ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
}
/* User control messages */
ngx_int_t
ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
uint8_t limit_type)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_bandwidth(s, ack_size, limit_type));
}
/* User control messages */
ngx_chain_t *
ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: stream_begin msid=%uD", msid);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN);
NGX_RTMP_USER_OUT4(msid);
@ -131,8 +197,19 @@ ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
ngx_int_t
ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_stream_begin(s, msid));
}
ngx_chain_t *
ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: stream_end msid=%uD", msid);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF);
NGX_RTMP_USER_OUT4(msid);
@ -142,8 +219,19 @@ ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
ngx_int_t
ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, uint32_t msid)
ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_stream_eof(s, msid));
}
ngx_chain_t *
ngx_rtmp_create_stream_dry(ngx_rtmp_session_t *s, uint32_t msid)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: stream_dry msid=%uD", msid);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_DRY);
NGX_RTMP_USER_OUT4(msid);
@ -153,9 +241,21 @@ ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, uint32_t msid)
ngx_int_t
ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, uint32_t msid,
uint32_t buflen_msec)
ngx_rtmp_send_stream_dry(ngx_rtmp_session_t *s, uint32_t msid)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_stream_dry(s, msid));
}
ngx_chain_t *
ngx_rtmp_create_set_buflen(ngx_rtmp_session_t *s, uint32_t msid,
uint32_t buflen_msec)
{
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: set_buflen msid=%uD buflen=%uD",
msid, buflen_msec);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_SET_BUFLEN);
NGX_RTMP_USER_OUT4(msid);
@ -166,8 +266,20 @@ ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, uint32_t msid,
ngx_int_t
ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, uint32_t msid)
ngx_rtmp_send_set_buflen(ngx_rtmp_session_t *s, uint32_t msid,
uint32_t buflen_msec)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_set_buflen(s, msid, buflen_msec));
}
ngx_chain_t *
ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: recorded msid=%uD", msid);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED);
NGX_RTMP_USER_OUT4(msid);
@ -177,8 +289,19 @@ ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, uint32_t msid)
ngx_int_t
ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp)
ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, uint32_t msid)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_recorded(s, msid));
}
ngx_chain_t *
ngx_rtmp_create_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: ping_request timestamp=%uD", timestamp);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_REQUEST);
NGX_RTMP_USER_OUT4(timestamp);
@ -188,8 +311,19 @@ ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp)
ngx_int_t
ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp)
ngx_rtmp_send_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_ping_request(s, timestamp));
}
ngx_chain_t *
ngx_rtmp_create_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp)
{
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: ping_response timestamp=%uD", timestamp);
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_RESPONSE);
NGX_RTMP_USER_OUT4(timestamp);
@ -199,20 +333,10 @@ ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp)
ngx_int_t
ngx_rtmp_send_user_unknown(ngx_rtmp_session_t *s, uint32_t timestamp)
ngx_rtmp_send_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp)
{
static uint32_t zero;
static uint32_t one = 1;
uint32_t val;
NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_UNKNOWN);
NGX_RTMP_USER_OUT4(zero);
NGX_RTMP_USER_OUT4(one);
val = timestamp & 0x7fffffff;
NGX_RTMP_USER_OUT4(val);
NGX_RTMP_USER_END(s);
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_ping_response(s, timestamp));
}
@ -228,8 +352,8 @@ ngx_rtmp_alloc_amf_buf(void *arg)
/* NOTE: this function does not free shared bufs on error */
ngx_int_t
ngx_rtmp_append_amf(ngx_rtmp_session_t *s,
ngx_chain_t **first, ngx_chain_t **last,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
ngx_chain_t **first, ngx_chain_t **last,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
ngx_rtmp_amf_ctx_t act;
ngx_rtmp_core_srv_conf_t *cscf;
@ -264,35 +388,48 @@ ngx_rtmp_append_amf(ngx_rtmp_session_t *s,
}
ngx_int_t
ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
ngx_chain_t *
ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
ngx_chain_t *first;
ngx_int_t rc;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: amf nelts=%ui", nelts);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
first = NULL;
rc = ngx_rtmp_append_amf(s, &first, NULL, elts, nelts);
if (rc != NGX_OK || first == NULL) {
goto done;
if (rc != NGX_OK && first) {
ngx_rtmp_free_shared_chain(cscf, first);
first = NULL;
}
ngx_rtmp_prepare_message(s, h, NULL, first);
if (first) {
ngx_rtmp_prepare_message(s, h, NULL, first);
}
rc = ngx_rtmp_send_message(s, first, 0);
done:
ngx_rtmp_free_shared_chain(cscf, first);
return rc;
return first;
}
ngx_int_t
ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_amf(s, h, elts, nelts));
}
ngx_chain_t *
ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, char* level,
char *desc)
{
ngx_rtmp_header_t h;
static double trans;
@ -332,6 +469,9 @@ ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
sizeof(out_inf) },
};
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: status code='%s' level='%s' desc='%s'",
code, level, desc);
out_inf[0].data = code;
out_inf[1].data = level;
@ -343,14 +483,22 @@ ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
return ngx_rtmp_create_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
ngx_int_t
ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level,
ngx_uint_t duration, ngx_uint_t bytes)
ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_status(s, code, level, desc));
}
ngx_chain_t *
ngx_rtmp_create_play_status(ngx_rtmp_session_t *s, char *code, char* level,
ngx_uint_t duration, ngx_uint_t bytes)
{
ngx_rtmp_header_t h;
static double dduration;
@ -387,6 +535,10 @@ ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level,
sizeof(out_inf) },
};
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"create: play_status code='%s' level='%s' "
"duration=%ui bytes=%ui",
code, level, duration, bytes);
out_inf[0].data = code;
out_inf[1].data = level;
@ -401,6 +553,56 @@ ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level,
h.msid = NGX_RTMP_MSID;
h.timestamp = duration;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
return ngx_rtmp_create_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
ngx_int_t
ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level,
ngx_uint_t duration, ngx_uint_t bytes)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_play_status(s, code, level, duration, bytes));
}
ngx_chain_t *
ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s)
{
ngx_rtmp_header_t h;
static int access = 1;
static ngx_rtmp_amf_elt_t access_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"|RtmpSampleAccess", 0 },
{ NGX_RTMP_AMF_BOOLEAN,
ngx_null_string,
&access, 0 },
{ NGX_RTMP_AMF_BOOLEAN,
ngx_null_string,
&access, 0 },
};
memset(&h, 0, sizeof(h));
h.type = NGX_RTMP_MSG_AMF_META;
h.csid = NGX_RTMP_CSID_AMF;
h.msid = NGX_RTMP_MSID;
return ngx_rtmp_create_amf(s, &h, access_elts,
sizeof(access_elts) / sizeof(access_elts[0]));
}
ngx_int_t
ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s)
{
return ngx_rtmp_send_shared_packet(s,
ngx_rtmp_create_sample_access(s));
}

View file

@ -335,13 +335,17 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll,
NGX_RTMP_STAT_L("</swfurl>");
}
if (ctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
if (ctx->publishing) {
NGX_RTMP_STAT_L("<publishing/>");
}
if (ctx->active) {
NGX_RTMP_STAT_L("<active/>");
}
NGX_RTMP_STAT_L("</client>\r\n");
}
if (ctx->flags & NGX_RTMP_LIVE_PUBLISHING) {
if (ctx->publishing) {
publishing = 1;
codec = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
}