1.Used http protocol
2.Used the AB tool for testing(ab -n30000 -c2000 -p call.json -T "application/json" http://0.0.0.0:8025/)
After receiving data several times, async_read_until becomes blocked, unable to receive new data,
callback function cannot be triggered and must call connection_manager_.stop(self); The AB tool will
continue. I think this is wrong with http server, What do I need to pay attention to in order for the AB
tool to work properly?
#include "connection.hpp"
#include "connection_manager.hpp"
#include "glog/logging.h"
#include <iostream>
#include <unordered_map>
namespace tcp {
#define STR_WHITESPACE " tnrvf"
#define APPLICATION_JSON "application/json"
Connection::Connection(asio::io_service& io_service, ConnectionManager& manager)
: socket_(io_service),
steady_timer_(io_service),
connection_manager_(manager),
timeout_seconds_(0),
fast_close_(1) {
}
void Connection::start() {
asio::ip::tcp::no_delay no_delay(true);
socket_.set_option(no_delay);
socket_.set_option(asio::detail::socket_option::boolean<IPPROTO_TCP, TCP_QUICKACK>(true));
netSocketReadHandle(1);
}
void Connection::stop(bool internal) {
if (!socket_.is_open()) {
return;
}
if (!internal) {
connection_manager_.stop(shared_from_this());
} else {
asio::error_code ec;
socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket_.close(ec);
}
stopSocketReadTimedout();
}
void Connection::send(const char* data, std::size_t n) {
send(std::string(data, n));
}
void Connection::send(std::string data) {
std::string transport_data;
if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_HTTP) {
transport_data = constructHttpResponse(data);
} else if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_CUSTOM) {
transport_data = constructCustomResponse(data);
}
SharedConstBuffer send_buffer(transport_data);
asio::async_write(socket_, send_buffer,
[this, size = transport_data.size(), self = shared_from_this()](const asio::error_code& error, std::size_t bytes_transferred) -> void {
if (!error) {
if (write_complete_fn_ && size == bytes_transferred) {
write_complete_fn_(self);
}
LOG(INFO) << "send success";
//connection_manager_.stop(self);
} else {
connection_manager_.stop(self);
}
}
);
}
std::string Connection::constructHttpResponse(std::string& content) {
std::string status_code = "200 ";
std::string status_description = "OK";
std::string content_length = "Content-Length: ";
std::string content_type = "Content-Type: ";
std::string connection = "Connection: ";
std::string application = "application/json";
std::string crlf = "rn";
std::string response = "HTTP/1.1 " + status_code + status_description + crlf;
response += content_length + std::to_string(content.size()) + crlf;
response += content_type + application + crlf;
response += crlf;
response += content;
return response;
}
std::string Connection::constructCustomResponse(std::string& content) {
int32_t net_length = htonl(content.size());
char* temp_transport_data = static_cast<char*>(calloc(1, sizeof(char) * (content.size() + sizeof(int32_t))));
memset(temp_transport_data, 0, sizeof(temp_transport_data));
memcpy(temp_transport_data, &net_length, sizeof(int32_t));
memcpy(temp_transport_data + sizeof(int32_t), content.c_str(), content.size());
std::string response = std::string(temp_transport_data);
free(temp_transport_data);
temp_transport_data = nullptr;
return response;
}
void Connection::startSocketReadTimedout() {
if (timeout_seconds_ == 0) {
return;
}
steady_timer_.expires_from_now(std::chrono::seconds(timeout_seconds_));
steady_timer_.async_wait(
[this, self = shared_from_this()](const asio::error_code& error) ->void {
if (!error) {
if (timedout_fn_) {
timedout_fn_(self);
}
}
connection_manager_.stop(self);
}
);
}
void Connection::stopSocketReadTimedout() {
if (timeout_seconds_ == 0) {
return;
}
asio::error_code ec;
steady_timer_.cancel(ec);
}
void Connection::netSocketReadHandle(int is_header) {
auto self(shared_from_this());
startSocketReadTimedout();
if (is_header) {
if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_HTTP) {
asio::async_read_until(socket_, read_stream_buffer_, "rnrn",
[this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
if (!ec) {
stopSocketReadTimedout();
std::size_t bytes_additional = read_stream_buffer_.size() - bytes_transferred;
std::string header;
std::istream request_stream(&read_stream_buffer_);
std::unordered_map<std::string, std::string> http_header_map;
while (std::getline(request_stream, header) && header != "r") {
std::string::size_type index = header.find(":", 1);
if (index == std::string::npos) {
// Request line or others error.
continue;
}
std::string::size_type begin = std::string::npos;
std::string::size_type end = std::string::npos;
std::string field = header.substr(0, index);
std::string value = header.substr(index + 1, -1);
begin = field.find_first_not_of(STR_WHITESPACE);
end = field.find_last_not_of(STR_WHITESPACE);
field = field.substr(begin, end - begin + 1);
begin = value.find_first_not_of(STR_WHITESPACE);
end = value.find_last_not_of(STR_WHITESPACE);
value = value.substr(begin, end - begin + 1);
std::transform(field.begin(), field.end(), field.begin(), ::tolower);
std::transform(value.begin(), value.end(), value.begin(), ::tolower);
http_header_map[field] = value;
}
std::string content_type = http_header_map["content-type"];
std::string content_length = http_header_map["content-length"];
if (content_type != std::string(APPLICATION_JSON)) {
connection_manager_.stop(self);
return;
}
if (!content_length.size()) {
connection_manager_.stop(self);
return;
}
data_length_ = std::stol(content_length);
if (data_length_ > bytes_additional) {
data_length_ -= bytes_additional;
if (socket_.is_open()) {
netSocketReadHandle(0);
}
} else {
char* data = (char*)calloc(1, sizeof(char) * data_length_);
memcpy(data, asio::buffer_cast<const void*>(read_stream_buffer_.data()), data_length_);
if (read_fn_) {
read_fn_(self, data);
}
free(data);
data = nullptr;
read_stream_buffer_.consume(data_length_);
if (socket_.is_open()) {
netSocketReadHandle(1);
}
}
} else {
if (closed_fn_ && bytes_transferred == 0) {
closed_fn_(self);
}
connection_manager_.stop(self);
}
}
);
} else if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_CUSTOM) {
asio::async_read(socket_, asio::buffer(&data_length_, sizeof(int32_t)),
[this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
if (!ec) {
stopSocketReadTimedout();
if (bytes_transferred < sizeof(int32_t)) {
return;
}
data_length_ = ntohl(data_length_);
read_stream_buffer_.consume(sizeof(int32_t));
if (socket_.is_open()) {
netSocketReadHandle(0);
}
} else {
if (closed_fn_ && bytes_transferred == 0) {
closed_fn_(self);
}
connection_manager_.stop(self);
}
}
);
}
} else {
asio::async_read(socket_, read_stream_buffer_, asio::transfer_exactly(data_length_),
[this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
if (!ec && bytes_transferred == data_length_) {
stopSocketReadTimedout();
char* data = (char*)calloc(1, sizeof(char) * bytes_transferred);
memcpy(data, asio::buffer_cast<const void*>(read_stream_buffer_.data()), bytes_transferred);
if (read_fn_) {
read_fn_(self, data);
}
free(data);
data = nullptr;
read_stream_buffer_.consume(bytes_transferred);
if (socket_.is_open()) {
netSocketReadHandle(1);
}
} else {
if (closed_fn_ && bytes_transferred == 0) {
closed_fn_(self);
}
connection_manager_.stop(self);
}
}
);
}
}
} /* end namespace tcp */
Source: Windows Questions C++