0%

srs_code

srs代码流程图

srs代码流程分析:

  • main:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    srs_error_t do_main(int argc, char** argv)

    if ((err = srs_thread_initialize()) != srs_success) {
    if ((err = srs_st_init()) != srs_success) {
    if((r0 = st_init()) != 0){
    /* We can ignore return value here */
    st_set_eventsys(ST_EVENTSYS_DEFAULT); //设置用epoll或其他;
    _st_eventsys 的各种操作:
    _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
  • 大的代码逻辑架构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
      1 srs_main_server.cpp:
    main-->do_main
    2 do_main主要做两件事:
    1) 初始化:
    // Initialize global or thread-local variables.
    if ((err = srs_thread_initialize()) != srs_success) {
    return srs_error_wrap(err, "thread init");
    }

    //非主要: 解析配置,初始化log

    2)运行混合服务:
    if ((err = run_directly_or_daemon()) != srs_success) {
    return srs_error_wrap(err, "run");
    }
  • 初始化分析:srs_thread_initialize

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
            1)创建多个全局变量
    2)初始化st :st主要是做网络的监听等操作: stack?协议栈
    // Initialize ST, which depends on pps cids.
    if ((err = srs_st_init()) != srs_success) {
    return srs_error_wrap(err, "initialize st failed");
    }
    --》根据是否支持epoll/等,来初始化poll:
    // Select the best event system available on the OS. In Linux this is
    // epoll(). On BSD it will be kqueue.
    if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
    return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());
    }
    static _st_eventsys_t _st_epoll_eventsys = {
    "epoll",
    ST_EVENTSYS_ALT,
    _st_epoll_init, //epoll_create这里
    _st_epoll_dispatch, //epoll_wait在这里
    _st_epoll_pollset_add,//epoll_ctl这里
    _st_epoll_pollset_del,
    _st_epoll_fd_new,
    _st_epoll_fd_close,
    _st_epoll_fd_getlimit
    };

    typedef struct _st_eventsys_ops {
    const char *name; /* Name of this event system */
    int val; /* Type of this event system */
    int (*init)(void); /* Initialization */
    void (*dispatch)(void); /* Dispatch function */
    int (*pollset_add)(struct pollfd *, int); /* Add descriptor set */
    void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */
    int (*fd_new)(int); /* New descriptor allocated */
    int (*fd_close)(int); /* Descriptor closed */
    int (*fd_getlimit)(void); /* Descriptor hard limit */
    } _st_eventsys_t;

    至此准备好了选择的网络处理模型;

  • 线程初始化:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54

    if((r0 = st_init()) != 0){
    return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
    }
    --》初始化st_io_init: 主要是做进程的rlimit等配置
    --》*_st_eventsys->init)() 创建 epoll_create
    -->创建idle 协程并启动:
    _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
    _st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
    {
    _st_thread_t *thread;
    _st_stack_t *stack;
    。。。
    #ifndef __ia64__
    /* Merge from https://github.com/michaeltalyansky/state-threads/commit/cce736426c2320ffec7c9820df49ee7a18ae638c */
    #if defined(__arm__) && !defined(MD_USE_BUILTIN_SETJMP) && __GLIBC_MINOR__ >= 19
    volatile void * lsp = PTR_MANGLE(stack->sp);
    if (_setjmp ((thread)->context))
    _st_thread_main();
    (thread)->context[0].__jmpbuf[8] = (long) (lsp);
    #else
    _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
    #endif
    #else
    _ST_INIT_CONTEXT(thread, stack->sp, stack->bsp, _st_thread_main);
    #endif
    }

    执行: 以下函数,会切换上下文,这样即使启动,也能回去;
    /*
    * Start function for the idle thread
    */
    /* ARGSUSED */
    void *_st_idle_thread_start(void *arg)
    {
    _st_thread_t *me = _ST_CURRENT_THREAD();

    while (_st_active_count > 0) {
    /* Idle vp till I/O is ready or the smallest timeout expired */
    _ST_VP_IDLE();

    /* Check sleep queue for expired threads */
    _st_vp_check_clock();

    me->state = _ST_ST_RUNNABLE;
    _ST_SWITCH_CONTEXT(me);
    }

    /* No more threads */
    exit(0);

    /* NOTREACHED */
    return NULL;
    }
  • 下面是初始化,监听,启动各个服务的协程;run_directly_or_daemon

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    1 检测是否是docker环境,是则用
    2 是否是后台使用?
    3 创建子进程来运行,其他父,祖父进程稍后退出,最后只有子进程:
    [2021-08-04 15:36:35.845][Warn][11265][b2cf05o9][22] SRS/4.0.146 is not stable
    [2021-08-04 15:36:35.845][Trace][11265][b2cf05o9] start daemon mode...
    [2021-08-04 15:36:35.845][Trace][11266][b2cf05o9] father process exit
    [2021-08-04 15:36:35.845][Trace][11267][b2cf05o9] son(daemon) process running.
    [2021-08-04 15:36:35.845][Trace][11265][b2cf05o9] grandpa process exit.
    4 关键函数:run_hybrid_server
    run_hybrid_server 分析:
    1)注册各种服务: http,rtmp,rtc服务等
    2)初始化:各种定时器,和遍历每个服务的初始化函数
    3) _srs_hybrid->run() 遍历每个服务进行 运行;

    这几个服务由他们包装:
    SrsServerAdapter--》实际服务:new SrsServer();--》rtmp and http
    SrtServerAdapter->...
    RtcServerAdapter->...

    已srsServerAdapter为例:
    srs_error_t SrsServerAdapter::run()
    {
    srs_error_t err = srs_success;

    // Initialize the whole system, set hooks to handle server level events.
    if ((err = srs->initialize(NULL)) != srs_success) {
    return srs_error_wrap(err, "server initialize");
    }

    if ((err = srs->initialize_st()) != srs_success) {
    return srs_error_wrap(err, "initialize st");
    }

    if ((err = srs->acquire_pid_file()) != srs_success) {
    return srs_error_wrap(err, "acquire pid file");
    }

    if ((err = srs->initialize_signal()) != srs_success) {
    return srs_error_wrap(err, "initialize signal");
    }

    if ((err = srs->listen()) != srs_success) { //监听:监听时会一层层传进去:->SrsTcpListen->srs_tcp_listen->do_srs_tcp_listen->srs_netfd_open_socket ,貌似监听的没有加到epoll中?
    return srs_error_wrap(err, "listen");
    }

    if ((err = srs->register_signal()) != srs_success) {
    return srs_error_wrap(err, "register signal");
    }

    if ((err = srs->http_handle()) != srs_success) {
    return srs_error_wrap(err, "http handle");
    }

    if ((err = srs->ingest()) != srs_success) {
    return srs_error_wrap(err, "ingest");
    }

    if ((err = srs->start()) != srs_success) { //启动协程:-> trd_->start()->SrsSTCoroutine->start()->SrsFastCoroutine->start->_pfn_st_thread_create 创建协程和启动
    return srs_error_wrap(err, "start");
    }

    return err;
    }

    至此运行起来

  • epoll和st_thread如何结合?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
       1 将st_thread相关信息等放到epoll: 当连接,accept,发送接收数据等,可能会调用如下函数:将fd设置到epoll中;
    int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
    {
    struct pollfd *pd;
    struct pollfd *epd = pds + npds;
    _st_pollq_t pq;
    _st_thread_t *me = _ST_CURRENT_THREAD();
    int n;

    if (me->flags & _ST_FL_INTERRUPT) {
    me->flags &= ~_ST_FL_INTERRUPT;
    errno = EINTR;
    return -1;
    }

    if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
    return -1;

    pq.pds = pds;
    pq.npds = npds;
    pq.thread = me;
    pq.on_ioq = 1;
    _ST_ADD_IOQ(pq);//同时设置这个结构,并将其加到队列中;
    if (timeout != ST_UTIME_NO_TIMEOUT)
    _ST_ADD_SLEEPQ(me, timeout);
    me->state = _ST_ST_IO_WAIT;

    _ST_SWITCH_CONTEXT(me);

    n = 0;
    if (pq.on_ioq) {
    /* If we timed out, the pollq might still be on the ioq. Remove it */
    _ST_DEL_IOQ(pq);
    (*_st_eventsys->pollset_del)(pds, npds);
    } else {
    /* Count the number of ready descriptors */
    for (pd = pds; pd < epd; pd++) {
    if (pd->revents)
    n++;
    }
    }

    if (me->flags & _ST_FL_INTERRUPT) {
    me->flags &= ~_ST_FL_INTERRUPT;
    errno = EINTR;
    return -1;
    }

    return n;
    }

  • 当epoll事件触发时:其实是通过切换协程,再判断epoll_wait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#define _ST_VP_IDLE()                   (*_st_eventsys->dispatch)()
/*
* Start function for the idle thread
*/
/* ARGSUSED */
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();

while (_st_active_count > 0) {
/* Idle vp till I/O is ready or the smallest timeout expired */
_ST_VP_IDLE(); //循环触发,触发后走当有事件时,让协程能调度运行,见下;

/* Check sleep queue for expired threads */
_st_vp_check_clock();

me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}

/* No more threads */
exit(0);

/* NOTREACHED */
return NULL;
}

ST_HIDDEN void _st_epoll_dispatch(void)
{
st_utime_t min_timeout;
_st_clist_t *q;
_st_pollq_t *pq;
struct pollfd *pds, *epds;
struct epoll_event ev;
int timeout, nfd, i, osfd, notify;
int events, op;
short revents;

#if defined(DEBUG) && defined(DEBUG_STATS)
++_st_stat_epoll;
#endif

if (_ST_SLEEPQ == NULL) {
timeout = -1;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout = (int) (min_timeout / 1000);

// At least wait 1ms when <1ms, to avoid epoll_wait spin loop.
if (timeout == 0) {
#if defined(DEBUG) && defined(DEBUG_STATS)
++_st_stat_epoll_zero;
#endif

if (min_timeout > 0) {
#if defined(DEBUG) && defined(DEBUG_STATS)
++_st_stat_epoll_shake;
#endif

timeout = 1;
}
}
}

if (_st_epoll_data->pid != getpid()) {
/* We probably forked, reinitialize epoll set */
close(_st_epoll_data->epfd);
_st_epoll_data->epfd = epoll_create(_st_epoll_data->fd_hint);
if (_st_epoll_data->epfd < 0) {
/* There is nothing we can do here, will retry later */
return;
}
fcntl(_st_epoll_data->epfd, F_SETFD, FD_CLOEXEC);
_st_epoll_data->pid = getpid();

/* Put all descriptors on ioq into new epoll set */
memset(_st_epoll_data->fd_data, 0, _st_epoll_data->fd_data_size * sizeof(_epoll_fd_data_t));
_st_epoll_data->evtlist_cnt = 0;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);//拿到协程相关结构
_st_epoll_pollset_add(pq->pds, pq->npds);
}
}

/* Check for I/O operations */
nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);

#if defined(DEBUG) && defined(DEBUG_STATS)
if (nfd <= 0) {
++_st_stat_epoll_spin;
}
#endif

if (nfd > 0) {
for (i = 0; i < nfd; i++) {
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
/* Also set I/O bits on error */
_ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
}
}

for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;

for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;

pds->revents = revents;
if (revents) {
notify = 1;
}
}
if (notify) { //如果有事件
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);

if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE; //将协程设置为运行,则调度 会调度这个协程运行;
_ST_ADD_RUNQ(pq->thread);
}
}

for (i = 0; i < nfd; i++) {
/* Delete/modify descriptors that fired */
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = 0;
events = _ST_EPOLL_EVENTS(osfd);
op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = events;
ev.data.fd = osfd;
if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 && op == EPOLL_CTL_DEL) {
_st_epoll_data->evtlist_cnt--;
}
}
}

//调度主要见sched.c:
//业务逻辑处理:
举例:如http:
当epoll发现数据准备好了,切换到协程,执行对应的do_cycle,这个函数会去循环读取数据来解析;
解析结束后让出 协程,到idle,等待下一个消息;

gdb调试:

  • 调试推流:ffmpeg -re -i ./doc/source.flv -c copy -f flv -y rtmp://localhost/live/livestream

  • 调试方式1:适用于单步,打断点,缺点,可能会因超时网络断开等需要重新开始;

    1
    2
    3
    4
    5
    gdb ./objs/srs
    set args -c ./conf/srs.conf
    break run_hybrid_server
    set detach-on-fork off
    set follow-fork-mode child
  • 调试方式2:适用于持续运行
    窗口1: ./srs -c ../conf/srs.conf
    窗口2: gdbserver –attach 127.0.0.1:1234 pidof srs
    窗口3: gdb –command=command //指定文件名为command,无后缀名。

一个command举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
file srs
target remote 127.0.0.1:1234
trace srs_app_rtc_dtls.cpp:561
actions
collect dtls_ctx
end
tstart
break on_connection_established
bt
continue

break on_rtp
bt
continue
break on_video
bt
continue
break process_publish_message
bt
continue
break handle_publish_message
bt
clear on_connection_established
clear on_rtp
clear on_video
clear process_publish_message
clear handle_publish_message
continue
tstatus
tstop
tfind start
while $trace_frame != -1
output $trace_file
printf ", line %d (tracepoint #%d)\n", $trace_line, $tracepoint
tfind
end

gdb调试一个场景代码流程分析:

rtmp推流,webrtc浏览器拉流播放:

1 接收sdp(以http发送,端口是1988,然后建立session,创建dtls ,并交换秘钥和握手;
接着再进行srtp的数据交换;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SrsHttpCorsMux::serve_http
SrsHttpServeMux::serve_http
SrsGoApiRtcPlay::serve_http
SrsRtcServer::on_udp_packet

http:
(gdb) bt
#0 SrsGoApiRtcPlay::serve_http (this=0x0, w=0x555556039170, r=0x555555ecaac8) at src/app/srs_app_rtc_api.cpp:42
#1 0x000055555569e3a3 in SrsHttpServeMux::serve_http (this=0x555555ecaa60, w=0x5555560393c0, r=0x555556006bc0) at src/protocol/srs_http_stack.cpp:713
#2 0x000055555569f1ec in SrsHttpCorsMux::serve_http (this=0x555556007c90, w=0x5555560393c0, r=0x555556006bc0) at src/protocol/srs_http_stack.cpp:861
#3 0x00005555557877bf in SrsHttpConn::process_request (this=0x555556007160, w=0x5555560393c0, r=0x555556006bc0, rid=1) at src/app/srs_app_http_conn.cpp:233
#4 0x00005555557873ea in SrsHttpConn::process_requests (this=0x555556007160, preq=0x555556039498) at src/app/srs_app_http_conn.cpp:206
#5 0x0000555555786f75 in SrsHttpConn::do_cycle (this=0x555556007160) at src/app/srs_app_http_conn.cpp:160
#6 0x000055555578694c in SrsHttpConn::cycle (this=0x555556007160) at src/app/srs_app_http_conn.cpp:105
#7 0x000055555571e1b8 in SrsFastCoroutine::cycle (this=0x555556006e20) at src/app/srs_app_st.cpp:253
#8 0x000055555571e25c in SrsFastCoroutine::pfn (arg=0x555556006e20) at src/app/srs_app_st.cpp:268
#9 0x000055555585072f in _st_thread_main () at sched.c:363
#10 0x0000555555850fe8 in st_thread_create (start=0x55555571e238 <SrsFastCoroutine::pfn(void*)>, arg=0x555556006e20, joinable=1, stk_size=65536) at sched.c:694
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

2 sdp: 客户端web->向server 发起http连接,传递sdp等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)
{
srs_error_t err = srs_success;
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcConnection* session = NULL;
if ((err = server_->create_session(&ruc, local_sdp, &session)) != srs_success) {
return srs_error_wrap(err, "create session, dtls=%u, srtp=%u, eip=%s", ruc.dtls_, ruc.srtp_, eip.c_str());
}

// TODO: FIXME: add do_create_session to error process.
SrsRtcConnection* session = new SrsRtcConnection(this, cid);
if ((err = do_create_session(ruc, local_sdp, session)) != srs_success) {//进行添加candiate,set_dtls_role,remote_sdp等等
srs_freep(session);
return srs_error_wrap(err, "create session");
}

*psession = session;

创建session,确认自己的角色后,
初始化sdp, dtls等等;
do_create_session完成的内容:
1- 创建session
2- 接收和初始化candiate
3-初始化dtls
candiate,offer,answer;
接着打洞;发送数据;

然后 on_udp_packet
on_stun -->
绑定连接,接着进行dtls 产生秘钥;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
[2021-08-11 18:22:29.490][Trace][39330][5mkn5262] RTC: session address init 172.21.0.5:60276
[2021-08-11 18:22:29.491][Trace][39330][5mkn5262] RTC: session STUN done, waiting DTLS handshake.
[2021-08-11 18:22:29.498][Trace][39330][5mkn5262] DTLS: State Passive RECV, done=0, arq=0/0, r0=1, r1=0, len=159, cnt=22, size=146, hs=1
[2021-08-11 18:22:29.499][Trace][39330][5mkn5262] DTLS: State Passive SEND, done=0, arq=0/0, r0=-1, r1=2, len=680, cnt=22, size=82, hs=2
[2021-08-11 18:22:29.502][Trace][39330][5mkn5262] DTLS: State Passive RECV, done=0, arq=0/0, r0=1, r1=0, len=578, cnt=22, size=300, hs=11
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] DTLS: State Passive SEND, done=1, arq=0/0, r0=1, r1=0, len=554, cnt=22, size=466, hs=4
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] RTC: DTLS handshake done.
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] RTC: session pub=0, sub=1, to=30000ms connection established
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] RTC: Subscriber url=/live/livestream established
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] create consumer, no gop cache
[2021-08-11 18:22:29.503][Trace][39330][5mkn5262] RTC: start play url=/live/livestream, source_id=/, realtime=1, mw_msgs=0
[2021-08-11 18:22:30.016][Trace][39330][89983q51] Hybrid cpu=1.00%,14MB
[2021-08-11 18:22:30.016][Trace][39330][89983q51] RTC: Server conns=1
[2021-08-11 18:22:35.016][Trace][39330][89983q51] Hybrid cpu=1.00%,14MB
[2021-08-11 18:22:35.016][Trace][39330][89983q51] RTC: Server conns=1
[2021-08-11 18:22:37.033][Trace][39330][408j1397] RTMP client ip=127.0.0.1:23390, fd=11
[2021-08-11 18:22:37.035][Trace][39330][408j1397] complex handshake success
[2021-08-11 18:22:37.035][Trace][39330][408j1397] connect app, tcUrl=rtmp://localhost:1936/live, pageUrl=, swfUrl=, schema=rtmp, vhost=localhost, port=1936, app=live, args=null
[2021-08-11 18:22:37.035][Trace][39330][408j1397] protocol in.buffer=0, in.ack=0, out.ack=0, in.chunk=128, out.chunk=128
[2021-08-11 18:22:37.035][Trace][39330][408j1397] client identified, type=fmle-publish, vhost=localhost, app=live, stream=livestream, param=, duration=0ms
[2021-08-11 18:22:37.035][Trace][39330][408j1397] connected stream, tcUrl=rtmp://localhost:1936/live, pageUrl=, swfUrl=, schema=rtmp, vhost=__defaultVhost__, port=1936, app=live, stream=livestream, param=, args=null
[2021-08-11 18:22:37.035][Trace][39330][408j1397] new source, stream_url=/live/livestream
[2021-08-11 18:22:37.035][Trace][39330][408j1397] source url=/live/livestream, ip=127.0.0.1, cache=1, is_edge=0, source_id=/
[2021-08-11 18:22:37.038][Trace][39330][408j1397] RTC bridge from RTMP, discard_aac=0, discard_bframe=1, merge_nalus=0
[2021-08-11 18:22:37.038][Trace][39330][408j1397] ignore disabled exec for vhost=__defaultVhost__
[2021-08-11 18:22:37.038][Trace][39330][408j1397] http: mount flv stream for sid=/live/livestream, mount=/live/livestream.flv
[2021-08-11 18:22:37.038][Trace][39330][408j1397] start publish mr=0/350, p1stpt=20000, pnt=5000, tcp_nodelay=0
[2021-08-11 18:22:37.038][Trace][39330][408j1397] got metadata, width=768, height=320, vcodec=7, acodec=10
[2021-08-11 18:22:37.039][Trace][39330][408j1397] 46B video sh, codec(7, profile=High, level=3.2, 768x320, 0kbps, 0.0fps, 0.0s)
[2021-08-11 18:22:37.039][Trace][39330][408j1397] 4B audio sh, codec(10, profile=LC, 2channels, 0kbps, 44100HZ), flv(16bits, 2channels, 44100HZ)
[2021-08-11 18:22:37.040][Trace][39330][5mkn5262] update source_id=408j1397/408j1397
[2021-08-11 18:22:37.240][Trace][39330][89137u4l] <- RTC RECV #10, udp 19, pps 1/1, schedule 19
[2021-08-11 18:22:40.017][Trace][39330][89983q51] Hybrid cpu=1.00%,17MB, cid=6,3, timer=63,0,39, clock=0,48,1,0,0,0,0,1,0, free=1, objs=(pkt:28,raw:23,fua:4,msg:59,oth:1,buf:11)
[2021-08-11 18:22:40.017][Trace][39330][89983q51] RTC: Server conns=1
[2021-08-11 18:22:45.018][Trace][39330][89983q51] Hybrid cpu=2.00%,18MB, cid=6,3, timer=63,0,39, clock=0,48,1,0,0,0,0,1,0, free=1, objs=(pkt:28,raw:23,fua:4,msg:59,oth:1,buf:11)
[2021-08-11 18:22:45.018][Trace][39330][89983q51] RTC: Server conns=1, rpkts=(1,rtp:0,stun:1,rtcp:1), spkts=(43,rtp:43,stun:1,rtcp:0), fid=(id:0,fid:0,ffid:1,addr:1,faddr:1)
[2021-08-11 18:22:47.288][Trace][39330][89137u4l] <- RTC RECV #10, udp 18, pps 1/1, schedule 18
[2021-08-11 18:22:50.019][Trace][39330][89983q51] Hybrid cpu=2.00%,18MB, cid=6,3, timer=63,0,39, clock=0,48,1,0,0,0,0,1,0, free=1, objs=(pkt:28,raw:23,fua:4,msg:59,oth:1,buf:11)
[2021-08-11 18:22:50.019][Trace][39330][89983q51] RTC: Server conns=1, rpkts=(1,rtp:0,stun:1,rtcp:1), spkts=(43,rtp:43,stun:1,rtcp:0), fid=(id:0,fid:0,ffid:1,addr:1,faddr:1)
[2021-08-11 18:22:55.020][Trace][39330][89983q51] Hybrid cpu=2.00%,18MB, cid=1,2, timer=62,0,48, clock=0,47,1,0,0,0,0,0,0, objs=(pkt:164,raw:113,fua:50,msg:300,oth:1,buf:50)

创建session ice连接(udp) --p2p
udp连接 :rtc connection --ice连接
之后设置了各种后
offer?是通过udp

创建session后,收到客户端的udp发包,进入onStun,接着回包;完成打洞;
[2021-08-13 11:25:10.072][Trace][12488][a8j88kek] RTC: session address init 172.21.0.5:59005
[2021-08-13 11:25:10.072][Trace][12488][a8j88kek] RTC: session STUN done, waiting DTLS handshake.


接着开始dtls连接;即收到客户端的dtls握手包后处理:
SrsDtlsImpl::on_dtls
[2021-08-13 11:26:10.425][Trace][12488][a8j88kek] DTLS: State Passive RECV, done=0, arq=0/0, r0=1, r1=0, len=159, cnt=22, size=146, hs=1
[2021-08-13 11:26:10.425][Trace][12488][a8j88kek] DTLS: State Passive SEND, done=0, arq=0/0, r0=-1, r1=2, len=681, cnt=22, size=82, hs=2


接着进on_rtcp传rtcp的数据;

3 订阅过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
在收到http请求后,走到srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)
会构造 req = json->to_object();
在创建会话session时传入:
SrsRtcServer::create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection** psession)
err = session->add_player(ruc, local_sdp)
create_player(req, play_sub_relations))
后初始化时,会去检测是否有这个流:
if ((err = _srs_rtc_sources->fetch_or_create(req_, &source_)) != srs_success) {
return srs_error_wrap(err, "rtc fetch source failed");
}
流和RtcSource放在这个结构:pool[stream_url] = source;
std::map<std::string, SrsRtcSource*> pool;

早在这里:rtmp推上来后,若配置了rtc就会创建了;所以在rtc创建play后能找到这个rtcsource,进而创建comsumor来取包发送;
srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
{
srs_error_t err = srs_success;

SrsRequest* req = info->req;

// Check whether RTMP stream is busy.
if (!source->can_publish(info->edge)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
}

// Check whether RTC stream is busy.
#ifdef SRS_RTC
SrsRtcSource *rtc = NULL;
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost);
if (rtc_server_enabled && rtc_enabled && !info->edge) {
if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) {
return srs_error_wrap(err, "create source");
}

if (!rtc->can_publish()) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtc stream %s busy", req->get_stream_url().c_str());
}
}
#endif

// Bridge to RTC streaming.
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)

4 srtp加密,传输;
传输数据;调用流程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* audio)
{
srs_error_t err = srs_success;

std::vector<SrsAudioFrame *> out_audios;
if ((err = codec_->transcode(audio, out_audios)) != srs_success) {
return srs_error_wrap(err, "recode error");
}

// Save OPUS packets in shared message.
if (out_audios.empty()) {
return err;
}

for (std::vector<SrsAudioFrame*>::iterator it = out_audios.begin(); it != out_audios.end(); ++it) {
SrsAudioFrame* out_audio = *it;

SrsRtpPacket* pkt = new SrsRtpPacket();
SrsAutoFree(SrsRtpPacket, pkt);

if ((err = package_opus(out_audio, pkt)) != srs_success) {
err = srs_error_wrap(err, "package opus");
break;
}

if ((err = source_->on_rtp(pkt)) != srs_success) {
err = srs_error_wrap(err, "consume opus");
break;
}
}

codec_->free_frames(out_audios);

return err;
}

srs_error_t SrsRtcSource::on_rtp(SrsRtpPacket* pkt)
{
srs_error_t err = srs_success;

// If circuit-breaker is dying, drop packet.
if (_srs_circuit_breaker->hybrid_dying_water_level()) {
_srs_pps_aloss2->sugar += (int64_t)consumers.size();
return err;
}

for (int i = 0; i < (int)consumers.size(); i++) { //这里是看有没有订阅者,如果请求的---
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(pkt->copy())) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}

if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "bridger consume message");
}

return err;
}

Breakpoint 3, SrsRtcSource::on_rtp (this=0x55ee8c800030, pkt=0x55ee8c84d680) at src/app/srs_app_rtc_source.cpp:612
612 {
Breakpoint 4 at 0x55ee8a2b1a9a: on_video. (13 locations)
#0 SrsRtcSource::on_rtp (this=0x55ee8c800030, pkt=0x55ee8c84d680) at src/app/srs_app_rtc_source.cpp:612
#1 0x000055ee8a4e5eb4 in SrsRtcFromRtmpBridger::transcode (this=0x55ee8c800030, audio=0x55ee8c76d260) at src/app/srs_app_rtc_source.cpp:867
#2 0x000055ee8a4e5c6d in SrsRtcFromRtmpBridger::on_audio (this=0x55ee8c800030, msg=0x55ee8c76eb70) at src/app/srs_app_rtc_source.cpp:834
#3 0x000055ee8a3a9b18 in SrsLiveSource::on_audio_imp (this=0x55ee8c426a70, msg=0x55ee8c76eb70) at src/app/srs_app_source.cpp:2205
#4 0x000055ee8a3a96de in SrsLiveSource::on_audio (this=0x55ee8c426a70, shared_audio=0x55ee8c652fc0) at src/app/srs_app_source.cpp:2160
#5 0x000055ee8a39b751 in SrsRtmpConn::process_publish_message (this=0x55ee8c571170, source=0x55ee8c426a70, msg=0x55ee8c652fc0) at src/app/srs_app_rtmp_conn.cpp:1055
--Type <RET> for more, q to quit, c to continue without paging--
#6 0x000055ee8a39b5bd in SrsRtmpConn::handle_publish_message (this=0x55ee8c571170, source=0x55ee8c426a70, msg=0x55ee8c652fc0) at src/app/srs_app_rtmp_conn.cpp:1034
#7 0x000055ee8a45afb7 in SrsPublishRecvThread::consume (this=0x55ee8c6b6f40, msg=0x55ee8c652fc0) at src/app/srs_app_recv_thread.cpp:376
#8 0x000055ee8a459f11 in SrsRecvThread::do_cycle (this=0x55ee8c6b6f60) at src/app/srs_app_recv_thread.cpp:133
#9 0x000055ee8a459d5e in SrsRecvThread::cycle (this=0x55ee8c6b6f60) at src/app/srs_app_recv_thread.cpp:102
#10 0x000055ee8a3ce1b8 in SrsFastCoroutine::cycle (this=0x55ee8c77fbb0) at src/app/srs_app_st.cpp:253
#11 0x000055ee8a3ce25c in SrsFastCoroutine::pfn (arg=0x55ee8c77fbb0) at src/app/srs_app_st.cpp:268
#12 0x000055ee8a50072f in _st_thread_main () at sched.c:363
#13 0x000055ee8a500fe8 in st_thread_create (start=0x55ee8c3f12c0, arg=0x55ee8c3f12e0, joinable=21998, stk_size=-1942023440) at sched.c:694
Backtrace stopped: previous frame inner to this frame (corrupt stack?)

实际上,在有srsrtcsource下,才会创建rtmpbridge,并在recvThread统计调用rtmp接收后,发送给bridge,到rtcsoucre;
srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
{
srs_error_t err = srs_success;

SrsRequest* req = info->req;

// Check whether RTMP stream is busy.
if (!source->can_publish(info->edge)) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
}

// Check whether RTC stream is busy.
#ifdef SRS_RTC
SrsRtcSource *rtc = NULL;
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost);
if (rtc_server_enabled && rtc_enabled && !info->edge) {
if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) {
return srs_error_wrap(err, "create source");
}

if (!rtc->can_publish()) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtc stream %s busy", req->get_stream_url().c_str());
}
}
#endif

// Bridge to RTC streaming.
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
if (rtc) {
SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc);
if ((err = bridger->initialize(req)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "bridger init");
}

source->set_bridger(bridger);
}
#endif

// Start publisher now.
if (info->edge) {
return source->on_edge_start_publish();
} else {
return source->on_publish();
}
}

ref:
https://github.com/ossrs/srs