srs代码流程图
srs代码流程分析:
main:
1
2
3
4
5
6
7
8
9srs_error_t do_main(int argc, char** argv)
if ((err = srs_thread_initialize()) != srs_success) {
if ((err = srs_st_init()) != srs_success) {
if((r0 = st_init()) != 0){
/* We can ignore return value here */
st_set_eventsys(ST_EVENTSYS_DEFAULT); //设置用epoll或其他;
_st_eventsys 的各种操作:
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);大的代码逻辑架构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
151 srs_main_server.cpp:
main-->do_main
2 do_main主要做两件事:
1) 初始化:
// Initialize global or thread-local variables.
if ((err = srs_thread_initialize()) != srs_success) {
return srs_error_wrap(err, "thread init");
}
//非主要: 解析配置,初始化log
2)运行混合服务:
if ((err = run_directly_or_daemon()) != srs_success) {
return srs_error_wrap(err, "run");
}初始化分析:srs_thread_initialize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
351)创建多个全局变量
2)初始化st :st主要是做网络的监听等操作: stack?协议栈
// Initialize ST, which depends on pps cids.
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}
--》根据是否支持epoll/等,来初始化poll:
// Select the best event system available on the OS. In Linux this is
// epoll(). On BSD it will be kqueue.
if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {
return srs_error_new(ERROR_ST_SET_EPOLL, "st enable st failed, current is %s", st_get_eventsys_name());
}
static _st_eventsys_t _st_epoll_eventsys = {
"epoll",
ST_EVENTSYS_ALT,
_st_epoll_init, //epoll_create这里
_st_epoll_dispatch, //epoll_wait在这里
_st_epoll_pollset_add,//epoll_ctl这里
_st_epoll_pollset_del,
_st_epoll_fd_new,
_st_epoll_fd_close,
_st_epoll_fd_getlimit
};
typedef struct _st_eventsys_ops {
const char *name; /* Name of this event system */
int val; /* Type of this event system */
int (*init)(void); /* Initialization */
void (*dispatch)(void); /* Dispatch function */
int (*pollset_add)(struct pollfd *, int); /* Add descriptor set */
void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */
int (*fd_new)(int); /* New descriptor allocated */
int (*fd_close)(int); /* Descriptor closed */
int (*fd_getlimit)(void); /* Descriptor hard limit */
} _st_eventsys_t;至此准备好了选择的网络处理模型;
线程初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
if((r0 = st_init()) != 0){
return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
}
--》初始化st_io_init: 主要是做进程的rlimit等配置
--》*_st_eventsys->init)() 创建 epoll_create
-->创建idle 协程并启动:
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
。。。
/* Merge from https://github.com/michaeltalyansky/state-threads/commit/cce736426c2320ffec7c9820df49ee7a18ae638c */
volatile void * lsp = PTR_MANGLE(stack->sp);
if (_setjmp ((thread)->context))
_st_thread_main();
(thread)->context[0].__jmpbuf[8] = (long) (lsp);
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
_ST_INIT_CONTEXT(thread, stack->sp, stack->bsp, _st_thread_main);
}
执行: 以下函数,会切换上下文,这样即使启动,也能回去;
/*
* Start function for the idle thread
*/
/* ARGSUSED */
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
/* Idle vp till I/O is ready or the smallest timeout expired */
_ST_VP_IDLE();
/* Check sleep queue for expired threads */
_st_vp_check_clock();
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
/* No more threads */
exit(0);
/* NOTREACHED */
return NULL;
}下面是初始化,监听,启动各个服务的协程;run_directly_or_daemon
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
641 检测是否是docker环境,是则用
2 是否是后台使用?
3 创建子进程来运行,其他父,祖父进程稍后退出,最后只有子进程:
[2021-08-04 15:36:35.845][Warn][11265][b2cf05o9][22] SRS/4.0.146 is not stable
[2021-08-04 15:36:35.845][Trace][11265][b2cf05o9] start daemon mode...
[2021-08-04 15:36:35.845][Trace][11266][b2cf05o9] father process exit
[2021-08-04 15:36:35.845][Trace][11267][b2cf05o9] son(daemon) process running.
[2021-08-04 15:36:35.845][Trace][11265][b2cf05o9] grandpa process exit.
4 关键函数:run_hybrid_server
run_hybrid_server 分析:
1)注册各种服务: http,rtmp,rtc服务等
2)初始化:各种定时器,和遍历每个服务的初始化函数
3) _srs_hybrid->run() 遍历每个服务进行 运行;
这几个服务由他们包装:
SrsServerAdapter--》实际服务:new SrsServer();--》rtmp and http
SrtServerAdapter->...
RtcServerAdapter->...
已srsServerAdapter为例:
srs_error_t SrsServerAdapter::run()
{
srs_error_t err = srs_success;
// Initialize the whole system, set hooks to handle server level events.
if ((err = srs->initialize(NULL)) != srs_success) {
return srs_error_wrap(err, "server initialize");
}
if ((err = srs->initialize_st()) != srs_success) {
return srs_error_wrap(err, "initialize st");
}
if ((err = srs->acquire_pid_file()) != srs_success) {
return srs_error_wrap(err, "acquire pid file");
}
if ((err = srs->initialize_signal()) != srs_success) {
return srs_error_wrap(err, "initialize signal");
}
if ((err = srs->listen()) != srs_success) { //监听:监听时会一层层传进去:->SrsTcpListen->srs_tcp_listen->do_srs_tcp_listen->srs_netfd_open_socket ,貌似监听的没有加到epoll中?
return srs_error_wrap(err, "listen");
}
if ((err = srs->register_signal()) != srs_success) {
return srs_error_wrap(err, "register signal");
}
if ((err = srs->http_handle()) != srs_success) {
return srs_error_wrap(err, "http handle");
}
if ((err = srs->ingest()) != srs_success) {
return srs_error_wrap(err, "ingest");
}
if ((err = srs->start()) != srs_success) { //启动协程:-> trd_->start()->SrsSTCoroutine->start()->SrsFastCoroutine->start->_pfn_st_thread_create 创建协程和启动
return srs_error_wrap(err, "start");
}
return err;
}至此运行起来
epoll和st_thread如何结合?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
511 将st_thread相关信息等放到epoll: 当连接,accept,发送接收数据等,可能会调用如下函数:将fd设置到epoll中;
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);//同时设置这个结构,并将其加到队列中;
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
/* If we timed out, the pollq might still be on the ioq. Remove it */
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
/* Count the number of ready descriptors */
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}当epoll事件触发时:其实是通过切换协程,再判断epoll_wait:
1 | #define _ST_VP_IDLE() (*_st_eventsys->dispatch)() |
gdb调试:
调试推流:ffmpeg -re -i ./doc/source.flv -c copy -f flv -y rtmp://localhost/live/livestream
调试方式1:适用于单步,打断点,缺点,可能会因超时网络断开等需要重新开始;
1
2
3
4
5gdb ./objs/srs
set args -c ./conf/srs.conf
break run_hybrid_server
set detach-on-fork off
set follow-fork-mode child调试方式2:适用于持续运行
窗口1: ./srs -c ../conf/srs.conf
窗口2: gdbserver –attach 127.0.0.1:1234pidof srs
窗口3: gdb –command=command //指定文件名为command,无后缀名。
一个command举例
1 | file srs |
gdb调试一个场景代码流程分析:
rtmp推流,webrtc浏览器拉流播放:
1 接收sdp(以http发送,端口是1988,然后建立session,创建dtls ,并交换秘钥和握手;
接着再进行srtp的数据交换;
1 | SrsHttpCorsMux::serve_http |
2 sdp: 客户端web->向server 发起http连接,传递sdp等信息
1 | srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res) |
1 | [2021-08-11 18:22:29.490][Trace][39330][5mkn5262] RTC: session address init 172.21.0.5:60276 |
3 订阅过程:
1 | 在收到http请求后,走到srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res) |
4 srtp加密,传输;
传输数据;调用流程;
1 | srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* audio) |