Многопоточная Delphi
У меня проблемы с многопоточностью в Delphi. У меня есть список имен (что-то около 2000 имен), и мне нужно получить данные о каждом имени на моем сайте. Моя система работает отлично, кроме контроля потока.
Я хочу создать 10 потоков, и, когда один поток завершится, создать еще один... до конца списка.
var
Form1: TForm;
tCount: Integer; //threads count
implementation
type
TCheck = class(TThread)
public
constructor Create(Name: string);
destructor Destroy; Override;
protected
procedure Execute; Override;
end;
MainT = class(TThread)
protected
procedure Execute; Override;
end;
destructor TCheck.Destroy;
begin
Dec(tCount);
end;
procedure MainT.Execute;
var
i: Integer;
Load: TStringList;
begin
Load:=TStringList.Create;
Load.LoadFromFile('C:\mynames.txt');
for i:= 0 to Load.Count -1 do
begin
if tCount = 10 then //if we have 10 threads running...
begin
repeat
Sleep(1);
until tCount < 10;
end;
TCheck.Create(Load.Strings[i]);
TCheck.Start;
Inc(tCount);
end;
end; // end of procedure
Ну, я не поместил TCheck.Constructor, потому что проблема в том, как я проверяю количество созданных потоков. Я имею в виду, что мои программы просто останавливаются, без каких-либо сообщений об ошибках, иногда проверяют 500 имен, иногда 150 имен...
Извините за плохой английский.
2 ответа
Вот потокобезопасное решение очереди с использованием обобщений.
Определите, сколько потребительских потоков вы хотите, глубину очереди и просто запустите DoSomeJob
процедура из потока.
Определите свою работу, работая со строкой как общая процедура (в CaptureJob
).
Когда очередь пуста, потоки потребителя будут уничтожены. DoSomeJob
процедура ждет, пока все работы не будут готовы. Вы можете легко превратить это в общий рабочий пул, повторно используя потоки, не разрушая их. Общая структура элементов работы также делает их пригодными для выполнения различных видов работ.
Обратите внимание, что эта очередь работает на XE2 и выше. Если вы используете более старую версию Delphi, ищите аналогичную безопасную очередь, как предложено в комментариях.
uses
Classes,SyncObjs,Generics.Collections;
Type
TMyConsumerItem = class(TThread)
private
FQueue : TThreadedQueue<TProc>;
FSignal : TCountDownEvent;
protected
procedure Execute; override;
public
constructor Create( aQueue : TThreadedQueue<TProc>; aSignal : TCountdownEvent);
end;
constructor TMyConsumerItem.Create(aQueue: TThreadedQueue<TProc>);
begin
Inherited Create(false);
Self.FreeOnTerminate := true;
FQueue := aQueue;
FSignal := aSignal;
end;
procedure TMyConsumerItem.Execute;
var
aProc : TProc;
begin
try
repeat
FQueue.PopItem(aProc);
if not Assigned(aProc) then
break; // Drop this thread
aProc();
until Terminated;
finally
FSignal.Signal;
end;
end;
procedure DoSomeJob(myListItems : TStringList);
const
cThreadCount = 10;
cMyQueueDepth = 100;
var
i : Integer;
aQueue : TThreadedQueue<TProc>;
aCounter : TCountDownEvent;
function CaptureJob( const aString : string) : TProc;
begin
Result :=
procedure
begin
// Do some job with aString
end;
end;
begin
aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
aCounter := TCountDownEvent.Create(cThreadCount);
try
for i := 1 to cThreadCount do
TMyConsumerItem.Create(aQueue,aCounter);
for i := 0 to myListItems.Count-1 do begin
aQueue.PushItem( CaptureJob( myListItems[i]));
end;
finally
for i := 1 to cThreadCount do
aQueue.PushItem(nil);
aCounter.WaitFor; // Wait for threads to finish
aCounter.Free;
aQueue.Free;
end;
end;
NB: Кен объясняет, почему ваша инициализация и запуск потоков неверны. Это предложение показывает лучшую структуру для более общего решения проблем этого типа.
Если вы не объявляете переменную для хранения возвращаемого значения TCheck.Create
Вы не можете получить доступ TCheck.Start
(нет ни одного случая TCheck
Вы можете использовать для доступа к Start
метод).
Правильным способом было бы объявить var Check: TCheck;
внутри MainT.Execute
и затем сохраните возвращенное значение:
Check := TCheck.Create(Load[i]); { See note below }
Check.Start;
Inc(tCount);
ПРИМЕЧАНИЕ. Свойство по умолчанию TStringList
является Strings
так что вам не нужно его использовать. Вы можете просто получить доступ Strings
прямо как у меня выше. Следующие две строки - это одно и то же (но, очевидно, одна из них короче и проще для ввода):
Load.Strings[i];
Load[i];
Если вы не хотите сохранять ссылку на TCheck
просто измените свой код, чтобы быть with
блок (включая begin..end
и не содержит никакого другого кода в блоке (это единственный способ, который я когда-либо рекомендую использовать with
):
with TCheck.Create(Load[i]) do
begin
Start;
Inc(tCount);
end;
С учетом вышесказанного, есть гораздо лучшие способы сделать это вместо создания / уничтожения всех видов потоков. Как уже говорили другие, вы можете иметь список из 10 потоков и поставить в очередь работу для них, так что каждый будет обрабатывать элемент из Load
и затем вернитесь, чтобы получить другой элемент для обработки, когда закончите, и повторите, пока список не будет завершен. Трудно сказать, как именно вы это сделаете, потому что это будет зависеть от вашей версии Delphi. (Существуют библиотеки, которые сделают большую часть работы за вас, например, OMNIThreadLibrary
, но он недоступен для некоторых старых версий Delphi. Последние версии Delphi также поддерживают TQueue
а также TObjectQueue
и некоторые другие типы и функции, которые могут быть очень полезны.
(Если у вас другой вопрос о том, как сделать это в очереди с ограниченным числом потоков, это должен быть новый вопрос, а не то, что вы добавляете к этому.)