diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 95f87b2..99498e8 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -462,6 +462,17 @@ ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, #define NGX_RTMP_LIMIT_DYNAMIC 2 /* Protocol control messages */ +ngx_chain_t * ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, + uint32_t chunk_size); +ngx_chain_t * ngx_rtmp_create_abort(ngx_rtmp_session_t *s, + uint32_t csid); +ngx_chain_t * ngx_rtmp_create_ack(ngx_rtmp_session_t *s, + uint32_t seq); +ngx_chain_t * ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, + uint32_t ack_size); +ngx_chain_t * ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, + uint32_t ack_size, uint8_t limit_type); + ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size); ngx_int_t ngx_rtmp_send_abort(ngx_rtmp_session_t *s, @@ -474,37 +485,60 @@ ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, uint8_t limit_type); /* User control messages */ -ngx_int_t ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid); -ngx_int_t ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid); -ngx_int_t ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_stream_dry(ngx_rtmp_session_t *s, uint32_t msid); -ngx_int_t ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, uint32_t buflen_msec); -ngx_int_t ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid); -ngx_int_t ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp); -ngx_int_t ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, +ngx_chain_t * ngx_rtmp_create_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp); -ngx_int_t ngx_rtmp_send_user_unknown(ngx_rtmp_session_t *s, + +ngx_int_t ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, + uint32_t msid); +ngx_int_t ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, + uint32_t msid); +ngx_int_t ngx_rtmp_send_stream_dry(ngx_rtmp_session_t *s, + uint32_t msid); +ngx_int_t ngx_rtmp_send_set_buflen(ngx_rtmp_session_t *s, + uint32_t msid, uint32_t buflen_msec); +ngx_int_t ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, + uint32_t msid); +ngx_int_t ngx_rtmp_send_ping_request(ngx_rtmp_session_t *s, + uint32_t timestamp); +ngx_int_t ngx_rtmp_send_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp); /* AMF sender/receiver */ ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s, ngx_chain_t **first, ngx_chain_t **last, ngx_rtmp_amf_elt_t *elts, size_t nelts); -ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_rtmp_amf_elt_t *elts, size_t nelts); ngx_int_t ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in, ngx_rtmp_amf_elt_t *elts, size_t nelts); +ngx_chain_t * ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_rtmp_amf_elt_t *elts, size_t nelts); +ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_rtmp_amf_elt_t *elts, size_t nelts); + /* AMF status sender */ +ngx_chain_t * ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, + char* level, char *desc); +ngx_chain_t * ngx_rtmp_create_play_status(ngx_rtmp_session_t *s, char *code, + char* level, ngx_uint_t duration, ngx_uint_t bytes); +ngx_chain_t * ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s); + ngx_int_t ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc); ngx_int_t ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, ngx_uint_t duration, ngx_uint_t bytes); +ngx_int_t ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s); /* Frame types */ diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index db0cd86..224942f 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -14,15 +14,8 @@ ngx_rtmp_connect_pt ngx_rtmp_connect; ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; ngx_rtmp_close_stream_pt ngx_rtmp_close_stream; ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; - ngx_rtmp_publish_pt ngx_rtmp_publish; -ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish; -ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish; - ngx_rtmp_play_pt ngx_rtmp_play; -ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe; -ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe; - ngx_rtmp_seek_pt ngx_rtmp_seek; ngx_rtmp_pause_pt ngx_rtmp_pause; @@ -124,9 +117,7 @@ ngx_rtmp_cmd_connect_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, v.app[len - 1] = 0; } - return ngx_rtmp_connect - ? ngx_rtmp_connect(s, &v) - : NGX_OK; + return ngx_rtmp_connect(s, &v); } @@ -250,27 +241,25 @@ ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) if (s->app_conf == NULL) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "connect: application not found: '%s'", v->app); + "connect: application not found: '%s'", v->app); return NGX_ERROR; } object_encoding = v->object_encoding; - /* send all replies */ - return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK - || ngx_rtmp_send_bandwidth(s, cscf->ack_window, - NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK - || ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK - || ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK - ? NGX_ERROR - : NGX_OK; + return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK || + ngx_rtmp_send_bandwidth(s, cscf->ack_window, + NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK || + ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK || + ngx_rtmp_send_amf(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) + != NGX_OK ? NGX_ERROR : NGX_OK; } static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) + ngx_chain_t *in) { static ngx_rtmp_create_stream_t v; @@ -287,9 +276,9 @@ ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_ERROR; } - return ngx_rtmp_create_stream - ? ngx_rtmp_create_stream(s, &v) - : NGX_OK; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream"); + + return ngx_rtmp_create_stream(s, &v); } @@ -324,23 +313,19 @@ ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) stream = NGX_RTMP_MSID; ngx_memzero(&h, sizeof(h)); + h.csid = NGX_RTMP_CSID_AMF_INI; h.type = NGX_RTMP_MSG_AMF_CMD; - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "createStream"); - - /* send result with standard stream */ return ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK - ? NGX_DONE - : NGX_ERROR; + sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ? + NGX_DONE : NGX_ERROR; } static ngx_int_t ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) + ngx_chain_t *in) { static ngx_rtmp_close_stream_t v; @@ -352,34 +337,27 @@ ngx_rtmp_cmd_close_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, }; if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) + sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "closeStream"); + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "closeStream"); - return ngx_rtmp_close_stream - ? ngx_rtmp_close_stream(s, &v) - : NGX_OK; + return ngx_rtmp_close_stream(s, &v); } static ngx_int_t ngx_rtmp_cmd_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) { - ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID); - - /* Whatever happens return OK - * since we should be careful with destruction */ return NGX_OK; } static ngx_int_t ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) + ngx_chain_t *in) { static ngx_rtmp_delete_stream_t v; @@ -399,14 +377,12 @@ ngx_rtmp_cmd_delete_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, }; if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) + sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - return ngx_rtmp_delete_stream - ? ngx_rtmp_delete_stream(s, &v) - : NGX_OK; + return ngx_rtmp_delete_stream(s, &v); } @@ -415,14 +391,11 @@ ngx_rtmp_cmd_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) { ngx_rtmp_close_stream_t cv; - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "deleteStream"); + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "deleteStream"); - /* chain close_stream */ cv.stream = 0; - return ngx_rtmp_close_stream - ? ngx_rtmp_close_stream(s, &cv) - : NGX_OK; + + return ngx_rtmp_close_stream(s, &cv); } @@ -470,186 +443,35 @@ ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_memzero(&v, sizeof(v)); - /* parse input */ if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) + sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_rtmp_cmd_fill_args(v.name, v.args); - return ngx_rtmp_publish - ? ngx_rtmp_publish(s, &v) - : NGX_OK; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "publish: name='%s' args='%s' type=%s silent=%d", + v.name, v.args, v.type, v.silent); + + return ngx_rtmp_publish(s, &v); } static ngx_int_t ngx_rtmp_cmd_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { - ngx_rtmp_header_t h; - - static double trans; - - static ngx_rtmp_amf_elt_t out_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Publish.Start", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "Publish succeeded.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onStatus", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out_inf, sizeof(out_inf) }, - }; - - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "publish: name='%s' args='%s' type=%s silent=%d", - v->name, v->args, v->type, v->silent); - - if (v->silent) { - return NGX_OK; - } - - /* send onStatus reply */ - memset(&h, 0, sizeof(h)); - h.type = NGX_RTMP_MSG_AMF_CMD; - h.csid = NGX_RTMP_CSID_AMF; - h.msid = NGX_RTMP_MSID; - - if (ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - return NGX_OK; } - -static ngx_int_t -ngx_rtmp_cmd_fcpublish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - static ngx_rtmp_fcpublish_t v; - - static ngx_rtmp_amf_elt_t in_elts[] = { - - /* transaction is always 0 */ - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - &v.name, sizeof(v.name) }, - }; - - ngx_memzero(&v, sizeof(v)); - - /* parse input */ - if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - - return ngx_rtmp_fcpublish - ? ngx_rtmp_fcpublish(s, &v) - : NGX_OK; -} - - -static ngx_int_t -ngx_rtmp_cmd_fcpublish(ngx_rtmp_session_t *s, ngx_rtmp_fcpublish_t *v) -{ - ngx_rtmp_header_t h; - - static double trans; - - static ngx_rtmp_amf_elt_t out_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Publish.Start", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "FCPublish succeeded.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onFCPublish", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out_inf, sizeof(out_inf) }, - }; - - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "fcpublish: name='%s'", v->name); - - /* send onFCPublish reply */ - memset(&h, 0, sizeof(h)); - h.type = NGX_RTMP_MSG_AMF_CMD; - h.csid = NGX_RTMP_CSID_AMF; - h.msid = NGX_RTMP_MSID; - - if (ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - return NGX_OK; -} - - static ngx_int_t ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { static ngx_rtmp_play_t v; - static ngx_rtmp_amf_elt_t in_elts[] = { + static ngx_rtmp_amf_elt_t in_elts[] = { /* transaction is always 0 */ { NGX_RTMP_AMF_NUMBER, @@ -679,279 +501,28 @@ ngx_rtmp_cmd_play_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_memzero(&v, sizeof(v)); - /* parse input */ if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) + sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } ngx_rtmp_cmd_fill_args(v.name, v.args); - return ngx_rtmp_play - ? ngx_rtmp_play(s, &v) - : NGX_OK; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "play: name='%s' args='%s' start=%i duration=%i " + "reset=%i silent=%i", + v.name, v.args, (ngx_int_t) v.start, + (ngx_int_t) v.duration, (ngx_int_t) v.reset, + (ngx_int_t) v.silent); + + return ngx_rtmp_play(s, &v); } static ngx_int_t ngx_rtmp_cmd_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { - ngx_rtmp_header_t h; - - static double trans; - static int access = 1; - - static ngx_rtmp_amf_elt_t out_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Play.Start", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "Playback started.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onStatus", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out_inf, sizeof(out_inf) }, - }; - - static ngx_rtmp_amf_elt_t out2_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Play.Start", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "Started playing.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out2_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onStatus", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out2_inf, - sizeof(out2_inf) }, - }; - - static ngx_rtmp_amf_elt_t out3_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "|RtmpSampleAccess", 0 }, - - { NGX_RTMP_AMF_BOOLEAN, - ngx_null_string, - &access, 0 }, - - { NGX_RTMP_AMF_BOOLEAN, - ngx_null_string, - &access, 0 }, - }; - - static ngx_rtmp_amf_elt_t out4_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Data.Start", 0 }, - }; - - static ngx_rtmp_amf_elt_t out4_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onStatus", 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out4_inf, sizeof(out4_inf) }, - }; - - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "play name='%s' args='%s' start=%i duration=%i " - "reset=%i silent=%i", - v->name, v->args, (ngx_int_t) v->start, - (ngx_int_t) v->duration, (ngx_int_t) v->reset, - (ngx_int_t) v->silent); - - if (v->silent) { - return NGX_OK; - } - - /* send onStatus reply */ - memset(&h, 0, sizeof(h)); - h.type = NGX_RTMP_MSG_AMF_CMD; - h.csid = NGX_RTMP_CSID_AMF; - h.msid = NGX_RTMP_MSID; - - /* - if (ngx_rtmp_send_user_recorded(s, NGX_RTMP_MSID) != NGX_OK) { - return NGX_ERROR; - }*/ - - if (ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) { - return NGX_ERROR; - } - - if (ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - /* send sample access meta message FIXME */ - if (ngx_rtmp_send_amf(s, &h, out2_elts, - sizeof(out2_elts) / sizeof(out2_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - /* send data start meta message */ - h.type = NGX_RTMP_MSG_AMF_META; - if (ngx_rtmp_send_amf(s, &h, out3_elts, - sizeof(out3_elts) / sizeof(out3_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - if (ngx_rtmp_send_amf(s, &h, out4_elts, - sizeof(out4_elts) / sizeof(out4_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - - return NGX_OK; -} - - -static ngx_int_t -ngx_rtmp_cmd_fcsubscribe_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) -{ - static ngx_rtmp_fcsubscribe_t v; - - static ngx_rtmp_amf_elt_t in_elts[] = { - - /* transaction is always 0 */ - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - &v.name, sizeof(v.name) }, - - }; - - ngx_memzero(&v, sizeof(v)); - - /* parse input */ - if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - - return ngx_rtmp_fcsubscribe - ? ngx_rtmp_fcsubscribe(s, &v) - : NGX_OK; -} - - -static ngx_int_t -ngx_rtmp_cmd_fcsubscribe(ngx_rtmp_session_t *s, ngx_rtmp_fcsubscribe_t *v) -{ - ngx_rtmp_header_t h; - - static double trans; - - static ngx_rtmp_amf_elt_t out_inf[] = { - - { NGX_RTMP_AMF_STRING, - ngx_string("code"), - "NetStream.Play.Start", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("level"), - "status", 0 }, - - { NGX_RTMP_AMF_STRING, - ngx_string("description"), - "Started playing.", 0 }, - }; - - static ngx_rtmp_amf_elt_t out_elts[] = { - - { NGX_RTMP_AMF_STRING, - ngx_null_string, - "onFCSubscribe", 0 }, - - { NGX_RTMP_AMF_NUMBER, - ngx_null_string, - &trans, 0 }, - - { NGX_RTMP_AMF_NULL, - ngx_null_string, - NULL, 0 }, - - { NGX_RTMP_AMF_OBJECT, - ngx_null_string, - out_inf, - sizeof(out_inf) }, - }; - - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "fcsubscribe: name='%s'", v->name); - - /* send onFCSubscribe reply */ - memset(&h, 0, sizeof(h)); - h.type = NGX_RTMP_MSG_AMF_CMD; - h.csid = NGX_RTMP_CSID_AMF; - h.msid = NGX_RTMP_MSID; - - if (ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) - { - return NGX_ERROR; - } - return NGX_OK; } @@ -983,7 +554,6 @@ ngx_rtmp_cmd_pause_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_memzero(&v, sizeof(v)); - /* parse input */ if (ngx_rtmp_receive_amf(s, in, in_elts, sizeof(in_elts) / sizeof(in_elts[0]))) { @@ -991,42 +561,27 @@ ngx_rtmp_cmd_pause_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "cmd: pause pause=%i position=%i", - (ngx_int_t)v.pause, (ngx_int_t)v.position); + "pause: pause=%i position=%i", + (ngx_int_t) v.pause, (ngx_int_t) v.position); - return ngx_rtmp_pause - ? ngx_rtmp_pause(s, &v) - : NGX_OK; + return ngx_rtmp_pause(s, &v); } static ngx_int_t ngx_rtmp_cmd_pause(ngx_rtmp_session_t *s, ngx_rtmp_pause_t *v) { - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "pause: state='%i' position=%i", - v->pause, (ngx_int_t) v->position); - - if (v->pause) { - return ngx_rtmp_send_status(s, "NetStream.Pause.Notify", "status", - "Paused"); - } else { - return ngx_rtmp_send_status(s, "NetStream.Unpause.Notify", "status", - "Unpaused"); - } + return NGX_OK; } static ngx_int_t ngx_rtmp_cmd_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_chain_t *in) + ngx_chain_t *in) { - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "disconnect"); + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "disconnect"); - return ngx_rtmp_delete_stream - ? ngx_rtmp_delete_stream(s, NULL) - : NGX_OK; + return ngx_rtmp_delete_stream(s, NULL); } @@ -1054,49 +609,33 @@ ngx_rtmp_cmd_seek_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_memzero(&v, sizeof(v)); - /* parse input */ if (ngx_rtmp_receive_amf(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) + sizeof(in_elts) / sizeof(in_elts[0]))) { return NGX_ERROR; } - return ngx_rtmp_seek - ? ngx_rtmp_seek(s, &v) - : NGX_OK; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "seek: offset=%i", (ngx_int_t) v.offset); + + return ngx_rtmp_seek(s, &v); } static ngx_int_t ngx_rtmp_cmd_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v) { - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "seek: offset=%i", (ngx_int_t) v->offset); - - return (ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID) != NGX_OK - || ngx_rtmp_send_user_stream_begin(s, NGX_RTMP_MSID) != NGX_OK - || ngx_rtmp_send_status(s, "NetStream.Seek.Notify", "status", - "Seeking")) - ? NGX_ERROR - : NGX_OK; + return NGX_OK; } static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = { - { ngx_string("connect"), ngx_rtmp_cmd_connect_init }, { ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init }, { ngx_string("closeStream"), ngx_rtmp_cmd_close_stream_init }, { ngx_string("deleteStream"), ngx_rtmp_cmd_delete_stream_init }, - { ngx_string("publish"), ngx_rtmp_cmd_publish_init }, - { ngx_string("fcpublish"), ngx_rtmp_cmd_fcpublish_init }, - /*{ ngx_string("fcunpublish"), ngx_rtmp_cmd_fcunpublish_init },*/ - { ngx_string("play"), ngx_rtmp_cmd_play_init }, - { ngx_string("fcsubscribe"), ngx_rtmp_cmd_fcsubscribe_init }, - /*{ ngx_string("fcunsubscribe"), ngx_rtmp_cmd_fcunsubscribe_init },*/ - { ngx_string("seek"), ngx_rtmp_cmd_seek_init }, { ngx_string("pause"), ngx_rtmp_cmd_pause_init }, { ngx_string("pauseraw"), ngx_rtmp_cmd_pause_init }, @@ -1116,35 +655,37 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf) /* redirect disconnects to deleteStream * to free client modules from registering * disconnect callback */ - h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]); - *h = ngx_rtmp_cmd_disconnect; - /* register AMF callbacks */ - ncalls = sizeof(ngx_rtmp_cmd_map) / sizeof(ngx_rtmp_cmd_map[0]); - ch = ngx_array_push_n(&cmcf->amf, ncalls); + h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]); if (h == NULL) { return NGX_ERROR; } + *h = ngx_rtmp_cmd_disconnect; + + /* register AMF callbacks */ + + ncalls = sizeof(ngx_rtmp_cmd_map) / sizeof(ngx_rtmp_cmd_map[0]); + + ch = ngx_array_push_n(&cmcf->amf, ncalls); + if (ch == NULL) { + return NGX_ERROR; + } + bh = ngx_rtmp_cmd_map; + for(n = 0; n < ncalls; ++n, ++ch, ++bh) { *ch = *bh; } /* set initial handlers */ + ngx_rtmp_connect = ngx_rtmp_cmd_connect; ngx_rtmp_create_stream = ngx_rtmp_cmd_create_stream; ngx_rtmp_close_stream = ngx_rtmp_cmd_close_stream; ngx_rtmp_delete_stream = ngx_rtmp_cmd_delete_stream; - ngx_rtmp_publish = ngx_rtmp_cmd_publish; - ngx_rtmp_fcpublish = ngx_rtmp_cmd_fcpublish; - /*ngx_rtmp_fcunpublish = ngx_rtmp_cmd_fcunpublish;*/ - ngx_rtmp_play = ngx_rtmp_cmd_play; - ngx_rtmp_fcsubscribe = ngx_rtmp_cmd_fcsubscribe; - /*ngx_rtmp_fcunsubscribe = ngx_rtmp_cmd_fcunsubsrcibe;*/ - ngx_rtmp_seek = ngx_rtmp_cmd_seek; ngx_rtmp_pause = ngx_rtmp_cmd_pause; diff --git a/ngx_rtmp_cmd_module.h b/ngx_rtmp_cmd_module.h index 61f5af5..724418d 100644 --- a/ngx_rtmp_cmd_module.h +++ b/ngx_rtmp_cmd_module.h @@ -58,16 +58,6 @@ typedef struct { } ngx_rtmp_publish_t; -typedef struct { - u_char name[NGX_RTMP_MAX_NAME]; -} ngx_rtmp_fcpublish_t; - - -typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunpublish_t; -typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcsubscribe_t; -typedef ngx_rtmp_fcpublish_t ngx_rtmp_fcunsubscribe_t; - - typedef struct { u_char name[NGX_RTMP_MAX_NAME]; u_char args[NGX_RTMP_MAX_ARGS]; @@ -100,17 +90,9 @@ typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s, typedef ngx_int_t (*ngx_rtmp_publish_pt)(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v); -typedef ngx_int_t (*ngx_rtmp_fcpublish_pt)(ngx_rtmp_session_t *s, - ngx_rtmp_fcpublish_t *v); -typedef ngx_int_t (*ngx_rtmp_fcunpublish_pt)(ngx_rtmp_session_t *s, - ngx_rtmp_fcunpublish_t *v); typedef ngx_int_t (*ngx_rtmp_play_pt)(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v); -typedef ngx_int_t (*ngx_rtmp_fcsubscribe_pt)(ngx_rtmp_session_t *s, - ngx_rtmp_fcsubscribe_t *v); -typedef ngx_int_t (*ngx_rtmp_fcunsubscribe_pt)(ngx_rtmp_session_t *s, - ngx_rtmp_fcunsubscribe_t *v); typedef ngx_int_t (*ngx_rtmp_seek_pt)(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v); @@ -122,15 +104,8 @@ extern ngx_rtmp_connect_pt ngx_rtmp_connect; extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; extern ngx_rtmp_close_stream_pt ngx_rtmp_close_stream; extern ngx_rtmp_delete_stream_pt ngx_rtmp_delete_stream; - extern ngx_rtmp_publish_pt ngx_rtmp_publish; -extern ngx_rtmp_fcpublish_pt ngx_rtmp_fcpublish; -extern ngx_rtmp_fcunpublish_pt ngx_rtmp_fcunpublish; - extern ngx_rtmp_play_pt ngx_rtmp_play; -extern ngx_rtmp_fcsubscribe_pt ngx_rtmp_fcsubscribe; -extern ngx_rtmp_fcunsubscribe_pt ngx_rtmp_fcunsubscribe; - extern ngx_rtmp_seek_pt ngx_rtmp_seek; extern ngx_rtmp_pause_pt ngx_rtmp_pause; diff --git a/ngx_rtmp_control_module.c b/ngx_rtmp_control_module.c index ff7bcd1..f76639e 100644 --- a/ngx_rtmp_control_module.c +++ b/ngx_rtmp_control_module.c @@ -252,7 +252,7 @@ ngx_rtmp_control_record(ngx_http_request_t *r, ngx_str_t *method) /* find publisher context */ for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { - if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) { + if (lctx->publishing) { break; } } @@ -358,7 +358,7 @@ ngx_rtmp_control_drop(ngx_http_request_t *r, ngx_str_t *method) ngx_strncmp(method->data, "publisher", method->len) == 0) { for (lctx = live.ls->ctx; lctx; lctx = lctx->next) { - if (lctx->flags & NGX_RTMP_LIVE_PUBLISHING) { + if (lctx->publishing) { ngx_rtmp_finalize_session(lctx->session); ++ndropped; break; diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index a5f9d5a..bbd759d 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -174,9 +174,7 @@ ngx_rtmp_ping(ngx_event_t *pev) ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "ping: schedule %Mms", cscf->ping_timeout); - if (ngx_rtmp_send_user_ping_request(s, (uint32_t)ngx_current_msec) - != NGX_OK) - { + if (ngx_rtmp_send_ping_request(s, (uint32_t)ngx_current_msec) != NGX_OK) { ngx_rtmp_finalize_session(s); return; } diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 29cd89b..08c04a1 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -198,8 +198,138 @@ ngx_rtmp_live_get_stream(ngx_rtmp_session_t *s, u_char *name, int create) static void -ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, - ngx_uint_t flags) +ngx_rtmp_live_set_status(ngx_rtmp_session_t *s, ngx_chain_t *control, + ngx_chain_t **status, size_t nstatus, + unsigned active) +{ + ngx_rtmp_live_ctx_t *ctx, *pctx; + ngx_chain_t **cl; + size_t n; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: set active=%ui", active); + + if (ctx->active == active) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: unchanged active=%ui", active); + return; + } + + ctx->active = active; + + if (ctx->publishing) { + + /* publisher */ + + ctx->stream->active = active; + + for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) { + if (pctx->publishing == 0) { + ngx_rtmp_live_set_status(pctx->session, control, status, + nstatus, active); + } + } + + return; + } + + /* subscriber */ + + if (control && ngx_rtmp_send_message(s, control, 0) != NGX_OK) { + ngx_rtmp_finalize_session(s); + return; + } + + if (!ctx->silent) { + cl = status; + + for (n = 0; n < nstatus; ++n, ++cl) { + if (*cl && ngx_rtmp_send_message(s, *cl, 0) != NGX_OK) { + ngx_rtmp_finalize_session(s); + return; + } + } + } + + ctx->cs[0].active = 0; + ctx->cs[0].dropped = 0; + + ctx->cs[1].active = 0; + ctx->cs[1].dropped = 0; +} + + +/*TODO: intercept stream begin/eof for publishers & call these functions */ + + +static void +ngx_rtmp_live_stream_begin(ngx_rtmp_session_t *s) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_chain_t *control; + ngx_chain_t *status[3]; + size_t n, nstatus; + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID); + + nstatus = 0; + + status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start", + "status", "Start live"); + status[nstatus++] = ngx_rtmp_create_sample_access(s); + /*status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.PublishNotify", + "status", "Start publishing"); +*/ + ngx_rtmp_live_set_status(s, control, status, nstatus, 1); + + if (control) { + ngx_rtmp_free_shared_chain(cscf, control); + } + + for (n = 0; n < nstatus; ++n) { + ngx_rtmp_free_shared_chain(cscf, status[n]); + } +} + + +static void +ngx_rtmp_live_stream_eof(ngx_rtmp_session_t *s) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_chain_t *control; + ngx_chain_t *status[3]; + size_t n, nstatus; + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + control = ngx_rtmp_create_stream_eof(s, NGX_RTMP_MSID); + + nstatus = 0; + + status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Stop", + "status", "Stop live"); + /*status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.UnpublishNotify", + "status", "Start publishing"); +*/ + + ngx_rtmp_live_set_status(s, control, status, nstatus, 0); + + if (control) { + ngx_rtmp_free_shared_chain(cscf, control); + } + + for (n = 0; n < nstatus; ++n) { + ngx_rtmp_free_shared_chain(cscf, status[n]); + } +} + + +static void +ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, unsigned publisher) { ngx_rtmp_live_ctx_t *ctx; ngx_rtmp_live_stream_t **stream; @@ -212,8 +342,8 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); if (ctx && ctx->stream) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, - 0, "live: already joined"); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "live: already joined"); return; } @@ -223,26 +353,31 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, } ngx_memzero(ctx, sizeof(*ctx)); + ctx->session = s; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: join '%s'", name); + "live: join '%s'", name); stream = ngx_rtmp_live_get_stream(s, name, 1); if (stream == NULL) { return; } - if (flags & NGX_RTMP_LIVE_PUBLISHING) { - if ((*stream)->flags & NGX_RTMP_LIVE_PUBLISHING) { + + if (publisher) { + if ((*stream)->publishing) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, - "live: already publishing"); + "live: already publishing"); return; } - (*stream)->flags |= NGX_RTMP_LIVE_PUBLISHING; + + (*stream)->publishing = 1; } + ctx->stream = *stream; - ctx->flags = flags; + ctx->publishing = publisher; ctx->next = (*stream)->ctx; + (*stream)->ctx = ctx; if (lacf->buflen) { @@ -251,93 +386,10 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name, ctx->cs[0].csid = NGX_RTMP_MSG_AUDIO; ctx->cs[1].csid = NGX_RTMP_MSG_VIDEO; -} - -static ngx_int_t -ngx_rtmp_live_sync_streams(ngx_rtmp_session_t *s) -{ - ngx_rtmp_core_srv_conf_t *cscf; - ngx_rtmp_live_app_conf_t *lacf; - ngx_rtmp_live_ctx_t *ctx, *pctx; - ngx_rtmp_session_t *ss; - ngx_rtmp_header_t ch, lh; - ngx_chain_t *pkt; - ngx_uint_t csidx; - - /* Stream synchronization: - * - * - both streams active - * advance lagging stream - * - * - one stream actvie - * add absolute frame for inactive stream - * (the active stream became active - * during current publisher session) - * - * - none active - * quit - */ - - lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); - - if (lacf->interleave) { - return NGX_OK; + if (ctx->publishing || ctx->stream->active) { + ngx_rtmp_live_stream_begin(s); } - - ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); - - if (ctx->cs[0].active == 0 && ctx->cs[1].active == 0) { - return NGX_OK; - } - - if (ctx->cs[0].active == 0) { - csidx = 0; - } else if (ctx->cs[1].active == 0) { - csidx = 1; - } else { - csidx = (ctx->cs[0].timestamp > ctx->cs[1].timestamp); - } - - ngx_memzero(&ch, sizeof(ch)); - - ch.msid = NGX_RTMP_MSID; - ch.type = csidx ? NGX_RTMP_MSG_VIDEO : NGX_RTMP_MSG_AUDIO; - ch.csid = ctx->cs[csidx].csid; - ch.timestamp = ctx->cs[1 - csidx].timestamp; - - lh = ch; - lh.timestamp = ctx->cs[csidx].timestamp; - - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - pkt = ngx_rtmp_alloc_shared_buf(cscf); - if (pkt == NULL) { - return NGX_ERROR; - } - - ngx_rtmp_prepare_message(s, &ch, ctx->cs[csidx].active ? &lh : NULL, pkt); - - for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) { - if (pctx->cs[1 - csidx].active == 0) { - continue; - } - - ss = pctx->session; - - if (ngx_rtmp_send_message(ss, pkt, 0) != NGX_OK) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, - "live: sync failed"); - ngx_rtmp_finalize_session(ss); - } - - pctx->cs[csidx].timestamp += ch.timestamp; - pctx->cs[csidx].active = 1; - } - - ngx_rtmp_free_shared_chain(cscf, pkt); - - return NGX_OK; } @@ -360,17 +412,15 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) if (ctx->stream == NULL) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: not joined "); + "live: not joined"); goto next; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "live: leave '%s'", ctx->stream->name); - if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING - && ctx->flags & NGX_RTMP_LIVE_PUBLISHING) - { - ctx->stream->flags &= ~NGX_RTMP_LIVE_PUBLISHING; + if (ctx->stream->publishing && ctx->publishing) { + ctx->stream->publishing = 0; } for (cctx = &ctx->stream->ctx; *cctx; cctx = &(*cctx)->next) { @@ -380,9 +430,8 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) } } - if (ngx_rtmp_live_sync_streams(s) != NGX_OK) { - ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, - "live: sync error"); + if (ctx->publishing || ctx->stream->active) { + ngx_rtmp_live_stream_eof(s); } if (ctx->stream->ctx) { @@ -391,11 +440,12 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: delete empty stream '%s'", ctx->stream->name); + "live: delete empty stream '%s'", + ctx->stream->name); stream = ngx_rtmp_live_get_stream(s, ctx->stream->name, 0); if (stream == NULL) { - return NGX_ERROR; + goto next; } *stream = (*stream)->next; @@ -426,7 +476,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_uint_t meta_version; ngx_uint_t csidx; uint32_t delta; - ngx_rtmp_live_chunk_stream_t *cs, *acs; + ngx_rtmp_live_chunk_stream_t *cs; #ifdef NGX_DEBUG const char *type_s; @@ -447,7 +497,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return NGX_OK; } - if ((ctx->flags & NGX_RTMP_LIVE_PUBLISHING) == 0) { + if (ctx->publishing == 0) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "live: %s from non-publisher", type_s); return NGX_OK; @@ -472,7 +522,6 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO); cs = &ctx->cs[csidx]; - acs = &ctx->cs[1 - csidx]; ngx_memzero(&ch, sizeof(ch)); @@ -483,11 +532,8 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, lh = ch; - if (cs->active || cs->secondary) { + if (cs->active) { lh.timestamp = cs->timestamp; - } else if (!acs->active) { - acs->secondary = 1; - acs->timestamp = h->timestamp; } cs->active = 1; @@ -567,8 +613,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0, "live: sync %s dropped=%uD", type_s, cs->dropped); - /*TODO send empty delta packet */ - + cs->active = 0; cs->dropped = 0; } @@ -675,6 +720,7 @@ static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) { ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_live_ctx_t *ctx; lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); @@ -683,11 +729,24 @@ ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) } ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "live: publish: name='%s' type='%s'", - v->name, v->type); + "live: publish: name='%s' type='%s'", + v->name, v->type); /* join stream as publisher */ - ngx_rtmp_live_join(s, v->name, NGX_RTMP_LIVE_PUBLISHING); + + ngx_rtmp_live_join(s, v->name, 1); + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + if (ctx == NULL || !ctx->publishing) { + goto next; + } + + ctx->silent = v->silent; + + if (!ctx->silent) { + ngx_rtmp_send_status(s, "NetStream.Publish.Start", + "status", "Start publishing"); + } next: return next_publish(s, v); @@ -698,6 +757,7 @@ static ngx_int_t ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) { ngx_rtmp_live_app_conf_t *lacf; + ngx_rtmp_live_ctx_t *ctx; lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module); @@ -710,9 +770,17 @@ ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) v->name, (uint32_t)v->start, (uint32_t)v->duration, (uint32_t)v->reset); - /* join stream as player */ + /* join stream as subscriber */ + ngx_rtmp_live_join(s, v->name, 0); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module); + if (ctx == NULL) { + goto next; + } + + ctx->silent = v->silent; + next: return next_play(s, v); } diff --git a/ngx_rtmp_live_module.h b/ngx_rtmp_live_module.h index 197452c..f8352ad 100644 --- a/ngx_rtmp_live_module.h +++ b/ngx_rtmp_live_module.h @@ -13,17 +13,12 @@ #include "ngx_rtmp_streams.h" -/* session flags */ -#define NGX_RTMP_LIVE_PUBLISHING 0x01 - - typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t; typedef struct ngx_rtmp_live_stream_s ngx_rtmp_live_stream_t; typedef struct { unsigned active:1; - unsigned secondary:1; uint32_t timestamp; uint32_t csid; uint32_t dropped; @@ -34,10 +29,12 @@ struct ngx_rtmp_live_ctx_s { ngx_rtmp_session_t *session; ngx_rtmp_live_stream_t *stream; ngx_rtmp_live_ctx_t *next; - ngx_uint_t flags; ngx_uint_t ndropped; ngx_rtmp_live_chunk_stream_t cs[2]; ngx_uint_t meta_version; + unsigned active:1; + unsigned publishing:1; + unsigned silent:1; }; @@ -45,10 +42,11 @@ struct ngx_rtmp_live_stream_s { u_char name[NGX_RTMP_MAX_NAME]; ngx_rtmp_live_stream_t *next; ngx_rtmp_live_ctx_t *ctx; - ngx_uint_t flags; ngx_rtmp_bandwidth_t bw_in; ngx_rtmp_bandwidth_t bw_out; ngx_msec_t epoch; + unsigned active:1; + unsigned publishing:1; }; diff --git a/ngx_rtmp_play_module.c b/ngx_rtmp_play_module.c index 8b2961a..8e7573e 100644 --- a/ngx_rtmp_play_module.c +++ b/ngx_rtmp_play_module.c @@ -188,7 +188,7 @@ ngx_rtmp_play_send(ngx_event_t *e) ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "play: send done"); - ngx_rtmp_send_user_stream_eof(s, NGX_RTMP_MSID); + ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID); ngx_rtmp_send_play_status(s, "NetStream.Play.Complete", "status", ts, 0); @@ -349,6 +349,11 @@ ngx_rtmp_play_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v) if (ctx->file.fd != NGX_INVALID_FILE) { ngx_close_file(ctx->file.fd); ctx->file.fd = NGX_INVALID_FILE; + + ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID); + + ngx_rtmp_send_status(s, "NetStream.Play.Stop", "status", + "Stop video on demand"); } next: @@ -366,8 +371,22 @@ ngx_rtmp_play_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v) goto next; } + if (ngx_rtmp_send_stream_eof(s, NGX_RTMP_MSID) != NGX_OK) { + return NGX_ERROR; + } + ngx_rtmp_play_do_seek(s, v->offset); + if (ngx_rtmp_send_status(s, "NetStream.Seek.Notify", "status", "Seeking") + != NGX_OK) + { + return NGX_ERROR; + } + + if (ngx_rtmp_send_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) { + return NGX_ERROR; + } + next: return next_seek(s, v); } @@ -389,8 +408,23 @@ ngx_rtmp_play_pause(ngx_rtmp_session_t *s, ngx_rtmp_pause_t *v) (ngx_int_t) v->pause, v->position); if (v->pause) { + if (ngx_rtmp_send_status(s, "NetStream.Pause.Notify", "status", + "Paused video on demand") + != NGX_OK) + { + return NGX_ERROR; + } + ngx_rtmp_play_do_stop(s); + } else { + if (ngx_rtmp_send_status(s, "NetStream.Unpause.Notify", "status", + "Unpaused video on demand") + != NGX_OK) + { + return NGX_ERROR; + } + ngx_rtmp_play_do_start(s); /*TODO: v->position? */ } @@ -563,12 +597,23 @@ ngx_rtmp_play_open(ngx_rtmp_session_t *s, double start) return NGX_ERROR; } + if (ngx_rtmp_send_stream_begin(s, NGX_RTMP_MSID) != NGX_OK) { + return NGX_ERROR; + } + + if (ngx_rtmp_send_status(s, "NetStream.Play.Start", "status", + "Start video on demand") + != NGX_OK) + { + return NGX_ERROR; + } + e = &ctx->send_evt; e->data = s; e->handler = ngx_rtmp_play_send; e->log = s->connection->log; - ngx_rtmp_send_user_recorded(s, 1); + ngx_rtmp_send_recorded(s, 1); if (ngx_rtmp_play_do_init(s) != NGX_OK) { return NGX_ERROR; diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index 23c4ebd..db26ae9 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -113,10 +113,14 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, switch(evt) { case NGX_RTMP_USER_STREAM_BEGIN: /* use =val as stream id which started */ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "stream begin msid=%uD", val); break; case NGX_RTMP_USER_STREAM_EOF: /* use =val as stream id which is over */ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "stream eof msid=%uD", val); break; case NGX_RTMP_USER_STREAM_DRY: @@ -144,7 +148,7 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, break; case NGX_RTMP_USER_PING_REQUEST: - ngx_rtmp_send_user_ping_response(s, val); + ngx_rtmp_send_ping_response(s, val); break; case NGX_RTMP_USER_PING_RESPONSE: diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 405bac5..6f6073d 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -952,7 +952,7 @@ ngx_rtmp_relay_send_play(ngx_rtmp_session_t *s) return ngx_rtmp_send_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK - || ngx_rtmp_send_user_set_buflen(s, NGX_RTMP_RELAY_MSID, + || ngx_rtmp_send_set_buflen(s, NGX_RTMP_RELAY_MSID, racf->buflen) != NGX_OK ? NGX_ERROR : NGX_OK; diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index 261c405..c9ffc0b 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -13,7 +13,6 @@ ngx_chain_t *__l; \ ngx_buf_t *__b; \ ngx_rtmp_core_srv_conf_t *__cscf; \ - ngx_int_t rc; \ \ __cscf = ngx_rtmp_get_module_srv_conf( \ s, ngx_rtmp_core_module); \ @@ -22,7 +21,7 @@ __h.csid = 2; \ __l = ngx_rtmp_alloc_shared_buf(__cscf); \ if (__l == NULL) { \ - return NGX_ERROR; \ + return NULL; \ } \ __b = __l->buf; @@ -42,17 +41,36 @@ #define NGX_RTMP_USER_END(s) \ ngx_rtmp_prepare_message(s, &__h, NULL, __l); \ - rc = ngx_rtmp_send_message(s, __l, 0); \ - ngx_rtmp_free_shared_chain(__cscf, __l); \ + return __l; + + +static ngx_int_t +ngx_rtmp_send_shared_packet(ngx_rtmp_session_t *s, ngx_chain_t *cl) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_int_t rc; + + if (cl == NULL) { + return NGX_ERROR; + } + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + rc = ngx_rtmp_send_message(s, cl, 0); + + ngx_rtmp_free_shared_chain(cscf, cl); + return rc; +} /* Protocol control messages */ -ngx_int_t -ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) + +ngx_chain_t * +ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "send chunk_size=%uD", chunk_size); + "chunk_size=%uD", chunk_size); NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); @@ -63,8 +81,19 @@ ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) ngx_int_t -ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid) +ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_chunk_size(s, chunk_size)); +} + + +ngx_chain_t * +ngx_rtmp_create_abort(ngx_rtmp_session_t *s, uint32_t csid) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: abort csid=%uD", csid); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); NGX_RTMP_USER_OUT4(csid); @@ -74,10 +103,18 @@ ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid) ngx_int_t -ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq) +ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_abort(s, csid)); +} + + +ngx_chain_t * +ngx_rtmp_create_ack(ngx_rtmp_session_t *s, uint32_t seq) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "send ack seq=%uD", seq); + "create: ack seq=%uD", seq); NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK); @@ -88,10 +125,18 @@ ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq) ngx_int_t -ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) +ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_ack(s, seq)); +} + + +ngx_chain_t * +ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "send ack_size=%uD", ack_size); + "create: ack_size=%uD", ack_size); NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE); @@ -102,12 +147,20 @@ ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) ngx_int_t -ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, - uint8_t limit_type) +ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_ack_size(s, ack_size)); +} + + +ngx_chain_t * +ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, + uint8_t limit_type) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "send bandwidth ack_size=%uD limit=%d", - ack_size, (int)limit_type); + "create: bandwidth ack_size=%uD limit=%d", + ack_size, (int)limit_type); NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH); @@ -118,10 +171,23 @@ ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, } -/* User control messages */ ngx_int_t -ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) +ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, + uint8_t limit_type) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_bandwidth(s, ack_size, limit_type)); +} + + +/* User control messages */ + +ngx_chain_t * +ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: stream_begin msid=%uD", msid); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN); NGX_RTMP_USER_OUT4(msid); @@ -131,8 +197,19 @@ ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) ngx_int_t -ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) +ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_stream_begin(s, msid)); +} + + +ngx_chain_t * +ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: stream_end msid=%uD", msid); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF); NGX_RTMP_USER_OUT4(msid); @@ -142,8 +219,19 @@ ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) ngx_int_t -ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) +ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_stream_eof(s, msid)); +} + + +ngx_chain_t * +ngx_rtmp_create_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: stream_dry msid=%uD", msid); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_DRY); NGX_RTMP_USER_OUT4(msid); @@ -153,9 +241,21 @@ ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) ngx_int_t -ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, - uint32_t buflen_msec) +ngx_rtmp_send_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_stream_dry(s, msid)); +} + + +ngx_chain_t * +ngx_rtmp_create_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, + uint32_t buflen_msec) +{ + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: set_buflen msid=%uD buflen=%uD", + msid, buflen_msec); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_SET_BUFLEN); NGX_RTMP_USER_OUT4(msid); @@ -166,8 +266,20 @@ ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, ngx_int_t -ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, uint32_t msid) +ngx_rtmp_send_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, + uint32_t buflen_msec) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_set_buflen(s, msid, buflen_msec)); +} + + +ngx_chain_t * +ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: recorded msid=%uD", msid); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED); NGX_RTMP_USER_OUT4(msid); @@ -177,8 +289,19 @@ ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, uint32_t msid) ngx_int_t -ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) +ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, uint32_t msid) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_recorded(s, msid)); +} + + +ngx_chain_t * +ngx_rtmp_create_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: ping_request timestamp=%uD", timestamp); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_REQUEST); NGX_RTMP_USER_OUT4(timestamp); @@ -188,8 +311,19 @@ ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) ngx_int_t -ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) +ngx_rtmp_send_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) { + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_ping_request(s, timestamp)); +} + + +ngx_chain_t * +ngx_rtmp_create_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) +{ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: ping_response timestamp=%uD", timestamp); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_RESPONSE); NGX_RTMP_USER_OUT4(timestamp); @@ -199,20 +333,10 @@ ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) ngx_int_t -ngx_rtmp_send_user_unknown(ngx_rtmp_session_t *s, uint32_t timestamp) +ngx_rtmp_send_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) { - static uint32_t zero; - static uint32_t one = 1; - uint32_t val; - - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_UNKNOWN); - - NGX_RTMP_USER_OUT4(zero); - NGX_RTMP_USER_OUT4(one); - val = timestamp & 0x7fffffff; - NGX_RTMP_USER_OUT4(val); - - NGX_RTMP_USER_END(s); + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_ping_response(s, timestamp)); } @@ -228,8 +352,8 @@ ngx_rtmp_alloc_amf_buf(void *arg) /* NOTE: this function does not free shared bufs on error */ ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s, - ngx_chain_t **first, ngx_chain_t **last, - ngx_rtmp_amf_elt_t *elts, size_t nelts) + ngx_chain_t **first, ngx_chain_t **last, + ngx_rtmp_amf_elt_t *elts, size_t nelts) { ngx_rtmp_amf_ctx_t act; ngx_rtmp_core_srv_conf_t *cscf; @@ -264,35 +388,48 @@ ngx_rtmp_append_amf(ngx_rtmp_session_t *s, } -ngx_int_t -ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, - ngx_rtmp_amf_elt_t *elts, size_t nelts) +ngx_chain_t * +ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_rtmp_amf_elt_t *elts, size_t nelts) { ngx_chain_t *first; ngx_int_t rc; ngx_rtmp_core_srv_conf_t *cscf; + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: amf nelts=%ui", nelts); + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); first = NULL; + rc = ngx_rtmp_append_amf(s, &first, NULL, elts, nelts); - if (rc != NGX_OK || first == NULL) { - goto done; + + if (rc != NGX_OK && first) { + ngx_rtmp_free_shared_chain(cscf, first); + first = NULL; } - ngx_rtmp_prepare_message(s, h, NULL, first); + if (first) { + ngx_rtmp_prepare_message(s, h, NULL, first); + } - rc = ngx_rtmp_send_message(s, first, 0); - -done: - ngx_rtmp_free_shared_chain(cscf, first); - - return rc; + return first; } ngx_int_t -ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) +ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_rtmp_amf_elt_t *elts, size_t nelts) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_amf(s, h, elts, nelts)); +} + + +ngx_chain_t * +ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, char* level, + char *desc) { ngx_rtmp_header_t h; static double trans; @@ -332,6 +469,9 @@ ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) sizeof(out_inf) }, }; + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: status code='%s' level='%s' desc='%s'", + code, level, desc); out_inf[0].data = code; out_inf[1].data = level; @@ -343,14 +483,22 @@ ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) h.csid = NGX_RTMP_CSID_AMF; h.msid = NGX_RTMP_MSID; - return ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])); + return ngx_rtmp_create_amf(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); } ngx_int_t -ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, - ngx_uint_t duration, ngx_uint_t bytes) +ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_status(s, code, level, desc)); +} + + +ngx_chain_t * +ngx_rtmp_create_play_status(ngx_rtmp_session_t *s, char *code, char* level, + ngx_uint_t duration, ngx_uint_t bytes) { ngx_rtmp_header_t h; static double dduration; @@ -387,6 +535,10 @@ ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, sizeof(out_inf) }, }; + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "create: play_status code='%s' level='%s' " + "duration=%ui bytes=%ui", + code, level, duration, bytes); out_inf[0].data = code; out_inf[1].data = level; @@ -401,6 +553,56 @@ ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, h.msid = NGX_RTMP_MSID; h.timestamp = duration; - return ngx_rtmp_send_amf(s, &h, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])); + return ngx_rtmp_create_amf(s, &h, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])); +} + + +ngx_int_t +ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, + ngx_uint_t duration, ngx_uint_t bytes) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_play_status(s, code, level, duration, bytes)); +} + + +ngx_chain_t * +ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s) +{ + ngx_rtmp_header_t h; + + static int access = 1; + + static ngx_rtmp_amf_elt_t access_elts[] = { + + { NGX_RTMP_AMF_STRING, + ngx_null_string, + "|RtmpSampleAccess", 0 }, + + { NGX_RTMP_AMF_BOOLEAN, + ngx_null_string, + &access, 0 }, + + { NGX_RTMP_AMF_BOOLEAN, + ngx_null_string, + &access, 0 }, + }; + + memset(&h, 0, sizeof(h)); + + h.type = NGX_RTMP_MSG_AMF_META; + h.csid = NGX_RTMP_CSID_AMF; + h.msid = NGX_RTMP_MSID; + + return ngx_rtmp_create_amf(s, &h, access_elts, + sizeof(access_elts) / sizeof(access_elts[0])); +} + + +ngx_int_t +ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s) +{ + return ngx_rtmp_send_shared_packet(s, + ngx_rtmp_create_sample_access(s)); } diff --git a/ngx_rtmp_stat_module.c b/ngx_rtmp_stat_module.c index a6e5892..0c633e5 100644 --- a/ngx_rtmp_stat_module.c +++ b/ngx_rtmp_stat_module.c @@ -335,13 +335,17 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll, NGX_RTMP_STAT_L(""); } - if (ctx->flags & NGX_RTMP_LIVE_PUBLISHING) { + if (ctx->publishing) { NGX_RTMP_STAT_L(""); } + if (ctx->active) { + NGX_RTMP_STAT_L(""); + } + NGX_RTMP_STAT_L("\r\n"); } - if (ctx->flags & NGX_RTMP_LIVE_PUBLISHING) { + if (ctx->publishing) { publishing = 1; codec = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module); }