服务器启动

ZLMediaKit所有服务的启动都是在server/main.cpp中启动,下面是启动webrtc的源码,其主要执行以下:

创建UDP服务设置socket连接时的回调,主要为了连接迁移,将会话绑定到socket中启动UDP

#if defined(ENABLE_WEBRTC)

//webrtc udp服务器

auto rtcSrv = std::make_shared();

rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {

if (!buf) {

return Socket::createSocket(poller, false);

}

auto new_poller = WebRtcSession::queryPoller(buf);

if (!new_poller) {

//该数据对应的webrtc对象未找到,丢弃之

return Socket::Ptr();

}

// 创建socket对象

return Socket::createSocket(new_poller, false);

});

uint16_t rtcPort = mINI::Instance()[RTC::kPort];

#endif//defined(ENABLE_WEBRTC)

...

#if defined(ENABLE_WEBRTC)

//webrtc udp服务器

if (rtcPort) { rtcSrv->start(rtcPort); }

#endif//defined(ENABLE_WEBRTC)

UDP收包

UdpServer::createSession我们可以在这里打个断点,接收到UDP包时会调到这边

toolkit::UdpServer::createSession UdpServer.cpp:216

toolkit::UdpServer::getOrCreateSession UdpServer.cpp:210

toolkit::UdpServer::onRead_l UdpServer.cpp:145

toolkit::UdpServer::onRead UdpServer.cpp:129

toolkit::UdpServer::::operator()(const toolkit::Buffer::Ptr &, sockaddr *, int) const UdpServer.cpp:49

std::_Function_handler &, sockaddr *, int), >::_M_invoke(const std::_Any_data &, const std::shared_ptr &, sockaddr *&&, int &&) std_function.h:300

std::function &, sockaddr *, int)>::operator()(const std::shared_ptr &, sockaddr *, int) const std_function.h:688

toolkit::Socket::onRead Socket.cpp:318

toolkit::Socket::::operator()(int) const Socket.cpp:259

std::_Function_handler >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300

std::function::operator()(int) const std_function.h:688

toolkit::EventPoller::runLoop EventPoller.cpp:304

std::__invoke_impl invoke.h:73

std::__invoke invoke.h:95

std::thread::_Invoker >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244

std::thread::_Invoker >::operator() thread:251

std::thread::_State_impl > >::_M_run thread:195

0x00007ffff7b0ade4

start_thread 0x00007ffff7c1f609

clone 0x00007ffff77f8293

从堆栈我们可以分析出UDP包接收流程如下:

toolkit::EventPoller::runLoop EventPoller.cpp:304:进行事件轮询toolkit::Socket::::operator()(int) const Socket.cpp:259:触发读事件toolkit::UdpServer::onRead_l UdpServer.cpp:145: 接收到UDP的处理

void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {

// udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数

bool is_new = false;

if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {

if (session->getPoller()->isCurrentThread()) {

//当前线程收到数据,直接处理数据

emitSessionRecv(session, buf);

} else {

//数据漂移到其他线程,需要先切换线程

WarnL << "udp packet incoming from other thread";

std::weak_ptr weak_session = session;

//由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下

auto cacheable_buf = std::make_shared(buf->toString());

session->async([weak_session, cacheable_buf]() {

if (auto strong_session = weak_session.lock()) {

emitSessionRecv(strong_session, cacheable_buf);

}

});

}

#if !defined(NDEBUG)

if (!is_new) {

TraceL << "udp packet incoming from " << (is_server_fd ? "server fd" : "other peer fd");

}

#endif

}

}

判断当前会话是否存在,不存在则创建

const Session::Ptr &UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {

{

//减小临界区

std::lock_guard lock(*_session_mutex);

auto it = _session_map->find(id);

if (it != _session_map->end()) {

return it->second->session();

}

}

is_new = true;

return createSession(id, buf, addr, addr_len);

}

创建会话

const Session::Ptr &UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {

auto socket = createSocket(_poller, buf, addr, addr_len);

if (!socket) {

//创建socket失败,本次onRead事件收到的数据直接丢弃

return s_null_session;

}

auto addr_str = string((char *) addr, addr_len);

std::weak_ptr weak_self = std::dynamic_pointer_cast(shared_from_this());

auto session_creator = [this, weak_self, socket, addr_str, id]() -> const Session::Ptr & {

auto server = weak_self.lock();

if (!server) {

return s_null_session;

}

//如果已经创建该客户端对应的UdpSession类,那么直接返回

lock_guard lck(*_session_mutex);

auto it = _session_map->find(id);

if (it != _session_map->end()) {

return it->second->session();

}

socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());

socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());

//在connect peer后再取消绑定关系, 避免在 server 的 socket 或其他cloned server中收到后续数据包.

SockUtil::dissolveUdpSock(_socket->rawFD());

auto helper = _session_alloc(server, socket);

auto session = helper->session();

// 把本服务器的配置传递给 Session

session->attachServer(*this);

std::weak_ptr weak_session = session;

socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {

auto strong_self = weak_self.lock();

if (!strong_self) {

return;

}

//快速判断是否为本会话的的数据, 通常应该成立

if (id == makeSockId(addr, addr_len)) {

if (auto strong_session = weak_session.lock()) {

emitSessionRecv(strong_session, buf);

}

return;

}

//收到非本peer fd的数据,让server去派发此数据到合适的session对象

strong_self->onRead_l(false, id, buf, addr, addr_len);

});

socket->setOnErr([weak_self, weak_session, id](const SockException &err) {

// 在本函数作用域结束时移除会话对象

// 目的是确保移除会话前执行其 onError 函数

// 同时避免其 onError 函数抛异常时没有移除会话对象

onceToken token(nullptr, [&]() {

// 移除掉会话

auto strong_self = weak_self.lock();

if (!strong_self) {

return;

}

//从共享map中移除本session对象

lock_guard lck(*strong_self->_session_mutex);

strong_self->_session_map->erase(id);

});

// 获取会话强应用

if (auto strong_session = weak_session.lock()) {

// 触发 onError 事件回调

strong_session->onError(err);

}

});

auto pr = _session_map->emplace(id, std::move(helper));

assert(pr.second);

return pr.first->second->session();

};

if (socket->getPoller()->isCurrentThread()) {

//该socket分配在本线程,直接创建session对象,并处理数据

return session_creator();

}

//该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据

auto cacheable_buf = std::make_shared(buf->toString());

socket->getPoller()->async([session_creator, cacheable_buf]() {

//在该socket所在线程创建session对象

auto session = session_creator();

if (session) {

//该数据不能丢弃,给session对象消费

emitSessionRecv(session, cacheable_buf);

}

});

return s_null_session;

}

Webrtc数据处理

我们在WebRtcTransport::inputSockData打个断点,此处是webrtc报文的处理的地方:

WebRtcTransport::inputSockData WebRtcTransport.cpp:281

WebRtcSession::onRecv WebRtcSession.cpp:70

toolkit::emitSessionRecv UdpServer.cpp:134

toolkit::UdpServer::::operator()(void) const UdpServer.cpp:304

std::_Function_handler >::_M_invoke(const std::_Any_data &) std_function.h:300

std::function::operator()() const std_function.h:688

toolkit::TaskCancelableImp::operator()() const TaskExecutor.h:111

::operator()(const std::shared_ptr > &) const EventPoller.cpp:237

toolkit::List > >::for_each< >( &&) List.h:203

toolkit::EventPoller::onPipeEvent EventPoller.cpp:235

toolkit::EventPoller::::operator()(int) const EventPoller.cpp:67

std::_Function_handler >::_M_invoke(const std::_Any_data &, int &&) std_function.h:300

std::function::operator()(int) const std_function.h:688

toolkit::EventPoller::runLoop EventPoller.cpp:304

std::__invoke_impl invoke.h:73

std::__invoke invoke.h:95

std::thread::_Invoker >::_M_invoke<0ul, 1ul, 2ul, 3ul> thread:244

std::thread::_Invoker >::operator() thread:251

std::thread::_State_impl > >::_M_run thread:195

0x00007ffff7b0ade4

start_thread 0x00007ffff7c1f609

clone 0x00007ffff77f8293

此处会对报文进行分析,判断报文属于那种类型并对其及逆行处理,报文类型主要有:

Stun:处理Dtls:处理DTLS握手RTP:音视频RTCP:会话控制

void WebRtcTransport::inputSockData(char *buf, int len, RTC::TransportTuple *tuple) {

// 校验是否是stun

if (RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {

std::unique_ptr packet(RTC::StunPacket::Parse((const uint8_t *) buf, len));

if (!packet) {

WarnL << "parse stun error" << std::endl;

return;

}

_ice_server->ProcessStunPacket(packet.get(), tuple);

return;

}

// 校验是否是DTLS

if (is_dtls(buf)) {

_dtls_transport->ProcessDtlsData((uint8_t *) buf, len);

return;

}

// 校验是否是RTP

if (is_rtp(buf)) {

if (!_srtp_session_recv) {

WarnL << "received rtp packet when dtls not completed from:" << getPeerAddress(tuple);

return;

}

// SRTP解密

if (_srtp_session_recv->DecryptSrtp((uint8_t *) buf, &len)) {

// 处理RTP报文

onRtp(buf, len, _ticker.createdTime());

}

return;

}

// 校验是否是RTCP

if (is_rtcp(buf)) {

if (!_srtp_session_recv) {

WarnL << "received rtcp packet when dtls not completed from:" << getPeerAddress(tuple);

return;

}

// SRTP解密

if (_srtp_session_recv->DecryptSrtcp((uint8_t *) buf, &len)) {

// 处理RTCP报文

onRtcp(buf, len);

}

return;

}

}

推荐一个零声学院免费公开课程,个人觉得老师讲得不错, 分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis, fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker, TCP/IP,协程,DPDK等技术内容,点击立即学习]

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: