Клонирование / копирование / дублирование потоков в Lazarus
Я разработал процедуру, которая получает TStream; но основной тип, чтобы разрешить отправку всех типов потоковых наследников.
Эта процедура предназначена для создания одного потока для каждого ядра или нескольких потоков. Каждый поток будет выполнять подробный анализ потоковых данных (только для чтения), и, поскольку классы Паскаля назначаются по ссылке, а не по значению, произойдет столкновение потоков, поскольку позиция чтения является интеркалярной.
Чтобы это исправить, я хочу, чтобы процедура выполняла всю работу, чтобы удвоить последний TStream в памяти, выделив ему новую переменную. Таким образом, я могу дублировать TStream в достаточном количестве, чтобы каждый поток имел уникальный TStream. После окончания самой нити библиотеки памяти.
Примечание: процедура находится внутри DLL, поток работает.
Примечание 2: цель состоит в том, чтобы процедура выполняла все необходимые услуги, т.е. без вмешательства кода, вызывающего; Вы можете легко передать массив TStream, а не просто TStream. Но я этого не хочу! Цель состоит в том, чтобы услуга предоставлялась полностью в соответствии с процедурой.
У вас есть идеи, как это сделать?
Спасибо.
Дополнение:
У меня была идея низкого уровня, но мои знания в Паскале ограничены.
- Определите адрес объекта в памяти и его размер.
- создать в памяти новый адрес того же размера, что и исходный объект.
- скопируйте весь объект содержимого (raw) на этот новый адрес.
- Я создаю указатель на TStream, который указывает на этот новый адрес в памяти.
Это будет работать, или это глупо? Если да, как действовать? Пример пожалуйста!
2º сложение:
В качестве примера, предположим, что программа выполняет атаки методом грубой силы на зашифрованные потоки (просто пример, потому что это не применимо):
Сцена: файл 30 ГБ в ЦП с 8 ядрами:
1º - TMemoryStream:
Создайте 8 TMemoryStream и скопируйте все содержимое файла для каждого из TMemoryStreams. Это приведет к одновременному использованию 240 ГБ ОЗУ. Я считаю эту нарушенную идею. Кроме того, это увеличит время обработки до такой степени, что не будет использоваться многопоточность. Я должен был бы прочитать весь файл в память, а затем загрузить, начать анализировать его. Сломался!
* Плохая альтернатива TMemoryStream - медленно копировать файл в TMemoryStream партиями по 100 МБ / ядро (800 МБ), чтобы не занимать память. Таким образом, каждый поток выглядит только 100 МБ, освобождает память, пока вы не завершите весь файл. Но проблема в том, что для этого потребовалась бы функция Synchronize() в DLL, которая, как мы знаем, не работает, так как я открываю вопрос в DLL Synchronize(), зависает без ошибок и вылетает
2º - TFileStream:
Это хуже на мой взгляд. Смотрите, я получаю TStream, создаю 8 TFileStream и копирую все 30 ГБ для каждого TFileStream. Это отстой, потому что занимают 240 ГБ на диске, что является высоким значением даже для жесткого диска. Время чтения и записи (копирования) в HD приведет к тому, что реализация многопоточной обработки займет больше времени, чем один поток. Сломался!
Вывод: два вышеуказанных подхода требуют использования synchronize(), чтобы поставить в очередь каждый поток для чтения файла. Поэтому потоки не работают одновременно даже на многоядерном процессоре. Я знаю, что даже если бы он мог одновременный доступ к файлу (непосредственно создав несколько TFileStream), операционная система все еще enfileiraria потоки для чтения файла по одному, потому что жесткий диск не является действительно потокобезопасным, он не может прочитать две данные в то же время. Это физическое ограничение жесткого диска! Тем не менее, управление очередями в ОС намного эффективнее и эффективно уменьшает скрытые узкие места, в отличие от того, если я реализую вручную синхронизировать (). Это оправдывает мою идею клонировать TStream, оставив SO всю работу по управлению очередью доступа к файлам; без какого-либо вмешательства - и я знаю, что он сделает это лучше меня.
пример
В приведенном выше примере я хочу, чтобы 8 потоков анализировали по-разному и одновременно один и тот же поток, зная, что потоки не знают, какой поток предоставлен, это может быть файл Stream, поток из Интернета или даже небольшой TStringStream . Основная программа создаст только один Strean, и будет с параметрами конфигурации. Простой пример:
TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;
TService = record
stream: TStream;
modes: array of TModesFB;
end;
Например, должна быть возможность анализировать только поток M1, только M2 или оба [M1, M2]. Состав TModesFB изменяет способ анализа потока. Каждый элемент в массиве "mode", который функционирует как список задач, будет обрабатываться другим потоком. Пример списка задач (представление JSON):
{
Stream: MyTstream,
modes: [
[M1, m5],
[M1],
[M5, m2],
[M5, m2, m4, m3],
[M1, m1, m3]
]
}
Примечание: в анализаторе [m1] + [m2] <> [m1, m2].
В программе:
function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';
В DLL:
// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType;
var
i, processors : integer;
begin
processors := getCPUCount();
if (maxCores < processors) and (maxCores > 0) then
processors := maxCores;
setlength (globalThreads, processors);
for i := 0 to processors - 1 do
// It is obvious that the counter modes in the original is not the same counter processors.
if i < length(Task.modes) then begin
globalThreads[i] := TAnalusysThread.create(true, Task.stream, Task.modes[i])
globalThreads[i].start();
end;
[...]
end;
Примечание: с одним потоком программа работает прекрасно, без известных ошибок.
Я хочу, чтобы каждый поток позаботился о типе анализа, и я не могу использовать Synchronize() в DLL. Понимаю? Есть адекватное и чистое решение?
2 ответа
Клонирование потока - это код, подобный следующему:
streamdest:=TMemoryStream.create;
streamsrc.position:=0;
streamdest.copyfrom(streamdest);
streamsrc.position:=0;
streamdest.position:=0;
Однако делать что-либо за пределами DLL сложно, так как у DLL есть собственная копия библиотек и состояние библиотеки. В настоящее время это не рекомендуется.
Я отвечаю на мой вопрос, потому что я решил, что ни у кого не было действительно хорошего решения. Возможно, потому что нет ни одного!
Поэтому я адаптировал идею Марко ван де Воорта и Кена Уайта для решения, которое работает с использованием TMemoryStream с частичной загрузкой в пакете памяти 50 МБ, используя TRTLCriticalSection для синхронизации.
Решение также содержит те же недостатки, упомянутые в добавлении 2; они:
- Очередь доступа к жесткому диску является ответственностью моей программы, а не операционной системы;
- Один поток несет в памяти дважды одинаковые данные.
- В зависимости от скорости процессора, поток может хорошо анализировать быстрые 50 МБ памяти; С другой стороны, загрузка памяти может быть очень медленной. Это позволило бы использовать несколько потоков, запускаемых последовательно, теряя преимущество использования многопоточности, потому что каждый поток перегружен доступом к файлу, работающему последовательно, как если бы они были одним потоком.
Поэтому я считаю это решение грязным решением. Но пока это работает!
Ниже я приведу простой пример. Это означает, что эта адаптация может содержать очевидные ошибки логики и / или синтаксиса. Но этого достаточно, чтобы продемонстрировать.
Используя тот же пример проблемы, вместо передачи тока в "анализ" передается указатель на процесс. Эта процедура отвечает за синхронизацию чтения пакета потока 50 МБ.
И DLL, и программа:
TLotLoadStream = function (var toStm: TMemoryStream; lot, id: integer): int64 of object;
TModeForceBrute = (M1, M2, M3, M4, M5...)
TModesFB = set of TModeForceBrute;
TaskTService = record
reader: TLotLoadStream; {changes here <<<<<<< }
modes: array of TModesFB;
end;
В программе:
type
{ another code here }
TForm1 = class(TForm)
{ another code here }
CS : TRTLCriticalSection;
stream: TFileStream;
function MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;
{ another code here }
end;
function analysis(Task: TService; maxCores: integer): TMyResultType; external 'mydll.dll';
{ another code here }
implementation
{ another code here }
function TForm1.MyReader(var toStm: TMemoryStream; lot: integer): int64 of object;
const
lotSize = (1024*1024) * 50; // 50MB
var
ler: int64;
begin
result := -1;
{
MUST BE PERFORMED PREVIOUSLY - FOR EXAMPLE IN TForm1.create()
InitCriticalSection (self.CriticalSection);
}
toStm.Clear;
ler := 0;
{ ENTERING IN CRITICAL SESSION }
EnterCriticalSection(self.CS);
{ POSITIONING IN LOT OF BEGIN}
self.streamSeek(lot * lotSize, soBeginning);
if (lot = 0) and (lotSize >= self.stream.size) then
ler := self.stream.size
else
if self.stream.Size >= (lotSize + (lot * lotSize)) THEN
ler := lotSize
else
ler := (self.stream.Size) - self.stream.Position; // stream inicia em 0?
{ COPYNG }
if (ler > 0) then
toStm.CopyFrom(self.stream, ler);
{ LEAVING THE CRITICAL SECTION }
LeaveCriticalSection(self.CS);
result := ler;
end;
В DLL:
{ another code here }
// Basic, simple and fasted Exemple! May contain syntax errors or logical.
function analysis(Task: TService; maxCores: integer): TMyResultType;
var
i, processors : integer;
begin
processors := getCPUCount();
if (maxCores < processors) and (maxCores > 0) then
processors := maxCores;
setlength (globalThreads, processors);
for i := 0 to processors - 1 do
// It is obvious that the counter modes in the original is not the same counter processors.
if i < length(Task.modes) then begin
globalThreads[i] := TAnalusysThread.create(true, Task.reader, Task.modes[i])
globalThreads[i].start();
end;
{ another code here }
end;
В потоке DLL класса:
type
{ another code here }
MyThreadAnalysis = class(TThread)
{ another code here }
reader: TLotLoadStream;
procedure Execute;
{ another code here }
end;
{ another code here }
implementation
{ another code here }
procedure MyThreadAnalysis.Execute;
var
Stream: TMemoryStream;
lot: integer;
{My analyzer already all written using buff, the job of rewriting it is too large, then it is so, two readings, two loads in memory, as I already mentioned in the question!}
buf: array[1..$F000] of byte; // 60K
begin
lot := 0;
Stream := TMemoryStream.Create;
self.reader(stream, lot);
while (assigned(Stream)) and (Stream <> nil) and (Stream.Size > 0) then begin
Stream.Seek(0, soBeginning);
{ 2º loading to memory buf }
while (Stream.Position < Stream.Size) do begin
n := Stream.read(buf, sizeof(buf));
{ MY CODE HERE }
end;
inc(lot);
self.reader(stream, lot, integer(Pchar(name)));
end;
end;
Итак, как видно, это временное решение. Я все еще надеюсь найти чистое решение, которое позволит мне удвоить контроллер потока таким образом, чтобы доступ к данным был обязан операционной системе, а не моей программе.