Как расширить контейнер списка бустов C++ для реализации поточно-ориентированной реализации с использованием буст-обновления mutex?
Я написал некоторый пример тестового кода, чтобы проверить функциональность использования мьютексов буст-апгрейда для реализации блокировки мьютексов чтения / записи над контейнером буст-списка. У меня десять тем, 5 читателей, 5 писателей.
Я использовал умные указатели, чтобы упростить управление памятью и позволить одному и тому же объекту содержаться в нескольких списках. Авторы постоянно удаляют и повторно вставляют объекты в свой соответствующий список, в то время как читатели периодически повторяют список. Кажется, все работает должным образом, но при вызове функции-члена стирания списка приходится искать запись, которую нужно удалить, когда она у меня уже есть.
Является ли метод стирания достаточно умным, чтобы знать, что запись должна быть стерта без необходимости ее повторного поиска, или он оптимизирован для исключения поиска, когда элемент списка известен? Если он выполняет поиск, то существует ли прямой способ его расширения, чтобы уникальная блокировка могла применяться только вокруг фактического удаления из списка, а не при поиске элемента списка? Вот код, который я связал с библиотекой boost 1.51 и протестировал с vs2008.
//******************************************************************************
// INCLUDE FILES
//******************************************************************************
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/container/list.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
using namespace std;
//******************************************************************************
// LOCAL DEFINES
//******************************************************************************
#define NUM_THREADS 10
#define NUM_WIDTH 5
#ifdef UNIQUE_MUTEX
#define MAIN_LIST_MUTEX g_listMutex
#define INT_LIST_MUTEX g_intListMutex
#define FLOAT_LIST_MUTEX g_floatListMutex
#else
#define MAIN_LIST_MUTEX g_listMutex
#define INT_LIST_MUTEX g_listMutex
#define FLOAT_LIST_MUTEX g_listMutex
#endif
//******************************************************************************
// LOCAL TYPEDEFS
//******************************************************************************
typedef boost::upgrade_mutex myMutex;
typedef boost::shared_lock<myMutex> SharedLock;
typedef boost::upgrade_to_unique_lock<myMutex> UniqueLock;
typedef boost::upgrade_lock<myMutex> UpgradeLock;
class myDataIntf;
typedef boost::shared_ptr<myDataIntf> myDataIntfPtr;
typedef boost::container::list<myDataIntfPtr> myList;
//******************************************************************************
// LOCAL CLASS DECLARATIONS
//******************************************************************************
class myDataIntf
{
public:
virtual char* getDataType(void) = 0;
};
class intData : public myDataIntf
{
private:
int data;
public:
intData(int new_data) : data(new_data){};
~intData(void)
{
extern int instIntDeletes;
instIntDeletes++;
};
char* getDataType(void)
{
return "Int";
}
int getData(void)
{
return data;
}
void setData(int new_data)
{
data = new_data;
}
};
class floatData : public myDataIntf
{
private:
float data;
public:
floatData(float new_data) : data(new_data){};
~floatData(void)
{
extern int instFloatDeletes;
instFloatDeletes++;
};
char* getDataType(void)
{
return "Float";
}
float getData(void)
{
return data;
}
void setData(float new_data)
{
data = new_data;
}
};
//******************************************************************************
// LOCALLY DEFINED GLOBAL DATA
//******************************************************************************
// Define one mutex per linked list
myMutex g_listMutex;
myMutex g_intListMutex;
myMutex g_floatListMutex;
int instReadFloatCount[NUM_THREADS];
int instWriteFloatCount[NUM_THREADS];
int instReadIntCount[NUM_THREADS];
int instWriteIntCount[NUM_THREADS];
int instFloatDeletes = 0;
int instIntDeletes = 0;
//******************************************************************************
// Worker Thread function
//******************************************************************************
void workerFunc(int inst, myList* assigned_list, myMutex* mutex)
{
boost::posix_time::millisec workTime(1*inst);
myList::iterator i;
int add_delay = 0;
int add_f_count = 0;
int add_i_count = 0;
instReadFloatCount[inst] = 0;
instReadIntCount[inst] = 0;
instWriteIntCount[inst] = 0;
instWriteFloatCount[inst] = 0;
mutex->lock();
cout << "Worker " << inst << ": ";
for (i = assigned_list->begin(); i != assigned_list->end(); ++i)
{
cout << (*i)->getDataType();
if ( 0 == strcmp("Float", (*i)->getDataType() ) )
{
floatData* f = (floatData*)i->get();
cout << " " << f->getData() << " ";
}
if ( 0 == strcmp("Int", (*i)->getDataType() ) )
{
intData* f = (intData*)i->get();
cout << " " << f->getData() << " ";
}
}
cout << endl;
mutex->unlock();
// Do some work for 10 seconds.
for ( int tick = 0; tick < 10000/(1*inst+1); tick++)
{
add_delay++;
boost::this_thread::sleep(workTime);
if ( inst < (NUM_THREADS/2) )
{
// reader - Get a shared lock that allows multiple readers to
// access the linked list. Upgrade locks act as shared locks
// until converted to unique locks, at which point the
// thread converting to the unique lock will block until
// all existing readers are done. New readers will wait
// after the unique lock is released.
SharedLock shared_lock(*mutex);
for (i = assigned_list->begin(); i != assigned_list->end(); ++i)
{
if ( 0 == strcmp("Float", (*i)->getDataType() ) )
{
floatData* f = (floatData*)i->get();
instReadFloatCount[inst]++;
}
if ( 0 == strcmp("Int", (*i)->getDataType() ) )
{
intData* f = (intData*)i->get();
instReadIntCount[inst]++;
}
}
}
else
{
// writer - get the upgrade lock that will allow us
// to make multiple modifications to the linked list
// without being interrupted by other writers (other writers attempting
// to get an upgrade lock will block until the writer that
// has it releases it.)
UpgradeLock upgrade_lock(*mutex);
for (i = assigned_list->begin(); i != assigned_list->end(); )
{
if ( 0 == strcmp("Float", (*i)->getDataType() ) )
{
floatData* f = (floatData*)i->get();
UniqueLock unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock
f->setData(f->getData() + 0.123f);
assigned_list->push_front(*i);
assigned_list->erase(i++);
instWriteFloatCount[inst]++;
// While the unique lock is in scope let's do some additional
// adds & deletes
if ( (add_delay > 100) && (add_f_count < 2) )
{
if ( add_f_count < 1)
{
// Delete the first record
}
else if ( add_f_count < 2)
{
// Add new item using separate allocations for smart pointer & data
assigned_list->insert(assigned_list->end(), new floatData(-(float)(inst*10000+add_f_count)));
}
else
{
// Add new item using make_shared function template. Both objects are created using one allocation.
assigned_list->insert(assigned_list->end(), boost::make_shared<floatData>(-(float)(inst*10000+add_f_count)));
}
add_f_count++;
}
}
else if ( 0 == strcmp("Int", (*i)->getDataType() ) )
{
intData* f = (intData*)i->get();
UniqueLock unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock
f->setData(f->getData() + 123);
assigned_list->push_front(*i);
assigned_list->erase(i++);
instWriteIntCount[inst]++;
// While the unique lock is in scope let's do some additional
// adds & deletes
if ( (add_delay > 100) && (add_i_count < 3) )
{
if ( add_i_count < 1)
{
// Delete the first record
}
else if ( add_i_count < 2)
{
// Add new item using separate allocations for smart pointer & data
assigned_list->insert(assigned_list->end(), new intData(-(int)(inst*10000+add_i_count)));
}
else
{
// Add new item using make_shared function template. Both objects are created using one allocation.
assigned_list->insert(assigned_list->end(), boost::make_shared<intData>(-(int)(inst*10000+add_i_count)));
}
add_i_count++;
}
}
else
{
++i;
}
}
}
}
cout << "Worker: finished" << " " << inst << endl;
}
//******************************************************************************
// Main Function
//******************************************************************************
int main(int argc, char* argv[])
{
{
myList test_list;
myList test_list_ints;
myList test_list_floats;
myList::iterator i;
// Fill the main list with some values
test_list.insert(test_list.end(), new intData(1));
test_list.insert(test_list.end(), new intData(2));
test_list.insert(test_list.end(), new intData(3));
test_list.insert(test_list.end(), new floatData(333.333f));
test_list.insert(test_list.end(), new floatData(555.555f));
test_list.insert(test_list.end(), new floatData(777.777f));
// Display the main list elements and add the specific values
// for each specialized list containing specific types of elements.
// The end result is that each object in the main list will also
// be in the specialized list.
cout << "test:";
for (i = test_list.begin(); i != test_list.end(); ++i)
{
cout << " " << (*i)->getDataType();
if ( 0 == strcmp("Float", (*i)->getDataType() ) )
{
floatData* f = (floatData*)i->get();
cout << " " << f->getData();
test_list_floats.insert(test_list_floats.end(), *i);
}
if ( 0 == strcmp("Int", (*i)->getDataType() ) )
{
intData* f = (intData*)i->get();
cout << " " << f->getData();
test_list_ints.insert(test_list_ints.end(), *i);
}
}
cout << endl;
// Display the list with float type elements
cout << "float test:";
for (i = test_list_floats.begin(); i != test_list_floats.end(); ++i)
{
cout << " " << (*i)->getDataType();
floatData* f = (floatData*)i->get();
cout << " " << f->getData();
}
cout << endl;
// Display the list with integer type elements
cout << "int test:";
for (i = test_list_ints.begin(); i != test_list_ints.end(); ++i)
{
cout << " " << (*i)->getDataType();
intData* f = (intData*)i->get();
cout << " " << f->getData();
}
cout << endl;
// NOTE: To reduce mutex bottleneck coupling in a real application it is recommended that
// each linked list have it's own shareable mutex.
// I used the same mutex here for all three lists to have the output from each thread
// appear in a single line. If I use one mutex per thread then it would appear
// jumbled up and almost unreadable.
// To use a unique mutex per list enable UNIQUE_MUTEX macro.
// For this test I did not notice any performance differences, but that will
// depend largely on how long the unique lock is held.
boost::thread workerThread0(workerFunc, 0, &test_list, &MAIN_LIST_MUTEX);
boost::thread workerThread1(workerFunc, 1, &test_list_ints, &INT_LIST_MUTEX);
boost::thread workerThread2(workerFunc, 2, &test_list_floats, &FLOAT_LIST_MUTEX);
boost::thread workerThread3(workerFunc, 3, &test_list, &MAIN_LIST_MUTEX);
boost::thread workerThread4(workerFunc, 4, &test_list_floats, &FLOAT_LIST_MUTEX);
boost::thread workerThread5(workerFunc, 5, &test_list_ints, &INT_LIST_MUTEX);
boost::thread workerThread6(workerFunc, 6, &test_list, &MAIN_LIST_MUTEX);
boost::thread workerThread7(workerFunc, 7, &test_list_floats, &FLOAT_LIST_MUTEX);
boost::thread workerThread8(workerFunc, 8, &test_list, &MAIN_LIST_MUTEX);
boost::thread workerThread9(workerFunc, 9, &test_list_ints, &INT_LIST_MUTEX);
workerThread0.join();
workerThread1.join();
workerThread2.join();
workerThread3.join();
workerThread4.join();
workerThread5.join();
workerThread6.join();
workerThread7.join();
workerThread8.join();
workerThread9.join();
cout << "*** Test End ***:";
for (i = test_list.begin(); i != test_list.end(); ++i)
{
cout << " " << (*i)->getDataType();
if ( 0 == strcmp("Float", (*i)->getDataType() ) )
{
floatData* f = (floatData*)i->get();
cout << " " << f->getData();
}
if ( 0 == strcmp("Int", (*i)->getDataType() ) )
{
intData* f = (intData*)i->get();
cout << " " << f->getData();
}
}
cout << endl;
cout << "float test end:";
for (i = test_list_floats.begin(); i != test_list_floats.end(); ++i)
{
cout << " " << (*i)->getDataType();
floatData* f = (floatData*)i->get();
cout << " " << f->getData();
}
cout << endl;
cout << "int test end:";
for (i = test_list_ints.begin(); i != test_list_ints.end(); ++i)
{
cout << " " << (*i)->getDataType();
intData* f = (intData*)i->get();
cout << " " << f->getData();
}
cout << endl;
cout << "*** thread counts***" << endl;
for ( int idx = 0; idx < NUM_THREADS; idx++)
{
cout << " thread " << idx;
cout << ": int rd(" << setw(NUM_WIDTH) << instReadIntCount[idx];
cout << ") int wr(" << setw(NUM_WIDTH) << instWriteIntCount[idx];
cout << ") flt rd(" << setw(NUM_WIDTH) << instReadFloatCount[idx];
cout << ") flt wr(" << setw(NUM_WIDTH) << instWriteFloatCount[idx];
cout << ")" << endl;
}
}
// All records in the linked list have now been deallocated automatically(due to smart pointer)
// as the linked list objects have been destroyed due to going out of scope.
cout << "*** Object Deletion counts***" << endl;
cout << " int deletes: " << instIntDeletes << endl;
cout << "float deletes: " << instFloatDeletes << endl;
return 0;
}
1 ответ
Сложность boost::container::list::erase(const_iterator)
амортизируется постоянное время (поиск iterator erase(const_iterator p)
в boost / container / list.hpp). Таким образом, при вызове этой функции повторный поиск не выполняется.
Однако есть пара моментов, которые я хотел бы сделать.
Совсем недавно эксперт по параллелизму сообщил мне, что разумно использовать концепцию UpgradeLockable только после выявления явной необходимости в ней; т.е. после профилирования. Замки, связанные с upgrade_mutex
они обязательно более сложны, чем простые boost::mutex::scoped_lock
с или std::lock_guard
и, следовательно, страдают от худших результатов.
В вашем примере вы, вероятно, обнаружите, что нет существенной разницы в производительности между вашей текущей (более сложной) настройкой и заменой upgrade_mutex
с mutex
и просто всегда исключительно блокировка.
Другой момент заключается в том, что ваши комментарии к коду указывают на то, что вы считаете, что несколько boost::upgrade_lock
на данный upgrade_mutex
может сосуществовать. Это не вариант. Только один поток может содержать upgrade_lock
вовремя.
Несколько других тем могут содержать shared_lock
в то время как upgrade_lock
проводится, но эти shared_lock
s должен быть выпущен до upgrade_lock
может быть улучшен до уникального.
Для получения дополнительной информации см. Документацию по расширению концепции UpgradeLockable.
редактировать
Просто чтобы подтвердить точку зрения, высказанную в комментариях ниже, следующий пример показывает, что новый shared_lock
можно приобрести в то время как upgrade_lock
существует, но не в то время как upgrade_to_unique_lock
существует (протестировано с бустом 1.51):
#include <iostream>
#include <vector>
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/shared_mutex.hpp>
typedef boost::shared_lock<boost::upgrade_mutex> SharedLock;
typedef boost::upgrade_to_unique_lock<boost::upgrade_mutex> UniqueLock;
typedef boost::upgrade_lock<boost::upgrade_mutex> UpgradeLock;
boost::upgrade_mutex the_mutex;
void Write() {
UpgradeLock upgrade_lock(the_mutex);
std::cout << "\tPreparing to write\n";
boost::this_thread::sleep(boost::posix_time::seconds(1));
UniqueLock unique_lock(upgrade_lock);
std::cout << "\tStarting to write\n";
boost::this_thread::sleep(boost::posix_time::seconds(5));
std::cout << "\tDone writing.\n";
}
void Read() {
SharedLock lock(the_mutex);
std::cout << "Starting to read.\n";
boost::this_thread::sleep(boost::posix_time::seconds(1));
std::cout << "Done reading.\n";
}
int main() {
// Start a read operation
std::vector<boost::thread> reader_threads;
reader_threads.push_back(std::move(boost::thread(Read)));
boost::this_thread::sleep(boost::posix_time::milliseconds(250));
// Start a write operation. This will block trying to upgrade
// the UpgradeLock to UniqueLock since a SharedLock currently exists.
boost::thread writer_thread(Write);
// Start several other read operations. These won't be blocked
// since only an UpgradeLock and SharedLocks currently exist.
for (int i = 0; i < 25; ++i) {
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
reader_threads.push_back(std::move(boost::thread(Read)));
}
// Join the readers. This allows the writer to upgrade to UniqueLock
// since it's currently the only lock.
for (auto& reader_thread : reader_threads)
reader_thread.join();
// Start a new read operation. This will be blocked since a UniqueLock
// currently exists.
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
boost::thread reader_thread(Read);
writer_thread.join();
reader_thread.join();
return 0;
}