mirror of
https://github.com/zotanmew/nginx-rtmp-module.git
synced 2024-05-20 01:51:07 +02:00
improved publishing & subscribing: now subscriber receives some data which looks like real video
This commit is contained in:
parent
eaef3b90b9
commit
03960eb7b4
4
TODO
4
TODO
|
@ -1,6 +1,4 @@
|
||||||
~ Implement modules support.
|
- implement chain-reuse for output
|
||||||
Move AMF0 handlers to modules.
|
|
||||||
Move broadcast to module.
|
|
||||||
|
|
||||||
- remove macros hell from ngx_rtmp_send.c
|
- remove macros hell from ngx_rtmp_send.c
|
||||||
|
|
||||||
|
|
|
@ -300,7 +300,6 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
|
||||||
NGX_RTMP_MSG_CHUNK_SIZE,
|
NGX_RTMP_MSG_CHUNK_SIZE,
|
||||||
NGX_RTMP_MSG_ABORT,
|
NGX_RTMP_MSG_ABORT,
|
||||||
NGX_RTMP_MSG_ACK,
|
NGX_RTMP_MSG_ACK,
|
||||||
NGX_RTMP_MSG_USER,
|
|
||||||
NGX_RTMP_MSG_ACK_SIZE,
|
NGX_RTMP_MSG_ACK_SIZE,
|
||||||
NGX_RTMP_MSG_BANDWIDTH
|
NGX_RTMP_MSG_BANDWIDTH
|
||||||
};
|
};
|
||||||
|
|
|
@ -151,12 +151,13 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct ngx_rtmp_stream_t {
|
typedef struct ngx_rtmp_stream_t {
|
||||||
ngx_rtmp_header_t hdr;
|
ngx_rtmp_header_t hdr;
|
||||||
|
uint32_t len; /* current fragment length */
|
||||||
ngx_chain_t *in;
|
ngx_chain_t *in;
|
||||||
} ngx_rtmp_stream_t;
|
} ngx_rtmp_stream_t;
|
||||||
|
|
||||||
|
|
||||||
typedef struct ngx_rtmp_session_s {
|
typedef struct ngx_rtmp_session_s {
|
||||||
uint32_t signature; /* "RTMP" */
|
uint32_t signature; /* "RTMP" */ /* <-- FIXME wtf */
|
||||||
|
|
||||||
ngx_connection_t *connection;
|
ngx_connection_t *connection;
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@ static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s,
|
||||||
double in_trans, ngx_chain_t *in);
|
double in_trans, ngx_chain_t *in);
|
||||||
static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s,
|
static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s,
|
||||||
double in_trans, ngx_chain_t *in);
|
double in_trans, ngx_chain_t *in);
|
||||||
|
static ngx_int_t ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s,
|
||||||
|
double in_trans, ngx_chain_t *in);
|
||||||
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
|
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
|
||||||
double in_trans, ngx_chain_t *in);
|
double in_trans, ngx_chain_t *in);
|
||||||
|
|
||||||
|
@ -33,8 +35,10 @@ static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = {
|
||||||
{ ngx_string("connect"), ngx_rtmp_broadcast_connect },
|
{ ngx_string("connect"), ngx_rtmp_broadcast_connect },
|
||||||
{ ngx_string("createStream"), ngx_rtmp_broadcast_create_stream },
|
{ ngx_string("createStream"), ngx_rtmp_broadcast_create_stream },
|
||||||
{ ngx_string("publish"), ngx_rtmp_broadcast_publish },
|
{ ngx_string("publish"), ngx_rtmp_broadcast_publish },
|
||||||
|
{ ngx_string("play"), ngx_rtmp_broadcast_play },
|
||||||
{ ngx_string("releaseStream"), ngx_rtmp_broadcast_ok },
|
{ ngx_string("releaseStream"), ngx_rtmp_broadcast_ok },
|
||||||
{ ngx_string("FCPublish"), ngx_rtmp_broadcast_ok },
|
{ ngx_string("FCPublish"), ngx_rtmp_broadcast_ok },
|
||||||
|
{ ngx_string("FCSubscribe"), ngx_rtmp_broadcast_ok },
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -179,7 +183,7 @@ ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream,
|
||||||
}
|
}
|
||||||
|
|
||||||
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
"join broadcast stream '%V'", &stream);
|
"join broadcast stream '%V'", stream);
|
||||||
|
|
||||||
ctx->stream = *stream;
|
ctx->stream = *stream;
|
||||||
hctx = ngx_rtmp_broadcast_get_head(s);
|
hctx = ngx_rtmp_broadcast_get_head(s);
|
||||||
|
@ -233,7 +237,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
|
|
||||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
|
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
|
||||||
|
|
||||||
if (ctx == NULL || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) {
|
if (ctx == NULL
|
||||||
|
|| !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER))
|
||||||
|
{
|
||||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
"received audio/video from non-publisher");
|
"received audio/video from non-publisher");
|
||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
|
@ -288,7 +294,7 @@ done:
|
||||||
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
|
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
|
||||||
ctx->stream.len))
|
ctx->stream.len))
|
||||||
{
|
{
|
||||||
if (ngx_rtmp_send_message(s, out) != NGX_OK) {
|
if (ngx_rtmp_send_message(cctx->session, out) != NGX_OK) {
|
||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
}
|
}
|
||||||
++nsubs;
|
++nsubs;
|
||||||
|
@ -418,7 +424,7 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans,
|
||||||
};
|
};
|
||||||
|
|
||||||
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
||||||
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
|
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
|
||||||
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
|
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
|
||||||
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
|
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
|
||||||
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
|
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
|
||||||
|
@ -452,6 +458,61 @@ ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static ngx_int_t
|
||||||
|
ngx_rtmp_broadcast_play(ngx_rtmp_session_t *s, double in_trans,
|
||||||
|
ngx_chain_t *in)
|
||||||
|
{
|
||||||
|
static double trans;
|
||||||
|
static u_char pub_name[1024];
|
||||||
|
static u_char pub_type[1024];
|
||||||
|
|
||||||
|
static ngx_rtmp_amf0_elt_t out_inf[] = {
|
||||||
|
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
|
||||||
|
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
|
||||||
|
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
static ngx_rtmp_amf0_elt_t in_elts[] = {
|
||||||
|
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
|
||||||
|
{ NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) },
|
||||||
|
{ NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) },
|
||||||
|
};
|
||||||
|
|
||||||
|
static ngx_rtmp_amf0_elt_t out_elts[] = {
|
||||||
|
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
|
||||||
|
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
|
||||||
|
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
|
||||||
|
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
if (ngx_rtmp_receive_amf0(s, in, in_elts,
|
||||||
|
sizeof(in_elts) / sizeof(in_elts[0])))
|
||||||
|
{
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||||
|
"publish() called; pubName='%s' pubType='%s'",
|
||||||
|
pub_name, pub_type);
|
||||||
|
|
||||||
|
ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_SUBSCRIBER);
|
||||||
|
|
||||||
|
trans = in_trans;
|
||||||
|
ngx_str_set(&out_inf[0], "NetStream.Play.Start");
|
||||||
|
ngx_str_set(&out_inf[1], "status");
|
||||||
|
ngx_str_set(&out_inf[2], "Started playing.");
|
||||||
|
|
||||||
|
if (ngx_rtmp_send_amf0(s, 3, 0, out_elts,
|
||||||
|
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
|
||||||
|
{
|
||||||
|
return NGX_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NGX_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
|
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
|
||||||
double in_trans, ngx_chain_t *in)
|
double in_trans, ngx_chain_t *in)
|
||||||
{
|
{
|
||||||
|
|
|
@ -31,7 +31,7 @@ ngx_rtmp_packet_type(uint8_t type) {
|
||||||
"chunk_size",
|
"chunk_size",
|
||||||
"abort",
|
"abort",
|
||||||
"ack",
|
"ack",
|
||||||
"ctl",
|
"user",
|
||||||
"ack_size",
|
"ack_size",
|
||||||
"bandwidth",
|
"bandwidth",
|
||||||
"edge",
|
"edge",
|
||||||
|
@ -227,9 +227,6 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
||||||
b->end = b->start + size;
|
b->end = b->start + size;
|
||||||
b->temporary = 1;
|
b->temporary = 1;
|
||||||
|
|
||||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
|
||||||
"RTMP handshake done");
|
|
||||||
|
|
||||||
c->write->handler = ngx_rtmp_handshake_send;
|
c->write->handler = ngx_rtmp_handshake_send;
|
||||||
c->read->handler = ngx_rtmp_handshake_recv;
|
c->read->handler = ngx_rtmp_handshake_recv;
|
||||||
|
|
||||||
|
@ -310,6 +307,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
||||||
c->read->handler = ngx_rtmp_recv;
|
c->read->handler = ngx_rtmp_recv;
|
||||||
c->write->handler = ngx_rtmp_send;
|
c->write->handler = ngx_rtmp_send;
|
||||||
|
|
||||||
|
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"RTMP handshake done");
|
||||||
|
|
||||||
ngx_rtmp_recv(rev);
|
ngx_rtmp_recv(rev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,19 +383,20 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
ngx_connection_t *c;
|
ngx_connection_t *c;
|
||||||
ngx_rtmp_session_t *s;
|
ngx_rtmp_session_t *s;
|
||||||
ngx_rtmp_core_srv_conf_t *cscf;
|
ngx_rtmp_core_srv_conf_t *cscf;
|
||||||
u_char *p, *pp;
|
|
||||||
uint32_t timestamp;
|
|
||||||
size_t size;
|
|
||||||
ngx_rtmp_header_t *h;
|
ngx_rtmp_header_t *h;
|
||||||
uint8_t fmt;
|
|
||||||
uint32_t csid;
|
|
||||||
ngx_rtmp_stream_t *st, *st0;
|
ngx_rtmp_stream_t *st, *st0;
|
||||||
ngx_chain_t *in, *head;
|
ngx_chain_t *in, *head;
|
||||||
ngx_buf_t *b;
|
ngx_buf_t *b;
|
||||||
|
u_char *p, *pp, *old_pos;
|
||||||
|
size_t size, fsize, old_size;
|
||||||
|
uint8_t fmt;
|
||||||
|
uint32_t csid, timestamp;
|
||||||
|
|
||||||
c = rev->data;
|
c = rev->data;
|
||||||
s = c->data;
|
s = c->data;
|
||||||
b = NULL;
|
b = NULL;
|
||||||
|
old_pos = NULL;
|
||||||
|
old_size = 0;
|
||||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||||
|
|
||||||
if (c->destroyed) {
|
if (c->destroyed) {
|
||||||
|
@ -406,7 +407,9 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
|
|
||||||
st = &s->in_streams[s->in_csid];
|
st = &s->in_streams[s->in_csid];
|
||||||
|
|
||||||
|
/* allocate new buffer */
|
||||||
if (st->in == NULL) {
|
if (st->in == NULL) {
|
||||||
|
|
||||||
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
|
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
|
||||||
|| (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
|
|| (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
|
||||||
{
|
{
|
||||||
|
@ -417,53 +420,64 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
st->in->next = NULL;
|
st->in->next = NULL;
|
||||||
|
b = st->in->buf;
|
||||||
size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
|
size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
|
||||||
st->in->buf->start = ngx_palloc(s->in_pool, size);
|
|
||||||
if (st->in->buf->start == NULL) {
|
b->start = b->last = b->pos = ngx_palloc(s->in_pool, size);
|
||||||
|
if (b->start == NULL) {
|
||||||
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
|
||||||
"buf alloc failed");
|
"buf alloc failed");
|
||||||
ngx_rtmp_close_connection(c);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
st->in->buf->end = st->in->buf->start + size;
|
b->end = b->start + size;
|
||||||
st->in->buf->flush = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h = &st->hdr;
|
h = &st->hdr;
|
||||||
in = st->in;
|
in = st->in;
|
||||||
|
|
||||||
/* anything remained from last iteration? */
|
|
||||||
if (b != NULL && b->recycled && b->pos < b->last) {
|
|
||||||
st->in->buf->pos = st->in->buf->start;
|
|
||||||
st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos,
|
|
||||||
b->last - b->pos);
|
|
||||||
b->recycled = 0;
|
|
||||||
st->in->buf->flush = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
b = in->buf;
|
b = in->buf;
|
||||||
|
|
||||||
if (b->flush) {
|
if (old_size) {
|
||||||
b->pos = b->last = b->start;
|
|
||||||
b->flush = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
n = c->recv(c, b->last, b->end - b->last);
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"reusing formerly read data: %d", old_size);
|
||||||
if (n == NGX_ERROR || n == 0) {
|
#if 0
|
||||||
ngx_rtmp_close_connection(c);
|
/* DEBUG! */
|
||||||
return;
|
{
|
||||||
}
|
size_t i;
|
||||||
|
for(i = 0; i < 16 && i < old_size; ++i) {
|
||||||
if (n == NGX_AGAIN) {
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reuse %d", (int)*(old_pos + i));
|
||||||
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
}
|
||||||
ngx_rtmp_close_connection(c);
|
|
||||||
}
|
}
|
||||||
return;
|
#endif
|
||||||
|
b->pos = b->start;
|
||||||
|
b->last = ngx_movemem(b->pos, old_pos, old_size);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
|
||||||
|
if (old_pos) {
|
||||||
|
b->pos = b->last = b->start;
|
||||||
|
}
|
||||||
|
|
||||||
|
n = c->recv(c, b->last, b->end - b->last);
|
||||||
|
|
||||||
|
if (n == NGX_ERROR || n == 0) {
|
||||||
|
ngx_rtmp_close_connection(c);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n == NGX_AGAIN) {
|
||||||
|
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
|
||||||
|
ngx_rtmp_close_connection(c);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
b->last += n;
|
||||||
}
|
}
|
||||||
|
|
||||||
b->last += n;
|
old_pos = NULL;
|
||||||
|
old_size = 0;
|
||||||
|
|
||||||
/* parse headers */
|
/* parse headers */
|
||||||
if (b->pos == b->start) {
|
if (b->pos == b->start) {
|
||||||
|
@ -576,38 +590,43 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ngx_log_debug5(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
ngx_log_debug6(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
"RTMP mheader %s (%d) "
|
"RTMP mheader %s (%d) "
|
||||||
"timestamp=%D mlen=%D msid=%D",
|
"timestamp=%D mlen=%D len=%D msid=%D",
|
||||||
ngx_rtmp_packet_type(h->type), (int)h->type,
|
ngx_rtmp_packet_type(h->type), (int)h->type,
|
||||||
h->timestamp, h->mlen, h->msid);
|
h->timestamp, h->mlen, st->len, h->msid);
|
||||||
|
|
||||||
/* header done */
|
/* header done */
|
||||||
b->pos = p;
|
b->pos = p;
|
||||||
}
|
}
|
||||||
|
|
||||||
size = b->last - b->pos;
|
size = b->last - b->pos;
|
||||||
|
fsize = h->mlen - st->len;
|
||||||
|
|
||||||
if (size < ngx_min(h->mlen, s->in_chunk_size))
|
if (size < ngx_min(fsize, s->in_chunk_size))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
/* buffer is ready */
|
/* buffer is ready */
|
||||||
b->flush = 1;
|
|
||||||
|
|
||||||
if (h->mlen > s->in_chunk_size) {
|
if (fsize > s->in_chunk_size) {
|
||||||
/* collect fragmented chunks */
|
/* collect fragmented chunks */
|
||||||
h->mlen -= s->in_chunk_size;
|
st->len += s->in_chunk_size;
|
||||||
b->pos += s->in_chunk_size;
|
|
||||||
|
old_pos = b->pos + s->in_chunk_size;
|
||||||
|
old_size = size - s->in_chunk_size;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
/* handle! */
|
/* handle! */
|
||||||
head = st->in->next;
|
head = st->in->next;
|
||||||
st->in->next = NULL;
|
st->in->next = NULL;
|
||||||
|
old_pos = b->pos + fsize;
|
||||||
|
old_size = size - fsize;
|
||||||
|
st->len = 0;
|
||||||
|
|
||||||
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
|
if (ngx_rtmp_receive_message(s, h, head) != NGX_OK) {
|
||||||
ngx_rtmp_close_connection(c);
|
ngx_rtmp_close_connection(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
b->pos += h->mlen;
|
|
||||||
|
|
||||||
/* add used bufs to stream #0 */
|
/* add used bufs to stream #0 */
|
||||||
st0 = &s->in_streams[0];
|
st0 = &s->in_streams[0];
|
||||||
|
@ -617,7 +636,6 @@ ngx_rtmp_recv(ngx_event_t *rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
s->in_csid = 0;
|
s->in_csid = 0;
|
||||||
b->recycled = 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -658,7 +676,7 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||||
ngx_del_timer(wev);
|
ngx_del_timer(wev);
|
||||||
}
|
}
|
||||||
|
|
||||||
while(s->out) {
|
while (s->out) {
|
||||||
out = c->send_chain(c, s->out, 0);
|
out = c->send_chain(c, s->out, 0);
|
||||||
|
|
||||||
if (out == NGX_CHAIN_ERROR) {
|
if (out == NGX_CHAIN_ERROR) {
|
||||||
|
@ -677,7 +695,7 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while(s->out) {
|
while (s->out) {
|
||||||
|
|
||||||
l = s->out;
|
l = s->out;
|
||||||
if (l->buf->pos < l->buf->last) {
|
if (l->buf->pos < l->buf->last) {
|
||||||
|
@ -685,9 +703,12 @@ ngx_rtmp_send(ngx_event_t *wev)
|
||||||
}
|
}
|
||||||
|
|
||||||
s->out = s->out->next;
|
s->out = s->out->next;
|
||||||
|
l->next = NULL;
|
||||||
|
|
||||||
/* anyone still using this buffer? */
|
/* anyone still using this buffer? */
|
||||||
if (ngx_rtmp_buf_release(l->buf)) {
|
if (ngx_rtmp_buf_release(l->buf)) {
|
||||||
|
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"keeping shared buffer");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -777,7 +798,6 @@ ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s,
|
||||||
cl->next = cscf->out_free;
|
cl->next = cscf->out_free;
|
||||||
cscf->out_free = out;
|
cscf->out_free = out;
|
||||||
|
|
||||||
|
|
||||||
return NGX_OK;
|
return NGX_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -791,12 +811,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
ngx_int_t hsize, thsize, nbufs;
|
ngx_int_t hsize, thsize, nbufs;
|
||||||
uint32_t mlen, timestamp, ext_timestamp;
|
uint32_t mlen, timestamp, ext_timestamp;
|
||||||
static uint8_t hdrsize[] = { 12, 8, 4, 1 };
|
static uint8_t hdrsize[] = { 12, 8, 4, 1 };
|
||||||
|
u_char th[3];
|
||||||
|
|
||||||
/* detect packet size */
|
/* detect packet size */
|
||||||
mlen = 0;
|
mlen = 0;
|
||||||
nbufs = 0;
|
nbufs = 0;
|
||||||
for(l = out; l; l = l->next) {
|
for(l = out; l; l = l->next) {
|
||||||
mlen += (out->buf->last - l->buf->pos);
|
mlen += (l->buf->last - l->buf->pos);
|
||||||
++nbufs;
|
++nbufs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -842,7 +863,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
*p++ = (uint8_t)((h->csid - 64) >> 8);
|
*p++ = (uint8_t)((h->csid - 64) >> 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* create fmt3 header for successive fragments */
|
||||||
thsize = p - out->buf->pos;
|
thsize = p - out->buf->pos;
|
||||||
|
ngx_memcpy(th, out->buf->pos, thsize);
|
||||||
|
th[0] |= 0xc0;
|
||||||
|
|
||||||
/* message header */
|
/* message header */
|
||||||
if (fmt <= 2) {
|
if (fmt <= 2) {
|
||||||
|
@ -875,12 +899,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||||
*p++ = pp[0];
|
*p++ = pp[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
/* use the smallest fmt (3) for
|
/* append headers to successive fragments */
|
||||||
* trailing fragments */
|
|
||||||
p = out->buf->pos;
|
|
||||||
for(out = out->next; out; out = out->next) {
|
for(out = out->next; out; out = out->next) {
|
||||||
out->buf->pos -= thsize;
|
out->buf->pos -= thsize;
|
||||||
ngx_memcpy(out->buf->pos, p, thsize);
|
ngx_memcpy(out->buf->pos, th, thsize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -955,12 +977,20 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
|
||||||
|
|
||||||
evhs = &cmcf->events[h->type];
|
evhs = &cmcf->events[h->type];
|
||||||
evh = evhs->elts;
|
evh = evhs->elts;
|
||||||
|
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"nhandlers: %d", evhs->nelts);
|
||||||
|
|
||||||
for(n = 0; n < evhs->nelts; ++n, ++evh) {
|
for(n = 0; n < evhs->nelts; ++n, ++evh) {
|
||||||
if (!evh) {
|
if (!evh) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"calling handler %d", n);
|
||||||
|
|
||||||
if ((*evh)(s, h, in) != NGX_OK) {
|
if ((*evh)(s, h, in) != NGX_OK) {
|
||||||
|
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
|
||||||
|
"handler %d failed", n);
|
||||||
return NGX_ERROR;
|
return NGX_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
1
test/dump.sh
Executable file
1
test/dump.sh
Executable file
|
@ -0,0 +1 @@
|
||||||
|
rtmpdump -v -r "rtmp://localhost/helo/pd"
|
|
@ -1 +1 @@
|
||||||
ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/helo
|
ffmpeg -loglevel verbose -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -an -f flv rtmp://localhost/helo
|
||||||
|
|
Loading…
Reference in a new issue