Многопоточная очередь в Delphi?

Это мой второй вопрос по этому поводу, у меня проблемы с этим>.<

Ну, я просто хочу создать ограниченное количество потоков (в данном случае я хочу 10 потоков), и тогда каждый поток выберет имя в моем списке и получит некоторые данные на моем сайте.

Моя система работает довольно хорошо, но моя многопоточная система все еще не работает =(

-

Я попробовал код, опубликованный LU RD, но основной поток не ждет, когда потоки закончат очередь, и просто останавливается =(

Код:

uses
Classes,SyncObjs,Generics.Collections;

Type
TMyConsumerItem = class(TThread)
private
 FQueue : TThreadedQueue<TProc>;
 FSignal : TCountDownEvent;
protected
 procedure Execute; override;
public
 constructor Create( aQueue : TThreadedQueue<TProc>; aSignal : TCountdownEvent);
end;

constructor TMyConsumerItem.Create(aQueue: TThreadedQueue<TProc>; aSignal : TCountDownEvent);
begin
 Inherited Create(false);
 Self.FreeOnTerminate := true;
 FQueue := aQueue;
 FSignal := aSignal;
end;

procedure TMyConsumerItem.Execute;
var
aProc : TProc;
begin
 try
 repeat
  FQueue.PopItem(aProc);
  if not Assigned(aProc) then
   break; // Drop this thread
  aProc();
 until Terminated;
 finally
  FSignal.Signal;
 end;
end;

procedure DoSomeJob(myListItems : TStringList);
const
 cThreadCount = 10;
 cMyQueueDepth = 100;
var
i : Integer;
aQueue : TThreadedQueue<TProc>;
aCounter : TCountDownEvent;
function CaptureJob( const aString : string) : TProc;
begin
 Result :=
  procedure
  begin
    // Do some job with aString
  end;
end;
begin
aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
aCounter := TCountDownEvent.Create(cThreadCount);
try
 for i := 1 to cThreadCount do
  TMyConsumerItem.Create(aQueue,aCounter);
 for i := 0 to myListItems.Count-1 do begin
  aQueue.PushItem( CaptureJob( myListItems[i]));
 end;
finally
 for i := 1 to cThreadCount do
  aQueue.PushItem(nil);
 aCounter.WaitFor;  // Wait for threads to finish
 aCounter.Free;
 aQueue.Free;
end;
end;

Мой другой вопрос: Multi Thread Delphi

Я использую Delphi XE3.

1 ответ

Решение
  • Во-первых, если вы хотите вызвать процедуру DoSomeJob() и блокировать до готовности из основного потока, есть предостережение. Если ваши рабочие потоки синхронизируются с основным потоком, возникает ситуация взаимоблокировки с aCounter.WaitFor а также TThread.Synchronize(),

Я предполагаю, что это то, что происходит с вами, догадываясь здесь.

Есть способ справиться с этим, как я покажу в этом ответе.

  • Во-вторых, обычно рабочие потоки должны обрабатываться пулом потоков, чтобы избежать постоянного создания / уничтожения потоков. Передайте свою работу в пул потоков, чтобы все выполнялось и ожидалось внутри потока. Это позволяет избежать блокировки основного потока. Я оставлю это до вас. Как только эта структура написана, многопоточность будет легче. Если это кажется сложным, попробуйте OTL вместо этого

Вот пример, где основной поток может ждать DoSomeJob() безопасным образом. Создан анонимный поток для ожидания aCounter сигнализировать. В этом примере используется TMemo и TButton, Просто создайте форму с этими компонентами и подключите кнопку OnClick событие в ButtonClick метод.

unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
  System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls;

type
  TForm1 = class(TForm)
    Button1: TButton;
    Memo1: TMemo;
    procedure Button1Click(Sender: TObject);
  private
    { Private declarations }
    procedure DoSomeJob( myListItems : TStringList);
  public
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

uses
  SyncObjs, Generics.Collections;

{- Include TMyConsumerItem class here }

procedure TForm1.Button1Click(Sender: TObject);
var
  aList : TStringList;
  i : Integer;
begin
  aList := TStringList.Create;
  Screen.Cursor := crHourGlass;
  try
    for i := 1 to 20 do aList.Add(IntToStr(i));
    DoSomeJob(aList);
  finally
    aList.Free;
    Screen.Cursor := crDefault;
  end;
end;

procedure TForm1.DoSomeJob(myListItems: TStringList);
const
  cThreadCount = 10;
  cMyQueueDepth = 100;
var
  i: Integer;
  aQueue: TThreadedQueue<TProc>;
  aCounter: TCountDownEvent;

  function CaptureJob(const aString: string): TProc;
  begin
    Result :=
      procedure
      var
        i,j : Integer;
      begin
        // Do some job with aString
        for i := 0 to 1000000 do
          j := i;
        // Report status to main thread
        TThread.Synchronize(nil,
          procedure
          begin
            Memo1.Lines.Add('Job with:'+aString+' done.');
          end
        );

      end;
  end;
var
  aThread : TThread;
begin
  aQueue := TThreadedQueue<TProc>.Create(cMyQueueDepth);
  aCounter := TCountDownEvent.Create(cThreadCount);
  try
    for i := 1 to cThreadCount do
      TMyConsumerItem.Create(aQueue, aCounter);
    for i := 0 to myListItems.Count - 1 do
    begin
      aQueue.PushItem(CaptureJob(myListItems[i]));
    end;
    // Kill the worker threads
    for i := 1 to cThreadCount do
      aQueue.PushItem(nil);
  finally
    // Since the worker threads synchronizes with the main thread,
    // we must wait for them in another thread.
    aThread := TThread.CreateAnonymousThread(
      procedure
      begin
        aCounter.WaitFor; // Wait for threads to finish
        aCounter.Free;
        aQueue.Free;
      end
    );
    aThread.FreeOnTerminate := false;
    aThread.Start;
    aThread.WaitFor;  // Safe to wait for the anonymous thread
    aThread.Free;
  end;
end;

end.  
Другие вопросы по тегам