Многопоточная Delphi

У меня проблемы с многопоточностью в Delphi. У меня есть список имен (что-то около 2000 имен), и мне нужно получить данные о каждом имени на моем сайте. Моя система работает отлично, кроме контроля потока.

Я хочу создать 10 потоков, и, когда один поток завершится, создать еще один... до конца списка.

var
 Form1: TForm;
 tCount: Integer;  //threads count

implementation

type
 TCheck = class(TThread)
 public
  constructor Create(Name: string);
  destructor Destroy; Override;
 protected
  procedure Execute; Override;
 end;

 MainT = class(TThread)
 protected
  procedure Execute; Override;
 end;

destructor TCheck.Destroy;
begin
 Dec(tCount);
end;

procedure MainT.Execute;
var
 i: Integer;
 Load: TStringList;
begin
 Load:=TStringList.Create;
 Load.LoadFromFile('C:\mynames.txt');

 for i:= 0 to Load.Count -1 do
 begin

  if tCount = 10 then  //if we have 10 threads running...
  begin
   repeat
    Sleep(1);
   until tCount < 10;
  end;

  TCheck.Create(Load.Strings[i]);
  TCheck.Start;
  Inc(tCount);

 end;

end;  // end of procedure

Ну, я не поместил TCheck.Constructor, потому что проблема в том, как я проверяю количество созданных потоков. Я имею в виду, что мои программы просто останавливаются, без каких-либо сообщений об ошибках, иногда проверяют 500 имен, иногда 150 имен...

Извините за плохой английский.

2 ответа

Решение

Вот потокобезопасное решение очереди с использованием обобщений.

Определите, сколько потребительских потоков вы хотите, глубину очереди и просто запустите DoSomeJob процедура из потока.

Определите свою работу, работая со строкой как общая процедура (в CaptureJob).

Когда очередь пуста, потоки потребителя будут уничтожены. DoSomeJob процедура ждет, пока все работы не будут готовы. Вы можете легко превратить это в общий рабочий пул, повторно используя потоки, не разрушая их. Общая структура элементов работы также делает их пригодными для выполнения различных видов работ.

Обратите внимание, что эта очередь работает на XE2 и выше. Если вы используете более старую версию Delphi, ищите аналогичную безопасную очередь, как предложено в комментариях.

uses
  Classes,SyncObjs,Generics.Collections;

Type
  TMyConsumerItem = class(TThread)
  private
    FQueue : TThreadedQueue<TProc>;
    FSignal : TCountDownEvent;
  protected
    procedure Execute; override;
  public
    constructor Create( aQueue : TThreadedQueue<TProc>; aSignal : TCountdownEvent);
  end;

constructor TMyConsumerItem.Create(aQueue: TThreadedQueue<TProc>);
begin
  Inherited Create(false);
  Self.FreeOnTerminate := true;
  FQueue := aQueue;
  FSignal := aSignal;
end;

procedure TMyConsumerItem.Execute;
var
  aProc : TProc;
begin
  try
    repeat
      FQueue.PopItem(aProc);
      if not Assigned(aProc) then
        break; // Drop this thread
      aProc();
    until Terminated;
  finally
    FSignal.Signal;
  end;
end;

procedure DoSomeJob(myListItems : TStringList);
const
  cThreadCount = 10;
  cMyQueueDepth = 100;
var
  i : Integer;
  aQueue : TThreadedQueue<TProc>;
  aCounter : TCountDownEvent;
  function CaptureJob( const aString : string) : TProc;
  begin
    Result :=
      procedure
      begin
        // Do some job with aString
      end;
  end;
begin
  aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
  aCounter := TCountDownEvent.Create(cThreadCount);
  try
    for i := 1 to cThreadCount do
      TMyConsumerItem.Create(aQueue,aCounter);
    for i := 0 to myListItems.Count-1 do begin
      aQueue.PushItem( CaptureJob( myListItems[i]));
    end;
  finally
    for i := 1 to cThreadCount do
      aQueue.PushItem(nil);
    aCounter.WaitFor;  // Wait for threads to finish
    aCounter.Free;
    aQueue.Free;
  end;
end;

NB: Кен объясняет, почему ваша инициализация и запуск потоков неверны. Это предложение показывает лучшую структуру для более общего решения проблем этого типа.

Если вы не объявляете переменную для хранения возвращаемого значения TCheck.Create Вы не можете получить доступ TCheck.Start (нет ни одного случая TCheck Вы можете использовать для доступа к Start метод).

Правильным способом было бы объявить var Check: TCheck; внутри MainT.Execute и затем сохраните возвращенное значение:

Check := TCheck.Create(Load[i]);  { See note below }
Check.Start;
Inc(tCount);

ПРИМЕЧАНИЕ. Свойство по умолчанию TStringList является Strings так что вам не нужно его использовать. Вы можете просто получить доступ Strings прямо как у меня выше. Следующие две строки - это одно и то же (но, очевидно, одна из них короче и проще для ввода):

Load.Strings[i];
Load[i];

Если вы не хотите сохранять ссылку на TCheck просто измените свой код, чтобы быть with блок (включая begin..end и не содержит никакого другого кода в блоке (это единственный способ, который я когда-либо рекомендую использовать with):

with TCheck.Create(Load[i]) do
begin
  Start;
  Inc(tCount);
end;

С учетом вышесказанного, есть гораздо лучшие способы сделать это вместо создания / уничтожения всех видов потоков. Как уже говорили другие, вы можете иметь список из 10 потоков и поставить в очередь работу для них, так что каждый будет обрабатывать элемент из Load и затем вернитесь, чтобы получить другой элемент для обработки, когда закончите, и повторите, пока список не будет завершен. Трудно сказать, как именно вы это сделаете, потому что это будет зависеть от вашей версии Delphi. (Существуют библиотеки, которые сделают большую часть работы за вас, например, OMNIThreadLibrary, но он недоступен для некоторых старых версий Delphi. Последние версии Delphi также поддерживают TQueue а также TObjectQueue и некоторые другие типы и функции, которые могут быть очень полезны.

(Если у вас другой вопрос о том, как сделать это в очереди с ограниченным числом потоков, это должен быть новый вопрос, а не то, что вы добавляете к этому.)

Другие вопросы по тегам