/* * Copyright (C) Roman Arutyunyan */ #include #include #include "ngx_rtmp.h" #include "ngx_rtmp_amf.h" #include "ngx_rtmp_streams.h" #define NGX_RTMP_USER_START(s, tp) \ ngx_rtmp_header_t __h; \ ngx_chain_t *__l; \ ngx_buf_t *__b; \ ngx_rtmp_core_srv_conf_t *__cscf; \ \ __cscf = ngx_rtmp_get_module_srv_conf( \ s, ngx_rtmp_core_module); \ memset(&__h, 0, sizeof(__h)); \ __h.type = tp; \ __h.csid = 2; \ __l = ngx_rtmp_alloc_shared_buf(__cscf); \ if (__l == NULL) { \ return NULL; \ } \ __b = __l->buf; #define NGX_RTMP_UCTL_START(s, type, utype) \ NGX_RTMP_USER_START(s, type); \ *(__b->last++) = (u_char)((utype) >> 8); \ *(__b->last++) = (u_char)(utype); #define NGX_RTMP_USER_OUT1(v) \ *(__b->last++) = ((u_char*)&v)[0]; #define NGX_RTMP_USER_OUT4(v) \ *(__b->last++) = ((u_char*)&v)[3]; \ *(__b->last++) = ((u_char*)&v)[2]; \ *(__b->last++) = ((u_char*)&v)[1]; \ *(__b->last++) = ((u_char*)&v)[0]; #define NGX_RTMP_USER_END(s) \ ngx_rtmp_prepare_message(s, &__h, NULL, __l); \ return __l; static ngx_int_t ngx_rtmp_send_shared_packet(ngx_rtmp_session_t *s, ngx_chain_t *cl) { ngx_rtmp_core_srv_conf_t *cscf; ngx_int_t rc; if (cl == NULL) { return NGX_ERROR; } cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); rc = ngx_rtmp_send_message(s, cl, 0); ngx_rtmp_free_shared_chain(cscf, cl); return rc; } /* Protocol control messages */ ngx_chain_t * ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "chunk_size=%uD", chunk_size); { NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); NGX_RTMP_USER_OUT4(chunk_size); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_chunk_size(s, chunk_size)); } ngx_chain_t * ngx_rtmp_create_abort(ngx_rtmp_session_t *s, uint32_t csid) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: abort csid=%uD", csid); { NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); NGX_RTMP_USER_OUT4(csid); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_abort(s, csid)); } ngx_chain_t * ngx_rtmp_create_ack(ngx_rtmp_session_t *s, uint32_t seq) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: ack seq=%uD", seq); { NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK); NGX_RTMP_USER_OUT4(seq); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_ack(s, seq)); } ngx_chain_t * ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: ack_size=%uD", ack_size); { NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE); NGX_RTMP_USER_OUT4(ack_size); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_ack_size(s, ack_size)); } ngx_chain_t * ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, uint8_t limit_type) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: bandwidth ack_size=%uD limit=%d", ack_size, (int)limit_type); { NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH); NGX_RTMP_USER_OUT4(ack_size); NGX_RTMP_USER_OUT1(limit_type); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, uint8_t limit_type) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_bandwidth(s, ack_size, limit_type)); } /* User control messages */ ngx_chain_t * ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: stream_begin msid=%uD", msid); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN); NGX_RTMP_USER_OUT4(msid); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_stream_begin(s, msid)); } ngx_chain_t * ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: stream_end msid=%uD", msid); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF); NGX_RTMP_USER_OUT4(msid); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_stream_eof(s, msid)); } ngx_chain_t * ngx_rtmp_create_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: stream_dry msid=%uD", msid); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_DRY); NGX_RTMP_USER_OUT4(msid); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_stream_dry(s, msid)); } ngx_chain_t * ngx_rtmp_create_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, uint32_t buflen_msec) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: set_buflen msid=%uD buflen=%uD", msid, buflen_msec); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_SET_BUFLEN); NGX_RTMP_USER_OUT4(msid); NGX_RTMP_USER_OUT4(buflen_msec); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, uint32_t buflen_msec) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_set_buflen(s, msid, buflen_msec)); } ngx_chain_t * ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: recorded msid=%uD", msid); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED); NGX_RTMP_USER_OUT4(msid); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, uint32_t msid) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_recorded(s, msid)); } ngx_chain_t * ngx_rtmp_create_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: ping_request timestamp=%uD", timestamp); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_REQUEST); NGX_RTMP_USER_OUT4(timestamp); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_ping_request(s, timestamp)); } ngx_chain_t * ngx_rtmp_create_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: ping_response timestamp=%uD", timestamp); { NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_RESPONSE); NGX_RTMP_USER_OUT4(timestamp); NGX_RTMP_USER_END(s); } } ngx_int_t ngx_rtmp_send_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_ping_response(s, timestamp)); } static ngx_chain_t * ngx_rtmp_alloc_amf_buf(void *arg) { return ngx_rtmp_alloc_shared_buf((ngx_rtmp_core_srv_conf_t *)arg); } /* AMF sender */ /* NOTE: this function does not free shared bufs on error */ ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s, ngx_chain_t **first, ngx_chain_t **last, ngx_rtmp_amf_elt_t *elts, size_t nelts) { ngx_rtmp_amf_ctx_t act; ngx_rtmp_core_srv_conf_t *cscf; ngx_int_t rc; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); memset(&act, 0, sizeof(act)); act.arg = cscf; act.alloc = ngx_rtmp_alloc_amf_buf; act.log = s->connection->log; if (first) { act.first = *first; } if (last) { act.link = *last; } rc = ngx_rtmp_amf_write(&act, elts, nelts); if (first) { *first = act.first; } if (last) { *last = act.link; } return rc; } ngx_chain_t * ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_amf_elt_t *elts, size_t nelts) { ngx_chain_t *first; ngx_int_t rc; ngx_rtmp_core_srv_conf_t *cscf; ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: amf nelts=%ui", nelts); cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); first = NULL; rc = ngx_rtmp_append_amf(s, &first, NULL, elts, nelts); if (rc != NGX_OK && first) { ngx_rtmp_free_shared_chain(cscf, first); first = NULL; } if (first) { ngx_rtmp_prepare_message(s, h, NULL, first); } return first; } ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_rtmp_amf_elt_t *elts, size_t nelts) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_amf(s, h, elts, nelts)); } ngx_chain_t * ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) { ngx_rtmp_header_t h; static double trans; static ngx_rtmp_amf_elt_t out_inf[] = { { NGX_RTMP_AMF_STRING, ngx_string("level"), NULL, 0 }, { NGX_RTMP_AMF_STRING, ngx_string("code"), NULL, 0 }, { NGX_RTMP_AMF_STRING, ngx_string("description"), NULL, 0 }, }; static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "onStatus", 0 }, { NGX_RTMP_AMF_NUMBER, ngx_null_string, &trans, 0 }, { NGX_RTMP_AMF_NULL, ngx_null_string, NULL, 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_inf, sizeof(out_inf) }, }; ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: status code='%s' level='%s' desc='%s'", code, level, desc); out_inf[0].data = level; out_inf[1].data = code; out_inf[2].data = desc; memset(&h, 0, sizeof(h)); h.type = NGX_RTMP_MSG_AMF_CMD; h.csid = NGX_RTMP_CSID_AMF; h.msid = NGX_RTMP_MSID; return ngx_rtmp_create_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])); } ngx_int_t ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_status(s, code, level, desc)); } ngx_chain_t * ngx_rtmp_create_play_status(ngx_rtmp_session_t *s, char *code, char* level, ngx_uint_t duration, ngx_uint_t bytes) { ngx_rtmp_header_t h; static double dduration; static double dbytes; static ngx_rtmp_amf_elt_t out_inf[] = { { NGX_RTMP_AMF_STRING, ngx_string("code"), NULL, 0 }, { NGX_RTMP_AMF_STRING, ngx_string("level"), NULL, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("duration"), &dduration, 0 }, { NGX_RTMP_AMF_NUMBER, ngx_string("bytes"), &dbytes, 0 }, }; static ngx_rtmp_amf_elt_t out_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "onPlayStatus", 0 }, { NGX_RTMP_AMF_OBJECT, ngx_null_string, out_inf, sizeof(out_inf) }, }; ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "create: play_status code='%s' level='%s' " "duration=%ui bytes=%ui", code, level, duration, bytes); out_inf[0].data = code; out_inf[1].data = level; dduration = duration; dbytes = bytes; memset(&h, 0, sizeof(h)); h.type = NGX_RTMP_MSG_AMF_META; h.csid = NGX_RTMP_CSID_AMF; h.msid = NGX_RTMP_MSID; h.timestamp = duration; return ngx_rtmp_create_amf(s, &h, out_elts, sizeof(out_elts) / sizeof(out_elts[0])); } ngx_int_t ngx_rtmp_send_play_status(ngx_rtmp_session_t *s, char *code, char* level, ngx_uint_t duration, ngx_uint_t bytes) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_play_status(s, code, level, duration, bytes)); } ngx_chain_t * ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s) { ngx_rtmp_header_t h; static int access = 1; static ngx_rtmp_amf_elt_t access_elts[] = { { NGX_RTMP_AMF_STRING, ngx_null_string, "|RtmpSampleAccess", 0 }, { NGX_RTMP_AMF_BOOLEAN, ngx_null_string, &access, 0 }, { NGX_RTMP_AMF_BOOLEAN, ngx_null_string, &access, 0 }, }; memset(&h, 0, sizeof(h)); h.type = NGX_RTMP_MSG_AMF_META; h.csid = NGX_RTMP_CSID_AMF; h.msid = NGX_RTMP_MSID; return ngx_rtmp_create_amf(s, &h, access_elts, sizeof(access_elts) / sizeof(access_elts[0])); } ngx_int_t ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s) { return ngx_rtmp_send_shared_packet(s, ngx_rtmp_create_sample_access(s)); }