CS 144 Lab Four 收尾 -- 网络交互全流程解析

引言Tun/Tap简介tcp_ipv4.cc文件配置信息初始化cs144实现的fd家族体系基于自定义fd体系进行数据读写的adapter适配器体系自定义socket体系自定义事件循环EventLoop模板类TCPSpongeSocket详解listen_and_accept方法_tcp_main方法_initialize_TCP初始化Tcp连接和事件循环_tcp_loop函数启动tcp事件循环

connect 方法

bidirectional_stream_copy方法TCPSpongeSocket的wait_until_closed方法

通道串联起子主线程

小结

对应课程视频: 【计算机网络】 斯坦福大学CS144课程

本节作为Lab Four的收尾,主要带领各位来看看网络交互的整体流程是怎样的。

引言

这里以tcp_ipv4.cc文件为起点,来探究一下cs144是如何实现整个协议栈的。

首先,项目根路径中的 tun.sh 会使用 ip tuntap 技术创建虚拟 Tun/Tap 网络设备。这类接口仅能工作在内核中。不同于普通的网络接口,没有物理硬件。这样做的目的应该是为了模拟真实网络环境下的网络环境。

Tun/Tap简介

关于Tun/Tap的介绍可以参考:

虚拟设备之TUN和TAPLinux官方内核文档: Tun/Tap驱动程序说明

TUN/TAP提供了用户空间程序的数据包接收和传输功能。

它可以被视为一个简单的点对点或以太网设备,不是从物理媒体接收数据包,而是从用户空间程序接收数据包,并且不是通过物理媒体发送数据包,而是将数据包写入用户空间程序。

为了使用驱动程序,程序必须打开/dev/net/tun,并发出相应的ioctl()来向内核注册一个网络设备。网络设备将显示为tunXX或tapXX,这取决于所选择的选项。当程序关闭文件描述符时,网络设备和所有相应的路由都将消失。

根据所选择的设备类型,用户空间程序必须读取/写入IP数据包(对于tun)或以太网帧(对于tap),使用哪种取决于ioctl()给定的标志。

TUN 是一个虚拟网络设备,它模拟的是一个三层设备,通过它可以处理来自网络层的数据包,也就是 IP 数据包。由于它只模拟到了 IP 层,所以它无法与物理网卡做 bridge,也没有 MAC 地址,但是可以通过三层交换的方式来与物理网卡相互通信。TAP 模拟的是一个二层设备,它比 TUN 更加深入,它可以处理数据链路层的数据包,拥有 MAC 地址,可以与物理网卡做 bridge,支持 MAC 层广播,也可以给它设置 IP 地址。

tcp_ipv4.cc文件

当 Tun/Tap 网络设备建立好后,接下来我们进入到 tcp_ipv4.cc 的main函数中:

int main(int argc, char **argv) {

try {

// 参数个数检查: 第一个参数是编译器传入的程序名,然后是我们需要传入的host和port

if (argc < 3) {

show_usage(argv[0], "ERROR: required arguments are missing.");

return EXIT_FAILURE;

}

// 解析参数,获取TCPConfig,FdAdapterConfig,当前启动的模式(server or client) 和 选择哪个网卡

auto [c_fsm, c_filt, listen, tun_dev_name] = get_config(argc, argv);

// 借助Tun/Tap实现一个虚拟网卡,该虚拟网络设备实现到了IP层

// TunFD是tun设备的文件描述符

// TCPOverIPv4OverTunFdAdapter封装从tun设备读取和写入IPV4数据报的操作

// LossyTCPOverIPv4OverTunFdAdapter采用装饰器模式在前者基础上,增加写入时根据先前设置的丢包率随机丢包的功能

// LossyTCPOverIPv4SpongeSocket 对上层提供一个标准Socket接口,进行调用

LossyTCPOverIPv4SpongeSocket tcp_socket(LossyTCPOverIPv4OverTunFdAdapter(

TCPOverIPv4OverTunFdAdapter(TunFD(tun_dev_name == nullptr ? TUN_DFLT : tun_dev_name))));

// 如果启动的是server mode,那么在监听指定端口上的消息

if (listen) {

tcp_socket.listen_and_accept(c_fsm, c_filt);

} else {

// 如果启动的是client mode,那么主动与对应server建立连接

tcp_socket.connect(c_fsm, c_filt);

}

// 键盘输入的数据会写入socket,socket有可读的数据会输出到屏幕上

bidirectional_stream_copy(tcp_socket);

// 同步等待直到_tcp_thread线程结束

tcp_socket.wait_until_closed();

} catch (const exception &e) {

cerr << "Exception: " << e.what() << endl;

return EXIT_FAILURE;

}

return EXIT_SUCCESS;

}

配置信息初始化

下面给出get_config方法源码解析,感兴趣可以瞅两眼:

//! Config for TCP sender and receiver

class TCPConfig {

public:

// 发送器和接收器缓冲区的默认容量。缓冲区容量指的是在给定时间内可以存储的最大数据量

static constexpr size_t DEFAULT_CAPACITY = 64000; //!< Default capacity

// tcp数据报中payload部分最大容量限制

static constexpr size_t MAX_PAYLOAD_SIZE = 1000; //!< Conservative max payload size for real Internet

// 默认的重传超时时间,以毫秒为单位。

// 当TCP发送器向接收器传输数据时,它期望在规定的超时时间内收到一个确认(ACK)。如果发送器在超时时间内没有收到确认,它会重新传输数据

static constexpr uint16_t TIMEOUT_DFLT = 1000; //!< Default re-transmit timeout is 1 second

// 数据包在放弃之前允许的最大重传次数。如果发送器在经过指定的重传尝试次数后仍未收到确认,它会认为连接不可靠并采取适当的措施

static constexpr unsigned MAX_RETX_ATTEMPTS = 8; //!< Maximum re-transmit attempts before giving up

// 用于保存重传超时的初始值,以毫秒为单位。它指定发送器在重新传输数据之前应等待ACK的时间

// 由于重传超时时间会在网络拥塞的时候动态增加,因此当重置超时重传计数器时,需要将重传超时时间恢复为初始值

uint16_t rt_timeout = TIMEOUT_DFLT; //!< Initial value of the retransmission timeout, in milliseconds

// 接收和发送缓冲区默认大小

size_t recv_capacity = DEFAULT_CAPACITY; //!< Receive capacity, in bytes

size_t send_capacity = DEFAULT_CAPACITY; //!< Sender capacity, in bytes

// 初始序列号,如果没有设置,那么会采用随机值策略

std::optional fixed_isn{};

};

//! Config for classes derived from FdAdapter

class FdAdapterConfig {

public:

// 源ip地址和端口号

Address source{"0", 0}; //!< Source address and port

// 目的ip地址和端口号

Address destination{"0", 0}; //!< Destination address and port

// 下行丢包率,即从服务器发往客户端的数据包丢失的概率

uint16_t loss_rate_dn = 0; //!< Downlink loss rate (for LossyFdAdapter)

// 上行丢包率,即从客户端发往服务器的数据包丢失的概率

uint16_t loss_rate_up = 0; //!< Uplink loss rate (for LossyFdAdapter)

};

static tuple get_config(int argc, char **argv) {

TCPConfig c_fsm{};

FdAdapterConfig c_filt{};

char *tundev = nullptr;

int curr = 1;

bool listen = false;

// 如果我们不指定Host和Port,那么使用默认提供的ip地址和随机端口号

string source_address = LOCAL_ADDRESS_DFLT;

string source_port = to_string(uint16_t(random_device()()));

// 判断是否传入了相关参数,保留最后两个host和port值

while (argc - curr > 2) {

// 打开server端的Listen模式

if (strncmp("-l", argv[curr], 3) == 0) {

listen = true;

curr += 1;

} else if (strncmp("-a", argv[curr], 3) == 0) {

// -a 用来指定自己的ip地址

check_argc(argc, argv, curr, "ERROR: -a requires one argument.");

source_address = argv[curr + 1];

curr += 2;

} else if (strncmp("-s", argv[curr], 3) == 0) {

// -s 用来指定自己的端口号

check_argc(argc, argv, curr, "ERROR: -s requires one argument.");

source_port = argv[curr + 1];

curr += 2;

} else if (strncmp("-w", argv[curr], 3) == 0) {

// -w 用来指定自己接收窗口大小

check_argc(argc, argv, curr, "ERROR: -w requires one argument.");

c_fsm.recv_capacity = strtol(argv[curr + 1], nullptr, 0);

curr += 2;

} else if (strncmp("-t", argv[curr], 3) == 0) {

// -t 指定RTO超时时间

check_argc(argc, argv, curr, "ERROR: -t requires one argument.");

c_fsm.rt_timeout = strtol(argv[curr + 1], nullptr, 0);

curr += 2;

} else if (strncmp("-d", argv[curr], 3) == 0) {

// -d 指定要连接的tundev也就是网卡

check_argc(argc, argv, curr, "ERROR: -t requires one argument.");

tundev = argv[curr + 1];

curr += 2;

} else if (strncmp("-Lu", argv[curr], 3) == 0) {

// -Lu 此选项设置上行丢包率,即从客户端发往服务器的数据包丢失的概率

check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");

float lossrate = strtof(argv[curr + 1], nullptr);

using LossRateUpT = decltype(c_filt.loss_rate_up);

c_filt.loss_rate_up =

static_cast(static_cast(numeric_limits::max()) * lossrate);

curr += 2;

} else if (strncmp("-Ld", argv[curr], 3) == 0) {

// -Ld 此选项设置下行丢包率,即从服务器发往客户端的数据包丢失的概率

check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");

float lossrate = strtof(argv[curr + 1], nullptr);

using LossRateDnT = decltype(c_filt.loss_rate_dn);

c_filt.loss_rate_dn =

static_cast(static_cast(numeric_limits::max()) * lossrate);

curr += 2;

} else if (strncmp("-h", argv[curr], 3) == 0) {

// -h 显示提示信息

show_usage(argv[0], nullptr);

exit(0);

} else {

show_usage(argv[0], string("ERROR: unrecognized option " + string(argv[curr])).c_str());

exit(1);

}

}

// parse positional command-line arguments

// 是否打开了server端LISTEN模式

if (listen) {

// 说明当前启动的是server端 --> 从参数中获取监听端口号

// 将过滤器的源地址配置为 "0"(表示监听所有本地网络接口的地址)

c_filt.source = {"0", argv[curr + 1]};

if (c_filt.source.port() == 0) {

show_usage(argv[0], "ERROR: listen port cannot be zero in server mode.");

exit(1);

}

} else {

// 说明当前启动的是client端 -- 目的ip地址和端口号从最后两个参数获取

c_filt.destination = {argv[curr], argv[curr + 1]};

// 我们可以通过-a或者-s参数指定启动的客户端监听的ip地址和端口

c_filt.source = {source_address, source_port};

}

return make_tuple(c_fsm, c_filt, listen, tundev);

}

cs144实现的fd家族体系

main函数中会建立一个 TCPOverIPv4OverTunFdAdapter。TunFd指的是连接进 Tun 设备上的 socket :

TunFD具体应用可以看app/tun.cc :

int main() {

try {

TunFD tun("tun144");

while (true) {

auto buffer = tun.read();

cout << "\n\n***\n*** Got packet:\n***\n";

hexdump(buffer.data(), buffer.size());

IPv4Datagram ip_dgram;

cout << "attempting to parse as ipv4 datagram... ";

if (ip_dgram.parse(move(buffer)) != ParseResult::NoError) {

cout << "failed.\n";

continue;

}

cout << "success! totlen=" << ip_dgram.header().len << ", IPv4 header contents:\n";

cout << ip_dgram.header().to_string();

if (ip_dgram.header().proto != IPv4Header::PROTO_TCP) {

cout << "\nNot TCP, skipping.\n";

continue;

}

cout << "\nAttempting to parse as a TCP segment... ";

TCPSegment tcp_seg;

if (tcp_seg.parse(ip_dgram.payload(), ip_dgram.header().pseudo_cksum()) != ParseResult::NoError) {

cout << "failed.\n";

continue;

}

cout << "success! payload len=" << tcp_seg.payload().size() << ", TCP header contents:\n";

cout << tcp_seg.header().to_string() << endl;

}

} catch (const exception &e) {

cout << "Exception: " << e.what() << endl;

return EXIT_FAILURE;

}

return EXIT_SUCCESS;

}

基于自定义fd体系进行数据读写的adapter适配器体系

TCPOverIPv4OverTunFdAdapter是一个 IP 层面的封装接口。当调用 adapter 向其写入 TCP 报文段时,它会自动 wrap 上 IP 段并传输进网络设备中;读取也是亦然,会自动解除 IP 段并返回其内部封装的 TCP报文段:

// A FD adapter for IPv4 datagrams read from and written to a TUN device

class TCPOverIPv4OverTunFdAdapter : public TCPOverIPv4Adapter {

private:

TunFD _tun;

public:

//! Construct from a TunFD

explicit TCPOverIPv4OverTunFdAdapter(TunFD &&tun) : _tun(std::move(tun)) {}

//! Attempts to read and parse an IPv4 datagram containing a TCP segment related to the current connection

// 从tun设备返回的以太网帧中解析得到ip数据报

std::optional read() {

InternetDatagram ip_dgram;

if (ip_dgram.parse(_tun.read()) != ParseResult::NoError) {

return {};

}

// 去除ip头,返回tcp报文段

return unwrap_tcp_in_ip(ip_dgram);

}

//! Creates an IPv4 datagram from a TCP segment and writes it to the TUN device

// 将写入的tcp报文段添加上ip头后写入tun设备

void write(TCPSegment &seg) { _tun.write(wrap_tcp_in_ip(seg).serialize()); }

//! Access the underlying TUN device

operator TunFD &() { return _tun; }

//! Access the underlying TUN device

operator const TunFD &() const { return _tun; }

};

LossyTCPOverIPv4OverTunFdAdapter本身由模板类LossyFdAdapter实例化而来,该模板类通过装饰器模式对内部持有的Adapter进行功能增强,主要增加在读写数据时,根据先前设置丢包率来判断是否丢弃此次的数据报:

template

class LossyFdAdapter {

private:

//! Fast RNG used by _should_drop()

std::mt19937 _rand{get_random_generator()};

//! The underlying FD adapter

AdapterT _adapter;

...

bool _should_drop(bool uplink) {

const auto &cfg = _adapter.config();

const uint16_t loss = uplink ? cfg.loss_rate_up : cfg.loss_rate_dn;

return loss != 0 && uint16_t(_rand()) < loss;

}

//! \brief Read from the underlying AdapterT instance, potentially dropping the read datagram

//! \returns std::optional that is empty if the segment was dropped or if

//! the underlying AdapterT returned an empty value

std::optional read() {

auto ret = _adapter.read();

if (_should_drop(false)) {

return {};

}

return ret;

}

//! \brief Write to the underlying AdapterT instance, potentially dropping the datagram to be written

//! \param[in] seg is the packet to either write or drop

void write(TCPSegment &seg) {

if (_should_drop(true)) {

return;

}

return _adapter.write(seg);

}

...

};

自定义socket体系

cs144中封装的Socket继承体系如下所示:

socket的read/write接口都位于顶层FileDescriptor父类中:

//! \param[in] limit is the maximum number of bytes to read; fewer bytes may be returned

//! \param[out] str is the string to be read

// 通过系统调用read从fd对应的设备或者文件中读取数据

void FileDescriptor::read(std::string &str, const size_t limit) {

constexpr size_t BUFFER_SIZE = 1024 * 1024; // maximum size of a read

const size_t size_to_read = min(BUFFER_SIZE, limit);

str.resize(size_to_read);

ssize_t bytes_read = SystemCall("read", ::read(fd_num(), str.data(), size_to_read));

if (limit > 0 && bytes_read == 0) {

_internal_fd->_eof = true;

}

if (bytes_read > static_cast(size_to_read)) {

throw runtime_error("read() read more than requested");

}

str.resize(bytes_read);

register_read();

}

// 通过write系统调用向fd对应的设备或者文件中写入数据

size_t FileDescriptor::write(BufferViewList buffer, const bool write_all) {

size_t total_bytes_written = 0;

do {

auto iovecs = buffer.as_iovecs();

const ssize_t bytes_written = SystemCall("writev", ::writev(fd_num(), iovecs.data(), iovecs.size()));

if (bytes_written == 0 and buffer.size() != 0) {

throw runtime_error("write returned 0 given non-empty input buffer");

}

if (bytes_written > ssize_t(buffer.size())) {

throw runtime_error("write wrote more than length of input buffer");

}

register_write();

buffer.remove_prefix(bytes_written);

total_bytes_written += bytes_written;

} while (write_all and buffer.size());

return total_bytes_written;

}

自定义事件循环EventLoop

cs144在Linux提供的多路复用模型Poll基础上进行封装,造出了一个简易版本的事件循环机制EventLoop:

//! Waits for events on file descriptors and executes corresponding callbacks.

class EventLoop {

public:

// 对fd的读事件还是写事件感兴趣

enum class Direction : short {

In = POLLIN,

Out = POLLOUT

};

private:

using CallbackT = std::function;

using InterestT = std::function;

// 内部类Rule,说白了就是持有用户对哪个fd的那些事件感兴趣的信息载体

// 同时持有对应事件发生和取消时的回调接口

class Rule {

public:

FileDescriptor fd;

Direction direction;

// 发生感兴趣事件的时候回调该接口

CallbackT callback;

// 返回值决定当前fd是否需要被监听

InterestT interest;

// 当对应fd关闭,出错时,回调该接口

CallbackT cancel;

// 根据direction的不同返回当前fd已经被读取或者写入了多少次

unsigned int service_count() const;

};

// 用户注册的感兴趣的事件集合

std::list _rules{};

public:

// 事件监听的返回结果

enum class Result {

Success, // At least one Rule was triggered.

Timeout, // No rules were triggered before timeout.

Exit // All rules have been canceled or were uninterested; make no further calls to EventLoop::wait_next_event.

};

// 用户添加感兴趣的事件

void add_rule(

const FileDescriptor &fd,

const Direction direction,

const CallbackT &callback,

const InterestT &interest = [] { return true; },

const CallbackT &cancel = [] {});

// 等待下一个感兴趣的事件发生 --- 参数是等待超时时间

Result wait_next_event(const int timeout_ms);

};

add_rule函数: 注册感兴趣的事件

void EventLoop::add_rule(const FileDescriptor &fd,

const Direction direction,

const CallbackT &callback,

const InterestT &interest,

const CallbackT &cancel) {

_rules.push_back({fd.duplicate(), direction, callback, interest, cancel});

}

service_count函数: 当前fd已经被读取或者写入了多少次

unsigned int EventLoop::Rule::service_count() const {

return direction == Direction::In ? fd.read_count() : fd.write_count();

}

wait_next_event函数: 等待获取下一个发生的感兴趣的事件

EventLoop::Result EventLoop::wait_next_event(const int timeout_ms) {

vector pollfds{};

pollfds.reserve(_rules.size());

bool something_to_poll = false;

// set up the pollfd for each rule

// 遍历所有Rule

for (auto it = _rules.cbegin(); it != _rules.cend();) { // NOTE: it gets erased or incremented in loop body

const auto &this_rule = *it;

// 如果当前rule期望从fd中读取数据,并且此时fd已经没有数据可以读取了,那么回调当前rule的cacel回调接口

// 并且将当前rule从已有的rule集合中移除

if (this_rule.direction == Direction::In && this_rule.fd.eof()) {

// no more reading on this rule, it's reached eof

this_rule.cancel();

it = _rules.erase(it);

continue;

}

// 如果当前fd关闭了,同上处理

if (this_rule.fd.closed()) {

this_rule.cancel();

it = _rules.erase(it);

continue;

}

// 判断是否对当前rule感兴趣,如果感兴趣则加入pollfds进入下面事件轮询阶段

if (this_rule.interest()) {

// pollfd由三个属性: 需要轮询的fd,是对fd的可读还是可写事件感兴趣,实际发生了什么事件

pollfds.push_back({this_rule.fd.fd_num(), static_cast(this_rule.direction), 0});

something_to_poll = true;

} else {

// 为了保持 pollfds 数组和规则列表 _rules 中的规则一一对应,仍然需要将一个 pollfd 结构体添加到 pollfds 数组中

// 但是对应的事件设置为 0,表示不关注任何事件,相当于占位符

pollfds.push_back({this_rule.fd.fd_num(), 0, 0}); // placeholder --- we still want errors

}

++it;

}

// quit if there is nothing left to poll --- 没有任何rule需要轮询

if (not something_to_poll) {

return Result::Exit;

}

// call poll -- wait until one of the fds satisfies one of the rules (writeable/readable)

try {

// 通过调用poll对pollfds集合中所有pollfd开启事件轮询

// 最后一个参数: 如果没有感兴趣事件发生,最多轮询等待多久

if (0 == SystemCall("poll", ::poll(pollfds.data(), pollfds.size(), timeout_ms))) {

return Result::Timeout;

}

} catch (unix_error const &e) {

if (e.code().value() == EINTR) {

return Result::Exit;

}

}

// go through the poll results

// 遍历poll结果 -- rules和pollfds集合索引是一一对应的

for (auto [it, idx] = make_pair(_rules.begin(), size_t(0)); it != _rules.end(); ++idx) {

const auto &this_pollfd = pollfds[idx];

// revents保存着实际发生的事件 -- 是否发生错误

const auto poll_error = static_cast(this_pollfd.revents & (POLLERR | POLLNVAL));

if (poll_error) {

throw runtime_error("EventLoop: error on polled file descriptor");

}

const auto &this_rule = *it;

// 获取发生了哪些感兴趣的事件

const auto poll_ready = static_cast(this_pollfd.revents & this_pollfd.events);

// 当描述符关闭时或者对端连接关闭时,会设置描述符挂起事件

const auto poll_hup = static_cast(this_pollfd.revents & POLLHUP);

// 如果当前描述符被挂起了,那么将当前rule移除

if (poll_hup && this_pollfd.events && !poll_ready) {

// if we asked for the status, and the _only_ condition was a hangup, this FD is defunct:

// - if it was POLLIN and nothing is readable, no more will ever be readable

// - if it was POLLOUT, it will not be writable again

this_rule.cancel();

it = _rules.erase(it);

continue;

}

// 如果存在感兴趣的事件发生

if (poll_ready) {

// we only want to call callback if revents includes the event we asked for

const auto count_before = this_rule.service_count();

// 回调Rule对应的接口

this_rule.callback();

// only check for busy wait if we're not canceling or exiting

if (count_before == this_rule.service_count() and this_rule.interest()) {

throw runtime_error(

"EventLoop: busy wait detected: callback did not read/write fd and is still interested");

}

}

++it; // if we got here, it means we didn't call _rules.erase()

}

return Result::Success;

}

模板类TCPSpongeSocket详解

TCPSpongeSocket本身是一个模板类,再该模板类基础上衍生出大量实例化类型:

using TCPOverUDPSpongeSocket = TCPSpongeSocket;

using TCPOverIPv4SpongeSocket = TCPSpongeSocket;

using TCPOverIPv4OverEthernetSpongeSocket = TCPSpongeSocket;

using LossyTCPOverUDPSpongeSocket = TCPSpongeSocket;

using LossyTCPOverIPv4SpongeSocket = TCPSpongeSocket;

TCPSpongeSocket类中重要的属性如下所示:

//! Multithreaded wrapper around TCPConnection that approximates the Unix sockets API

template

class TCPSpongeSocket : public LocalStreamSocket {

private:

//! Stream socket for reads and writes between owner and TCP thread

LocalStreamSocket _thread_data;

protected:

//! Adapter to underlying datagram socket (e.g., UDP or IP)

AdaptT _datagram_adapter;

private:

//! Set up the TCPConnection and the event loop

void _initialize_TCP(const TCPConfig &config);

//! TCP state machine -- Lab Four实现的

std::optional _tcp{};

//! eventloop that handles all the events (new inbound datagram, new outbound bytes, new inbound bytes)

// 事件循环机制 -- 参考Select和Epoll模型

EventLoop _eventloop{};

//! Process events while specified condition is true

void _tcp_loop(const std::function &condition);

//! Main loop of TCPConnection thread

void _tcp_main();

//! Handle to the TCPConnection thread; owner thread calls join() in the destructor

std::thread _tcp_thread{};

//! Construct LocalStreamSocket fds from socket pair, initialize eventloop

TCPSpongeSocket(std::pair data_socket_pair, AdaptT &&datagram_interface);

std::atomic_bool _abort{false}; //!< Flag used by the owner to force the TCPConnection thread to shut down

bool _inbound_shutdown{false}; //!< Has TCPSpongeSocket shut down the incoming data to the owner?

bool _outbound_shutdown{false}; //!< Has the owner shut down the outbound data to the TCP connection?

bool _fully_acked{false}; //!< Has the outbound data been fully acknowledged by the peer?

...

listen_and_accept方法

我们先来看一下TCPSpongeSocket类的listen_and_accept方法实现,服务端会调用该方法进行端口监听:

//! \param[in] c_tcp is the TCPConfig for the TCPConnection

//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter

template

void TCPSpongeSocket::listen_and_accept(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {

if (_tcp) {

throw runtime_error("listen_and_accept() with TCPConnection already initialized");

}

// 初始化TCP连接和事件循环

_initialize_TCP(c_tcp);

_datagram_adapter.config_mut() = c_ad;

_datagram_adapter.set_listening(true);

cerr << "DEBUG: Listening for incoming connection...\n";

// 启动tcp事件循环,传入的函数为condition,其返回值决定事件循环是否继续

// 该事件循环只负责将连接建立起来,三次握手结束后,退出事件循环 -- 事务循环函数解析下面会给出

_tcp_loop([&] {

const auto s = _tcp->state();

return (s == TCPState::State::LISTEN or s == TCPState::State::SYN_RCVD or s == TCPState::State::SYN_SENT);

});

cerr << "New connection from " << _datagram_adapter.config().destination.to_string() << ".\n";

// _tcp_thread线程负责完成当前TCP连接后续数据传输,此时线程已经启动

_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);

}

_tcp_main方法

template

void TCPSpongeSocket::_tcp_main() {

try {

if (not _tcp.has_value()) {

throw runtime_error("no TCP");

}

// 开启tcp事件循环,不断运行,直到TCP连接断开

_tcp_loop([] { return true; });

// 关闭当前Socket

shutdown(SHUT_RDWR);

if (not _tcp.value().active()) {

cerr << "DEBUG: TCP connection finished "

<< (_tcp.value().state() == TCPState::State::RESET ? "uncleanly" : "cleanly.\n");

}

// 将optional里面保存的TCPConnection清空

_tcp.reset();

} catch (const exception &e) {

cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";

throw e;

}

}

_initialize_TCP初始化Tcp连接和事件循环

_initialize_TCP负责初始化tcp连接和事件循环:

template

void TCPSpongeSocket::_initialize_TCP(const TCPConfig &config) {

// 将tcpConfig设置到TCPConnection中

_tcp.emplace(config);

// Set up the event loop

// There are four possible events to handle:

//

// 1) Incoming datagram received (needs to be given to

// TCPConnection::segment_received method)

//

// 2) Outbound bytes received from local application via a write()

// call (needs to be read from the local stream socket and

// given to TCPConnection::data_written method)

//

// 3) Incoming bytes reassembled by the TCPConnection

// (needs to be read from the inbound_stream and written

// to the local stream socket back to the application)

//

// 4) Outbound segment generated by TCP (needs to be

// given to underlying datagram socket)

// rule 1: read from filtered packet stream and dump into TCPConnection

// 监听网络是否有数据报到达

_eventloop.add_rule(

// 监听的fd本质是tun设备

_datagram_adapter,

Direction::In,

// 当感兴趣事件发生时,会回调该接口

[&] {

// 从tun设备读取数据

auto seg = _datagram_adapter.read();

// 交给TcpConnection进行处理

if (seg) {

_tcp->segment_received(move(seg.value()));

}

// debugging output:

if (_thread_data.eof() and _tcp.value().bytes_in_flight() == 0 and not _fully_acked) {

cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()

<< " has been fully acknowledged.\n";

_fully_acked = true;

}

},

// 只要tcp连接还活跃,那么就继续轮询当前rule

[&] { return _tcp->active(); });

// rule 2: read from pipe into outbound buffer

// 监听应用程序是否有数据需要传输

_eventloop.add_rule(

// 监听_thread_data -- 竖立在应用程序和协议栈直接的数据传输通道

_thread_data,

Direction::In,

[&] {

// 应用程序向_thread_data中写入数据,然后通知协议栈有数据需要发送

// 根据tcp写入窗口剩余空闲大小读取指定的需要写出的数据量

const auto data = _thread_data.read(_tcp->remaining_outbound_capacity());

const auto len = data.size();

// 调用TCPConnection的write方法进行写出

const auto amount_written = _tcp->write(move(data));

if (amount_written != len) {

throw runtime_error("TCPConnection::write() accepted less than advertised length");

}

// 如果应用程序主动调用close关闭了_thread_data通道,那么tcp写入通道也可以关闭了

if (_thread_data.eof()) {

_tcp->end_input_stream();

// 输出通道关闭

_outbound_shutdown = true;

// debugging output:

cerr << "DEBUG: Outbound stream to " << _datagram_adapter.config().destination.to_string()

<< " finished (" << _tcp.value().bytes_in_flight() << " byte"

<< (_tcp.value().bytes_in_flight() == 1 ? "" : "s") << " still in flight).\n";

}

},

// 只要当前tcp连接还活跃并且输出通道还没有关闭并且当前tcp写入窗口大小不为0,就继续轮询当前rule

[&] { return (_tcp->active()) and (not _outbound_shutdown) and (_tcp->remaining_outbound_capacity() > 0); },

// fd发生错误时,回调该接口

[&] {

_tcp->end_input_stream();

_outbound_shutdown = true;

});

// rule 3: read from inbound buffer into pipe

// 监听是否有按序到达的字节流还未写入,同时_thread_data通道还未关闭,如果有则写入_thread_data通道

_eventloop.add_rule(

// 监听thread_data

_thread_data,

// 关注可写事件

Direction::Out,

[&] {

// 获取tcp接收器的读取流

ByteStream &inbound = _tcp->inbound_stream();

// Write from the inbound_stream into

// the pipe, handling the possibility of a partial

// write (i.e., only pop what was actually written).

// 一口气把所有已经按序达到的字节流全部读取出来

const size_t amount_to_write = min(size_t(65536), inbound.buffer_size());

const std::string buffer = inbound.peek_output(amount_to_write);

// 将读取出来的数据全部写入_thread_data管道中

const auto bytes_written = _thread_data.write(move(buffer), false);

// 已经成功被应用程序接收的字节流可以丢掉了

inbound.pop_output(bytes_written);

// 如果tcp进入四次挥手阶段或者断开连接了,那么关闭_thread_data管道

if (inbound.eof() or inbound.error()) {

_thread_data.shutdown(SHUT_WR);

_inbound_shutdown = true;

// debugging output:

cerr << "DEBUG: Inbound stream from " << _datagram_adapter.config().destination.to_string()

<< " finished " << (inbound.error() ? "with an error/reset.\n" : "cleanly.\n");

// 满足下面这个条件说明目前此端为客户端,并且进入了四次挥手的TIME_WAIT阶段

if (_tcp.value().state() == TCPState::State::TIME_WAIT) {

cerr << "DEBUG: Waiting for lingering segments (e.g. retransmissions of FIN) from peer...\n";

}

}

},

// 如果tcp接收器还存在按序到达的字节流没有读取,或者tcp_receiver还没有接收到FIN包,那么就继续轮询当前rule

[&] {

return (not _tcp->inbound_stream().buffer_empty()) or

((_tcp->inbound_stream().eof() or _tcp->inbound_stream().error()) and not _inbound_shutdown);

});

// rule 4: read outbound segments from TCPConnection and send as datagrams

// 监听TCPConnection是否有数据需要发送,如果有则发送,前提是_datagram_adapter可写

_eventloop.add_rule(

_datagram_adapter,

Direction::Out,

[&] {

// 如果TCPConnection的segments_out等待队列不为空,说明存在待传输的数据包

while (not _tcp->segments_out().empty()) {

// 写入segments_out,进行数据包的实际传输

_datagram_adapter.write(_tcp->segments_out().front());

_tcp->segments_out().pop();

}

},

// 只要segments_out不为空,就继续轮询当前rule

[&] { return not _tcp->segments_out().empty(); });

}

_tcp_loop函数启动tcp事件循环

_tcp_loop函数启动tcp事件循环:

//! \param[in] condition is a function returning true if loop should continue

template

void TCPSpongeSocket::_tcp_loop(const function &condition) {

auto base_time = timestamp_ms();

// 什么时候停止事件循环取决于condition函数返回值

while (condition()) {

// 等待获取下一个待发生的rule,超时则返回 -- 超时时间为10毫秒

auto ret = _eventloop.wait_next_event(TCP_TICK_MS);

// 没有事件发生,说明TCP断开了连接

if (ret == EventLoop::Result::Exit or _abort) {

break;

}

// 如果tcp连接仍然活跃

if (_tcp.value().active()) {

// 每隔10毫秒,调用一次TCPConnection的tick方法

const auto next_time = timestamp_ms();

// 传入参数: 距离上次调用该方法过了多久

_tcp.value().tick(next_time - base_time);

// 只有TCPOverIPv4OverEthernetAdapter的tick函数才有意义 -- lab five会讲解

// 其他adapter均为空实现

_datagram_adapter.tick(next_time - base_time);

base_time = next_time;

}

}

}

connect 方法

//! \param[in] c_tcp is the TCPConfig for the TCPConnection

//! \param[in] c_ad is the FdAdapterConfig for the FdAdapter

template

void TCPSpongeSocket::connect(const TCPConfig &c_tcp, const FdAdapterConfig &c_ad) {

if (_tcp) {

throw runtime_error("connect() with TCPConnection already initialized");

}

// 初始化TCP连接和事件循环

_initialize_TCP(c_tcp);

_datagram_adapter.config_mut() = c_ad;

cerr << "DEBUG: Connecting to " << c_ad.destination.to_string() << "...\n";

// 开始三次握手,首先由Client发出一个SYN包

_tcp->connect();

const TCPState expected_state = TCPState::State::SYN_SENT;

if (_tcp->state() != expected_state) {

throw runtime_error("After TCPConnection::connect(), state was " + _tcp->state().name() + " but expected " +

expected_state.name());

}

// 使用事件循环,等待三次连接建立完毕

_tcp_loop([&] { return _tcp->state() == TCPState::State::SYN_SENT; });

cerr << "Successfully connected to " << c_ad.destination.to_string() << ".\n";

// 单独开启一个线程用于后续数据传输

_tcp_thread = thread(&TCPSpongeSocket::_tcp_main, this);

}

bidirectional_stream_copy方法

无论对于 Server 还是 Client,在三次握手之后,都会建立一个新的线程,来专门执行 LossyTCPOverIPv4SpongeSocket 中的 eventloop。而主线程会另起一个 eventloop 以及另外开辟两个缓冲区,用于存放用户写入的数据与即将输出至屏幕的数据。当用户通过 stdin 输入数据时, eventloop 中所注册的 poll 事件被检测到,则数据将会被写入进本地输入缓冲区中。当 TCPOverIPv4OverTunFdAdapter 可写时,它会将本地输入缓冲区中的数据全部写入至 TCPOverIPv4OverTunFdAdapter ,并最终传输至远程。

而 webget 与真实服务器通信的原理,也是通过将 IP 报文写入 tun 虚拟网络设备,将其注入进 OS 协议栈中,模拟实际的发包情况。

// 在标准输入(stdin)和标准输出(stdout)之间以及一个自定义的 socket 对象之间进行双向数据复制

// 标准输入 --> socket --> 标准输出

// 键盘输入的数据会写入socket,socket有可读的数据会输出到屏幕上

void bidirectional_stream_copy(Socket &socket) {

constexpr size_t max_copy_length = 65536;

constexpr size_t buffer_size = 1048576;

EventLoop _eventloop{};

FileDescriptor _input{STDIN_FILENO};

FileDescriptor _output{STDOUT_FILENO};

ByteStream _outbound{buffer_size};

ByteStream _inbound{buffer_size};

bool _outbound_shutdown{false};

bool _inbound_shutdown{false};

socket.set_blocking(false);

_input.set_blocking(false);

_output.set_blocking(false);

// rule 1: read from stdin into outbound byte stream

// 标准输入有数据可读则写入_outbound通道

_eventloop.add_rule(

_input,

Direction::In,

[&] {

_outbound.write(_input.read(_outbound.remaining_capacity()));

if (_input.eof()) {

_outbound.end_input();

}

},

[&] { return (not _outbound.error()) and (_outbound.remaining_capacity() > 0) and (not _inbound.error()); },

[&] { _outbound.end_input(); });

// rule 2: read from outbound byte stream into socket

// socket可写,则将_outbound通道中数据写入socket

_eventloop.add_rule(

socket,

Direction::Out,

[&] {

const size_t bytes_to_write = min(max_copy_length, _outbound.buffer_size());

const size_t bytes_written = socket.write(_outbound.peek_output(bytes_to_write), false);

_outbound.pop_output(bytes_written);

if (_outbound.eof()) {

socket.shutdown(SHUT_WR);

_outbound_shutdown = true;

}

},

[&] { return (not _outbound.buffer_empty()) or (_outbound.eof() and not _outbound_shutdown); },

[&] { _outbound.end_input(); });

// rule 3: read from socket into inbound byte stream

// socket有可读数据,则读取数据并写入_inbound通道

_eventloop.add_rule(

socket,

Direction::In,

[&] {

_inbound.write(socket.read(_inbound.remaining_capacity()));

if (socket.eof()) {

_inbound.end_input();

}

},

[&] { return (not _inbound.error()) and (_inbound.remaining_capacity() > 0) and (not _outbound.error()); },

[&] { _inbound.end_input(); });

// rule 4: read from inbound byte stream into stdout

// 如果标准输出可写,则将数据从_inbound中读取出来,然后写入标准输出

_eventloop.add_rule(

_output,

Direction::Out,

[&] {

const size_t bytes_to_write = min(max_copy_length, _inbound.buffer_size());

const size_t bytes_written = _output.write(_inbound.peek_output(bytes_to_write), false);

_inbound.pop_output(bytes_written);

if (_inbound.eof()) {

_output.close();

_inbound_shutdown = true;

}

},

[&] { return (not _inbound.buffer_empty()) or (_inbound.eof() and not _inbound_shutdown); },

[&] { _inbound.end_input(); });

// loop until completion -- 死循环,每次都阻塞到下一次事件发生

while (true) {

if (EventLoop::Result::Exit == _eventloop.wait_next_event(-1)) {

return;

}

}

}

TCPSpongeSocket的wait_until_closed方法

wait_until_closed方法负责同步等待直到_tcp_thread线程结束:

template

void TCPSpongeSocket::wait_until_closed() {

// 关闭当前socket

shutdown(SHUT_RDWR);

// 同步等待直到_tcp_thread线程结束

if (_tcp_thread.joinable()) {

cerr << "DEBUG: Waiting for clean shutdown... ";

_tcp_thread.join();

cerr << "done.\n";

}

}

通道串联起子主线程

首先,我们来看一下TCPSpongeSocket的构造函数和析构函数:

// socketpair系统调用的作用是在本地进程间创建一对已连接的套接字(sockets)。

// 这对套接字可用于本地通信,类似于网络套接字的用法,但是不需要通过网络协议栈进行通信,而是直接在内核中完成通信,因此效率更高。

static inline pair socket_pair_helper(const int type) {

int fds[2];

// 具体来说,socketpair创建了两个相关联的套接字,一个作为读取套接字(reading socket),另一个作为写入套接字(writing socket)。

// 这两个套接字之间形成了一条双向的通信通道,任何通过写入套接字发送的数据都可以通过读取套接字接收,并且反之亦然。

SystemCall("socketpair", ::socketpair(AF_UNIX, type, 0, static_cast(fds)));

return {FileDescriptor(fds[0]), FileDescriptor(fds[1])};

}

//! \param[in] datagram_interface is the underlying interface (e.g. to UDP, IP, or Ethernet)

template

TCPSpongeSocket::TCPSpongeSocket(AdaptT &&datagram_interface)

: TCPSpongeSocket(socket_pair_helper(SOCK_STREAM), move(datagram_interface)) {}

template

TCPSpongeSocket::TCPSpongeSocket(pair data_socket_pair,

AdaptT &&datagram_interface)

// 主线程拿着通道一端

: LocalStreamSocket(move(data_socket_pair.first))

// 子线程拿着通道的另一端

, _thread_data(move(data_socket_pair.second))

, _datagram_adapter(move(datagram_interface)) {

_thread_data.set_blocking(false);

}

template

TCPSpongeSocket::~TCPSpongeSocket() {

try {

if (_tcp_thread.joinable()) {

cerr << "Warning: unclean shutdown of TCPSpongeSocket\n";

// force the other side to exit

_abort.store(true);

_tcp_thread.join();

}

} catch (const exception &e) {

cerr << "Exception destructing TCPSpongeSocket: " << e.what() << endl;

}

}

主线程和子线程通过socketpair系统调用创建的一对已连接的套接字(sockets)进行本地通信。

主线程中发生键盘输入事件,到输入的内容通过socktpair创建的双向通道传输到子线程,然后由子线程将数据最终通过tun设备发送出去,这中间结合了两个eventloop共同协作完成

当tun设备接收到网络数据包的时候,会将数据包传输给TCP协议栈进行处理,TCP协议栈处理完后,如果发现_thread_data双向通道可写,则将处理完毕的数据包丢到通道中,主线程中的Socket发现来数据了,将数据写入_inbound通道中,此时发现标准输出可写,最终将接收到的数据包输出到屏幕上

这中间同样结合了两个eventloop共同协作工作,大家可以好好理解一下

小结

以上就是我个人对cs144 Lab Four测试文件tcp_ipv4.cc文件大体流程的理解,可能会存在错误,欢迎各位大佬评论区指出,同时由于篇幅有限,不能将所有源码一一贴出讲解,所以阅读过程中大家可以对照cs144 lab four相关源码进行学习

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: