boost::beast

boost::beast是一个基于boost::asio封装的一个网络编程库,与asio注重网络底层不同,beast专注于搭建Http和WebSocket服务器框架,其完全由C++实现,支持零拷贝异步多线程协程等,性能极高,支持ssl/tls加密等,但它的缺点也来自C++,例如开箱即用的轮子少编译时间较长、更注重底层使得应用层兼容性一般、较繁杂的错误处理机制。

示例来自官方示例

同步服务器-客户端

服务器端:比asio中的示例多了几步,首先是使用socket初始化一个websocket的流对象,设置其http响应头:websocket的连接建立是通过http握手开始的:

websocket的http握手

客户端发送消息:

1
2
3
4
GET /index.html HTTP/1.1 #起始行
Host: www.example.com #HTTP 头(Header)
User-Agent: Mozilla/5.0
Accept: text/html

服务器响应:

1
2
3
HTTP/1.1 200 OK #起始行
Content-Type: text/html #HTTP 头(Header)
Content-Length: 123
所以:
1
2
3
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){
res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_Server"));
}));
其作用就是在http响应信息中增加server字段
1
Server:xxxx I_am_Server

请求结构体和回复结构体还支持许多可选字段,如field::host(请求的主机名)、field::user_agent(客户端信息)、field::accept(可接受的类型)、field::authorization(认证信息)、field::content_type (内容类型)、field::content_length(内容长度)、field::date(日期时间)等,详见http协议。

客户端接受响应,双方会切换到websocket协议,按照ws的二进制的分帧协议进行发送和传输,这部分完全由底层保证,用户无需对每帧进行操作。为了实现http流和websocket流的帧高效发送和读取,beast引入了beast::flat_buffer

beast::flat_buffer

beast::flat_buffer是一个基于连续内存的数组结构,用于socket数据的高效写入和访问,主要设计是:

  1. 动态增长内存,类似vector机制;

  2. 读写区域分离零拷贝,其数组分布类似[已消费的数据][可读的数据][可写的数据]三个指针分别管理这这三个区域,但第一个部分不可访问,对于后两部分,当数据写入或者读走,都是直接写入到或者从 asio::const_buffer读出,通过commitconsume确认指针消费移动,实际上均不涉及数据跨态传输或者拷贝,故是零拷贝行为,beast::flat_buffer例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    #include <iostream>
    #include <boost/beast/core.hpp>
    #include <boost/beast/websocket.hpp>

    using namespace std;

    namespace beast = boost::beast;
    namespace asio = boost::asio;

    int main(){
    beast::flat_buffer buffer;
    asio::mutable_buffer writeBuf = buffer.prepare(32);
    void * wptr = writeBuf.data();
    printf("%p\n", wptr); //写指针:00000205aa4e1a50

    const char* data = "hello1234";
    int len = strlen(data);
    memcpy(wptr,data, len);
    buffer.commit(len); //写指针后移

    asio::mutable_buffer readBuf = buffer.data();
    void *rptr = readBuf.data();
    printf("%p\n", rptr); //读指针:00000205aa4e1a50

    string check(static_cast<const char*>(rptr), len);
    cout << check <<endl; //hello1234

    buffer.consume(len/2); //读指针后移

    asio::mutable_buffer readBuf1 = buffer.data();
    void *rptr1 = readBuf1.data();
    string check1(static_cast<const char*>(rptr1), len/2);
    cout << check1 <<endl; //o123

    return 0;
    }

同步服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//server.cpp
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <thread>

using namespace std;
namespace asio = boost::asio;
namespace beast = boost::beast;

void doSession(asio::ip::tcp::socket socket_){
try{
//将socket套接字初始化称websocket流对象
beast::websocket::stream<asio::ip::tcp::socket> ws(std::move(socket_));

//设置响应头server字段,其中BOOST_BEAST_VERSION_STRING代表框架版本,如果不想可以不发版本
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){
res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_Server"));
}));

ws.accept();
while(true){
beast::flat_buffer buffer;
ws.read(buffer);

ws.text(ws.got_text());
ws.write(buffer.data());
}

}
catch(beast::system_error& ec){
cerr << "beast::Error: " << ec.what() <<endl;
}
catch(std::exception& e){
cerr << "std::Error: " << e.what() <<endl;
}
}

int main(){
string arg1 = "127.0.0.1";
string arg2 = "5555";

const asio::ip::address address = asio::ip::make_address(arg1);
const unsigned short port = static_cast<unsigned short>(stoi(arg2));

asio::io_context ioc(1);
asio::ip::tcp::acceptor acceptor(ioc, asio::ip::tcp::endpoint(address, port));
while(true){
asio::ip::tcp::socket socket(ioc);
acceptor.accept(socket);

std::thread(&doSession, std::move(socket)).detach();
}

return 0;
}

同步客户端

这里有两个类似的概念,分别是:

1
2
asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port);
asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint);
results_type类型是resolve后返回的集合,集合中每个对象都是endpoint和一些额外信息(server_namehost_name)resolve的作用是解析,总的而言提供了三类解析

  1. DNS解析:当第一个参数为host主机名(域名)时,resolve能够进行DNS解析并且获取到ip和端口;

  2. ip解析:解析ip到通信端点;

  3. 服务解析:如localhost,也会返回到ip和端口解析;

可见对于1和3,服务都是可能指向多ip的,而且还有可能需要区分ipv4和ipv6,所以asio的设计干脆返回一个集合,对于2未必也一定返回单个ip和端口节点,毕竟可能协议簇并不一样,记住,TCP底层使用了五元组唯一区分连接,包括源和目的ip和端口、协议。

对于单个对象endpoint主要是提供了,例如ip地址、端口等。

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//client.cpp
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <chrono>
#include <thread>

using namespace std;
namespace beast = boost::beast;
namespace asio = boost::asio;

int main(){
string host = "127.0.0.1";
string port = "5555";

try{
asio::io_context ioc(1);
asio::ip::tcp::resolver resolver(ioc);
asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port); //DNS解析获取的通信端点

beast::websocket::stream<asio::ip::tcp::socket> ws(ioc); //基于tcp socket的websocket
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ //设置握手请求头
req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING) + string("I_am_Client"));
}));

asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint); //tcp连接
host += ':' + std::to_string(ep.port());
ws.handshake(host, "/"); //发起握手请求,http请求路径为根目录
auto now = std::chrono::steady_clock::now();

while(true){
auto newTime = std::chrono::steady_clock::now();
if(std::chrono::duration_cast<std::chrono::seconds>(newTime - now).count() > 10){ //定时10s,客户端退出
break;
}
string sendMsg("Hello Server");
ws.write(asio::buffer(sendMsg));
std::this_thread::sleep_for(std::chrono::seconds(1));

beast::flat_buffer rbuffer;
ws.read(rbuffer);
cout << beast::make_printable(rbuffer.data()) << endl;
}
ws.close(beast::websocket::close_code::normal);
}catch(std::exception&e){
cout << "Error:" << e.what() <<endl;
return EXIT_FAILURE;
}
return 0;
}

异步服务器-客户端

boost库的asio设计中异步是其“第一公民”,在异步的websocket架构体现的淋漓尽致。这里所谓的异步特指的是linux环境下实现的reactor模式(在windows下ASIO可能依赖的是IOCP的proactor模式,reactor模式是一种异步事件驱动模型,这里的异步指的是线程只需要发起IO,就可以非阻塞地继续执行任务,由内核线程通过epoll等模式进行监听,当事件完成、数据就绪(设备中断触发),会来读写数据并触发注册的回调函数,****这里的读写是同步阻塞的****,是同步IO,所以也有人将rector称为非阻塞同步模型

而对proactor模型,使用的完全是异步IO,在Linux Posix平台上的aio接口模拟了这种行为,数据就绪时内核会自动填充数据到用户缓冲区,而非用户使用read/write等发起数据拷贝:

1
2
struct aiocb cb;
aio_read(&cb); // 实际读操作由内核完成
然而aio接口在Linux中只是模拟行为,目前并没有急于完整地实现proactor机制,原因只能是现阶段proactor性能上的吸引力显然不足以抵消linux内核设计哲学上让程序控制IO的思潮。

除了同步和异步的说法混淆,另一个容易混淆的点是异步和多线程,因为异步意味着非阻塞,很容易让人误解成“每一个异步操作就是开一个线程去跑IO任务”,诚然这是一开始的设计思想,但随着IO多路复用的引入,数据监听的工作是由内核线程完成的,严格来说单线程与多线程的范畴并不讨论内核线程,否则现代操作系统每个进程至少含一个内核线程和一个用户线程,就没有所谓单线程的说法了,所以linux ASIO底层使用的是IO多路复用,如epoll等,而并非粗暴地建立线程。

前置知识细节

beast::tcp_stream和asio::ip::tcp::socket

  1. 差异:主要在于层次区别websocket能够对sockettcp_stream封装,但要知道beast::tcp_stream本身是对asio::ip::tcp::socket的封装,对于无ssl加密层情况下,其对应的websocket分别是三级和二级封装,通过next_layerget_lowest_layer等接口可用访问到底层对象:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    beast::websocket::stream<asio::ip::tcp::socket> socket1; //二级封装
    beast::websocket::stream<beast::tcp_stream> socket2; //三级封装

    boost::asio::ip::tcp::socket& sl1 = socket1.next_layer();

    beast::tcp_stream& tcpStream = socket2.next_layer();
    beast::tcp_stream& tcpStream1 = beast::get_lowest_layer(socket2);

    boost::asio::ip::tcp::socket& sl2 = tcpStream.socket();

    对比asio::ip::tcp::socketbeast::tcp_stream内置了超时机制,支持expires_after/expires_at/expires_never等超时接口,更方便衔接ssl。

  2. 初始化的共同点:一个容易看晕的点,两种类型的beast::websocket均既可以使用ioc进行初始化,也可以使用std::move(sokcet)初始化

    1
    2
    3
    4
    beast::websocket::stream<beast::tcp_stream> ws;
    ws(ioc); //初始化1
    asio::ip::tcp::socket socket;
    ws(std::move(socket)); //初始化2
    从规则而言二者均可,但从通信习惯上来看:

    对于服务端,官方示例使用std::move(sokcet)初始化,因为无论同步异步,经过对endpoint的绑定、监听,服务端的acceptor执行accept后,会返回一个通信socket,这个socket常常被用于构建通信Session类用于信息收发和处理。如果没有accept,socket是否初始化都是无用的,因此习惯先得到socket,再顺手初始化一个websocket以通信:

    1
    2
    3
    asio::ip::tcp::acceptor accepetor(ioc, asio::ip::tcp::endpoint{host,port});
    accepetor.accept(asio::ip::tcp::socket socket);
    accepetor.async_accept(ioc, [](beast::error_code ec, asio::ip::tcp::socket socket){...})

    对于客户端,官方示例使用ioc进行初始化,因为client没有独立的通信类,意味着websocketresolver是需要在同一个类经ioc初始化的,只有resolve解析出endpoint,通过connect(websocket.next_layer(), endpoint)来建立TCP连接,故无需创建专门的socket用于通信

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    ///同步:
    beast::websocket::stream<asio::ip::tcp::socket> ws(ioc);
    asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port);
    asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint);

    ////异步
    void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){
    if(ec){
    cerr << "onResolve Error: " << ec.message() <<endl;
    return;
    }
    beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
    tcpStream.expires_after(std::chrono::seconds(30));
    tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this()));
    }

    void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){
    if(ec){
    cerr << "onConnect Error: " << ec.message() <<endl;
    return;
    }
    beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
    tcpStream.expires_never();
    ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client));
    ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){
    req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client"));
    }));
    host_ += ':' + std::to_string(ep.port());
    ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this()));
    }

    当然,客户端如果做成独立通信节点类,先resolve再建立socket、再初始化websocket亦可,如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    tcp::resolver resolver(ioc);
    auto const results = resolver.resolve("example.com", "5555");

    tcp::socket socket(ioc);
    boost::asio::connect(socket, results);
    websocket::stream<tcp::socket> ws(std::move(socket));

    //...握手、收发信息
    ws.handshake("example.com", "/");

async_异步接口与beast::bind_front_handler

beast::websocket实现了从连接建立到通信到关闭socket的全流程异步操作,每个异步操作都会注册一个回调函数,在官方示例中都使用了beast::bind_front_handler(&Session::func, shared_from_this())这个语法糖,shared_from_this之前已经剖析过了,在异步任务中安全地获取类指针、增加共享指针引用计数延长类的生命周期非常重要,beast::bind_front_handler(&Session::func, shared_from_this())的效果其实是代替了匿名函数捕获变量的写法,例如两种等效写法:

1
2
3
4
5
6
7
ws.async_accept(beast::bind_front_handler(&Session::onAccept, shared_from_this()));

//equal:
auto self = shared_from_this();
ws.async_accept([self,this](){
//....onAccept函数内容
});

如果大量采用匿名函数写法,函数的耦合程度和代码量会很大,不利于阅读。beast::bind_front_handler的函数原型为:

1
2
template <class Function, class... Args>
auto bind_front_handler(Function&& f, Args&&... args);
当当前的上下文已经有确定的参数了,你可以使用bind_front_handler提前绑定这些参数,其返回值就相当于一个新的函数签名,例如:
1
2
3
4
5
6
7
8
9
10
11
12
int subtract(int a, int b, int c){
int num = a - b - c;
cout << num;
return num;
}

int main(){
auto func = beast::bind_front_handler(subtract, 3, 9);
func(10); //相当于a=3, b=9, c = 10

return 0;
}
但由于beast是专门为网络库设计的,返回的func仍然是beast函数的封装,不能直接返回int函数指针,但C++ 20已经开始在标准库引入相关类似特性的函数。

boost::asio::tcp server的地址复用(SO_REUSEADDR和SO_REUSEPORT)

在官方示例中,设置了地址复用:

1
acceptor.set_option(asio::socket_base::reuse_address(true), ec);
这里牵涉到网络编程中服务侧可配置两个关键字:SO_REUSEADDRSO_REUSEPORT

  • SO_REUSEADDR地址复用。如果开启,则做了两件事情:第一件,当一个服务器绑定了0.0.0.0时,代表绑定到所有ip,那么另一个服务器一般将不能绑定到任何端口,地址复用允许绑定;第二,TCP协议的服务器退出会有一个Time_Wait时间,主要用于等待报文全部死亡,持续几十秒到几分钟不等,如果开启地址复用该地址能够马上被新的服务器bind,而且实践情况下少有被旧报文影响的。注意,只要一个server设置成SO_REUSEADDR,那么该地址就是被允许重复绑定,bind不会主动检测是否所有socket都设置了地址重用。

  • SO_REUSEPORT端口复用。如果开启,那么将允许多个socket同时bind到同一个ip和端口上,内核通过五元组区分流量:<协议,源ip,源端口,目的ip,目的端口>,此处源ip和源端口指的是是客户端的ip和端口,端口一般是系统临时分配的端口。但注意,只有连接到ip:端口的所有socket均设置了SO_REUSEPORT,该ip和端口才能被复用,否则只要一个socket没有设置,重复连接会失败。再者,SO_REUSEPORT不包含SO_REUSEADDR,如果需要TIME_WAIT时刻仍然正常连接,仍需要设置SO_REUSEADDR

ws.text(ws.got_text())与make_printable(buffer.data())

与第一直觉不同,ws.got_text()返回的不是文本信息,而是一个bool,若true代表该帧信息是文本信息,否则为二进制信息ws.text同样接受bool参数,来确定发送的是文本还是二进制,二者组合常常表示"将帧原封不动地回显回去",因此如果需要打印信息,只能操作beast::flat_buffer缓冲区,通过make_printable(buffer.data())等。

代码实现

异步服务器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//server.cpp
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>
#include <thread>

using namespace std;
namespace asio = boost::asio;
namespace beast = boost::beast;

//相当于异步socket类,定义了其初始化
class Session : public std::enable_shared_from_this<Session>{
public:
explicit Session(asio::ip::tcp::socket&& socket): ws(std::move(socket)){}
void run(){
asio::dispatch(ws.get_executor(), beast::bind_front_handler(&Session::onRun, shared_from_this()));
}

void onRun(){
//设置服务器建议的默认超时时间,包括握手、心跳、读写超时等
ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::server));
//设置http显示头
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){
res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_async_server"));
}));
ws.async_accept(beast::bind_front_handler(&Session::onAccept, shared_from_this()));
}

void onAccept(beast::error_code ec){
if(ec){
cerr << "onAccept Error: " << ec.message() << endl;
return;
}
doRead();
}
void doRead(){
auto self(shared_from_this());
ws.async_read(buffer,[self,this](beast::error_code ec, std::size_t bytes_transferred){
boost::ignore_unused(bytes_transferred);
if(beast::websocket::error::closed == ec)
return;
if(ec){
cerr << "doRead Error: " << ec.message() <<endl;
return;
}
ws.text(ws.got_text());

///beast::flat_buffer添加标识再回显
string data = beast::buffers_to_string(buffer.data()); //flat buffer to string
cout << "Receive data: " << data << endl;
buffer.consume(data.size());
data.insert(0, "Hello Client-");
void* wptr = buffer.prepare(data.size()).data();
memcpy(wptr, reinterpret_cast<const char*>(data.data()), data.size());
buffer.commit(data.size());

ws.async_write(buffer.data(), beast::bind_front_handler(&Session::onWrite, shared_from_this()));
});
}
void onWrite(beast::error_code ec, std::size_t bytes_transferred){
if(ec){
cerr << "onWrite Error: " << ec.message() <<endl;
return;
}
buffer.consume(buffer.size());
doRead();
}

private:
beast::websocket::stream<beast::tcp_stream> ws;
beast::flat_buffer buffer;
};

class Listener: public std::enable_shared_from_this<Listener>{
public:
Listener(asio::io_context& ioc, asio::ip::tcp::endpoint endpoint):ioc(ioc), acceptor(ioc){
beast::error_code ec;
acceptor.open(endpoint.protocol(), ec);
if(ec){
cerr << "acceptor open Error: " << ec.message() <<endl;
throw std::runtime_error("Listener Open Fault");
}

acceptor.set_option(asio::socket_base::reuse_address(true), ec);
if(ec){
cerr << "acceptor set_option Error: " << ec.message() <<endl;
throw std::runtime_error("Listener set_option Fault");
}

acceptor.bind(endpoint, ec);
if(ec){
cerr << "acceptor bind Error: " << ec.message() <<endl;
throw std::runtime_error("Listener bind Fault");
}

acceptor.listen(asio::socket_base::max_listen_connections, ec);
if(ec){
cerr << "acceptor listen Error: " << ec.message() <<endl;
throw std::runtime_error("Listener listen Fault");
}
}

void listenerDoAccept(){
auto self(shared_from_this());
acceptor.async_accept(asio::make_strand(ioc), [self, this](beast::error_code ec, asio::ip::tcp::socket socket){
if(ec){
cerr << "listenerDoAccept Error: " << ec.message() << endl;
return;
}
auto sessionPtr = std::make_shared<Session>(std::move(socket));
sessionPtr->run();
listenerDoAccept();
});
}

private:
asio::io_context& ioc;
asio::ip::tcp::acceptor acceptor;
};

int main(){
string arg1 = "127.0.0.1";
string arg2 = "5555";
const int threadNum = 5;

const asio::ip::address ip = asio::ip::make_address(arg1);
const unsigned short port = static_cast<unsigned short>(stoi(arg2));

asio::io_context ioc(5);

//使用enable_shared_from_this的对象需要共享指针管理
auto lisptr = std::make_shared<Listener>(ioc, asio::ip::tcp::endpoint{ip,port});
lisptr->listenerDoAccept();

//多开线程跑run
vector<std::thread> threadPool;
for(int i=0; i<threadNum-1; i++){
threadPool.emplace_back([&ioc](){
ioc.run();
});
}
ioc.run();
return 0;
}

异步客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//client.cpp
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>

using namespace std;
namespace beast = boost::beast;
namespace asio = boost::asio;

class Session: public enable_shared_from_this<Session>{
public:
explicit Session(asio::io_context& ioc): resolver(asio::make_strand(ioc)), ws(asio::make_strand(ioc)){}

void run(const char* host, const char* port, const char* text){
host_ = host;
text_ = text;
resolver.async_resolve(host, port, beast::bind_front_handler(&Session::onResolve, shared_from_this())); //解析协议和endpoint
}

void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){
if(ec){
cerr << "onResolve Error: " << ec.message() <<endl;
return;
}
beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
tcpStream.expires_after(std::chrono::seconds(30));
tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this())); //TCP连接
}

void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){
if(ec){
cerr << "onConnect Error: " << ec.message() <<endl;
return;
}
beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
tcpStream.expires_never();
ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client)); //设置客户端异步的连接、握手、收发信息默认超时
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ //http通信头
req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client"));
}));
host_ += ':' + std::to_string(ep.port());
ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this())); //http握手
}
void onHandShake(beast::error_code ec){
if(ec){
cerr << "onHandShake Error: " << ec.message() <<endl;
return;
}
ws.async_write(asio::buffer(text_), beast::bind_front_handler(&Session::onWrite, shared_from_this()));
}
void onWrite(beast::error_code ec, std::size_t bytes_transferred){
boost::ignore_unused(bytes_transferred);
if(ec){
cerr << "onWrite Error: " << ec.message() <<endl;
return;
}
ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this()));
}
void onRead(beast::error_code ec, std::size_t bytes_transferred){
boost::ignore_unused(bytes_transferred);
if(ec){
cerr << "onRead Error: " << ec.message() <<endl;
return;
}
ws.async_close(beast::websocket::close_code::normal, beast::bind_front_handler(&Session::onClose, shared_from_this()));
}

void onClose(beast::error_code ec){
if(ec){
cerr << "onClose Error: " << ec.message() <<endl;
return;
}
cout << "Receive: " << beast::make_printable(buffer.data()) << endl;
}

private:
asio::ip::tcp::resolver resolver;
beast::websocket::stream<beast::tcp_stream> ws;
beast::flat_buffer buffer;
// 参数要传入异步任务,只能作成员变量,保持生命周期与类指针一致
string host_;
string text_;
};

int main(){
const char* host = "127.0.0.1";
const char* port = "5555";
const char* text = "Hello Server";
asio::io_context ioc;
auto sessionPtr = std::make_shared<Session>(ioc);
sessionPtr->run(host, port, text);
ioc.run();

return 0;
}

该客户端发送一次信息即退出,不符合大部分客户端场景,加入stdin读取,并且区分异步读取发送:

循环读取发送客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//client.cpp
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio.hpp>

using namespace std;
namespace beast = boost::beast;
namespace asio = boost::asio;

class Session: public enable_shared_from_this<Session>{
public:
explicit Session(asio::io_context& ioc): resolver(asio::make_strand(ioc)), ws(asio::make_strand(ioc)), \
input_(asio::make_strand(ioc), ::dup(STDIN_FILENO)){}

void run(const char* host, const char* port, const char* text){
host_ = host;
text_ = text;
resolver.async_resolve(host, port, beast::bind_front_handler(&Session::onResolve, shared_from_this()));
}

void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){
if(ec){
cerr << "onResolve Error: " << ec.message() <<endl;
return;
}
beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
tcpStream.expires_after(std::chrono::seconds(30));
tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this()));
}

void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){
if(ec){
cerr << "onConnect Error: " << ec.message() <<endl;
return;
}
beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws);
tcpStream.expires_never();
ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client));
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){
req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client"));
}));
host_ += ':' + std::to_string(ep.port());
ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this()));
}

void onHandShake(beast::error_code ec){
if(ec){
cerr << "onHandShake Error: " << ec.message() <<endl;
return;
}
ws.async_write(asio::buffer(text_), beast::bind_front_handler(&Session::onWrite, shared_from_this()));
ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this()));
}

void onWrite(beast::error_code ec, std::size_t bytes_transferred){
boost::ignore_unused(bytes_transferred);
if(ec){
cerr << "onWrite Error: " << ec.message() <<endl;
return;
}
//ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this()));
asyncReadInput();
}

void onRead(beast::error_code ec, std::size_t bytes_transferred){
boost::ignore_unused(bytes_transferred);
if(ec){
cerr << "onRead Error: " << ec.message() <<endl;
return;
}
//ws.async_close(beast::websocket::close_code::normal, beast::bind_front_handler(&Session::onClose, shared_from_this()));
ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this()));
}

void onClose(beast::error_code ec){
if(ec){
cerr << "onClose Error: " << ec.message() <<endl;
return;
}
cout << "Receive: " << beast::make_printable(buffer.data()) << endl;
}
/// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ //增加读取输入:
void asyncReadInput(){
auto self = shared_from_this();
asio::async_read_until(input_, boardBuf, '\n', [self, this](beast::error_code ec, std::size_t bytes_transferred){
if(ec){
cerr << "read_until Error: " << ec.message() << endl;
return;
}

//直接使用beast::flat_buffer,使用buffers_to_string会尝试对整个缓存区转化,不符合单次转化目的
//string input = beast::buffers_to_string(boardBuf.data());
std::istream iss(&boardBuf);
string input;
std::getline(iss, input);

cout << "11111" <<input << endl;
if(input == "exit"){
ws.async_close(beast::websocket::close_code::normal, beast::bind_front_handler(&Session::onClose, shared_from_this()));
return;
}
ws.async_write(asio::buffer(input), beast::bind_front_handler(&Session::onWrite, shared_from_this()));
});
}
private:
asio::posix::stream_descriptor input_;
asio::streambuf boardBuf;
/// =============================================================

private:
asio::ip::tcp::resolver resolver;
beast::websocket::stream<beast::tcp_stream> ws;
beast::flat_buffer buffer;
// 参数要传入异步任务,只能作成员变量,保持生命周期与类指针一致
string host_;
string text_;
};

int main(){
const char* host = "127.0.0.1";
const char* port = "5555";
const char* text = "Hello Server";
asio::io_context ioc;
auto sessionPtr = std::make_shared<Session>(ioc);
sessionPtr->run(host, port, text);
ioc.run();

return 0;
}