How to insure synchronization with Mutex and Semaphore?

  c++, c++20, gcc, mutex, semaphore

I’m trying to write a code for parallelization with c++.

I’m using c++20 with g++-11 because I need to use coroutines and the semaphore.

However, I get some errors at random when trying to run my code so I tested the parallel portion with a very straightforward example and the problem persists.

I get one of three errors (at random execution stages) every time I run the same binaries whenever I instantiate more than two worker threads.

e.g :

1- munmap_chunk(): invalid pointer.

2- malloc(): invalid next size (unsorted).

3- double free or corruption (out).

The example code is as following:

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
#include <vector> 
#include <mutex> 

std::counting_semaphore smphSignalThreadToMain(0);
std::vector<int> vec ; 
std::mutex vec_mutex ; 


void Threadwhile(){
    
    while(smphSignalMainToThread.try_acquire_for( std::chrono::seconds(2) )  ){
        vec_mutex.lock();
        std::cout << vec.back() << "t" ;
        // * Some processing on vec.back() *
        vec.pop_back() ; 
        vec_mutex.unlock() ; 
    };
    
};

int main()
{
    int num_threads = 3; // std::thread::hardware_concurrency() -1 ;
    std::vector< std::thread > TV ;
    
    for(int i = 0 ; i < num_threads ; i++ ){
        TV.push_back( std::thread(Threadwhile ) ) ;
    };

    
    for(int i = 0 ; i< 100000 ; i++){
        vec_mutex.lock();
        vec.push_back(i)  ; 
        vec_mutex.unlock() ;
        smphSignalMainToThread.release() ; 
    };
    
    for(int i = 0 ; i < num_threads ; i++ ){
        TV[i].join() ;
    };

    return 0 ; 
}

I thought it’s I just through so much data in the vector/queue (I also tried with queues), So I changed the code and still get some errors.
e.g.

1- Segmentation fault.

2- double free or corruption (out).

3- munmap_chunk(): invalid pointer.

New Code:

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
#include <coroutine>
#include <vector> 
#include <mutex> 


std::counting_semaphore smphSignalMainToThread(0),  smphSignalThreadToMain(0);
std::vector<int> vec ; 
std::mutex vec_mutex ;


void Threadwhile(){
    
    while(smphSignalMainToThread.try_acquire_for( std::chrono::seconds(5) )  ){
        vec_mutex.lock();
        std::cout << vec.back() << "t" ;
        // * Some processing on vec.back() *
        vec.pop_back() ; 
        vec_mutex.unlock() ; 
        smphSignalThreadToMain.release() ; 
    };
    
};

int main()
{
    int num_threads = 3; // std::thread::hardware_concurrency() -1 ;
    std::vector< std::thread > TV ;
        
    for(int i = 0 ; i< 2*num_threads ; i++){
        vec_mutex.lock();
        vec.push_back(i); 
        vec_mutex.unlock() ;
        smphSignalMainToThread.release() ; 
    };
    
    for(int i = 0 ; i < num_threads ; i++ ){

        TV.push_back( std::thread(Threadwhile ) ) ;

    };
    
    int i =  2*num_threads ; 
    
    while( smphSignalThreadToMain.try_acquire_for( std::chrono::seconds(5) ) && i<1000000 ){
        vec_mutex.lock();
        vec.push_back(i)  ; 
        vec_mutex.unlock() ;
        smphSignalMainToThread.release() ;
        i++ ; 
    };
    
    
    for(int i = 0 ; i < num_threads ; i++ ){

        TV[i].join() ;

    };
    return 0 ; 
}

I run the code on Ubuntu 20.04 with g++ version 11.0.1 and my compilation line is as following (I turn every optimization flag I know beacause the original code is very computationally intensive and optimizations reduce the computation time darastically):

g++ -fcoroutines -std=c++20 -lrt -pthread -mtune=native -march=native -O3 semaphore.cpp -o semaphore

Please take into consideration that I’m a physics student with not-so-much experience in SE/CS.

I’d be very much thankful if anybody guide me through this or point out some mistakes I made in the code!

Source: Windows Questions C++

LEAVE A COMMENT