Параллельный метод асинхронного обратного вызова ввода-вывода
Я реализовал асинхронный ввод-вывод с обратным вызовом, когда я беспокоюсь о параллелизме. Я оспариваю вам, что, поскольку я всегда работаю с одним и тем же файлом, а физический ввод-вывод файла ОС по сути является синхронной операцией, мне не понадобится механизм блокировки в моем методе обратного вызова - но я вполне могу ошибаться здесь - введите SO:o) У меня есть менеджер буфера, который помещает считанные данные в свой буферный кеш после завершения операции чтения, и механизм состояний для каждой перекрытой операции на основе состояний перечисления EOverlappedStates; "I/O не запущен", "Success" и "Error". Как вы думаете, мне нужно заблокировать метод обратного вызова, чтобы обеспечить параллелизм в многопоточной программе, такой как наша?
Открыть файл:
OS_FILE_HANDLE CUniformDiskInterface::OpenFile(const wchar_t *fileName, bool *fileExists, bool readData, bool writeData, bool overlap,
bool disableDiskCache, bool disableOsCache, bool randomAccess, bool sequentalScan) {
// Set access method
DWORD desiredAccess = readData ? GENERIC_READ : 0;
desiredAccess |= writeData ? GENERIC_WRITE : 0;
// Set file flags
DWORD fileFlags = disableDiskCache ? FILE_FLAG_WRITE_THROUGH : 0;
fileFlags |= disableOsCache ? FILE_FLAG_NO_BUFFERING : 0;
fileFlags |= randomAccess ? FILE_FLAG_RANDOM_ACCESS : 0;
fileFlags |= sequentalScan ? FILE_FLAG_SEQUENTIAL_SCAN : 0;
fileFlags |= !fileFlags ? FILE_ATTRIBUTE_NORMAL : 0;
fileFlags |= overlap ? FILE_FLAG_OVERLAPPED : 0;
HANDLE hOutputFile = CreateFile(
fileName,
desiredAccess,
0,
NULL,
OPEN_EXISTING,
fileFlags,
NULL);
Читать файл:
_UINT64 CUniformDiskInterface::ReadFromFile(OS_FILE_HANDLE hFile, void *outData, _UINT64 bytesToRead, OVERLAPPED *overlapped, LPOVERLAPPED_COMPLETION_ROUTINE completionRoutine) {
DWORD wBytesRead = 0;
BOOL result = completionRoutine ?
ReadFileEx(hFile, outData, (DWORD)(bytesToRead), overlapped, completionRoutine) :
ReadFile(hFile, outData, (DWORD)(bytesToRead), &wBytesRead, overlapped);
if (!result)
{
int errorCode = GetLastError();
if (errorCode != ERROR_IO_PENDING )
{
wstringstream err(wstringstream::in | wstringstream::out);
err << L"Can't read sectors from file. [ReadFile] error #" << errorCode << L".";
throw new FileIOException(L"CUniformDiskInterface", L"ReadFromFile", err.str().c_str(), GETDATE, GETFILE, GETLINE);
}
}
return (_UINT64)wBytesRead; }
Расширенная перекрывающаяся структура:
/*!
\enum EOverlappedStates
\brief The different overlapped states
\details Used as inter-thread communication while waiting for the I/O operation to complete
*/
enum EOverlappedStates
{
/** The I/O operation has not started or in in-progress */
EOverlappedNotStarted,
/** The I/O operation is done and was successful */
EOverlappedSuccess,
/** The I/O operation is done but there was an error */
EOverlappedError
};
/*!
\struct OverlappedEx
\brief Extended overlapped structure
*/
struct OverlappedEx : OVERLAPPED
{
/** The buffer manager that is designated to cache the record when it's loaded */
CBufferManager *bufferManger;
/** Transaction ID related to this disk I/O operation */
_UINT64 transactionId;
/** Start disk sector of the record */
_UINT64 startDiskSector;
/** Buffer */
void *buffer;
/** Number of bytes in \c buffer */
_UINT64 bufferSize;
/** Current overlapped I/O state. Used for inter-thread communication while waiting for the I/O to complete */
EOverlappedStates state;
/** Error code, or \c 0 if no error */
_UINT32 errorCode;
};
Метод обратного вызова:
/*! \brief Callback routine after a overlapped read has completed
\details Fills the buffer managers buffer cache with the read data
\todo This callback method may be a bottleneck, so look into how to handle this better
*/
VOID WINAPI CompletedReadRoutine(DWORD dwErr, DWORD cbBytesRead, LPOVERLAPPED lpOverLap)
{
OverlappedEx *overlapped = (OverlappedEx*)lpOverLap;
overlapped->errorCode = (_UINT32)dwErr;
if (!dwErr && cbBytesRead)
{
overlapped->state = EOverlappedSuccess;
overlapped->bufferManger->AddBuffer(overlapped->startDiskSector, overlapped->buffer, overlapped->bufferSize);
}
else
{
// An error occurred
overlapped->state = EOverlappedError;
}
}
Использование:
_UINT64 startDiskSector = location / sectorByteSize;
void *buffer = bufferManager->GetBuffer(startDiskSector);
if (!buffer)
{
/*
The disk sector was not cached, so get the data from the disk and cache in internal memory with
the buffer manager
*/
buffer = new char[recordByteSize];
// Create a overlapped structure to enable disk async I/O operations
OverlappedEx *overlapped = new OverlappedEx;
memset(overlapped, 0, sizeof(OverlappedEx));
overlapped->Offset = (DWORD)(startDiskSector & 0xffffffffULL);
overlapped->OffsetHigh = (DWORD)(startDiskSector >> 31ULL);
overlapped->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
overlapped->bufferManger = bufferManager;
overlapped->startDiskSector = startDiskSector;
overlapped->buffer = buffer;
overlapped->bufferSize = recordByteSize;
overlapped->state = EOverlappedNotStarted;
// Read from disk
diskApi.ReadFromFile(fileHandle, buffer, sectorByteSize, overlapped, CompletedReadRoutine);
return overlapped;
}
1 ответ
Согласно документации по MSDN, функция обратного вызова будет вызываться только в том же потоке, который вызвал ReadFileEx
функция и только тогда, когда поток ожидает событий. Таким образом, не гарантируется никаких проблем синхронизации между вызовом ReadFileEx
и вызов обратного вызова.
Это означает, что нет необходимости синхронизировать доступ к OverlappedEx
структура данных, если только один поток пытается прочитать конкретный экземпляр этой структуры, что эквивалентно чтению определенного файла только из одного потока. Если вы попытаетесь прочитать один файл из нескольких потоков, вполне вероятно, что вы столкнетесь с проблемами в самой Windows (я не думаю, что асинхронный ввод-вывод сам по себе является потокобезопасным), поэтому блокировка мьютекса не поможет вам в таком случае.