Why BOOST ASIO used async_read_until can’t read data

  asio, asyncsocket, boost, c++

1.Used http protocol

2.Used the AB tool for testing(ab -n30000 -c2000 -p call.json -T "application/json" http://0.0.0.0:8025/)

After receiving data several times, async_read_until becomes blocked, unable to receive new data,

callback function cannot be triggered and must call connection_manager_.stop(self); The AB tool will

continue. I think this is wrong with http server, What do I need to pay attention to in order for the AB

tool to work properly?

#include "connection.hpp"
#include "connection_manager.hpp"

#include "glog/logging.h"

#include <iostream>
#include <unordered_map>
namespace tcp {

#define STR_WHITESPACE      " tnrvf"
#define APPLICATION_JSON    "application/json"

Connection::Connection(asio::io_service& io_service, ConnectionManager& manager)
    : socket_(io_service),
      steady_timer_(io_service),
      connection_manager_(manager),
      timeout_seconds_(0),
      fast_close_(1) {

}

void Connection::start() {
    asio::ip::tcp::no_delay no_delay(true);
    socket_.set_option(no_delay);
    socket_.set_option(asio::detail::socket_option::boolean<IPPROTO_TCP, TCP_QUICKACK>(true));

    netSocketReadHandle(1);
}

void Connection::stop(bool internal) {
    if (!socket_.is_open()) {
        return;
    }

    if (!internal) {
        connection_manager_.stop(shared_from_this());
    } else {
        asio::error_code ec;
        socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
        socket_.close(ec);
    }

    stopSocketReadTimedout();
}

void Connection::send(const char* data, std::size_t n) {
    send(std::string(data, n));
}

void Connection::send(std::string data) {
    std::string transport_data;

    if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_HTTP) {
        transport_data = constructHttpResponse(data);
    } else if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_CUSTOM) {
        transport_data = constructCustomResponse(data);
    }

    SharedConstBuffer send_buffer(transport_data);

    asio::async_write(socket_, send_buffer,
        [this, size = transport_data.size(), self = shared_from_this()](const asio::error_code& error, std::size_t bytes_transferred) -> void {
            if (!error) {
                if (write_complete_fn_ && size == bytes_transferred) {
                    write_complete_fn_(self);
                }

                LOG(INFO) << "send success";
                //connection_manager_.stop(self);
            } else {
                connection_manager_.stop(self);
            }
        }
    );
}

std::string Connection::constructHttpResponse(std::string& content) {
    std::string status_code = "200 ";
    std::string status_description = "OK";
    std::string content_length = "Content-Length: ";
    std::string content_type = "Content-Type: ";
    std::string connection = "Connection: ";
    std::string application = "application/json";
    std::string crlf = "rn";

    std::string response = "HTTP/1.1 " + status_code + status_description + crlf;
    response += content_length + std::to_string(content.size()) + crlf;
    response += content_type + application + crlf;

    response += crlf;

    response += content;

    return response;
}

std::string Connection::constructCustomResponse(std::string& content) {
    int32_t net_length = htonl(content.size());
    char* temp_transport_data = static_cast<char*>(calloc(1, sizeof(char) * (content.size() + sizeof(int32_t))));
    memset(temp_transport_data, 0, sizeof(temp_transport_data));
    
    memcpy(temp_transport_data, &net_length, sizeof(int32_t));
    memcpy(temp_transport_data + sizeof(int32_t), content.c_str(), content.size());
    
    std::string response = std::string(temp_transport_data);
    
    free(temp_transport_data);
    temp_transport_data = nullptr;

    return response;
}

void Connection::startSocketReadTimedout() {
    if (timeout_seconds_ == 0) {
        return;
    }

    steady_timer_.expires_from_now(std::chrono::seconds(timeout_seconds_));

    steady_timer_.async_wait(
        [this, self = shared_from_this()](const asio::error_code& error) ->void {
            if (!error) {
                if (timedout_fn_) {
                    timedout_fn_(self);
                }
            }

            connection_manager_.stop(self);
        }
    );
}

void Connection::stopSocketReadTimedout() {
    if (timeout_seconds_ == 0) {
        return;
    }

    asio::error_code ec;
    steady_timer_.cancel(ec);
}

void Connection::netSocketReadHandle(int is_header) {
    auto self(shared_from_this());

    startSocketReadTimedout();

    if (is_header) {
        if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_HTTP) {
            asio::async_read_until(socket_, read_stream_buffer_, "rnrn",
                [this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
                    if (!ec) {
                        stopSocketReadTimedout();
                        std::size_t bytes_additional = read_stream_buffer_.size() - bytes_transferred;

                        std::string header;
                        std::istream request_stream(&read_stream_buffer_);
                        std::unordered_map<std::string, std::string> http_header_map;

                        while (std::getline(request_stream, header) && header != "r") {
                            std::string::size_type index = header.find(":", 1);
                            if (index == std::string::npos) {
                                // Request line or others error.
                                continue;
                            }

                            std::string::size_type begin = std::string::npos;
                            std::string::size_type end = std::string::npos;

                            std::string field = header.substr(0, index);
                            std::string value = header.substr(index + 1, -1);

                            begin = field.find_first_not_of(STR_WHITESPACE);
                            end = field.find_last_not_of(STR_WHITESPACE);
                            field = field.substr(begin, end - begin + 1);

                            begin = value.find_first_not_of(STR_WHITESPACE);
                            end = value.find_last_not_of(STR_WHITESPACE);
                            value = value.substr(begin, end - begin + 1);

                            std::transform(field.begin(), field.end(), field.begin(), ::tolower);
                            std::transform(value.begin(), value.end(), value.begin(), ::tolower);

                            http_header_map[field] = value;
                        }

                        std::string content_type = http_header_map["content-type"];
                        std::string content_length = http_header_map["content-length"];

                        if (content_type != std::string(APPLICATION_JSON)) {
                            connection_manager_.stop(self);
                            return;
                        }

                        if (!content_length.size()) {
                            connection_manager_.stop(self);
                            return;
                        }

                        data_length_ = std::stol(content_length);

                        if (data_length_ > bytes_additional) {
                            data_length_ -= bytes_additional;
                            if (socket_.is_open()) {
                                netSocketReadHandle(0);
                            }
                        } else {
                            char* data = (char*)calloc(1, sizeof(char) * data_length_);
                            memcpy(data, asio::buffer_cast<const void*>(read_stream_buffer_.data()), data_length_);

                            if (read_fn_) {
                                read_fn_(self, data);
                            }

                            free(data);
                            data = nullptr;

                            read_stream_buffer_.consume(data_length_);

                            if (socket_.is_open()) {
                                netSocketReadHandle(1);
                            }
                        }
                    } else {
                        if (closed_fn_ && bytes_transferred == 0) {
                            closed_fn_(self);
                        }

                        connection_manager_.stop(self);
                    }
                }
            );
        } else if (connection_manager_.getMessageProtocol() == MESSAGE_PROTOCOL_CUSTOM) {
            asio::async_read(socket_, asio::buffer(&data_length_, sizeof(int32_t)),
                [this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
                    if (!ec) {
                        stopSocketReadTimedout();

                        if (bytes_transferred < sizeof(int32_t)) {
                            return;
                        }

                        data_length_ = ntohl(data_length_);

                        read_stream_buffer_.consume(sizeof(int32_t));

                        if (socket_.is_open()) {
                            netSocketReadHandle(0);
                        }
                    } else {
                        if (closed_fn_ && bytes_transferred == 0) {
                            closed_fn_(self);
                        }

                        connection_manager_.stop(self);
                    }
                }
            );
        }
    } else {
        asio::async_read(socket_, read_stream_buffer_, asio::transfer_exactly(data_length_),
            [this, self](const asio::error_code& ec, std::size_t bytes_transferred) -> void {
                if (!ec && bytes_transferred == data_length_) {
                    stopSocketReadTimedout();

                    char* data = (char*)calloc(1, sizeof(char) * bytes_transferred);
                    memcpy(data, asio::buffer_cast<const void*>(read_stream_buffer_.data()), bytes_transferred);

                    if (read_fn_) {
                        read_fn_(self, data);
                    }

                    free(data);
                    data = nullptr;

                    read_stream_buffer_.consume(bytes_transferred);

                    if (socket_.is_open()) {
                        netSocketReadHandle(1);
                    }
                } else {
                    if (closed_fn_ && bytes_transferred == 0) {
                        closed_fn_(self);
                    }

                    connection_manager_.stop(self);
                }
            }
        );
    }
}

} /* end namespace tcp */

Source: Windows Questions C++

LEAVE A COMMENT