From 60f7962b5b7bd9946640ac22bc4b5819c594dc23 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Fri, 9 Mar 2012 02:39:15 +0400 Subject: [PATCH] fixed many errors; now test goes up to successfull 'connect' parse --- config | 2 +- ngx_rtmp.h | 4 +- ngx_rtmp_amf0.c | 144 +++++++++++++++++++++++--------- ngx_rtmp_amf0.h | 11 ++- ngx_rtmp_handler.c | 199 ++++++++++++++++++++++++--------------------- test/ffstream.sh | 1 + 6 files changed, 222 insertions(+), 139 deletions(-) create mode 100755 test/ffstream.sh diff --git a/config b/config index 5c1a78c..3f1f846 100644 --- a/config +++ b/config @@ -1,6 +1,6 @@ ngx_addon_name="ngx_rtmp_module" -CORE_MODULES="$CORE_MODULES ngx_rtmp_module" +CORE_MODULES="$CORE_MODULES ngx_rtmp_module ngx_rtmp_core_module" NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp.c \ diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 43ca233..f4e1adb 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -108,10 +108,10 @@ typedef struct { typedef struct { - uint8_t channel; uint8_t type; + uint8_t channel; uint8_t hsize; - uint8_t size; + uint32_t size; uint32_t timer; uint32_t stream; } ngx_rtmp_packet_hdr_t; diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index fd52f51..d63253f 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -3,25 +3,66 @@ */ #include "ngx_rtmp_amf0.h" +#include "ngx_rtmp.h" #include +#define NGX_RTMP_AMF0_SV(x, y) \ + (x) ^= (y); (y) ^= (x); (x) ^= (y) + +#define NGX_RTMP_AMF0_SB(x) \ + NGX_RTMP_AMF0_SV(*(uint8_t*)(&x), *((uint8_t*)(&x) + 1)) + static ngx_int_t -ngx_rtmp_amf0_get(ngx_chain_t **l, void *p, size_t n) +ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) { - ngx_buf_t *b; + ngx_buf_t *b; size_t size; + ngx_chain_t **l; +#ifdef NGX_DEBUG + void *op = p; +#endif if (!n) return NGX_OK; - for(; *l; l = &(*l)->next) { + for(l = ctx->link; *l; l = &(*l)->next) { b = (*l)->buf; if (b->last > n + b->pos) { - if (p) + if (p) { p = ngx_cpymem(p, b->pos, n); + } b->pos += n; + +#define NGX_RTMP_AMF0_DEBUG_SIZE 16 +#ifdef NGX_DEBUG + { + u_char hstr[3 * NGX_RTMP_AMF0_DEBUG_SIZE + 1]; + u_char str[NGX_RTMP_AMF0_DEBUG_SIZE + 1]; + u_char *hp, *pp, *sp; + static u_char hex[] = "0123456789ABCDEF"; + + hp = hstr; + sp = str; + pp = op; + + while (pp < (u_char*)p + && pp - (u_char*)op < NGX_RTMP_AMF0_DEBUG_SIZE) + { + *hp++ = ' '; + *hp++ = hex[(*pp & 0xf0) >> 4]; + *hp++ = hex[*pp & 0x0f]; + *sp++ = (*pp >= 0x20 && *pp <= 0x7e) ? + *pp : (u_char)'?'; + ++pp; + } + *hp = *sp = '\0'; + + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ctx->log, 0, + "AMF0 read (%d)%s '%s'", n, hstr, str); + } +#endif return NGX_OK; } @@ -33,15 +74,22 @@ ngx_rtmp_amf0_get(ngx_chain_t **l, void *p, size_t n) n -= size; } + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, ctx->log, 0, + "AMF0 read eof (%d)", n); + return NGX_ERROR; } static ngx_int_t -ngx_rtmp_amf0_put(ngx_chain_t **l, ngx_chain_t **free, void *p, size_t n) +ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) { ngx_buf_t *b; size_t size; + ngx_chain_t **l, **free; + + l = ctx->link; + free = ctx->free; while(n) { b = (*l) ? (*l)->buf : NULL; @@ -80,7 +128,7 @@ ngx_rtmp_amf0_put(ngx_chain_t **l, ngx_chain_t **free, void *p, size_t n) static ngx_int_t -ngx_rtmp_amf0_read_object(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, +ngx_rtmp_amf0_read_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { uint8_t type; @@ -100,22 +148,22 @@ ngx_rtmp_amf0_read_object(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, char name[maxlen + 1]; /* read key */ - if (ngx_rtmp_amf0_get(l, &len, sizeof(len)) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK) return NGX_ERROR; if (!len) break; if (len <= maxlen) { - rc = ngx_rtmp_amf0_get(l, name, len); + rc = ngx_rtmp_amf0_get(ctx, name, len); name[len] = 0; } else { - rc = ngx_rtmp_amf0_get(l, name, maxlen); + rc = ngx_rtmp_amf0_get(ctx, name, maxlen); if (rc != NGX_OK) return NGX_ERROR; name[maxlen] = 0; - rc = ngx_rtmp_amf0_get(l, 0, len - maxlen); + rc = ngx_rtmp_amf0_get(ctx, 0, len - maxlen); } if (rc != NGX_OK) @@ -125,11 +173,11 @@ ngx_rtmp_amf0_read_object(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, * then we could be able to use binary search */ for(n = 0; n < nelts && strcmp(name, elts[n].name); ++n); - if (ngx_rtmp_amf0_read(l, n < nelts ? &elts[n] : NULL, 1) != NGX_OK) + if (ngx_rtmp_amf0_read(ctx, n < nelts ? &elts[n] : NULL, 1) != NGX_OK) return NGX_ERROR; } - if (ngx_rtmp_amf0_get(l, &type, 1) != NGX_OK + if (ngx_rtmp_amf0_get(ctx, &type, 1) != NGX_OK || type != NGX_RTMP_AMF0_END) { return NGX_ERROR; @@ -141,7 +189,7 @@ ngx_rtmp_amf0_read_object(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, #define NGX_RTMP_AMF0_TILL_END_FLAG ((size_t)1 << (sizeof(size_t) * 8 - 1)) ngx_int_t -ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) +ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { void *data; uint8_t type; @@ -159,7 +207,7 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) for(n = 0; till_end || n < nelts; ++n) { - if (ngx_rtmp_amf0_get(l, &type, sizeof(type)) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, &type, sizeof(type)) != NGX_OK) return NGX_ERROR; data = (n >= nelts || elts == NULL || elts->type != type) @@ -168,36 +216,42 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) switch(type) { case NGX_RTMP_AMF0_NUMBER: - if (ngx_rtmp_amf0_get(l, data, 8) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, data, 8) != NGX_OK) { return NGX_ERROR; + } break; case NGX_RTMP_AMF0_BOOLEAN: - if (ngx_rtmp_amf0_get(l, data, 1) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, data, 1) != NGX_OK) { return NGX_ERROR; + } break; case NGX_RTMP_AMF0_STRING: - if (ngx_rtmp_amf0_get(l, &len, sizeof(len)) != NGX_OK) + if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK) { return NGX_ERROR; + } + + NGX_RTMP_AMF0_SB(len); if (data == NULL) { - rc = ngx_rtmp_amf0_get(l, data, len); + rc = ngx_rtmp_amf0_get(ctx, data, len); } else if (elts->len <= len) { - rc = ngx_rtmp_amf0_get(l, data, elts->len - 1); + rc = ngx_rtmp_amf0_get(ctx, data, elts->len - 1); if (rc != NGX_OK) return NGX_ERROR; ((char*)data)[elts->len - 1] = 0; - rc = ngx_rtmp_amf0_get(l, NULL, len - elts->len + 1); + rc = ngx_rtmp_amf0_get(ctx, NULL, len - elts->len + 1); } else { - rc = ngx_rtmp_amf0_get(l, data, len); + rc = ngx_rtmp_amf0_get(ctx, data, len); ((char*)data)[len] = 0; } - if (rc != NGX_OK) + if (rc != NGX_OK) { return NGX_ERROR; + } break; @@ -205,7 +259,7 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) break; case NGX_RTMP_AMF0_OBJECT: - if (ngx_rtmp_amf0_read_object(l, data, + if (ngx_rtmp_amf0_read_object(ctx, data, elts ? elts->len / sizeof(ngx_rtmp_amf0_elt_t) : 0 ) != NGX_OK) { @@ -214,7 +268,7 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) break; case NGX_RTMP_AMF0_ARRAY: - if (ngx_rtmp_amf0_read(l, data, + if (ngx_rtmp_amf0_read(ctx, data, elts ? (elts->len / sizeof(ngx_rtmp_amf0_elt_t)) | NGX_RTMP_AMF0_TILL_END_FLAG : 0 ) != NGX_OK) @@ -230,8 +284,9 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) return NGX_ERROR; } - if (elts) + if (elts) { ++elts; + } } return NGX_OK; @@ -239,7 +294,7 @@ ngx_rtmp_amf0_read(ngx_chain_t **l, ngx_rtmp_amf0_elt_t *elts, size_t nelts) static ngx_int_t -ngx_rtmp_amf0_write_object(ngx_chain_t **l, ngx_chain_t **free, +ngx_rtmp_amf0_write_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { uint16_t len; @@ -251,24 +306,27 @@ ngx_rtmp_amf0_write_object(ngx_chain_t **l, ngx_chain_t **free, name = elts[n].name; len = strlen(name); - if (ngx_rtmp_amf0_put(l, free, &name, len) != NGX_OK) + if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) { return NGX_ERROR; + } - if (ngx_rtmp_amf0_write(l, free, &elts[n], 1) != NGX_OK) + if (ngx_rtmp_amf0_write(ctx, &elts[n], 1) != NGX_OK) { return NGX_ERROR; + } } len = 0; - if (ngx_rtmp_amf0_put(l, free, &name, len) != NGX_OK) + if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) { return NGX_ERROR; + } return NGX_OK; } ngx_int_t -ngx_rtmp_amf0_write(ngx_chain_t **l, ngx_chain_t **free, +ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { size_t n; @@ -282,24 +340,30 @@ ngx_rtmp_amf0_write(ngx_chain_t **l, ngx_chain_t **free, data = elts[n].data; len = elts[n].len; - if (ngx_rtmp_amf0_put(l, free, &type, sizeof(type)) != NGX_OK) + if (ngx_rtmp_amf0_put(ctx, &type, sizeof(type)) != NGX_OK) return NGX_ERROR; switch(type) { case NGX_RTMP_AMF0_NUMBER: - if (ngx_rtmp_amf0_put(l, free, data, 8) != NGX_OK) + if (ngx_rtmp_amf0_put(ctx, data, 8) != NGX_OK) { return NGX_ERROR; + } break; case NGX_RTMP_AMF0_BOOLEAN: - if (ngx_rtmp_amf0_put(l, free, data, 1) != NGX_OK) + if (ngx_rtmp_amf0_put(ctx, data, 1) != NGX_OK) { return NGX_ERROR; + } break; case NGX_RTMP_AMF0_STRING: - if (ngx_rtmp_amf0_put(l, free, &len, sizeof(len)) != NGX_OK - || ngx_rtmp_amf0_put(l, free, data, len) != NGX_OK) - { + if (ngx_rtmp_amf0_put(ctx, &len, sizeof(len)) != NGX_OK) { + return NGX_ERROR; + } + + NGX_RTMP_AMF0_SB(len); + + if (ngx_rtmp_amf0_put(ctx, data, len) != NGX_OK) { return NGX_ERROR; } break; @@ -309,9 +373,9 @@ ngx_rtmp_amf0_write(ngx_chain_t **l, ngx_chain_t **free, case NGX_RTMP_AMF0_OBJECT: type = NGX_RTMP_AMF0_END; - if (ngx_rtmp_amf0_write_object(l, free, data, + if (ngx_rtmp_amf0_write_object(ctx, data, elts[n].len / sizeof(ngx_rtmp_amf0_elt_t)) != NGX_OK - || ngx_rtmp_amf0_put(l, free, &type, + || ngx_rtmp_amf0_put(ctx, &type, sizeof(type)) != NGX_OK) { return NGX_ERROR; @@ -320,9 +384,9 @@ ngx_rtmp_amf0_write(ngx_chain_t **l, ngx_chain_t **free, case NGX_RTMP_AMF0_ARRAY: type = NGX_RTMP_AMF0_END; - if (ngx_rtmp_amf0_write(l, free, data, + if (ngx_rtmp_amf0_write(ctx, data, elts[n].len / sizeof(ngx_rtmp_amf0_elt_t)) != NGX_OK - || ngx_rtmp_amf0_put(l, free, &type, + || ngx_rtmp_amf0_put(ctx, &type, sizeof(type)) != NGX_OK) { return NGX_ERROR; diff --git a/ngx_rtmp_amf0.h b/ngx_rtmp_amf0.h index fcc03a9..f392a7d 100644 --- a/ngx_rtmp_amf0.h +++ b/ngx_rtmp_amf0.h @@ -25,6 +25,13 @@ typedef struct { size_t len; } ngx_rtmp_amf0_elt_t; + +typedef struct { + ngx_chain_t **link, **free; + ngx_log_t *log; +} ngx_rtmp_amf0_ctx_t; + + /* struct { @@ -70,11 +77,11 @@ ngx_rtmp_amf0_write(l, free, elts, sizeof(elts)); */ /* reading AMF0 */ -ngx_int_t ngx_rtmp_amf0_read(ngx_chain_t **l, +ngx_int_t ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts); /* writing AMF0 */ -ngx_int_t ngx_rtmp_amf0_write(ngx_chain_t **l, ngx_chain_t **free, +ngx_int_t ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index cf7ba31..1cb2e43 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -25,6 +25,38 @@ static void ngx_rtmp_close_connection(ngx_connection_t *c); static size_t hdrsizes[] = { 12, 8, 4, 1 }; +#ifdef NGX_DEBUG +static char* +ngx_rtmp_packet_type(uint8_t type) { + static char* types[] = { + "?", + "chunk_size", + "?", + "bytes_read", + "ping", + "server_bw", + "client_bw", + "?", + "audio", + "video", + "?", + "?", + "?", + "?", + "?", + "flex", + "flex_so", + "flex_msg", + "notify", + "so", + "invoke" + }; + + return type < sizeof(types) / sizeof(types[0]) + ? types[type] + : "?"; +} +#endif void ngx_rtmp_init_connection(ngx_connection_t *c) @@ -234,10 +266,6 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) return; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "handshake data received size=%d fd=%d", - n, c->fd); - if (n > 0) { if (b->last == b->start && s->hs_stage == 0 && *b->last != '\x03') @@ -251,13 +279,10 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) } if (n == NGX_AGAIN) { - ngx_add_timer(rev, cscf->timeout); - if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_rtmp_close_session(s); } - return; } } @@ -270,12 +295,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) } /* handshake done */ - ngx_pfree(c->pool, s->buf.start); - c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; - ngx_rtmp_recv(rev); } @@ -308,9 +330,7 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) restart: while(b->pos != b->last) { - n = c->send(c, b->pos, b->last - b->pos); - if (n > 0) { b->pos += n; } @@ -320,13 +340,8 @@ restart: return; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "handshake data sent size=%d fd=%d", - n, c->fd); - if (n == NGX_AGAIN) { ngx_add_timer(c->write, cscf->timeout); - if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_rtmp_close_session(s); return; @@ -340,9 +355,7 @@ restart: } b->pos = b->last = b->start + 1; - ngx_del_event(wev, NGX_WRITE_EVENT, 0); - ngx_rtmp_handshake_recv(c->read); } @@ -355,10 +368,9 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; ngx_buf_t *b, *bb; - u_char h, *p; + u_char h, *p, *pp; ngx_chain_t *lin; - c = rev->data; s = c->data; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); @@ -378,9 +390,8 @@ ngx_rtmp_recv(ngx_event_t *rev) for(;;) { - /* find the last buf buf */ + /* find the last buf */ for(lin = s->in; lin->next; lin = lin->next); - b = lin->buf; if (b->last == b->end) { @@ -388,7 +399,6 @@ ngx_rtmp_recv(ngx_event_t *rev) return; } - /* receive data from client */ n = c->recv(c, b->last, b->end - b->last); if (n == NGX_ERROR || n == 0) { @@ -396,10 +406,6 @@ ngx_rtmp_recv(ngx_event_t *rev) return; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "data received size=%d fd=%d", - n, c->fd); - if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_rtmp_close_session(s); @@ -412,65 +418,87 @@ ngx_rtmp_recv(ngx_event_t *rev) h = *b->last; s->in_hdr.hsize = hdrsizes[(h >> 6) & 0x03]; s->in_hdr.channel = h & 0x3f; + + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP start %d hd=%d ch=%d", + (int)h, + (int)s->in_hdr.hsize, + (int)s->in_hdr.channel); } b->last += n; - if (b->last - b->pos < s->in_hdr.hsize) continue; + p = b->start + 1; /* basic header */ do { - p = b->start + 1; - if (s->in_hdr.hsize < 4) break; - /* TODO: check endians here */ - -/* + /* FIXME: is this fix really needed? if (s->in_hdr.channel == 1) { p += 2; }*/ - s->in_hdr.timer = 0; - ngx_memcpy(&s->in_hdr.timer, p, 3); - + /* timer: + * big-endian 3b -> little-endian 4b */ + pp = (u_char*)&s->in_hdr.timer; + pp[0] = p[2]; + pp[1] = p[1]; + pp[2] = p[0]; + pp[3] = 0; if (s->in_hdr.hsize < 8) break; + /* size: + * big-endian 3b -> little-endian 4b + * type: + * 1b -> 1b*/ p += 3; - s->in_hdr.size = 0; - ngx_memcpy(&s->in_hdr.size, p, 3); + pp = (u_char*)&s->in_hdr.size; + pp[0] = p[2]; + pp[1] = p[1]; + pp[2] = p[0]; + pp[3] = 0; p += 3; - ngx_memcpy(&s->in_hdr.type, p, 1); - + pp = &s->in_hdr.type; + *pp = *p; if (s->in_hdr.hsize < 12) break; + /* stream: + * little-endian 4b -> little-endian 4b */ ++p; - /* little-endian */ ngx_memcpy(&s->in_hdr.stream, p, 4); p += 4; } while(0); + ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP header %s (%d) ch=%d hd=%d " + "sz=%D tm=%D st=%D", + ngx_rtmp_packet_type(s->in_hdr.type), + (int)s->in_hdr.type, + (int)s->in_hdr.channel, + (int)s->in_hdr.hsize, + s->in_hdr.size, + s->in_hdr.timer, + s->in_hdr.stream); - if (b->last - p < (ngx_int_t)ngx_min(s->in_hdr.size, s->chunk_size)) + if (b->last < p + ngx_min(s->in_hdr.size, s->chunk_size)) continue; b->pos = p; + /* if fragmented then wait for more fragments */ if (s->in_hdr.size > s->chunk_size) { - - /* fragmented; need more fragments */ if (s->free == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "no free buffers"); ngx_rtmp_close_session(s); return; } - lin->next = s->free; s->free = s->free->next; lin = lin->next; @@ -479,7 +507,6 @@ ngx_rtmp_recv(ngx_event_t *rev) bb = lin->buf; bb->pos = bb->last = bb->start; continue; - } /* handle packet! */ @@ -487,7 +514,6 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_rtmp_close_session(s); return; } - bb = s->in->buf; bb->pos = bb->last = bb->start; @@ -549,20 +575,12 @@ ngx_rtmp_send(ngx_event_t *wev) s->out = l; } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "data sent size=? fd=%d", - c->fd); - if (l != NULL) { - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - ngx_add_timer(c->write, cscf->timeout); - if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_rtmp_close_session(s); } - return; } } @@ -598,9 +616,9 @@ ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags) return; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "join name='%V' fd=%d", - &name, c->fd); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP join '%V'", + &name); s->name = *name; ps = ngx_rtmp_get_session_head(s); @@ -621,9 +639,9 @@ ngx_rtmp_leave(ngx_rtmp_session_t *s) if (!s->name.len) return; - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "leave name='%V' fd=%d", - &s->name, c->fd); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP leave '%V'", + &s->name); ps = ngx_rtmp_get_session_head(s); @@ -756,12 +774,13 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, invoke_name, sizeof(invoke_name) }; - + ngx_rtmp_amf0_ctx_t amf_ctx; c = s->connection; - if (l == NULL) + if (l == NULL) { return NGX_ERROR; + } #ifdef NGX_DEBUG { @@ -771,16 +790,16 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, for(nch = 1, ch = l; ch->next; ch = ch->next, ++nch); ngx_log_debug8(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP packet received type=%d channel=%d hsize=%d " - "size=%d timer=%D stream=%D nbufs=%d fd=%d", + "RTMP packet %s (%d) ch=%d hd=%d " + "sz=%d tm=%D st=%D nbfs=%d", + ngx_rtmp_packet_type(h->type), (int)h->type, (int)h->channel, (int)h->hsize, (int)h->size, h->timer, h->stream, - nch, - c->fd); + nch); } #endif @@ -789,7 +808,6 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, b = l->buf; switch(h->type) { - case NGX_RTMP_PACKET_CHUNK_SIZE: if (b->last - b->pos < 4) return NGX_ERROR; @@ -863,9 +881,8 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, case NGX_RTMP_PACKET_AUDIO: case NGX_RTMP_PACKET_VIDEO: if (!(s->flags & NGX_RTMP_PUBLISHER)) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "received audio/video from non-publisher fd=%d", - c->fd); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "received audio/video from non-publisher"); return NGX_ERROR; } @@ -898,21 +915,20 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, break; case NGX_RTMP_PACKET_INVOKE: - /* amf: name, id, argv */ - - /*TODO: - * will call ngx_rtmp_join/ngx_rtmp_leave from here - * */ + amf_ctx.link = &l; + amf_ctx.free = &s->free; + amf_ctx.log = c->log; memset(invoke_name, 0, sizeof(invoke_name)); - if (ngx_rtmp_amf0_read(&l, &invoke_name_elt, 1) != NGX_OK) { + if (ngx_rtmp_amf0_read(&amf_ctx, &invoke_name_elt, 1) != NGX_OK) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP invoke failed"); return NGX_ERROR; } - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP invoke name='%s' fd=%d", - invoke_name, c->fd); - + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP invoke '%s'", + invoke_name); #define INVOKE_CALL(name) \ if (!strcasecmp(invoke_name, #name)) { \ @@ -943,16 +959,15 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, case NGX_RTMP_PACKET_FLEX: case NGX_RTMP_PACKET_FLEX_SO: case NGX_RTMP_PACKET_FLEX_MSG: - /* no support for flex */ - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "flex packets are not supported type=%d fd=%d", - (int)h->type, c->fd); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "flex packets are not supported %d", + (int)h->type); break; default: - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "unsupported packet type type=%d fd=%d", - (int)h->type, c->fd); + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "unsupported packet type %d", + (int)h->type); } return NGX_OK; @@ -963,7 +978,6 @@ void ngx_rtmp_close_session(ngx_rtmp_session_t *s) { ngx_rtmp_leave(s); - ngx_rtmp_close_connection(s->connection); } @@ -977,11 +991,8 @@ ngx_rtmp_close_connection(ngx_connection_t *c) "close connection: %d", c->fd); c->destroyed = 1; - pool = c->pool; - ngx_close_connection(c); - ngx_destroy_pool(pool); } @@ -1011,7 +1022,7 @@ ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len) return p; } - p = ngx_snprintf(buf, len, "%s, server: %V", s->addr_text); + p = ngx_snprintf(buf, len, ", server: %V", s->addr_text); len -= p - buf; buf = p; diff --git a/test/ffstream.sh b/test/ffstream.sh new file mode 100755 index 0000000..1a9a561 --- /dev/null +++ b/test/ffstream.sh @@ -0,0 +1 @@ +ffmpeg -re -i /mnt/home/rarutyunyan/Videos/the_changeup-solaris.giga.su.avi -f flv rtmp://localhost/