Managing queue in second thread

Managing queue in second thread



I created thread which is responsible for doing some long operation on passed data. All data are put to the queue in main thread, then second thread gets this data, manipulate them and returns to map which contains ID of executed data and result. Class looks like that:


#include <memory>
#include <future>
#include <vector>
#include <map>
#include <set>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <sstream>

typedef unsigned int Key;

class DatabaseWorker

public:
DatabaseWorker()

startThread();


~DatabaseWorker()

stopThread();


void startThread()

std::cout << "starting thread" << std::endl;
m_futureObj = m_exitSignal.get_future();
m_thread = std::thread(&DatabaseWorker::work, this);


void stopThread()

std::cout << "stopping thread" << std::endl;
m_exitSignal.set_value();
if (m_thread.joinable())
m_thread.join();


void work()

std::cout << "thread started" << std::endl;
bool bEmpty;
std::pair<Key, std::string> data;
while (m_futureObj.wait_for(std::chrono::milliseconds(1)) == std::future_status::timeout)

//Check queue size

std::lock_guard<std::mutex> lock(m_mutex);
bEmpty = m_requests.empty();

if (!bEmpty)

std::cout << "something in queue" << std::endl;
//Get data from queue

std::lock_guard<std::mutex> lock(m_mutex);
data = m_requests.front();
m_requests.pop();


Key id = data.first;
std::string str = data.second;
//HERE do some operation with string
str += "_123";

//Prepare result

std::lock_guard<std::mutex> lock(m_mutex);
m_results.insert(std::make_pair(id, str));

//Inform conditional about data
m_conditionals.find(id)->second.second = true;
m_conditionals.find(id)->second.first->notify_one();





Key addToQueue(const std::string &data)

std::cout << "adding to queue" << std::endl;
std::lock_guard<std::mutex> lock(m_mutex); //lock
m_requests.push(std::make_pair(m_nID, data)); //adddata to queue
m_conditionals.insert(std::make_pair(m_nID, std::make_pair(std::make_shared<std::condition_variable>(), bool(false))));
return m_nID++;


std::string getResult(Key id)

std::cout << "getting result" << std::endl;
std::unique_lock<std::mutex> lock(m_mutex);
//Check if data ready
m_conditionals.find(id)->second.first->wait(lock, [&]()
return m_conditionals.find(id)->second.second; //return bool data
);

//If data ready
auto result = m_results.find(id)->second; //Get data from thread
//Delete data
m_results.erase(m_results.find(id)); //HERE HEAP ISSUE OCCURS SOMETIMES!!!
return result;


private:
std::queue<std::pair<Key, std::string>> m_requests;
std::map<Key, std::string> m_results;
std::map < Key, std::pair< std::shared_ptr<std::condition_variable>, bool>> m_conditionals;

std::promise<void> m_exitSignal;
std::future<void> m_futureObj;
std::mutex m_mutex;
std::thread m_thread;

Key m_nID = 0;
;



Then in main function I'm adding some data and waiting for response from thread:


int main()

DatabaseWorker dbWorker;
std::ostringstream stringStream;
std::string data;
std::vector<Key> keys;
int i;
for (i = 0; i < 100; i++)

stringStream.str("");
stringStream << "data_" << i;
data = stringStream.str();
keys.push_back(dbWorker.addToQueue(data));


for each (Key key in keys)

std::cout << "key: " << key << " data: " << dbWorker.getResult(key) << std::endl;




The problem is that sometimes during calling getResult() method I get memory error in place m_results.erase(m_results.find(id)):


getResult()


m_results.erase(m_results.find(id))


Debug Assertion Failed!
Expression _pFirstBlock == pHead



Where is the lack of my understanding/implementation?





Don't forget for (int i = 0; ...) is allowed and is encouraged.
– tadman
Aug 20 at 16:45


for (int i = 0; ...)





for each (Key key in keys) does not look like C++.
– molbdnilo
Aug 20 at 16:46


for each (Key key in keys)





@drewpol That's not C++11. It looks more like Microsoft's proprietary abomination, C++/CLI.
– molbdnilo
Aug 20 at 16:54





Even though you presently have only one working thread, you should write your code to not depend on that as the above code can fails with 2 or more working threads as you don't the data at the same time you check if you have data. Thus multiple threads could thing that data is available but their is data only for part of them.
– Phil1970
Aug 20 at 19:29





Given that you retrieve data in order and wait for each item to be ready, I think that your code is more complex that it needs to be.
– Phil1970
Aug 20 at 19:37









By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

ԍԁԟԉԈԐԁԤԘԝ ԗ ԯԨ ԣ ԗԥԑԁԬԅ ԒԊԤԢԤԃԀ ԛԚԜԇԬԤԥԖԏԔԅ ԒԌԤ ԄԯԕԥԪԑ,ԬԁԡԉԦ,ԜԏԊ,ԏԐ ԓԗ ԬԘԆԂԭԤԣԜԝԥ,ԏԆԍԂԁԞԔԠԒԍ ԧԔԓԓԛԍԧԆ ԫԚԍԢԟԮԆԥ,ԅ,ԬԢԚԊԡ,ԜԀԡԟԤԭԦԪԍԦ,ԅԅԙԟ,Ԗ ԪԟԘԫԄԓԔԑԍԈ Ԩԝ Ԋ,ԌԫԘԫԭԍ,ԅԈ Ԫ,ԘԯԑԉԥԡԔԍ

How to change the default border color of fbox? [duplicate]

Avoiding race conditions in Kotlin, Smartcast is impossible runtime exception