gpt4 book ai didi

Boost ASIO crashes while calling callback function(调用回调函数时Boost ASIO崩溃)

转载 作者:bug小助手 更新时间:2023-10-28 10:12:40 28 4
gpt4 key购买 nike



I am new to boost library. I have written a server using boost beast async methods, but sometimes it got crashed. The scenario is not common, sometime it will take 3-4 hr to get crashed. I am sharing the screenshot of crash point which I got from application crashed dump file.

我是新来的。我写了一个服务器使用boost beast的方法,但有时它会崩溃。这种情况并不常见,有时需要3-4小时才能崩溃。我分享的崩溃点,我从应用程序崩溃转储文件的截图.


Please help me to find the reason of the crash.

请帮我找出坠机的原因。


enter image description here


I have done some research on google and also check this link "https://www.boost.org/doc/libs/1_54_0/boost/asio/detail/deadline_timer_service.hpp" but not able to understand it.

我在谷歌上做了一些研究,也检查了这个链接“https://www.boost.org/doc/libs/1_54_0/boost/asio/detail/deadline_timer_service.hpp”,但无法理解它。


Header file WebSocketServer.h

头文件WebSocketServer.h


#include "server_certificate.hpp"
#include "xlogger.h"
#include "TSWebSocket.h"

#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/mutex.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <map>
#include <queue>

namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

//------------------------------------------------------------------------------

// The following ifdef block is the standard way of creating macros which make exporting
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener>
{
private:

std::shared_ptr<ISocketEvents> m_pEvent;
CRITICAL_SECTION m_hCSClientidSession;
std::map<UINT, std::shared_ptr<WebSocketSession>> m_mapClientidSession;
public:
WebSocketListener(tcp::endpoint endpoint);
~WebSocketListener();
void Run();
void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
bool SendMessageToClient(UINT uClientid, std::string strMessage);
bool RemoveClient(UINT uClientid);
private:
void do_accept();
void on_accept(beast::error_code ec, tcp::socket socket);
};

//------------------------------------------------------------------------------

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession>
{
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
beast::flat_buffer m_ReadBuffer;
beast::flat_buffer m_WriteBuffer;
std::shared_ptr<ISocketEvents> m_pEvent;
SOCKET m_SocketDescriptor;
bool bHandShake;
public:
// Take ownership of the socket
WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId);
void run();
void closeSocket();
void on_run();
void on_handshake(beast::error_code ec);
void on_accept(beast::error_code ec);
void on_upgrade(beast::error_code ec, size_t);
void do_read();
void on_read(beast::error_code ec, std::size_t bytes_transferred);
UINT getSocketDescriptor();
bool SendMessageToClient(std::string strMessage);
void do_Disconnect();
~WebSocketSession();
};

//------------------------------------------------------------------------------

// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey);
//------------------------------------------------------------------------------

Source File WebSocketServer.cpp

源文件WebSocketServer.cpp


    #include "stdafx.h"
#include "WebSocketServer.h"
//------------------------------------------------------------------------------

#define NO_OF_THREAD 50

static std::shared_ptr<WebSocketListener> objWebSocketListener = nullptr;

// The io_context is required for all I/O
boost::asio::io_context g_ioc{ NO_OF_THREAD };

// The SSL context is required, and holds certificates
ssl::context g_ctx{ ssl::context::tlsv12 };

tcp::acceptor g_acceptor(net::make_strand(g_ioc));
http::request<http::string_body> g_upgrade_request;

void RunIOContextThread();

//It initialize the WebSocketListener
std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey)
{
auto const address = net::ip::make_address("0.0.0.0");

// This holds the self-signed certificate used by the server
load_server_certificate(g_ctx, certificate, privateKey);

// Create and launch a listening port
objWebSocketListener = std::make_shared<WebSocketListener>(tcp::endpoint{ address, port });
if(objWebSocketListener)
objWebSocketListener->Run();

// Run the I/O service on the requested number of threads
for (int i = 0; i < NO_OF_THREAD; ++i)
{
std::thread ioContextThread(&RunIOContextThread);
ioContextThread.detach();
}

return objWebSocketListener->shared_from_this();
}

void RunIOContextThread()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "RunIOContextThread thread started %d", std::this_thread::get_id());
g_ioc.run();
}

//------------------------------------------------------------------------------

// Listener class
WebSocketListener::~WebSocketListener()
{
}

WebSocketListener::WebSocketListener(tcp::endpoint endpoint) :
m_pEvent(NULL)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Enter");
InitializeCriticalSection(&m_hCSClientidSession);
beast::error_code ec;

// Open the acceptor
g_acceptor.open(endpoint.protocol(), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener open failed, error message: %s", ec.message().c_str());
return;
}

// Allow address reuse
g_acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option failed, error message: %s", ec.message().c_str());
return;
}


g_acceptor.set_option(tcp::no_delay(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s", ec.message().c_str());
return;
}

// Bind to the server address
g_acceptor.bind(endpoint, ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener bind failed, error message: %s", ec.message().c_str());
return;
}

// Start listening for connections
g_acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener listen failed, error message: %s", ec.message().c_str());
return;
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::WebSocketListener Started listening at %s:%hu", g_acceptor.local_endpoint().address().to_string().c_str(), g_acceptor.local_endpoint().port());
}

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Leave");
}

bool WebSocketListener::RemoveClient(UINT uClientid)
{
bool bRet = false;
try
{
EnterCriticalSection(&m_hCSClientidSession);
auto iter = m_mapClientidSession.find(uClientid);
if (iter != m_mapClientidSession.end())
{
iter->second->closeSocket();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::RemoveClient Removed from map uClientid : %d refCount: %d", uClientid, iter->second.use_count());
m_mapClientidSession.erase(iter);
bRet = true;
}
LeaveCriticalSection(&m_hCSClientidSession);
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Generic Catch");
}
return bRet;
}

bool WebSocketListener::SendMessageToClient(UINT uClientid, std::string strMessage)
{
bool bRet = false;
try
{

EnterCriticalSection(&m_hCSClientidSession);
auto iter = m_mapClientidSession.find(uClientid);
if (iter != m_mapClientidSession.end())
{
bRet = iter->second->SendMessageToClient(strMessage);
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
}
LeaveCriticalSection(&m_hCSClientidSession);
}
catch (exception& e)
{
LeaveCriticalSection(&m_hCSClientidSession);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
}
catch (...)
{
LeaveCriticalSection(&m_hCSClientidSession);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Generic Catch");
}
return bRet;
}

void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent)
{
m_pEvent = pEvent;
}

// Start accepting incoming connections
void WebSocketListener::Run()
{
do_accept();
}

void WebSocketListener::do_accept()
{
try
{
// The new connection gets its own strand
g_acceptor.async_accept(
net::make_strand(g_ioc),
beast::bind_front_handler(
&WebSocketListener::on_accept,
shared_from_this()));
}
catch(exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Generic Catch");
}
}

void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s", socket.native_handle(), ec.message().c_str());
}
else
{
socket.set_option(tcp::no_delay(true), ec);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept set_option(No delay) failed, error message: %s", ec.message().c_str());
return;
}

std::string sClientIp = socket.remote_endpoint().address().to_string();
unsigned short uiClientPort = socket.remote_endpoint().port();
SOCKET socketId = socket.native_handle();
// Create the WebSocketSession and run it
std::shared_ptr<WebSocketSession> objSession = std::make_shared<WebSocketSession>(std::move(socket), g_ctx, m_pEvent, socketId);
if (objSession != nullptr)
{
objSession->run();

EnterCriticalSection(&m_hCSClientidSession);
auto itr = m_mapClientidSession.find(socketId);
if (itr != m_mapClientidSession.end())
{
itr->second = objSession;
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept update client id : %d", socketId);
}
else
{
m_mapClientidSession.insert(pair<unsigned int, std::shared_ptr<WebSocketSession >>(socketId, objSession));
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept insert client id : %d", socketId);
}
LeaveCriticalSection(&m_hCSClientidSession);

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::on_accept Incoming connection request from SocketID: %d, IP: %s Port: %hu", socketId, sClientIp.c_str(), uiClientPort);
}
}
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Generic Catch");
}
// Accept another connection
do_accept();
}

//------------------------------------------------------------------------------

// Session class
WebSocketSession::WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId)
: ws_(std::move(socket), ctx), m_pEvent(pEvent), m_SocketDescriptor(socketId)
{
m_WriteBuffer.reserve(1000);
m_ReadBuffer.reserve(1000);
bHandShake = false;
}

WebSocketSession::~WebSocketSession()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d Closing ", m_SocketDescriptor);
if (ws_.is_open())
{
beast::websocket::close_reason closeReason;
beast::error_code ec;
ws_.close(closeReason, ec);
if (ec)
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s", ec.message().c_str());
}
}

// Get on the correct executor
void WebSocketSession::run()
{
try {
// We need to be executing within a strand to perform async operations
// on the I/O objects in this WebSocketSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(
&WebSocketSession::on_run,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Generic Catch");
}
}

void WebSocketSession::closeSocket()
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket Closing SocketID: %d", m_SocketDescriptor);
beast::websocket::close_reason closeReason;
beast::error_code ec;
UINT ID = m_SocketDescriptor;
ws_.async_close(closeReason, [=](beast::error_code ec)
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket SocketID: %d, Code: %d Message: %s", ID, closeReason, ec.message().c_str());
}
}
);
}

// Start the asynchronous operation
void WebSocketSession::on_run()
{
try
{
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::server,
beast::bind_front_handler(
&WebSocketSession::on_handshake,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Generic Catch");
}
}

void WebSocketSession::on_handshake(beast::error_code ec)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
return;
}
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();

// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::server));

// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async-ssl");
}));

http::async_read(ws_.next_layer(), m_ReadBuffer, g_upgrade_request,
beast::bind_front_handler(
&WebSocketSession::on_upgrade,
shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Generic Catch");
}
}

void WebSocketSession::on_upgrade(beast::error_code ec, size_t)
{
try
{
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
return;
}

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s", m_SocketDescriptor, boost::lexical_cast<std::string>(g_upgrade_request.base()));

if (m_pEvent != nullptr)
{
m_pEvent->OnConnect(m_SocketDescriptor, true, "", 0);
}
// Accept the websocket handshake
ws_.async_accept(
g_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Generic Catch");
}
}

void WebSocketSession::on_accept(beast::error_code ec)
{
if (ec)
{
do_Disconnect();
return;
}
// Read a message
do_read();
}

void WebSocketSession::do_read()
{
static boost::mutex objMutex;
try
{
objMutex.lock();
if (m_pEvent && !bHandShake)
{
bHandShake = true;
m_pEvent->onHandShake(m_SocketDescriptor, bHandShake);
}
// Read a message into our buffer

ws_.async_read(
m_ReadBuffer,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()));
objMutex.unlock();
}
catch (exception& e)
{
objMutex.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Catch msg: %s", e.what());
}
catch (...)
{
objMutex.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Generic Catch");
}
}

void WebSocketSession::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
try
{
boost::ignore_unused(bytes_transferred);

// This indicates that the WebSocketSession was closed
if (ec == websocket::error::closed)
{
do_Disconnect();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client closed SocketID: %d", m_SocketDescriptor);
return;
}

if (ec)
{
do_Disconnect();
return;
}

if (m_pEvent != nullptr)
{
if (m_ReadBuffer.cdata().size() > 0)
{
const unsigned int iLen = m_ReadBuffer.cdata().size();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client SocketID: %d dataRecv: %d cpacity: %d", m_SocketDescriptor, iLen, m_ReadBuffer.capacity());

std::shared_ptr<char> objData(new char[iLen], std::default_delete<char[]>());
::memset(objData.get(), 0, iLen);
if (objData)
{
memcpy(objData.get(), m_ReadBuffer.cdata().data(), iLen);
m_pEvent->OnReceive(m_SocketDescriptor, objData.get(), iLen);
}
}
}
m_ReadBuffer.consume(bytes_transferred);
}
catch (exception& e)
{

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Generic Catch");
}
do_read();
}

UINT WebSocketSession::getSocketDescriptor()
{
return m_SocketDescriptor;
}

void testDelete(char* p)
{
delete[] p;
}

struct DeleteChar {
void operator()(char* p) const {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"Deleting the buffer");
delete[] p;
}
};

bool WebSocketSession::SendMessageToClient(std::string strMessage)
{
bool bRet = false;
static std::mutex obj;
try {
obj.lock();

if (strMessage.length() > 0)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u msgLen: %d, msg:%s",
m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity(), strMessage.length(), strMessage.c_str());

if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, Disconnecting client due to pending buffer limit crossed",
m_SocketDescriptor, m_WriteBuffer.size());
do_Disconnect();
}
else
{
boost::beast::ostream(m_WriteBuffer) << strMessage << ":";

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u",
m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity());

ws_.async_write(m_WriteBuffer.data(),
[this](beast::error_code ec, std::size_t transfer)
{
boost::ignore_unused(transfer);
if (ec)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessageToClient callback error SocketID: %d, ErrorMsg:%s"/* ,Message: %s"*/, m_SocketDescriptor, ec.message().c_str()/*,strMessage.c_str()*/);

do_Disconnect();
}

obj.lock();
m_WriteBuffer.consume(transfer);
obj.unlock();
});

bRet = true;
}
}
obj.unlock();
}
catch (exception& e)
{
obj.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Catch msg: %s", e.what());
}
catch (...)
{
obj.unlock();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Generic Catch");
}
return bRet;
}

void WebSocketSession::do_Disconnect()
{
try {
if (m_pEvent)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_Disconnect disconnecting SocketID: %d", m_SocketDescriptor);
m_pEvent->OnConnect(m_SocketDescriptor, false, "", 0);
}
else
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_Disconnect Removing SocketID: %d", m_SocketDescriptor);
objWebSocketListener->RemoveClient(m_SocketDescriptor);
}
}
catch (exception& e)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Catch msg: %s", e.what());
}
catch (...)
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Generic Catch");
}
}

更多回答

We need to see the code to maybe see where you invoke the Undefined Behaviour

我们需要查看代码,以了解您在何处调用未定义的行为

Out on a limb I'd say the timer is destructed after the execution context is (e.g. io_context)

我敢说,计时器在执行上下文被破坏后被销毁(例如,io_Context)

@sehe I have added the code and exception info. Please check it. What i am doing wrong. If needed please contact me on my email. Mail id: [email protected]

@sehe我已经添加了代码和异常信息。请检查一下。我做错了什么。如果需要,请通过我的电子邮件联系我。电子邮件ID:[电子邮件受保护]

Stackoverflow is a q&a site. I prefer to use it because the help serves many more people. You could have guessed from the fact that this is where I answered your question :)

StackOverflow是一个问答网站。我更喜欢使用它,因为帮助服务的人更多。您可能已经猜到了,这就是我回答您的问题的地方:)

优秀答案推荐

I've just spent... hours getting your code fixed up enough so it compiles. I
remember doing the same when you posted it
https://github.com/chriskohlhoff/asio/issues/1335 and your previous question.
Rough result Live On
Coliru

我刚刚花了..。花几个小时对代码进行足够的修复,使其能够编译。我记得当你发布https://github.com/chriskohlhoff/asio/issues/1335和你之前的问题时,我也做了同样的事情。粗略的结果在Coliru直播


The number of problems in the code is scarily high.

代码中的问题数量高得惊人。



  • Sockets aren't UINT,

  • they're not unique during the lifetime of a process,

  • a local mutex will not protect anything,

  • the majority of format specifiers are wrong,

  • some format arguments are missing,

  • some are invoking UB by passing non-trivial objects through varargs,

  • concurrency hints are not numbers of threads,

  • the threads just run wild (detached),

  • strands should actually remove the need for critical sections,

  • maxThreads is unused,

  • there will necessarily always be a race condition around start of the listener and setting the event handlers,

  • all the sessions share the g_upgrade_request variable - regardless of threading even - that's just a data race and therefore UB;

  • the mix of WIN32 API, boost and standard library threading primitives suggests that all of the code is basically copy-pasted from random sources.


Regardless of everything, your original stacktrace (which you removed?) showed a timer completion. That implies the source is a socket operation on ws_ timing out (since you don't have any other timers):

不管怎样,你原来的堆栈跟踪(你删除了?)显示计时器完成。这意味着源是ws_ timing out上的套接字操作(因为您没有任何其他计时器):


beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

And that probably happens after the g_io instance is destructed. That's completely expected since all your IO threads are... detached, so they will happily continue to run after main exits.

这很可能发生在g_io实例被销毁之后。这完全是意料之中的,因为您的所有IO线程都是...因此,他们将很高兴地在主要出口后继续奔跑。


So, make sure your main doesn't exit before the execution context runs out of work. Better yet, don't make that a global anyways.

因此,请确保您的Main不会在执行上下文用完之前退出。更好的是,无论如何都不要让它成为全球性的。




Friends Don't Let Friends Use Globals


Just fixing everything by applying basic scope hygiene

只是通过应用基本的范围卫生来修复一切



  • moving most things into the listener

  • moving the upgrade request into the session

  • using thread_pool to avoid doing the manually threads (poorly as well)

  • adding Stop that waits

  • removing the useless mutexes in favor of the strands - you were trying to use them anyways

  • relying on the shared_from_this (that you... were using anyways) by
    storing weak_ptr in your sessions map. Note the session map no longer
    requires the unsafe key either

  • improved and simplified error handling


Live On Coliru

科里鲁现场直播


#if 0
// The following ifdef block is the standard way of creating macros which make exporting
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
#include "TSWebSocket.h"
#include "server_certificate.hpp"
#else
#include <cstdarg>
#include <cstdio>
#define WEBSOCKETSERVER_API
#define STDCALL // __stdcall

namespace stl::log {
enum { LOG_GROUP_ERROR };
enum { LOG_LEVEL_INFO, LOG_LEVEL_ERROR };
[[gnu::format(printf, 3, 4)]] void trace([[maybe_unused]] auto section, [[maybe_unused]] auto level,
[[maybe_unused]] char const* fmt, ...) {
std::va_list argptr;
va_start(argptr, fmt);
std::vfprintf(stderr, fmt, argptr);
std::puts(""); // poor man's newline
va_end(argptr);
}
} // namespace stl::log

#endif

#include <algorithm>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <list>
#include <mutex>
#include <string_view>
#include <thread>
#include <vector>

namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

static void load_server_certificate(ssl::context& ctx, std::string, std::string) {
// TODO
ctx.use_certificate_file("server.pem", ssl::context::pem);
ctx.use_private_key_file("server.pem", ssl::context::pem);
ctx.set_default_verify_paths();
ctx.set_password_callback([](size_t, auto /*purpose*/) { return "test"; });
}

using SOCKET = tcp::socket::native_handle_type;
struct ISocketEvents {
virtual ~ISocketEvents() = default;
virtual void OnConnect(SOCKET, bool, std::string_view, int) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
virtual void onHandShake(SOCKET, bool) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
virtual void OnReceive(SOCKET, char const*, size_t) {
stl::log::trace(0, 0, "Event handler: %s\n", __FUNCTION__);
}
};
//------------------------------------------------------------------------------

class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener> {
private:
using Handle = std::weak_ptr<WebSocketSession>;
std::list<Handle> m_sessions;
std::shared_ptr<ISocketEvents> m_pEvent;

boost::asio::thread_pool m_ctx;
tcp::acceptor m_acceptor{net::make_strand(m_ctx)};

public:
// The SSL context is required, and holds certificates
ssl::context m_sslctx{
ssl::context::tlsv12}; // TODO make private, probably by moving load_server_certificate

WebSocketListener(tcp::endpoint endpoint, size_t num_threads);
~WebSocketListener();
void Run();
void Stop();
void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
bool SendMessageToClient(SOCKET uClientid, std::string strMessage);

private:
void do_accept();
void on_accept(beast::error_code ec, tcp::socket socket);
};

//------------------------------------------------------------------------------

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;

beast::flat_buffer m_ReadBuffer;
beast::flat_buffer m_WriteBuffer;
std::shared_ptr<ISocketEvents> m_pEvent;
bool bHandShake = false;
http::request<http::string_body> m_upgrade_request;
beast::websocket::close_reason m_closeReason{};

public:
// Take ownership of the socket
WebSocketSession(tcp::socket socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent);
~WebSocketSession();
void run();
void closeSocket();
void on_run();
void on_handshake(beast::error_code ec);
void on_accept(beast::error_code ec);
void on_upgrade(beast::error_code ec, size_t);
void do_read();
void on_read(beast::error_code ec, size_t bytes_transferred);
SOCKET getId();
void SendMessage(std::string strMessage);
};

//------------------------------------------------------------------------------

// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> STDCALL InitializeWebSocketListener( //
unsigned short port, int maxThreads, std::string certificate, std::string privateKey,
std::shared_ptr<ISocketEvents> handlers);
//--------
//

// It initialize the WebSocketListener
std::shared_ptr<WebSocketListener>
STDCALL InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate,
std::string privateKey, std::shared_ptr<ISocketEvents> handlers) {

// Create and launch a listening port
auto obj = std::make_shared<WebSocketListener>(tcp::endpoint{{}, port}, maxThreads);
obj->setEventHandler(handlers);
obj->Run();
load_server_certificate(obj->m_sslctx, certificate, privateKey);

return obj;
}

//------------------------------------------------------------------------------

// Listener class
WebSocketListener::~WebSocketListener() {}

WebSocketListener::WebSocketListener(tcp::endpoint endpoint, size_t num_threads) : m_ctx(num_threads) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener Constructor Enter");
beast::error_code ec;

// Open the acceptor
m_acceptor.open(endpoint.protocol(), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener open failed, error message: %s",
ec.message().c_str());
return;
}

// Allow address reuse
m_acceptor.set_option(net::socket_base::reuse_address(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener set_option failed, error message: %s",
ec.message().c_str());
return;
}

m_acceptor.set_option(tcp::no_delay(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s",
ec.message().c_str());
return;
}

// Bind to the server address
m_acceptor.bind(endpoint, ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener bind failed, error message: %s",
ec.message().c_str());
return;
}

// Start listening for connections
m_acceptor.listen(net::socket_base::max_listen_connections, ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener listen failed, error message: %s",
ec.message().c_str());
return;
} else {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketListener::WebSocketListener Started listening at %s:%hu",
m_acceptor.local_endpoint().address().to_string().c_str(),
m_acceptor.local_endpoint().port());
}

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::WebSocketListener Constructor Leave");
}

bool WebSocketListener::SendMessageToClient(SOCKET uClientid, std::string strMessage) {
try {
// TODO maybe make `m_sessions` a map again
for (Handle const& handle : m_sessions) {
if (auto session = handle.lock()) {
if (session->getId() != uClientid)
continue;

session->SendMessage(strMessage);
return true;
}
}
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::SendMessageToClient Inside Generic Catch");
}
return false;
}

void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent) {
net::post(m_acceptor.get_executor(), // post on strand
[this, pEvent, self = shared_from_this()] { m_pEvent = pEvent; });
}

// Start accepting incoming connections
void WebSocketListener::Run() {
net::post(m_acceptor.get_executor(), // post on strand
[this, self = shared_from_this()] { //
do_accept();
});
}

void WebSocketListener::Stop() {
net::post(m_acceptor.get_executor(), [this, self = shared_from_this()] {
m_acceptor.cancel();
for (Handle& handle : m_sessions)
if (auto session = handle.lock())
session->closeSocket();
});
m_ctx.join(); // waits for all IO to finish
}

void WebSocketListener::do_accept() {
m_acceptor.async_accept( //
net::make_strand(m_ctx),
beast::bind_front_handler(&WebSocketListener::on_accept, shared_from_this()));
}

void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket) {
try {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s",
socket.native_handle(), ec.message().c_str());
} else {
// Accept another connection
do_accept();

socket.set_option(tcp::no_delay(true), ec);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept set_option(No delay) failed, error message: %s",
ec.message().c_str());
return;
}

std::string sClientIp = socket.remote_endpoint().address().to_string();
unsigned short uiClientPort = socket.remote_endpoint().port();

// Create the WebSocketSession and run it
auto session = std::make_shared<WebSocketSession>(std::move(socket), m_sslctx, m_pEvent);

m_sessions.emplace_back(session);
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept insert client id : %d", session->getId());

// optionally: garbage collect expired sessions
m_sessions.remove_if(std::mem_fn(&Handle::expired));

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketListener::on_accept Incoming connection request from SocketID: %d, "
"IP: %s Port: %hu",
session->getId(), sClientIp.c_str(), uiClientPort);

session->run();
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketListener::on_accept Inside Generic Catch");
}
}

//------------------------------------------------------------------------------

// Session class
WebSocketSession::WebSocketSession(tcp::socket socket, ssl::context& ctx,
std::shared_ptr<ISocketEvents> pEvent)
: ws_(std::move(socket), ctx)
, m_pEvent(pEvent) //
{
m_WriteBuffer.reserve(1000);
m_ReadBuffer.reserve(1000);
}

WebSocketSession::~WebSocketSession() try {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::~WebSocketSession SocketID: %d Closing ", getId());
if (ws_.is_open()) {
beast::error_code ec;
ws_.close(m_closeReason, ec);
if (ec)
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s",
getId(), ec.message().c_str());
if (m_pEvent)
m_pEvent->OnConnect(getId(), false, "", 0);
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"~WebSocketSession Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"~WebSocketSession Inside Generic Catch");
}

// Get on the correct executor
void WebSocketSession::run() {
// We need to be executing within a strand to perform async operations
// on the I/O objects in this WebSocketSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(ws_.get_executor(),
beast::bind_front_handler(&WebSocketSession::on_run, shared_from_this()));
}

void WebSocketSession::closeSocket() {
auto id = getId();
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::closeSocket Closing SocketID: %d", id);

auto self = shared_from_this();
//// ideally we cleanly close, however this will not reliably close the pending read
// net::post(ws_.get_executor(), [this, self] { get_lowest_layer(ws_).cancel(); });

net::post(ws_.get_executor(), [this, self, id] {
// m_closeReason must stay valid!
ws_.async_close(m_closeReason, [self, id](beast::error_code ec) {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::closeSocket SocketID: %d Message: %s", id,
ec.message().c_str());
}
});
});
}

// Start the asynchronous operation
void WebSocketSession::on_run() {
// Set the timeout.
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::server,
beast::bind_front_handler(&WebSocketSession::on_handshake, shared_from_this()));
}

void WebSocketSession::on_handshake(beast::error_code ec) {
try {
if (ec) {
stl::log::trace(
stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", getId(),
ec.message().c_str());
return;
}
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();

// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));

// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async-ssl");
}));

http::async_read(ws_.next_layer(), m_ReadBuffer, m_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_upgrade, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_handshake Inside Generic Catch");
}
}

void WebSocketSession::on_upgrade(beast::error_code ec, size_t) {
try {
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s",
getId(), ec.message().c_str());
return;
}

stl::log::trace(
stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s",
getId(), boost::lexical_cast<std::string>(m_upgrade_request.base()).c_str());

if (m_pEvent)
m_pEvent->OnConnect(getId(), true, "", 0);

// Accept the websocket handshake
ws_.async_accept(m_upgrade_request,
beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_upgrade Inside Generic Catch");
}
}

void WebSocketSession::on_accept(beast::error_code ec) {
if (!ec)
do_read(); // Read a message
}

void WebSocketSession::do_read() {
try {
if (m_pEvent && !bHandShake) {
bHandShake = true;
m_pEvent->onHandShake(getId(), bHandShake);
}
// Read a message into our buffer
ws_.async_read(m_ReadBuffer,
beast::bind_front_handler(&WebSocketSession::on_read, shared_from_this()));
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_read Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::do_read Inside Generic Catch");
}
}

void WebSocketSession::on_read(beast::error_code ec, size_t bytes_transferred) {
try {
boost::ignore_unused(bytes_transferred);

// This indicates that the WebSocketSession was closed
if (ec) {
if (ec == websocket::error::closed) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_read Client closed SocketID: %d", getId());
}
return;
}

if (auto iLen = m_ReadBuffer.cdata().size()) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO,
"WebSocketSession::on_read Client SocketID: %d dataRecv: %zu cpacity: %zu",
getId(), iLen, m_ReadBuffer.capacity());

std::string data(net::buffer_cast<char const*>(m_ReadBuffer.cdata()), iLen);
if (m_pEvent)
m_pEvent->OnReceive(getId(), data.data(), iLen);
}
m_ReadBuffer.consume(bytes_transferred);
} catch (std::exception const& e) {

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_read Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::on_read Inside Generic Catch");
}
do_read();
}

SOCKET WebSocketSession::getId() { return beast::get_lowest_layer(ws_).socket().native_handle(); }

void WebSocketSession::SendMessage(std::string strMessage) {
net::post(ws_.get_executor(), [this, msg = std::move(strMessage), self=shared_from_this()] {
try {
if (msg.length() > 0) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"BufferCapacity: %zu msgLen:%zud, msg:%s",
getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity(), msg.length(),
msg.c_str());

if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
{
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"Disconnecting client due to pending buffer limit crossed",
getId(), m_WriteBuffer.size());
} else {
boost::beast::ostream(m_WriteBuffer) << msg << ":";

stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage SocketID: %d, BufferSize: %zu, "
"BufferCapacity: %zu",
getId(), m_WriteBuffer.size(), m_WriteBuffer.capacity());

ws_.async_write(m_WriteBuffer.data(), [this](beast::error_code ec, size_t transferred) {
boost::ignore_unused(transferred);
if (ec) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage callback error SocketID: %d, "
"ErrorMsg:%s" /* ,Message: %s"*/,
getId(), ec.message().c_str() /*,msg.c_str()*/);

}

m_WriteBuffer.consume(transferred);
});
}
}
} catch (std::exception const& e) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage Inside Catch msg: %s", e.what());
} catch (...) {
stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
"WebSocketSession::SendMessage Inside Generic Catch");
}
});
}

int main() {
auto handlers = std::make_shared<ISocketEvents>();
auto wsl = InitializeWebSocketListener(8989, 50, "CERT", "KEY", handlers);

std::this_thread::sleep_for(std::chrono::seconds(10));
wsl->Stop();
}

With a local demo:

通过本地演示:


enter image description here


Summary, Caveats


In summary, be way more aware of lifetimes (see m_closeReason for the subtler ones) and shutdown.

总之,要更加注意生命周期(参见m_closeReason了解更微妙的生命周期)和关闭。


Issues remaining:

遗留问题:



  • closeSocket() partly depends on the peer actively cooperating according to Websocket specification. You might want to extend with logic to forcibly close the socket eventually

    根据Websocket规范,closeSocket()部分依赖于对等体的主动协作。您可能希望扩展逻辑以最终强制关闭套接字



  • you need more safety surrounding SendMessage - currently it is entirely possible to issue overlapping writes. That's... against the specs. You can solve that by putting an outgoing message queue in there

    你需要更多的安全性围绕SendMessage -目前它是完全可能的问题重叠写。那是...与规格不符您可以通过在其中放置一个传出消息队列来解决这个问题




更多回答

thanks for your effort and time. I really appreciated it. I will work on the points which you highlighted.

感谢您的努力和时间。我真的很感激。我会就你强调的几点展开工作。

You're welcome. Also, if it turns out helpful, please remember to vote in the end

不用谢。另外,如果它被证明是有用的,请记住最后投票

I have voted it but still it is pending because I don't have 15 reputations at the current moment.

我已经投票了,但它仍然是悬而未决的,因为我目前没有15个声誉。

i have implemented the message queue in the code like this:- 1. We insert message into queue and checked if size is one then we send it using async. otherwise we added it to queue. 2. When we receive a callback we check the queue size if it is greater than 1. We do pop and send another message using async. but there is one case when we received messages very fast then pending queue size increased rapidly for a client. What can we do for this case in boost?

我在代码中实现了消息队列,如下所示:-1.我们将消息插入到队列中,并检查大小是否为1,然后使用异步发送。否则,我们将其添加到队列中。2.当我们收到回调时,我们检查队列大小是否大于1。我们确实使用异步弹出和发送另一条消息。但有一种情况是,当我们以非常快的速度接收消息时,客户端的挂起队列大小会迅速增加。我们可以为Boost中的这个案例做些什么?

That sounds about right (assuming some of the wording is just unfortunately confusing). "What can we do for this case in boost" - this has nothing to do with boost. You have to put a limit in code, just like you would without boost.

这听起来大致正确(假设其中一些措辞令人遗憾地感到困惑)。“我们能为Boost这个案例做些什么?”--这与Boost无关。您必须在代码中设置限制,就像在没有Boost的情况下一样。

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com