refactored

This commit is contained in:
Roman Arutyunyan 2012-10-27 17:39:04 +04:00
parent 9f85961535
commit 284e521e82
3 changed files with 143 additions and 150 deletions

View file

@ -248,6 +248,9 @@ ngx_rtmp_live_join(ngx_rtmp_session_t *s, u_char *name,
if (lacf->buflen) {
s->out_buffer = 1;
}
ctx->cs[0].csid = NGX_RTMP_MSG_AUDIO;
ctx->cs[1].csid = NGX_RTMP_MSG_VIDEO;
}
@ -277,8 +280,6 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: leave '%s'", ctx->stream->name);
ctx->msg_mask = 0;
if (ctx->stream->flags & NGX_RTMP_LIVE_PUBLISHING
&& ctx->flags & NGX_RTMP_LIVE_PUBLISHING)
{
@ -313,7 +314,7 @@ ngx_rtmp_live_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
next:
return next_close_stream(s, v);
}
static ngx_int_t
ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
@ -321,116 +322,103 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
{
ngx_rtmp_live_ctx_t *ctx, *pctx;
ngx_rtmp_codec_ctx_t *codec_ctx;
ngx_chain_t *out, *peer_out, *header_out,
*pheader_out, *meta;
ngx_chain_t *out, *pkt, *header, *pheader, *meta;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_live_app_conf_t *lacf;
ngx_rtmp_session_t *ss;
ngx_rtmp_header_t ch, lh;
ngx_uint_t prio, peer_prio;
ngx_uint_t peers, dropped_peers;
size_t header_offset, last_offset;
ngx_uint_t prio;
ngx_uint_t peers;
ngx_uint_t header_version, meta_version;
ngx_int_t diff_timestamp;
uint32_t *last, timestamp;
ngx_uint_t csidx, hvidx;
uint32_t timestamp, delta;
ngx_rtmp_live_chunk_stream_t *cs;
#ifdef NGX_DEBUG
const char *type_s;
type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio");
#endif
lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);
if (lacf == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: NULL application");
return NGX_ERROR;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
if (!lacf->live
|| in == NULL || in->buf == NULL
|| ctx == NULL || ctx->stream == NULL
|| (h->type != NGX_RTMP_MSG_VIDEO
&& h->type != NGX_RTMP_MSG_AUDIO))
if (!lacf->live ||
in == NULL || in->buf == NULL ||
ctx == NULL || ctx->stream == NULL ||
(h->type != NGX_RTMP_MSG_VIDEO && h->type != NGX_RTMP_MSG_AUDIO))
{
return NGX_OK;
}
if ((ctx->flags & NGX_RTMP_LIVE_PUBLISHING) == 0) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: received audio/video from non-publisher");
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: %s from non-publisher", type_s);
return NGX_OK;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"live: av: %s timestamp=%uD timeshift=%uD",
h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
h->timestamp, h->timeshift);
"live: %s packet timestamp=%uD timeshift=%uD",
type_s, h->timestamp, h->timeshift);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);
hvidx = (h->type == NGX_RTMP_MSG_VIDEO);
timestamp = h->timestamp;
if (lacf->atc == 0) {
timestamp += h->timeshift;
}
/* prepare output header */
/* prepare output headers */
ngx_memzero(&ch, sizeof(ch));
ngx_memzero(&lh, sizeof(lh));
cs = &ctx->cs[csidx];
ch.timestamp = timestamp;
ch.msid = NGX_RTMP_MSID;
ch.csid = cs->csid;
ch.type = h->type;
lh.msid = ch.msid;
if (lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO) {
last_offset = offsetof(ngx_rtmp_live_ctx_t, last_video);
} else {
last_offset = offsetof(ngx_rtmp_live_ctx_t, last_audio);
lh = ch;
if (cs->active) {
lh.timestamp = cs->timestamp;
}
last = (uint32_t *)((u_char *)ctx + last_offset);
lh.timestamp = *last;
*last = ch.timestamp;
cs->active = 1;
cs->timestamp = ch.timestamp;
if (h->type == NGX_RTMP_MSG_VIDEO) {
prio = ngx_rtmp_get_video_frame_type(in);
ch.csid = NGX_RTMP_CSID_VIDEO;
} else {
prio = 0;
ch.csid = NGX_RTMP_CSID_AUDIO;
}
prio = (h->type == NGX_RTMP_MSG_VIDEO ?
ngx_rtmp_get_video_frame_type(in) : 0);
if (lacf->interleave) {
ch.csid = NGX_RTMP_CSID_VIDEO;
}
if ((ctx->msg_mask & (1 << ch.csid)) == 0) {
lh.timestamp = ch.timestamp;
ctx->msg_mask |= (1 << ch.csid);
}
lh.csid = ch.csid;
diff_timestamp = ch.timestamp - lh.timestamp;
delta = ch.timestamp - lh.timestamp;
out = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, &lh, out);
peers = 0;
dropped_peers = 0;
codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
header_out = NULL;
pheader_out = NULL;
header_offset = 0;
header = NULL;
pheader = NULL;
header_version = 0;
meta = NULL;
meta_version = 0;
codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
if (codec_ctx) {
if (h->type == NGX_RTMP_MSG_AUDIO) {
if (codec_ctx->aac_header) {
header_out = codec_ctx->aac_header;
header_offset = offsetof(ngx_rtmp_live_ctx_t, aac_version);
header = codec_ctx->aac_header;
header_version = codec_ctx->aac_version;
}
} else {
if (codec_ctx->avc_header) {
header_out = codec_ctx->avc_header;
header_offset = offsetof(ngx_rtmp_live_ctx_t, avc_version);
header = codec_ctx->avc_header;
header_version = codec_ctx->avc_version;
}
}
@ -445,116 +433,122 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (pctx == ctx) {
continue;
}
++peers;
ss = pctx->session;
last = (uint32_t *)((u_char *)pctx + last_offset);
ch.timestamp = timestamp;
if (lacf->atc == 0) {
ch.timestamp -= (uint32_t)ss->epoch;
ch.timestamp -= (uint32_t) ss->epoch;
}
lh.timestamp = ch.timestamp - diff_timestamp;
lh.timestamp = ch.timestamp - delta;
/* send absolute frame */
if ((pctx->msg_mask & (1 << ch.csid)) == 0) {
/* packet from the past for the peer */
if (lacf->atc == 0 && timestamp < (uint32_t)ss->epoch) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: av: %s packet from the past %uD < %uD",
h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
timestamp, (uint32_t)ss->epoch);
continue;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: av: abs %s timestamp=%uD",
h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
ch.timestamp);
/* send codec header as abs frame if any */
peer_out = ngx_rtmp_append_shared_bufs(cscf, NULL,
header_out ? header_out : in);
ngx_rtmp_prepare_message(s, &ch, NULL, peer_out);
if (ngx_rtmp_send_message(ss, peer_out, prio) == NGX_OK) {
pctx->msg_mask |= (1 << ch.csid);
if (header_out) {
*(ngx_uint_t *)((u_char *)pctx + header_offset)
= header_version;
*last = ch.timestamp;
}
}
ngx_rtmp_free_shared_chain(cscf, peer_out);
continue;
}
/* send AVC/H264 header if newer header has arrived */
if (header_out && *(ngx_uint_t *)((u_char *)pctx + header_offset)
!= header_version)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sending codec header");
if (pheader_out == NULL) {
pheader_out =
ngx_rtmp_append_shared_bufs(cscf, NULL, header_out);
ngx_rtmp_prepare_message(s, &ch, &ch, pheader_out);
}
if (ngx_rtmp_send_message(ss, pheader_out, 0) == NGX_OK) {
*(ngx_uint_t *)((u_char *)pctx + header_offset)
= header_version;
}
}
cs = &pctx->cs[csidx];
/* send metadata if newer exists */
if (meta && meta_version != pctx->meta_version) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: sending metadata");
if (ngx_rtmp_send_message(ss, meta, prio) == NGX_OK) {
"live: sending metadata");
if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {
pctx->meta_version = meta_version;
}
}
/* send absolute frame */
if (!cs->active) {
/* packet from the past for the peer */
if (lacf->atc == 0 && timestamp < (uint32_t) ss->epoch) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: %s packet from the past %uD < %uD",
type_s, timestamp, (uint32_t)ss->epoch);
continue;
}
if (header) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s header timestamp=%uD",
type_s, lh.timestamp);
pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
ngx_rtmp_prepare_message(s, &lh, NULL, pkt);
if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) {
cs->timestamp = lh.timestamp;
cs->active = 1;
pctx->header_versions[hvidx] = header_version;
}
ngx_rtmp_free_shared_chain(cscf, pkt);
} else {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: abs %s frame timestamp=%uD",
type_s, ch.timestamp);
pkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);
ngx_rtmp_prepare_message(s, &ch, NULL, pkt);
if (ngx_rtmp_send_message(ss, pkt, prio) == NGX_OK) {
cs->timestamp = ch.timestamp;
cs->active = 1;
++peers;
}
ngx_rtmp_free_shared_chain(cscf, pkt);
continue;
}
}
/* send absolute frame to sync stream */
if (!lacf->interleave && lacf->sync &&
*last + lacf->sync < lh.timestamp)
cs->timestamp + lacf->sync < lh.timestamp)
{
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: av: sync %s: %i",
h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio",
(ngx_int_t) (lh.timestamp - *last));
"live: abs sync %s: %i",
type_s, (ngx_int_t) (lh.timestamp - cs->timestamp));
peer_out = ngx_rtmp_alloc_shared_buf(cscf);
ngx_rtmp_prepare_message(s, &lh, NULL, peer_out);
if (ngx_rtmp_send_message(ss, peer_out, 0) == NGX_OK) {
*last = lh.timestamp;
pkt = ngx_rtmp_alloc_shared_buf(cscf);
ngx_rtmp_prepare_message(s, &lh, NULL, pkt);
if (ngx_rtmp_send_message(ss, pkt, 0) == NGX_OK) {
cs->timestamp = lh.timestamp;
}
ngx_rtmp_free_shared_chain(cscf, pkt);
}
/* send codec header if newer header has arrived */
if (header && pctx->header_versions[hvidx] != header_version) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,
"live: new %s header", type_s);
if (pheader == NULL) {
pheader = ngx_rtmp_append_shared_bufs(cscf, NULL, header);
ngx_rtmp_prepare_message(s, &ch, &ch, pheader);
}
if (ngx_rtmp_send_message(ss, pheader, 0) == NGX_OK) {
pctx->header_versions[hvidx] = header_version;
}
ngx_rtmp_free_shared_chain(cscf, peer_out);
}
/* push buffered data */
peer_prio = prio;
if (ngx_rtmp_send_message(ss, out, peer_prio) != NGX_OK) {
++pctx->dropped;
++dropped_peers;
if (ngx_rtmp_send_message(ss, out, prio) == NGX_OK) {
cs->timestamp += delta;
++peers;
continue;
}
*last += diff_timestamp;
}
ngx_rtmp_free_shared_chain(cscf, out);
if (pheader_out) {
ngx_rtmp_free_shared_chain(cscf, pheader_out);
if (pheader) {
ngx_rtmp_free_shared_chain(cscf, pheader);
}
ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out,
h->mlen * (peers - dropped_peers));
ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);
return NGX_OK;
}

View file

@ -21,23 +21,22 @@ typedef struct ngx_rtmp_live_ctx_s ngx_rtmp_live_ctx_t;
typedef struct ngx_rtmp_live_stream_s ngx_rtmp_live_stream_t;
typedef struct {
unsigned active:1;
uint32_t timestamp;
uint32_t csid;
} ngx_rtmp_live_chunk_stream_t;
struct ngx_rtmp_live_ctx_s {
ngx_rtmp_session_t *session;
ngx_rtmp_live_stream_t *stream;
ngx_rtmp_live_ctx_t *next;
ngx_uint_t flags;
ngx_uint_t msg_mask;
ngx_uint_t dropped;
uint32_t csid;
uint32_t last_audio;
uint32_t last_video;
ngx_uint_t aac_version;
ngx_uint_t avc_version;
ngx_rtmp_live_chunk_stream_t cs[2];
ngx_uint_t header_versions[2];
ngx_uint_t meta_version;
/* last stream timestamps */
uint32_t last[2];
uint32_t *plast[2];
};

View file

@ -309,11 +309,11 @@ ngx_rtmp_stat_live(ngx_http_request_t *r, ngx_chain_t ***lll,
"%uz", ctx->dropped) - buf);
NGX_RTMP_STAT_L("</dropped>");
NGX_RTMP_STAT_L("<avsync>");
/*NGX_RTMP_STAT_L("<avsync>");
NGX_RTMP_STAT(buf, ngx_snprintf(buf, sizeof(buf),
"%L", (int64_t)ctx->last_audio
- ctx->last_video) - buf);
NGX_RTMP_STAT_L("</avsync>");
NGX_RTMP_STAT_L("</avsync>");*/
if (s->flashver.len) {
NGX_RTMP_STAT_L("<flashver>");