boost::beast
boost::beast
是一个基于boost::asio
封装的一个网络编程库,与asio注重网络底层不同,beast专注于搭建Http和WebSocket服务器框架,其完全由C++实现,支持零拷贝、异步、多线程、协程等,性能极高,支持ssl/tls加密等,但它的缺点也来自C++,例如开箱即用的轮子少、编译时间较长、更注重底层使得应用层兼容性一般、较繁杂的错误处理机制。
示例来自官方示例。
同步服务器-客户端
服务器端:比asio
中的示例多了几步,首先是使用socket
初始化一个websocket
的流对象,设置其http
响应头:websocket
的连接建立是通过http
握手开始的:
websocket的http握手
客户端发送消息: 1 2 3 4
| GET /index.html HTTP/1.1 #起始行 Host: www.example.com #HTTP 头(Header) User-Agent: Mozilla/5.0 Accept: text/html
|
服务器响应: 1 2 3
| HTTP/1.1 200 OK #起始行 Content-Type: text/html #HTTP 头(Header) Content-Length: 123
|
所以: 1 2 3
| ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){ res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_Server")); }));
|
其作用就是在http响应信息中增加server字段:
请求结构体和回复结构体还支持许多可选字段,如field::host
(请求的主机名)、field::user_agent
(客户端信息)、field::accept
(可接受的类型)、field::authorization
(认证信息)、field::content_type
(内容类型)、field::content_length
(内容长度)、field::date
(日期时间)等,详见http协议。
当客户端接受响应,双方会切换到websocket协议,按照ws的二进制的分帧协议进行发送和传输,这部分完全由底层保证,用户无需对每帧进行操作。为了实现http
流和websocket
流的帧高效发送和读取,beast
引入了beast::flat_buffer。
beast::flat_buffer
beast::flat_buffer
是一个基于连续内存的数组结构,用于socket
数据的高效写入和访问,主要设计是:
动态增长内存,类似vector机制;
读写区域分离和零拷贝,其数组分布类似[已消费的数据][可读的数据][可写的数据]
,三个指针
分别管理这这三个区域,但第一个部分不可访问,对于后两部分,当数据写入或者读走,都是直接写入到或者从
asio::const_buffer
读出,通过commit
和consume
确认指针消费移动,实际上均不涉及数据跨态传输或者拷贝,故是零拷贝行为,beast::flat_buffer
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp>
using namespace std;
namespace beast = boost::beast; namespace asio = boost::asio;
int main(){ beast::flat_buffer buffer; asio::mutable_buffer writeBuf = buffer.prepare(32); void * wptr = writeBuf.data(); printf("%p\n", wptr);
const char* data = "hello1234"; int len = strlen(data); memcpy(wptr,data, len); buffer.commit(len);
asio::mutable_buffer readBuf = buffer.data(); void *rptr = readBuf.data(); printf("%p\n", rptr);
string check(static_cast<const char*>(rptr), len); cout << check <<endl;
buffer.consume(len/2);
asio::mutable_buffer readBuf1 = buffer.data(); void *rptr1 = readBuf1.data(); string check1(static_cast<const char*>(rptr1), len/2); cout << check1 <<endl;
return 0; }
|
同步服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <thread>
using namespace std; namespace asio = boost::asio; namespace beast = boost::beast;
void doSession(asio::ip::tcp::socket socket_){ try{ beast::websocket::stream<asio::ip::tcp::socket> ws(std::move(socket_));
ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){ res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_Server")); }));
ws.accept(); while(true){ beast::flat_buffer buffer; ws.read(buffer);
ws.text(ws.got_text()); ws.write(buffer.data()); }
} catch(beast::system_error& ec){ cerr << "beast::Error: " << ec.what() <<endl; } catch(std::exception& e){ cerr << "std::Error: " << e.what() <<endl; } }
int main(){ string arg1 = "127.0.0.1"; string arg2 = "5555";
const asio::ip::address address = asio::ip::make_address(arg1); const unsigned short port = static_cast<unsigned short>(stoi(arg2));
asio::io_context ioc(1); asio::ip::tcp::acceptor acceptor(ioc, asio::ip::tcp::endpoint(address, port)); while(true){ asio::ip::tcp::socket socket(ioc); acceptor.accept(socket);
std::thread(&doSession, std::move(socket)).detach(); }
return 0; }
|
同步客户端
这里有两个类似的概念,分别是: 1 2
| asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port); asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint);
|
results_type
类型是resolve
后返回的集合,集合中每个对象都是endpoint
和一些额外信息(server_name
和host_name
),resolve
的作用是解析,总的而言提供了三类解析:
DNS解析:当第一个参数为host主机名(域名)时,resolve能够进行DNS解析并且获取到ip和端口;
ip解析:解析ip到通信端点;
服务解析:如localhost,也会返回到ip和端口解析;
可见对于1和3,服务都是可能指向多ip的,而且还有可能需要区分ipv4和ipv6,所以asio的设计干脆返回一个集合,对于2未必也一定返回单个ip和端口节点,毕竟可能协议簇并不一样,记住,TCP底层使用了五元组唯一区分连接,包括源和目的ip和端口、协议。
对于单个对象endpoint,主要是提供了,例如ip地址、端口等。
客户端代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio/connect.hpp> #include <boost/asio/ip/tcp.hpp> #include <chrono> #include <thread>
using namespace std; namespace beast = boost::beast; namespace asio = boost::asio;
int main(){ string host = "127.0.0.1"; string port = "5555";
try{ asio::io_context ioc(1); asio::ip::tcp::resolver resolver(ioc); asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port);
beast::websocket::stream<asio::ip::tcp::socket> ws(ioc); ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING) + string("I_am_Client")); }));
asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint); host += ':' + std::to_string(ep.port()); ws.handshake(host, "/"); auto now = std::chrono::steady_clock::now();
while(true){ auto newTime = std::chrono::steady_clock::now(); if(std::chrono::duration_cast<std::chrono::seconds>(newTime - now).count() > 10){ break; } string sendMsg("Hello Server"); ws.write(asio::buffer(sendMsg)); std::this_thread::sleep_for(std::chrono::seconds(1));
beast::flat_buffer rbuffer; ws.read(rbuffer); cout << beast::make_printable(rbuffer.data()) << endl; } ws.close(beast::websocket::close_code::normal); }catch(std::exception&e){ cout << "Error:" << e.what() <<endl; return EXIT_FAILURE; } return 0; }
|
异步服务器-客户端
boost库的asio设计中异步是其“第一公民”,在异步的websocket架构体现的淋漓尽致。这里所谓的异步特指的是linux环境下实现的reactor模式(在windows下ASIO可能依赖的是IOCP的proactor模式,reactor模式是一种异步事件驱动模型,这里的异步指的是线程只需要发起IO,就可以非阻塞地继续执行任务,由内核线程通过epoll等模式进行监听,当事件完成、数据就绪(设备中断触发),会来读写数据并触发注册的回调函数,****这里的读写是同步阻塞的****,是同步IO,所以也有人将rector称为非阻塞同步模型。
而对proactor模型,使用的完全是异步IO,在Linux
Posix平台上的aio接口模拟了这种行为,数据就绪时内核会自动填充数据到用户缓冲区,而非用户使用read/write等发起数据拷贝:
1 2
| struct aiocb cb; aio_read(&cb);
|
然而aio接口在Linux中只是模拟行为,目前并没有急于完整地实现proactor机制,原因只能是现阶段proactor性能上的吸引力显然不足以抵消linux内核设计哲学上让程序控制IO的思潮。
除了同步和异步的说法混淆,另一个容易混淆的点是异步和多线程,因为异步意味着非阻塞,很容易让人误解成“每一个异步操作就是开一个线程去跑IO任务”,诚然这是一开始的设计思想,但随着IO多路复用的引入,数据监听的工作是由内核线程完成的,严格来说单线程与多线程的范畴并不讨论内核线程,否则现代操作系统每个进程至少含一个内核线程和一个用户线程,就没有所谓单线程的说法了,所以linux
ASIO底层使用的是IO多路复用,如epoll等,而并非粗暴地建立线程。
前置知识细节
beast::tcp_stream和asio::ip::tcp::socket
差异:主要在于层次区别,websocket
能够对socket
和tcp_stream
封装,但要知道beast::tcp_stream
本身是对asio::ip::tcp::socket
的封装,对于无ssl加密层情况下,其对应的websocket分别是三级和二级封装,通过next_layer
、get_lowest_layer
等接口可用访问到底层对象:
1 2 3 4 5 6 7 8 9
| beast::websocket::stream<asio::ip::tcp::socket> socket1; beast::websocket::stream<beast::tcp_stream> socket2;
boost::asio::ip::tcp::socket& sl1 = socket1.next_layer();
beast::tcp_stream& tcpStream = socket2.next_layer(); beast::tcp_stream& tcpStream1 = beast::get_lowest_layer(socket2);
boost::asio::ip::tcp::socket& sl2 = tcpStream.socket();
|
对比asio::ip::tcp::socket
,beast::tcp_stream
内置了超时机制,支持expires_after/expires_at/expires_never
等超时接口,更方便衔接ssl。
初始化的共同点:一个容易看晕的点,两种类型的beast::websocket
均既可以使用ioc
进行初始化,也可以使用std::move(sokcet)
初始化:
1 2 3 4
| beast::websocket::stream<beast::tcp_stream> ws; ws(ioc); asio::ip::tcp::socket socket; ws(std::move(socket));
|
从规则而言二者均可,但从通信习惯上来看:
对于服务端,官方示例使用std::move(sokcet)
初始化,因为无论同步异步,经过对endpoint
的绑定、监听,服务端的acceptor
执行accept
后,会返回一个通信socket
,这个socket
常常被用于构建通信Session类用于信息收发和处理。如果没有accept,socket是否初始化都是无用的,因此习惯先得到socket,再顺手初始化一个websocket以通信:
1 2 3
| asio::ip::tcp::acceptor accepetor(ioc, asio::ip::tcp::endpoint{host,port}); accepetor.accept(asio::ip::tcp::socket socket); accepetor.async_accept(ioc, [](beast::error_code ec, asio::ip::tcp::socket socket){...})
|
对于客户端,官方示例使用ioc
进行初始化,因为client
没有独立的通信类,意味着websocket
和resolver
是需要在同一个类经ioc初始化的,只有resolve
解析出endpoint
,通过connect(websocket.next_layer(), endpoint)
来建立TCP连接,故无需创建专门的socket
用于通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| beast::websocket::stream<asio::ip::tcp::socket> ws(ioc); asio::ip::tcp::resolver::results_type endpoint = resolver.resolve(host, port); asio::ip::tcp::endpoint ep = asio::connect(ws.next_layer(), endpoint);
void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){ if(ec){ cerr << "onResolve Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_after(std::chrono::seconds(30)); tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this())); }
void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){ if(ec){ cerr << "onConnect Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_never(); ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client)); ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client")); })); host_ += ':' + std::to_string(ep.port()); ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this())); }
|
当然,客户端如果做成独立通信节点类,先resolve
再建立socket
、再初始化websocket
亦可,如:
1 2 3 4 5 6 7 8 9
| tcp::resolver resolver(ioc); auto const results = resolver.resolve("example.com", "5555");
tcp::socket socket(ioc); boost::asio::connect(socket, results); websocket::stream<tcp::socket> ws(std::move(socket));
ws.handshake("example.com", "/");
|
async_异步接口与beast::bind_front_handler
beast::websocket
实现了从连接建立到通信到关闭socket的全流程异步操作,每个异步操作都会注册一个回调函数,在官方示例中都使用了beast::bind_front_handler(&Session::func, shared_from_this())
这个语法糖,shared_from_this
之前已经剖析过了,在异步任务中安全地获取类指针、增加共享指针引用计数延长类的生命周期非常重要,beast::bind_front_handler(&Session::func, shared_from_this())
的效果其实是代替了匿名函数捕获变量的写法,例如两种等效写法:
1 2 3 4 5 6 7
| ws.async_accept(beast::bind_front_handler(&Session::onAccept, shared_from_this()));
auto self = shared_from_this(); ws.async_accept([self,this](){ });
|
如果大量采用匿名函数写法,函数的耦合程度和代码量会很大,不利于阅读。beast::bind_front_handler
的函数原型为:
1 2
| template <class Function, class... Args> auto bind_front_handler(Function&& f, Args&&... args);
|
当当前的上下文已经有确定的参数了,你可以使用bind_front_handler提前绑定这些参数,其返回值就相当于一个新的函数签名,例如:
1 2 3 4 5 6 7 8 9 10 11 12
| int subtract(int a, int b, int c){ int num = a - b - c; cout << num; return num; }
int main(){ auto func = beast::bind_front_handler(subtract, 3, 9); func(10);
return 0; }
|
但由于beast是专门为网络库设计的,返回的func仍然是beast函数的封装,不能直接返回int函数指针,但C++
20已经开始在标准库引入相关类似特性的函数。
boost::asio::tcp
server的地址复用(SO_REUSEADDR和SO_REUSEPORT)
在官方示例中,设置了地址复用: 1
| acceptor.set_option(asio::socket_base::reuse_address(true), ec);
|
这里牵涉到网络编程中服务侧可配置两个关键字:SO_REUSEADDR
和SO_REUSEPORT
:
SO_REUSEADDR:地址复用。如果开启,则做了两件事情:第一件,当一个服务器绑定了0.0.0.0
时,代表绑定到所有ip,那么另一个服务器一般将不能绑定到任何端口,地址复用允许绑定;第二,TCP协议的服务器退出会有一个Time_Wait
时间,主要用于等待报文全部死亡,持续几十秒到几分钟不等,如果开启地址复用,该地址能够马上被新的服务器bind,而且实践情况下少有被旧报文影响的。注意,只要一个server设置成SO_REUSEADDR
,那么该地址就是被允许重复绑定,bind不会主动检测是否所有socket都设置了地址重用。
SO_REUSEPORT:端口复用。如果开启,那么将允许多个socket同时bind到同一个ip和端口上,内核通过五元组区分流量:<协议,源ip,源端口,目的ip,目的端口>
,此处源ip和源端口指的是是客户端的ip和端口,端口一般是系统临时分配的端口。但注意,只有连接到ip:端口的所有socket均设置了SO_REUSEPORT,该ip和端口才能被复用,否则只要一个socket没有设置,重复连接会失败。再者,SO_REUSEPORT
不包含SO_REUSEADDR
,如果需要TIME_WAIT
时刻仍然正常连接,仍需要设置SO_REUSEADDR
。
ws.text(ws.got_text())与make_printable(buffer.data())
与第一直觉不同,ws.got_text()
返回的不是文本信息,而是一个bool,若true
代表该帧信息是文本信息,否则为二进制信息,ws.text
同样接受bool
参数,来确定发送的是文本还是二进制,二者组合常常表示"将帧原封不动地回显回去",因此如果需要打印信息,只能操作beast::flat_buffer
缓冲区,通过make_printable(buffer.data())
等。
代码实现
异步服务器实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio.hpp> #include <thread>
using namespace std; namespace asio = boost::asio; namespace beast = boost::beast;
class Session : public std::enable_shared_from_this<Session>{ public: explicit Session(asio::ip::tcp::socket&& socket): ws(std::move(socket)){} void run(){ asio::dispatch(ws.get_executor(), beast::bind_front_handler(&Session::onRun, shared_from_this())); }
void onRun(){ ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::server)); ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::response_type& res){ res.set(beast::http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + string("I_am_async_server")); })); ws.async_accept(beast::bind_front_handler(&Session::onAccept, shared_from_this())); }
void onAccept(beast::error_code ec){ if(ec){ cerr << "onAccept Error: " << ec.message() << endl; return; } doRead(); } void doRead(){ auto self(shared_from_this()); ws.async_read(buffer,[self,this](beast::error_code ec, std::size_t bytes_transferred){ boost::ignore_unused(bytes_transferred); if(beast::websocket::error::closed == ec) return; if(ec){ cerr << "doRead Error: " << ec.message() <<endl; return; } ws.text(ws.got_text());
string data = beast::buffers_to_string(buffer.data()); cout << "Receive data: " << data << endl; buffer.consume(data.size()); data.insert(0, "Hello Client-"); void* wptr = buffer.prepare(data.size()).data(); memcpy(wptr, reinterpret_cast<const char*>(data.data()), data.size()); buffer.commit(data.size());
ws.async_write(buffer.data(), beast::bind_front_handler(&Session::onWrite, shared_from_this())); }); } void onWrite(beast::error_code ec, std::size_t bytes_transferred){ if(ec){ cerr << "onWrite Error: " << ec.message() <<endl; return; } buffer.consume(buffer.size()); doRead(); }
private: beast::websocket::stream<beast::tcp_stream> ws; beast::flat_buffer buffer; };
class Listener: public std::enable_shared_from_this<Listener>{ public: Listener(asio::io_context& ioc, asio::ip::tcp::endpoint endpoint):ioc(ioc), acceptor(ioc){ beast::error_code ec; acceptor.open(endpoint.protocol(), ec); if(ec){ cerr << "acceptor open Error: " << ec.message() <<endl; throw std::runtime_error("Listener Open Fault"); }
acceptor.set_option(asio::socket_base::reuse_address(true), ec); if(ec){ cerr << "acceptor set_option Error: " << ec.message() <<endl; throw std::runtime_error("Listener set_option Fault"); }
acceptor.bind(endpoint, ec); if(ec){ cerr << "acceptor bind Error: " << ec.message() <<endl; throw std::runtime_error("Listener bind Fault"); }
acceptor.listen(asio::socket_base::max_listen_connections, ec); if(ec){ cerr << "acceptor listen Error: " << ec.message() <<endl; throw std::runtime_error("Listener listen Fault"); } }
void listenerDoAccept(){ auto self(shared_from_this()); acceptor.async_accept(asio::make_strand(ioc), [self, this](beast::error_code ec, asio::ip::tcp::socket socket){ if(ec){ cerr << "listenerDoAccept Error: " << ec.message() << endl; return; } auto sessionPtr = std::make_shared<Session>(std::move(socket)); sessionPtr->run(); listenerDoAccept(); }); }
private: asio::io_context& ioc; asio::ip::tcp::acceptor acceptor; };
int main(){ string arg1 = "127.0.0.1"; string arg2 = "5555"; const int threadNum = 5;
const asio::ip::address ip = asio::ip::make_address(arg1); const unsigned short port = static_cast<unsigned short>(stoi(arg2));
asio::io_context ioc(5);
auto lisptr = std::make_shared<Listener>(ioc, asio::ip::tcp::endpoint{ip,port}); lisptr->listenerDoAccept();
vector<std::thread> threadPool; for(int i=0; i<threadNum-1; i++){ threadPool.emplace_back([&ioc](){ ioc.run(); }); } ioc.run(); return 0; }
|
异步客户端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio.hpp>
using namespace std; namespace beast = boost::beast; namespace asio = boost::asio;
class Session: public enable_shared_from_this<Session>{ public: explicit Session(asio::io_context& ioc): resolver(asio::make_strand(ioc)), ws(asio::make_strand(ioc)){}
void run(const char* host, const char* port, const char* text){ host_ = host; text_ = text; resolver.async_resolve(host, port, beast::bind_front_handler(&Session::onResolve, shared_from_this())); }
void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){ if(ec){ cerr << "onResolve Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_after(std::chrono::seconds(30)); tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this())); }
void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){ if(ec){ cerr << "onConnect Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_never(); ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client)); ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client")); })); host_ += ':' + std::to_string(ep.port()); ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this())); } void onHandShake(beast::error_code ec){ if(ec){ cerr << "onHandShake Error: " << ec.message() <<endl; return; } ws.async_write(asio::buffer(text_), beast::bind_front_handler(&Session::onWrite, shared_from_this())); } void onWrite(beast::error_code ec, std::size_t bytes_transferred){ boost::ignore_unused(bytes_transferred); if(ec){ cerr << "onWrite Error: " << ec.message() <<endl; return; } ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this())); } void onRead(beast::error_code ec, std::size_t bytes_transferred){ boost::ignore_unused(bytes_transferred); if(ec){ cerr << "onRead Error: " << ec.message() <<endl; return; } ws.async_close(beast::websocket::close_code::normal, beast::bind_front_handler(&Session::onClose, shared_from_this())); }
void onClose(beast::error_code ec){ if(ec){ cerr << "onClose Error: " << ec.message() <<endl; return; } cout << "Receive: " << beast::make_printable(buffer.data()) << endl; }
private: asio::ip::tcp::resolver resolver; beast::websocket::stream<beast::tcp_stream> ws; beast::flat_buffer buffer; string host_; string text_; };
int main(){ const char* host = "127.0.0.1"; const char* port = "5555"; const char* text = "Hello Server"; asio::io_context ioc; auto sessionPtr = std::make_shared<Session>(ioc); sessionPtr->run(host, port, text); ioc.run();
return 0; }
|
该客户端发送一次信息即退出,不符合大部分客户端场景,加入stdin读取,并且区分异步读取发送:
循环读取发送客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| #include <iostream> #include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio.hpp>
using namespace std; namespace beast = boost::beast; namespace asio = boost::asio;
class Session: public enable_shared_from_this<Session>{ public: explicit Session(asio::io_context& ioc): resolver(asio::make_strand(ioc)), ws(asio::make_strand(ioc)), \ input_(asio::make_strand(ioc), ::dup(STDIN_FILENO)){}
void run(const char* host, const char* port, const char* text){ host_ = host; text_ = text; resolver.async_resolve(host, port, beast::bind_front_handler(&Session::onResolve, shared_from_this())); }
void onResolve(beast::error_code ec, asio::ip::tcp::resolver::results_type results){ if(ec){ cerr << "onResolve Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_after(std::chrono::seconds(30)); tcpStream.async_connect(results, beast::bind_front_handler(&Session::onConnect, shared_from_this())); }
void onConnect(beast::error_code ec, asio::ip::tcp::endpoint ep){ if(ec){ cerr << "onConnect Error: " << ec.message() <<endl; return; } beast::tcp_stream& tcpStream = beast::get_lowest_layer(ws); tcpStream.expires_never(); ws.set_option(beast::websocket::stream_base::timeout::suggested(beast::role_type::client)); ws.set_option(beast::websocket::stream_base::decorator([](beast::websocket::request_type& req){ req.set(beast::http::field::user_agent, string(BOOST_BEAST_VERSION_STRING)+string("I_am_async_client")); })); host_ += ':' + std::to_string(ep.port()); ws.async_handshake(host_, "/", beast::bind_front_handler(&Session::onHandShake, shared_from_this())); }
void onHandShake(beast::error_code ec){ if(ec){ cerr << "onHandShake Error: " << ec.message() <<endl; return; } ws.async_write(asio::buffer(text_), beast::bind_front_handler(&Session::onWrite, shared_from_this())); ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this())); }
void onWrite(beast::error_code ec, std::size_t bytes_transferred){ boost::ignore_unused(bytes_transferred); if(ec){ cerr << "onWrite Error: " << ec.message() <<endl; return; } asyncReadInput(); }
void onRead(beast::error_code ec, std::size_t bytes_transferred){ boost::ignore_unused(bytes_transferred); if(ec){ cerr << "onRead Error: " << ec.message() <<endl; return; } ws.async_read(buffer, beast::bind_front_handler(&Session::onRead, shared_from_this())); }
void onClose(beast::error_code ec){ if(ec){ cerr << "onClose Error: " << ec.message() <<endl; return; } cout << "Receive: " << beast::make_printable(buffer.data()) << endl; }
void asyncReadInput(){ auto self = shared_from_this(); asio::async_read_until(input_, boardBuf, '\n', [self, this](beast::error_code ec, std::size_t bytes_transferred){ if(ec){ cerr << "read_until Error: " << ec.message() << endl; return; }
std::istream iss(&boardBuf); string input; std::getline(iss, input);
cout << "11111" <<input << endl; if(input == "exit"){ ws.async_close(beast::websocket::close_code::normal, beast::bind_front_handler(&Session::onClose, shared_from_this())); return; } ws.async_write(asio::buffer(input), beast::bind_front_handler(&Session::onWrite, shared_from_this())); }); } private: asio::posix::stream_descriptor input_; asio::streambuf boardBuf;
private: asio::ip::tcp::resolver resolver; beast::websocket::stream<beast::tcp_stream> ws; beast::flat_buffer buffer; string host_; string text_; };
int main(){ const char* host = "127.0.0.1"; const char* port = "5555"; const char* text = "Hello Server"; asio::io_context ioc; auto sessionPtr = std::make_shared<Session>(ioc); sessionPtr->run(host, port, text); ioc.run();
return 0; }
|