epoll_wait() breaks at unexpected events

  c++, epoll, linux, sockets

I am trying to build a class that can connect sockets and communicate through them using epoll. I add a connectService, an acceptService to the object, and both of these will spawn a readService, when connected. The connectService will then terminate and the acceptService will continue to run.
When I try it it works most of the time, but every once in a while it fails. I think it is read() that fails with some kind of bufferoverrun. The readService is called many times before the connection has completed.
It looks like some kind of race condition, but I cannot get my head around, whats wrong.

#ifndef SOCKET_THREAD_H_
#define SOCKET_THREAD_H_

#include <functional>
#include <thread>
#include <vector>
#include <sys/epoll.h>
#include <atomic>
#include <mutex>

#define MAX_EPOLL_EVENTS 64

class SocketThread
{
public:
    SocketThread(void);
    ~SocketThread(void);

    void epollService();
    void connectService();
    void addService(int addSocket, std::function<void()>);   
    void addService(std::function<bool()>);
    void breakWait(int epoll);

private:
    std::thread epollThread;
    std::thread connectThread;

    std::atomic<bool> keepAlive;
    int epoll;

    struct epoll_event event;
    struct epoll_event events[MAX_EPOLL_EVENTS];

    std::vector<std::function<void()>> call;
    std::vector<std::function<bool()>> connect;
};

#endif /* SOCKET_THREAD_H_ */

and the code

#include "SocketThread.h"
#include <unistd.h>
#include <iostream>

#define ERROR_CODE  -1
#define PIPE_PAIR   2

SocketThread::SocketThread(void) :
    connectThread([=]() { connectService(); }),
    epollThread([=]() { epollService(); }),
    keepAlive(true)
{
    epoll = epoll_create(10);
    if (epoll == ERROR_CODE)
        std::cout << "Error creating epoll." << std::endl;
}

SocketThread::~SocketThread(void)
{
    keepAlive = false;
    breakWait(epoll);
}

void SocketThread::addService(std::function<bool()> lambda)
{
    connect.push_back(lambda);
}

void SocketThread::addService(int sock, std::function<void()> lambda)
{
    event.events = EPOLLIN | EPOLLOUT;
    event.data.fd = call.size();

    call.push_back(lambda);

    if (epoll_ctl(epoll, EPOLL_CTL_ADD, sock, &event) == ERROR_CODE)
        std::cout << "Error handling epoll callback." << std::endl;
}

void SocketThread::connectService(void)
{
    while (keepAlive)
    {
        int size = connect.size();
        for (int i = 0; i < size; i++) 
        {
            if (connect[i])
            {
                if (connect[i]())
                    connect.erase(connect.begin() + i);
                if (i>0)
                    i--;
            }
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void SocketThread::epollService(void)
{
    int size = 0;

    while (keepAlive)
    {
        errno = 0;
        size = epoll_wait(epoll, events, MAX_EPOLL_EVENTS, -1);
        if (size == -1)
        {
            std::cout << "Error on wait." << std::endl;
        }
        else
        if (size >= MAX_EPOLL_EVENTS)
        {
            std::cout << "Error number of epoll events exceeded " << std::endl;
            size = MAX_EPOLL_EVENTS - 1;
        }
        else
        if (size > 0)
        {
            for (int i = 0; i < size; i++) 
            {
                if(events[i].events & (EPOLLIN | EPOLLOUT)) 
                {
                    if (call[events[i].data.fd])
                    {
                        call[events[i].data.fd]();
                    }
                }
            }
        }
    }
}

void SocketThread::breakWait(int epoll)
{
    if (epoll == ERROR_CODE)
        return;

    struct epoll_event event;
    int selfpipe[PIPE_PAIR];

    if (pipe(selfpipe) < 0)
        std::cout << "Error on self pipe." << std::endl;

    if (epoll_ctl(epoll, EPOLL_CTL_ADD, selfpipe[0], &event) == ERROR_CODE)
        std::cout << "Error breaking epoll." << std::endl;

    int temp = 0;
    write(selfpipe[1], &temp, sizeof(temp));   
}

And my services look like this.
The size of the transmitted data is transmitted first.
Only acceptService and readService is controlled by epoll.

void acceptService(Communication* comm, int mainsocket)
{
    int sock = accept(mainsocket, address, &addressLength);

    if (sock >= 0)
    {
        if (comm != nullptr)
        {
            comm->onAccept();
            thread.addService(sock, [=]() { readService(comm, sock); });
        }
    }
}

bool connectService(Communication* comm, int sock)
{
    bool connected = connect(sock, address, addressLength);

    if (connected && !running)
    {
        if (comm != nullptr)
        {
            comm->onConnect();
            thread.addService(sock, [=]() { readService(comm, sock); });
        }
    }

    return connected;
}

void readService(Communication* comm, int sock)
{
    readAlive = true;

    auto bytes = 0;
    uint64_t localsize = 0;

    if (dynamicBuffer)
        read(sock, &localsize, sizeof(localsize));
    else
        localsize = totalSize;
    newBufferSize(localsize);
    bytes = read(sock, buffer, totalSize);

    if (bytes > 0)
    {
        if (comm != nullptr)
            comm->callback(index, buffer, bytes);

        updated = true;
    }
}

Source: Windows Questions C++

LEAVE A COMMENT