Stanford CS144 Lab
于2023年2月6日2023年2月6日由Sukuna发布
Lab0.Warm Up
1 Networking by hand
这一个部分主要是体验一些基本的应用层协议,主要是HTTP协议和SMTP协议.
Both of these tasks rely on a networking abstraction called a reliable bidirectional in-order byte stream: you’ll type a sequence of bytes into the terminal, and the same sequence of bytes will eventually be delivered, in the same order, to a program running on another computer (a server). The server responds with its own sequence of bytes, delivered back to your terminal.
在实验资料中给出的是这么一段话,这句话的意思就是所有的应用层协议都是由底层支撑的,这个底层可以理解成可靠的二进制比特流的传输,一方应用程序会产生比特流投入到传输通道中,另一方的应用程序会从传输通道中获取到比特流信息.这个传输通道就是Socket,套接字.
1.1 获取一个网页.
我们跟随着实验的步骤来进行执行.
1)首先用浏览器打开网页http://cs144.keithw.org/hello.
发现内容就是简短的一行字 Hello,CS144.
2) 用telnet进行连接,输入telnet cs144.keithw.org http
这一步的做法就是在你的电脑和另一台电脑(域名为cs144.keithw.org)进行一个套接字连接,(原文:a reliable byte stream between your computer and anothercomputer),这个时候你的电脑和cs144.keithw.org(获取这个域名的IP需要通过DNS服务器进行)就连接上了.
3)输入GET /hello HTTP/1.1
这个是HTTP协议的应用,HTTP协议的命令有GET和POST两种,其中POST一般就是投放一个表单到服务器上,GET就是从服务器获取资源.其中资源的名称是可以指定的,uri就是指定资源, 一般uri的组成就是 服务器的域名+资源在服务器的根目录的对应的文件位置.比如说hust.edu.cn/1.jpg,就是在hust.edu.cn这个服务器根目录下的1.jpg这个文件.
这个命令就是用HTTP/1.1的方式获取服务器的资源
4) 输入 Host: cs144.keithw.org
这个指定host的地址,因为一个uri的构成就是域名+文件位置.
上面几个做法就是利用telnet软件制作一个HTTP请求报文.请求报文分成头部和请求本身
这个时候就会返回一个HTTP响应报文:
HTTP/1.1 200 OK Date: Thu, 03 Mar 2022 07:53:25 GMT Server: Apache Last-Modified: Thu, 13 Dec 2018 15:45:29 GMT ETag: "e-57ce93446cb64" Accept-Ranges: bytes Content-Length: 14 Content-Type: text/plain
Hello, CS144!
其中分成第一行,首部和报文内容,第一行表示使用的HTTP版本,状态码和短语,我们可以利用状态码和短语来判断服务器是否正常响应了我们的请求:
首部就蕴含了一些控制信息,比如说报文长度和报文类型等.最后的就是报文内容,就是主体部分.
5) 获得sunsetid
如法炮制:构造请求报文和响应报文即可.
1.2 给自己发送邮件
1) 首先连接到服务器,这里要使用SMTP协议进行连接.
2) 给服务器打招呼
3) 登陆
注意这里是BASE64编码的,第一行语句表示Username:,就输入你的邮箱地址,第二行语句表示授权码,把授权码转化成BASE64编码的即可.
4) 写邮件
MAIL 命令,发件人
RCPT 命令,收件人
DATA表示开始书写新建内容
.表示书写结束
1.3 监听服务器
1) 使用netcat -v -l -p 9090创建一个监听端口.
2) 连接本机创建的监听端口:telnet localhost 9090
这个时候客户端和服务器连接上了.
HTTP的连接本质上是C/S模型:客户-服务器模型,在上文我们提到了管道的观念,其实服务器一直在一个管道上监听信息,一旦监听到了信息就对应地往管道里面投放信息.
总的来说,就是客户端向服务器提出申请请求,服务器一直在监听客户端的申请请求,一有申请请求就立刻建立TCP连接.
2 自制Socket
This feature is known as a stream socket. To your program and to the Web server, the socket looks like an ordinary file descriptor (similar to a file on disk, or to the stdin or stdout I/O streams). When two stream sockets are connected, any bytes written to one socket will eventually come out in the same order from the other socket on the other computer.
Socket在Linux操作系统中本质上就是一个文件,一旦两个Socket相互连接,应用程序会往一个Socket递交数据,另外一个Socket就会原封不动地把数据传递过来.连接的方式在运输层有讲,客户端的一个网络端口创建一个Socket,往服务器的一个网络端口发送请求,这是第一次握手,接着服务器的网络端口传输ACK给客户端,这是第二次握手,接着客户端会传输一个最后的请求,这个叫三次握手.三次握手后,连接就完成了,这个时候两个Socket(可以理解成网络端口?)相互链接了.
需要注意的是,在应用层我们一般是注重逻辑通信,Socket是一个逻辑概念,应用程序把数据投给一个叫做Socket的东西,你可以理解成逻辑通信的一端,但是具体Socket往下是怎么做的不是应用程序需要关注的.
这个实验就需要我们模拟一个Socket应用,与一个服务器的端口建立连接.然后获取网页.
void get_URL(const string &host, const string &path) {
// Your code here.// You will need to connect to the "http" service on // the computer whose name is in the "host" string, // then request the URL path given in the "path" string. // Then you'll need to print out everything the server sends back, // (not just one call to rearequestd() -- everything) until you reach // the "eof" (end of file). // create a TCPSocket TCPSocket client_socket; // connect with host. host is a parameter. client_socket.connect(Address(host, "http")); // send a request Message. the request is made of 2 sentences. string request = "GET "+path+" HTTP/1.1\r\n"+"Host: "+host+"\r\nConnection: close\r\n\r\n"; client_socket.write(request); // get the Message while(!client_socket.eof()){ string reply = client_socket.read(); cout<<reply; } cerr << "Function called: get_URL(" << host << ", " << path << ").\n"; cerr << "Warning: get_URL() has not been implemented yet.\n";
}
这个时候先创建一个TCPSocket,首先先进行连接,然后像之前一样创建request,接着这个Socket就可以把request写进去.然后服务器会返回数据,这个数据是读取到Socket的,读数据一直读到EOF即可.
由于这个实验是面向初学者的,具体Socket怎么读怎么写我们没有考虑,我们只用调用教授已经写好的写,读操作.
3 缓冲区队列
要求实现一个有序字节流类(in-order byte stream),使之支持读写、容量控制。这个字节流类似于一个带容量的队列,从一头读,从另一头写。当流中的数据达到容量上限时,便无法再写入新的数据。特别的,写操作被分为了peek和pop两步。peek为从头部开始读取指定数量的字节,pop为弹出指定数量的字节。
总的来说就是做一个桶,可以从下方获得内容,也可以从上方添加内容,当桶满的时候就不可以添加东西了
ByteStream具有一定的容量,最大允许存储该容量大小的数据;在读取端读出一部分数据后,它会释放掉已经被读出的内容,以腾出空间继续让写端写入数据。
这个实验为我们后期实现TCP协议有着帮助.
上面的是缓冲区队列的一些声明,对于读写两方,操作是不同的.
有个小提示,如果C++的构造函数可以使用像这样的方法进行初始化的
class baba (const int abab) _abab(abab){
}
这个本质上就是数据结构题,完成缓冲区队列罢了.
Lab1.stitching substrings into a byte stream
该lab要求我们实现一个流重组类,可以将Sender发来的带索引号的字节碎片重组成有序的字节写入到byte_stream。接收端从发送端读取数据,调用流重组器,流重组器对数据进行排序,排序好后写入byte_stream。
流重组器也是有capasity的,也就是说流重组器也有一定的容量,一次性处理太多的信息会导致流重组器不能够正常地工作.同样的我们把流处理器当成一个双端队列即可.
private类中还有一个ByteStream类型的变量,所有的内容都输出给ByteStream,还有一个容量变量.其中ByteStream中的bytes_read返回ByteStream处理了多少元素.
因为重组类的函数中,支持的index是first unread=_output.bytes_read()(已经读取的元素)到first unacceptable的这一块区域,我们要保证输入的字节编号是在这个区域里面的.
在重组器类的函数中,push_substring函数完成重组工作。其参数有data(string),index(size_t),eof(bool),data是待排序的数据,index是数据首元素的序号,eof用于判断是否结束。结束后将不再有数据输入。
在重组的过程中,我们会遇到重复、丢包、重叠、乱序的现象。为了使乱序的数据找到有序的位置,我使用’\0’维护重组器中数据的相对序号,例如,第一次data为make,index为0,第二次data为great,index为13,而处于两组数据中间的数据未知,我们就用’\0’代替,即make\0\0\0\0\0\0\0\0\0great。这样就维护了已经进入重组器的数据的有序。当然,写入的data中也有可能含有\0,这是,我们就需要一个bool双端队列,来记录相应位置的数据是否有序,在上述例子中,队列的bool值为111100000000011111。
所以说我们在数据结构中添加几项,一个是_unassembled_byte,是一个std::deque<char>,暂时存储还乱序的字符串,_check_byte是std::deque<bool>,这个元素与_unassembled_byte一一对应,当un[i]存储着还没有发送的字符的时候,ch[I]=true,否则为false,还有一个_lens_un,这个记录乱序的字符的长度.
程序的总体结构:
发送端的数据->流重组器(重组成有序的数据)->Bytestream(在Lab0就做好的队列)->TCP接收端.
流重组器需要做的是,把所有有序的数据写入到接收端.
其中字符的编号是从1一直往后延伸的,因为队列的首和尾都可以记录.TCP的发送端发送的数据也是(字符号、字符串)字符的编号一直往后延伸.
这个时候我们回忆一下对应数据的表示:
output.bytes_read()
:接收端从ByteStream获得的字符数量.
output.bytes_write()
:流重组器写入ByteStream的字符数量-1.而且是流重组器的有效数据中index最小的序号
_lens_un
指的还在流重组器里面的数据的长度.
其中:output.bytes_read()+_capacity
是ByteStream可以接受的范围,output.bytes_write()+_lens_un
是流重组器的有效数据中index最大的序号.
1.我们判断输入序号是否大于内存与已读取数据之和,也就是说,该数据是否属于unacceptable中的数据,如果是这样的数据,我们没有足够的内存写入,因为写入这样的数据需要添加\0,从而超过capasity的大小。代码如下:
if(index>_output.bytes_read()+_capacity){
return;
}
2.字符串部分在区域内,但是部分在区域外,那就把区域外的内容舍弃,只读取区域内的内容.
我们需要判断data中最后一个数据的序号是否大于内存与已读取数据之和,如果大于,我们就要将能写入的部分写入,也就是按data的顺序尽可能地写入数据而不超过capasity,在写入的过程中,我们也会遇到两种情况,一种是序号index大于此时已经在流重组器的最后一个数据的序号,在这种情况下我们要在流重组器最后一个序号与index之间填入’\0′,同时将相应的bool双端队列(_check_byte)设置为false,做完这些工作后,才开始写入新的数据。另一种情况是index的小于或者等于流重组器最后一个数据的序号,我们需要弹出冲突的数据,举个例子就是,index序号为5,此时流重组器中的数据为stanford,我们就要从序号5的数据也就是o开始弹出,变成stanf,再写入data中的数据。代码如下:
if(index+data.length()>_capacity+_output.bytes_read()){
for(size_t i=_lens_un+_output.bytes_written();i<_capacity+_output.bytes_read();i++){
if(i<index){
_unassembled_byte.push_back('\0');
_check_byte.push_back(false);}else{ _unassembled_byte.push_back(data[i-index]); _check_byte.push_back(true); } _lens_un++; }
}
3.我们要判断index是否等于已经写入byte_stream(_output)中的数据,如果是的,我们就直接将data中的数据写入byte_stream,然后在重组器中弹出data.length()个数据,值得注意的是,当重组器中的数据个数小于data.length(),我们就全部弹出。但是后面的数据会被当成无效数据而不进行处理,代码如下:
if(index==_output.bytes_written()){
//直接写
_output.write(data);
size_t temp_len=std::min(_lens_un,data.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
}
4.我们要判断index是否大于流重组器中的最后一个数据的序号和写入byte_stream中的数据个数之和,如果大于,我们就可以参考1的处理,代码如下:
if(index>_output.bytes_written()+_lens_un){
for(size_t i=_output.bytes_written()+_lens_un;i<index;i++){
_unassembled_byte.push_back('\0');
_check_byte.push_back(false);
_lens_un++;
}
[原来的data][空][新的data]
for(char i : data){
_unassembled_byte.push_back(i);
_lens_un++;
_check_byte.push_back(true);
}
}
5.我们要判断data中的数据是否已经被写入byte_stream,这个说法有些不准确,准确的说是相应序号的数据被写入,如果data中的所有数据都被写入了byte_stream,我们就直接返回,如果只是部分被写入,我们就将data中未被写入的部分写入。代码如下:
if(index<_output.bytes_written()){
if(_output.bytes_written()>index+data.length()){
return;
}
//[已经写入Byte_stream的][bytes_written()][新传来的data在bytes_written()之后的,入队][原来在_output.bytes_written()+_lens_un之后的data]
//还是要写,一直写到data最后.
std::string data_cut(data.begin()+_output.bytes_written()-index,data.end());
_output.write(data_cut);
size_t temp_len=std::min(_lens_un,data_cut.length());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
6.不是任何情况:首先我们知道要把_output.bytes_written()~index这一部分的内容保存好,然后再把data加入进去即可
//在中间插入元素
//先弹出一部分数据保存到栈中
std::stack<char> temp;
std::stack<bool> temp_check;
for(size_t i=0;i<index-_output.bytes_written();i++){
temp.push(_unassembled_byte.at(i));
temp_check.push(_check_byte.at(i));
}
[原data,入队][index][新传来的data,入队][原来在_output.bytes_written()+_lens_un之后的data]
//这里是看数据的最后一个index有没有达到_output.bytes_written()+_lens_un,达到的话后面的内容要保留,没达到就全部删除即可
size_t temp_len=std::min(_lens_un,data.length()+index-_output.bytes_written());
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+temp_len);
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+temp_len);
_lens_un-=temp_len;
for(int i=data.length()-1;i>=0;i--){
_unassembled_byte.push_front(data[i]);
_check_byte.push_front(true);
_lens_un++;
}
while(!temp.empty()){
_unassembled_byte.push_front(temp.top());
_check_byte.push_front(temp_check.top());
_lens_un++;
temp.pop();
temp_check.pop();
}
7.输入字符串到ByteStream中:
size_t i=0;
while(i<_lens_un){
if(!_check_byte.at(i)){
break;
}
i++;
}
std::string n(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_output.write(n);
_unassembled_byte.erase(_unassembled_byte.begin(),_unassembled_byte.begin()+i);
_lens_un-=i;
_check_byte.erase(_check_byte.begin(),_check_byte.begin()+i);
if(eof) input_end_index=index+data.length();
if(input_end_index==_output.bytes_written()) _output.end_input();
Lab2.TCP Reciever
绝对序号和相对序号的转换:
在实践中,一个分组的序号承载在分组首部的一个固定长度的字段中。如果分组序号字段的比特数是k,则该序号范围是。 在一个有限的序号范围内,所有涉及序号的运算必须使用模运算。(即序号空间可被看作是一个长度为 的环,其中序号紧挨着0)。上面论述的序号是相对序号(相对序号的开始值是),还有一种不模的运算就是绝对序号.
这个时候我们需要完成两个函数:
1.wrap(绝对序号转化为相对序号)
WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
DUMMY_CODE(n, isn);
WrappingInt32 res(n+isn.raw_value());
return res;
}
这个函数调用了WrappingInt32
类的构造函数,构造函数获得一个int类型的数(uint_64等类型)然后取模之后获得32位的整形数,存放到raw_value成员中.
2.unwrap(相对序号转绝对序号)
uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
DUMMY_CODE(n, isn, checkpoint);
uint64_t temp=n.raw_value()-isn.raw_value();
if(checkpoint==0){
return temp;
}
uint32_t div=checkpoint/(1ul<<32);
uint32_t res=checkpoint%(1ul<<32);
if (res<=temp) {
temp=(checkpoint-temp-(div-1)*(1ul<<32))<(temp+div*(1ul<<32)-checkpoint)?temp+(div-1)*(1ul<<32):temp+div*(1ul<<32);
}else{
temp=(checkpoint-temp-div*(1ul<<32))<(temp+(div+1)*(1ul<<32)-checkpoint)?temp+div*(1ul<<32):temp+(div+1)*(1ul<<32);
}
return temp;
}
给定checkpoint,找到最靠近checkpoint的那个temp,返回即可.
Implementing the TCP receiver
首先我们看一看TCP报文包的定义:主要是由首部和其中的元素组成:其中可以调用serialize和parse方法转化,
class TCPSegment {
private:
TCPHeader _header{};
Buffer _payload{};public:
//! \brief Parse the segment from a string
ParseResult parse(const Buffer buffer, const uint32_t datagram_layer_checksum = 0);//! \brief Serialize the segment to a string BufferList serialize(const uint32_t datagram_layer_checksum = 0) const; //! \name Accessors //!@{ const TCPHeader &header() const { return _header; } TCPHeader &header() { return _header; } const Buffer &payload() const { return _payload; } Buffer &payload() { return _payload; } //!@} //! \brief Segment's length in sequence space //! \note Equal to payload length plus one byte if SYN is set, plus one byte if FIN is set size_t length_in_sequence_space() const;
};
接着我们来看一看TCP首部:首部的元素主要是:
- 序号:seqno,占32位,用来标识从发送端到接收端的字节流;
- 确认号:ackno,占32位,只有ACK标志位为1时,确认号才有效,ackno=seqno+1;
- 标志位:
- SYN:发起一个连接;
- FIN:释放一个连接;
- ACK:确认序号有效。
struct TCPHeader {
static constexpr size_t LENGTH = 20; //!< [TCP](\ref rfc::rfc793) header length, not including options//! \struct TCPHeader //! ~~~{.txt} //! 0 1 2 3 //! 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Source Port | Destination Port | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Sequence Number | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Acknowledgment Number | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Data | |U|A|P|R|S|F| | //! | Offset| Reserved |R|C|S|S|Y|I| Window | //! | | |G|K|H|T|N|N| | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Checksum | Urgent Pointer | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | Options | Padding | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! | data | //! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //! ~~~ //! \name TCP Header fields //!@{ uint16_t sport = 0; //!< source port uint16_t dport = 0; //!< destination port WrappingInt32 seqno{0}; //!< sequence number WrappingInt32 ackno{0}; //!< ack number uint8_t doff = LENGTH / 4; //!< data offset bool urg = false; //!< urgent flag bool ack = false; //!< ack flag bool psh = false; //!< push flag bool rst = false; //!< rst flag bool syn = false; //!< syn flag bool fin = false; //!< fin flag uint16_t win = 0; //!< window size uint16_t cksum = 0; //!< checksum uint16_t uptr = 0; //!< urgent pointer
}
接着看一看TCP receiver的数据结构定义:
#ifndef SPONGE_LIBSPONGE_TCP_RECEIVER_HH
#define SPONGE_LIBSPONGE_TCP_RECEIVER_HH#include "byte_stream.hh"
#include "stream_reassembler.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"#include <optional>
//! \brief The "receiver" part of a TCP implementation.
//! Receives and reassembles segments into a ByteStream, and computes
//! the acknowledgment number and window size to advertise back to the
//! remote TCPSender.
//接收重组segments为 ByteStream,并计算确认号和窗口大小以通告回远程 TCPSender。
class TCPReceiver {
//! Our data structure for re-assembling bytes.
//我们用于重新组装字节的数据结构。
StreamReassembler _reassembler;//! The maximum number of bytes we'll store. //容量大小 size_t _capacity; WrappingInt32 ISN; bool syn_flag;
public:
//! \brief Construct a TCP receiver //! //! \param capacity the maximum number of bytes that the receiver will //! store in its buffers at any give time. //构造函数,构造一个 TCP 接收器,容量接收器在任何给定时间将存储在其缓冲区中的最大字节数。 TCPReceiver(const size_t capacity) : _reassembler(capacity), _capacity(capacity),ISN(0) ,syn_flag(0){} //! \name Accessors to provide feedback to the remote TCPSender //!@{ //! \brief The ackno that should be sent to the peer //! \returns empty if no SYN has been received //! //! This is the beginning of the receiver's window, or in other words, the sequence number //! of the first byte in the stream that the receiver hasn't received. // 如果没有收到 SYN,则应发送给对等方的 ackno 为空 //这是接收器窗口的开始,否则,接收器未接收到的流中第一个字节的序列号。 std::optional<WrappingInt32> ackno() const; //! \brief The window size that should be sent to the peer //! //! Operationally: the capacity minus the number of bytes that the //! TCPReceiver is holding in its byte stream (those that have been //! reassembled, but not consumed). //! //! Formally: the difference between (a) the sequence number of //! the first byte that falls after the window (and will not be //! accepted by the receiver) and (b) the sequence number of the //! beginning of the window (the ackno). size_t window_size() const; //!@} //! \brief number of bytes stored but not yet reassembled size_t unassembled_bytes() const { return _reassembler.unassembled_bytes(); } //! \brief handle an inbound segment void segment_received(const TCPSegment &seg); //! \name "Output" interface for the reader //!@{ ByteStream &stream_out() { return _reassembler.stream_out(); } const ByteStream &stream_out() const { return _reassembler.stream_out(); } bool recv_fin() const; //!@}
};
我们知道TCP需要接受一个叫做segment类型的数据,然后存储起来,送入到Lab1已经实现好的reassemble_stream中.并返回适合的ACK.
对于接受的数据:分成两种可能,一种是第一个序列,另外的就是普通的数据
void TCPReceiver::segment_received(const TCPSegment &seg) {
DUMMY_CODE(seg);
//代表第一个传过来的seg
if(seg.header().syn){
syn_flag= true;
//窗口的左端
ISN=seg.header().seqno;
} else if(!syn_flag){
return;
}
//推断数据包的序号,序号比较靠近上一个已经接收到的序号,然后塞进我们在Lab1已经写好的流重组器.
uint64_t received_lens=_reassembler.stream_out().bytes_written();
size_t index= unwrap(seg.header().seqno,ISN,received_lens);
if(!seg.header().syn){
index--;
}
//进行重组
_reassembler.push_substring(seg.payload().copy(),index,seg.header().fin);
}
ACK的返回也很简单,流重组器输入到Byte stream的个数就代表已经输入了多少个有序的序列,返回对应的ACK即可.但是对于结束的时候的ACK回应,我们还是需要分类讨论.
optional<WrappingInt32> TCPReceiver::ackno() const {
if(!syn_flag){
return std::nullopt;
}else{
//判断是否是最后一个
if(_reassembler.stream_out().input_ended()){
return ISN+_reassembler.stream_out().bytes_written()+2;
}else{
//返回的ACK的序号就是期望获得的下一个字符的数+1,流重组器的已连续写入的数据量就是最后一个有序的 //字符
return ISN+_reassembler.stream_out().bytes_written()+1;
}
}
}
Lab3 TCP Sender
这一次我们要实现TCP的发送方,这一次我把必要的注释写在代码里面了.
1.头文件:
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
uint64_t base{0};
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//cached TCPSegment.
std::queue<TCPSegment> _segments_out_cached{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;//! outgoing stream of bytes that have not yet been sent ByteStream _stream; //nextseq numbers as the absolute TCP number. uint64_t _next_seqnum{0}; //slide windows size uint16_t _curr_window_size; //isfinished? bool _isfin; size_t _times; //ticking? bool _time_waiting; //remission times. int _consecutive_remission; // when is time out? size_t _time_out; //empty windows? bool _window_zero; //! the (absolute) sequence number for the next byte to be sent uint64_t _next_seqno{0};
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});//! \name "Input" interface for the writer //!@{ ByteStream &stream_in() { return _stream; } const ByteStream &stream_in() const { return _stream; } //!@} //! \name Methods that can cause the TCPSender to send a segment //!@{ //! \brief A new acknowledgment was received void ack_received(const WrappingInt32 ackno, const uint16_t window_size); //! \brief Generate an empty-payload segment (useful for creating empty ACK segments) void send_empty_segment(); //! \brief create and send segments to fill as much of the window as possible void fill_window(); //! \brief Notifies the TCPSender of the passage of time void tick(const size_t ms_since_last_tick); //!@} //! \name Accessors //!@{ //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged? //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte //! (see TCPSegment::length_in_sequence_space()) size_t bytes_in_flight() const; //! \brief Number of consecutive retransmissions that have occurred in a row unsigned int consecutive_retransmissions() const; //! \brief TCPSegments that the TCPSender has enqueued for transmission. //! \note These must be dequeued and sent by the TCPConnection, //! which will need to fill in the fields that are set by the TCPReceiver //! (ackno and window size) before sending. std::queue<TCPSegment> &segments_out() { return _segments_out; } //!@} //! \name What is the next sequence number? (used for testing) //!@{ //! \brief absolute seqno for the next byte to be sent uint64_t next_seqno_absolute() const { return _next_seqno; } //! \brief relative seqno for the next byte to be sent WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); } //!@}
};
2.发送数据函数:
void TCPSender::fill_window() {
// windows is full or the programe is finished.
if(_curr_window_size==0||_isfin){
return;
}
//haven't send any bytes.
if(_next_seqno==0){
TCPSegment seg;
// the TCP transmission start from _isn.
seg.header().seqno = _isn;
seg.header().syn = true;
// the TCP first connection just send 1 bytes;
_next_seqno = 1;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//the end of the file
else if(_stream.eof()){
//set the finish flag to true;
_isfin = true;
TCPSegment seg;
seg.header().syn=false;
seg.header().fin=true;
//convert the absolute TCP number to TCP number.
seg.header().seqno = wrap(_next_seqno,_isn);
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
_segments_out.push(seg);
_segments_out_cached.push(seg);
}
//normal file
else{
//make sure the windows is not full and there's any data to convert.
while(!_stream.buffer_empty()&&_curr_window_size>0){
//decide the length of the TCP Segment.
//make sure the length of TCP segment is below the silde windows size and data length.
uint64_t lens_byte=std::min(_stream.buffer_size(),uint64_t (_curr_window_size));
lens_byte=std::min(lens_byte,TCPConfig::MAX_PAYLOAD_SIZE);
TCPSegment seg;
seg.header().seqno = wrap(_next_seqno,_isn);
seg.header().syn = false;
//get the lens_byte data to the payload.
seg.payload()=_stream.read(lens_byte);
// increase the next seq_no;
_next_seqno += lens_byte;
_curr_window_size -= lens_byte;
// get the end of the file.
if(_stream.eof()&&_curr_window_size>0){
_isfin = true;
seg.header().fin=true;
//the fin packet only send a byte.
_next_seqno++;
_curr_window_size--;
}
_segments_out.push(seg);
_segments_out_cached.push(seg);
if(_isfin){
break;
}
}
}
//start ticking...
if(!_time_waiting){
_time_out = _initial_retransmission_timeout;
_time_waiting = true;
_times = 0;
}
}
3.接受ACK:
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
DUMMY_CODE(ackno, window_size);
// get the absolute TCP number of ACK...
uint64_t acknos = unwrap(ackno,_isn,base);
//thrid connection...
//means the 0th bytes gets and desire to 1st bytes...
if(base==0&&acknos==1){
base=1;
_segments_out_cached.pop();
_consecutive_remission=0;
}
else if(acknos > _next_seqno){
return;
}
//the ack number is bigger than first cached segment...
//means the cached data gets by the reciever...
else if(!_segments_out_cached.empty() && acknos >= base + _segments_out_cached.front().length_in_sequence_space()){
//first segment in cache, and get the seqno and length of the segment...
uint64_t copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
uint64_t copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
//find the segments that acked by recevier...
//hint:if seqno+len<=ackno:means the data is acked by recevier...
while(copy_seg_len+copy_seg_seqno<=acknos){
//move the base, base is the 1st bytes that nor acked...
base += _segments_out_cached.front().length_in_sequence_space();
_segments_out_cached.pop();
if(_segments_out_cached.empty()) break;
// judge the 2nd segs...
copy_seg_seqno = unwrap(_segments_out_cached.front().header().seqno, _isn, base);
copy_seg_len = _segments_out_cached.front().length_in_sequence_space();
}
_time_out = _initial_retransmission_timeout;
_times = 0;
_consecutive_remission = 0;
}
// 3rd disconnection.
else if(acknos == _next_seqno && _isfin){
base = acknos;
_segments_out_cached.pop();
}
//the windows is empty
if(_next_seqno-base==0){
_time_waiting = false;
}
// 流量控制,发送方窗口不大于接受方窗口
else if(_next_seqno-base>=window_size){
_curr_window_size = 0;
return;
}
if(window_size==0){
_curr_window_size = 1;
_window_zero = true;
}
else{
_curr_window_size = window_size;
_window_zero = false;
_consecutive_remission = 0;
}
fill_window();
}
4. 构造函数
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, base(0)
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity)
, _curr_window_size(1)
, _isfin(false)
, _times(0)
, _time_waiting(false)
, _consecutive_remission(0)
, _time_out(0)
, _window_zero(false)
{}</code></pre></div></div><p>5.超时处理:</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
DUMMY_CODE(ms_since_last_tick);
// the times pased by
_times += ms_since_last_tick;
//timeout and non-empty cache. resend...
if(!_segments_out_cached.empty()&&_time_waiting&&_times>=_time_out){
//resend..
_segments_out.push(_segments_out_cached.front());
// increase the time out times...
if(!_window_zero){
//add remissions
_consecutive_remission++;
_time_out*=2;
_time_waiting = true;
}
_times=0;
}
}
Lab4 TCP Connnection
在这里我们需要实现一个TCP连接类,在这个TCP连接类里面,我们需要组合之前已经写好的TCP发送端和接收端的函数来进行处理.
1、接受到segment的操作:
分成两种操作,一种是正常的交互,一种是握手的操作.握手又分成主动请求链接和被动链接,在这两种模式下接受握手信息的处理是不一样的.对于正常的交互,需要交付给Sender和Reciever.因为对于TCP来说,两者是相互统一的.两个主机之间也会互相传递信息,所以说交给发送端处理ACK,交给接收端返回给上层.实际的TCP协议并不是完全的类似于GBN和SR,具体的差异就在ACK的数据是相互传递的,换句话说就是连着兼有.
void TCPConnection::segment_received(const TCPSegment &seg) {
DUMMY_CODE(seg);
// get the segment from IP level;
if(!_active){
return;
}
_time_since_last_segment_received=0;
//not get segment no send ACK.
//passive connection...
//the ackno is null and no bytes is sent
if(!_receiver.ackno().has_value()&&_sender.next_seqno_absolute()==0){
//only recieve syn...
if(!seg.header().syn) return;
//as the Service side,passive connected.
_receiver.segment_received(seg);
//it's OK to connect.
connect();
return;
}
// active connected..
// first connection...
if(_sender.next_seqno_absolute() > 0 && _sender.bytes_in_flight() == _sender.next_seqno_absolute() &&
!_receiver.ackno().has_value()){
// the length of payload is not 0
if(seg.payload().size() ){
return;
}
// if ack is no
// the twoside wants to setup the connection at the same time.
if(!seg.header().ack){
if(seg.header().syn){
_receiver.segment_received(seg);
// send empty ack to setup the connection.
_sender.send_empty_segment();
}
return;
}
// ifsyn=1,ack=1,rst=1,then shut down.
if(seg.header().rst){
_receiver.stream_out().set_error();
_sender.stream_in().set_error();
_active = false;
return;
}
}
//otherwise...
//recieve the segment
_receiver.segment_received(seg);
_sender.ack_received(seg.header().ackno,seg.header().win);
// thrid connection
if (_sender.stream_in().buffer_empty() && seg.length_in_sequence_space())
_sender.send_empty_segment();
if (seg.header().rst) {
_sender.send_empty_segment();
unclean_shutdown();
return;
}
send_sender_segments();
}
2、写seg.
将上层应用的数据写入到Bytestream中,提醒发送方发送.
size_t TCPConnection::write(const string &data) {
DUMMY_CODE(data);
// get the OS data... ready to be sent by TCP
if(data.size()==0){
return 0;
}
size_t write_size = _sender.stream_in().write(data);
_sender.fill_window();
send_sender_segments();
return write_size;
}
3、时钟(操作系统不定期调用之)
提醒Sender处理时间,看看是不是超时了.记录一下time_since_last_segment_received.
//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
DUMMY_CODE(ms_since_last_tick);
if(!_active) return;
//count
_time_since_last_segment_received += ms_since_last_tick;
// tell the sender to tick
_sender.tick(ms_since_last_tick);
if(_sender.consecutive_retransmissions()>TCPConfig::MAX_RETX_ATTEMPTS){
unclean_shutdown();
}
send_sender_segments();
}
4、真正的发送信息:读取sender中的消息缓存,然后加上ack和窗口信息信息,发送出去.
void TCPConnection::send_sender_segments (){
//travel the queue to set the ack and windows size.
while(!_sender.segments_out().empty()){
TCPSegment seg = _sender.segments_out().front();
_sender.segments_out().pop();
// the ack number is bot null
if(_receiver.ackno().has_value()){
seg.header().ack=true;
seg.header().ackno=_receiver.ackno().value();
seg.header().win=_receiver.window_size();
}
_segments_out.push(seg);
}
//every time send segment,we need to shutdown.
clean_shutdown();
}