diff --git a/README b/README new file mode 100644 index 0000000..afeca25 --- /dev/null +++ b/README @@ -0,0 +1 @@ +This is an early version of module diff --git a/TODO b/TODO index 8e58a66..6350959 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,4 @@ -~ Implement modules support. - Move AMF0 handlers to modules. - Move broadcast to module. +- implement chain-reuse for output - remove macros hell from ngx_rtmp_send.c diff --git a/ngx_rtmp.c b/ngx_rtmp.c index 845fc2b..1a1d256 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -300,7 +300,6 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) NGX_RTMP_MSG_CHUNK_SIZE, NGX_RTMP_MSG_ABORT, NGX_RTMP_MSG_ACK, - NGX_RTMP_MSG_USER, NGX_RTMP_MSG_ACK_SIZE, NGX_RTMP_MSG_BANDWIDTH }; diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 21f8136..6d820f8 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -151,12 +151,13 @@ typedef struct { typedef struct ngx_rtmp_stream_t { ngx_rtmp_header_t hdr; + uint32_t len; /* current fragment length */ ngx_chain_t *in; } ngx_rtmp_stream_t; typedef struct ngx_rtmp_session_s { - uint32_t signature; /* "RTMP" */ + uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */ ngx_connection_t *connection; diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index 0f1a63d..e485e8e 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -19,6 +19,8 @@ static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in); static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, + double in_trans, ngx_chain_t *in); static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in); @@ -33,8 +35,10 @@ static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = { { ngx_string("connect"), ngx_rtmp_broadcast_connect }, { ngx_string("createStream"), ngx_rtmp_broadcast_create_stream }, { ngx_string("publish"), ngx_rtmp_broadcast_publish }, + { ngx_string("play"), ngx_rtmp_broadcast_play }, { ngx_string("releaseStream"), ngx_rtmp_broadcast_ok }, { ngx_string("FCPublish"), ngx_rtmp_broadcast_ok }, + { ngx_string("FCSubscribe"), ngx_rtmp_broadcast_ok }, }; @@ -179,7 +183,7 @@ ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream, } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "join broadcast stream '%V'", &stream); + "join broadcast stream '%V'", stream); ctx->stream = *stream; hctx = ngx_rtmp_broadcast_get_head(s); @@ -233,7 +237,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); - if (ctx == NULL || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) { + if (ctx == NULL + || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) + { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "received audio/video from non-publisher"); return NGX_ERROR; @@ -288,7 +294,7 @@ done: && !ngx_strncmp(cctx->stream.data, ctx->stream.data, ctx->stream.len)) { - if (ngx_rtmp_send_message(s, out) != NGX_OK) { + if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) { return NGX_ERROR; } ++nsubs; @@ -418,7 +424,7 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans, }; static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, @@ -452,6 +458,61 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans, } +static ngx_int_t +ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, double in_trans, + ngx_chain_t *in) +{ + static double trans; + static u_char pub_name[1024]; + static u_char pub_type[1024]; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t in_elts[] = { + { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, + { NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) }, + { NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + }; + + + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "publish() called; pubName='%s' pubType='%s'", + pub_name, pub_type); + + ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER); + + trans = in_trans; + ngx_str_set(&out_inf[0], "NetStream.Play.Start"); + ngx_str_set(&out_inf[1], "status"); + ngx_str_set(&out_inf[2], "Started playing."); + + if (ngx_rtmp_send_amf0(s, 3, 0, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in) { diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 1e5bd09..98b9f9a 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -31,7 +31,7 @@ ngx_rtmp_packet_type(uint8_t type) { "chunk_size", "abort", "ack", - "ctl", + "user", "ack_size", "bandwidth", "edge", @@ -227,9 +227,6 @@ ngx_rtmp_init_session(ngx_connection_t *c) b->end = b->start + size; b->temporary = 1; - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP handshake done"); - c->write->handler = ngx_rtmp_handshake_send; c->read->handler = ngx_rtmp_handshake_recv; @@ -310,6 +307,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP handshake done"); + ngx_rtmp_recv(rev); } @@ -383,19 +383,20 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; - u_char *p, *pp; - uint32_t timestamp; - size_t size; ngx_rtmp_header_t *h; - uint8_t fmt; - uint32_t csid; ngx_rtmp_stream_t *st, *st0; ngx_chain_t *in, *head; ngx_buf_t *b; + u_char *p, *pp, *old_pos; + size_t size, fsize, old_size; + uint8_t fmt; + uint32_t csid, timestamp; c = rev->data; s = c->data; b = NULL; + old_pos = NULL; + old_size = 0; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); if (c->destroyed) { @@ -406,7 +407,9 @@ ngx_rtmp_recv(ngx_event_t *rev) st = &s->in_streams[s->in_csid]; + /* allocate new buffer */ if (st->in == NULL) { + if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL || (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL) { @@ -417,53 +420,64 @@ ngx_rtmp_recv(ngx_event_t *rev) } st->in->next = NULL; - + b = st->in->buf; size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER; - st->in->buf->start = ngx_palloc(s->in_pool, size); - if (st->in->buf->start == NULL) { + + b->start = b->last = b->pos = ngx_palloc(s->in_pool, size); + if (b->start == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "buf alloc failed"); ngx_rtmp_close_connection(c); return; } - st->in->buf->end = st->in->buf->start + size; - st->in->buf->flush = 1; + b->end = b->start + size; } h = &st->hdr; in = st->in; - - /* anything remained from last iteration? */ - if (b != NULL && b->recycled && b->pos < b->last) { - st->in->buf->pos = st->in->buf->start; - st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos, - b->last - b->pos); - b->recycled = 0; - st->in->buf->flush = 0; - } - b = in->buf; - if (b->flush) { - b->pos = b->last = b->start; - b->flush = 0; - } + if (old_size) { - n = c->recv(c, b->last, b->end - b->last); - - if (n == NGX_ERROR || n == 0) { - ngx_rtmp_close_connection(c); - return; - } - - if (n == NGX_AGAIN) { - if (ngx_handle_read_event(c->read, 0) != NGX_OK) { - ngx_rtmp_close_connection(c); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "reusing formerly read data: %d", old_size); +#if 0 + /* DEBUG! */ + { + size_t i; + for(i = 0; i < 16 && i < old_size; ++i) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reuse %d", (int)*(old_pos + i)); + } } - return; +#endif + b->pos = b->start; + b->last = ngx_movemem(b->pos, old_pos, old_size); + + } else { + + if (old_pos) { + b->pos = b->last = b->start; + } + + n = c->recv(c, b->last, b->end - b->last); + + if (n == NGX_ERROR || n == 0) { + ngx_rtmp_close_connection(c); + return; + } + + if (n == NGX_AGAIN) { + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_rtmp_close_connection(c); + } + return; + } + + b->last += n; } - b->last += n; + old_pos = NULL; + old_size = 0; /* parse headers */ if (b->pos == b->start) { @@ -576,38 +590,43 @@ ngx_rtmp_recv(ngx_event_t *rev) } } - ngx_log_debug5(NGX_LOG_DEBUG_RTMP, c->log, 0, + ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP mheader %s (%d) " - "timestamp=%D mlen=%D msid=%D", + "timestamp=%D mlen=%D len=%D msid=%D", ngx_rtmp_packet_type(h->type), (int)h->type, - h->timestamp, h->mlen, h->msid); + h->timestamp, h->mlen, st->len, h->msid); /* header done */ b->pos = p; } size = b->last - b->pos; + fsize = h->mlen - st->len; - if (size < ngx_min(h->mlen, s->in_chunk_size)) + if (size < ngx_min(fsize, s->in_chunk_size)) continue; /* buffer is ready */ - b->flush = 1; - if (h->mlen > s->in_chunk_size) { + if (fsize > s->in_chunk_size) { /* collect fragmented chunks */ - h->mlen -= s->in_chunk_size; - b->pos += s->in_chunk_size; + st->len += s->in_chunk_size; + + old_pos = b->pos + s->in_chunk_size; + old_size = size - s->in_chunk_size; } else { /* handle! */ head = st->in->next; st->in->next = NULL; + old_pos = b->pos + fsize; + old_size = size - fsize; + st->len = 0; + if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) { ngx_rtmp_close_connection(c); return; } - b->pos += h->mlen; /* add used bufs to stream #0 */ st0 = &s->in_streams[0]; @@ -617,7 +636,6 @@ ngx_rtmp_recv(ngx_event_t *rev) } s->in_csid = 0; - b->recycled = 1; } } @@ -658,7 +676,7 @@ ngx_rtmp_send(ngx_event_t *wev) ngx_del_timer(wev); } - while(s->out) { + while (s->out) { out = c->send_chain(c, s->out, 0); if (out == NGX_CHAIN_ERROR) { @@ -677,7 +695,7 @@ ngx_rtmp_send(ngx_event_t *wev) return; } - while(s->out) { + while (s->out) { l = s->out; if (l->buf->pos < l->buf->last) { @@ -685,9 +703,12 @@ ngx_rtmp_send(ngx_event_t *wev) } s->out = s->out->next; + l->next = NULL; /* anyone still using this buffer? */ if (ngx_rtmp_buf_release(l->buf)) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "keeping shared buffer"); continue; } @@ -777,7 +798,6 @@ ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s, cl->next = cscf->out_free; cscf->out_free = out; - return NGX_OK; } @@ -791,12 +811,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_int_t hsize, thsize, nbufs; uint32_t mlen, timestamp, ext_timestamp; static uint8_t hdrsize[] = { 12, 8, 4, 1 }; + u_char th[3]; /* detect packet size */ mlen = 0; nbufs = 0; for(l = out; l; l = l->next) { - mlen += (out->buf->last - l->buf->pos); + mlen += (l->buf->last - l->buf->pos); ++nbufs; } @@ -842,7 +863,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, *p++ = (uint8_t)((h->csid - 64) >> 8); } + /* create fmt3 header for successive fragments */ thsize = p - out->buf->pos; + ngx_memcpy(th, out->buf->pos, thsize); + th[0] |= 0xc0; /* message header */ if (fmt <= 2) { @@ -875,12 +899,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, *p++ = pp[0]; } - /* use the smallest fmt (3) for - * trailing fragments */ - p = out->buf->pos; + /* append headers to successive fragments */ for(out = out->next; out; out = out->next) { out->buf->pos -= thsize; - ngx_memcpy(out->buf->pos, p, thsize); + ngx_memcpy(out->buf->pos, th, thsize); } } @@ -955,12 +977,20 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, evhs = &cmcf->events[h->type]; evh = evhs->elts; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "nhandlers: %d", evhs->nelts); + for(n = 0; n < evhs->nelts; ++n, ++evh) { if (!evh) { continue; } + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "calling handler %d", n); if ((*evh)(s, h, in) != NGX_OK) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "handler %d failed", n); return NGX_ERROR; } } diff --git a/test/dump.sh b/test/dump.sh new file mode 100755 index 0000000..3b3db71 --- /dev/null +++ b/test/dump.sh @@ -0,0 +1 @@ +rtmpdump -v -r "rtmp://localhost/helo/pd" diff --git a/test/ffstream.sh b/test/ffstream.sh index 146219c..8e19e34 100755 --- a/test/ffstream.sh +++ b/test/ffstream.sh @@ -1 +1 @@ -ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/helo +ffmpeg -loglevel verbose -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -an -f flv rtmp://localhost/helo