Delphi tIdTCPClient с событиями таймера и другими многопоточными событиями на стороне клиента

У нас есть клиент-серверное приложение Delphi, использующее INDY. Клиент имеет одно соединение tIdTCPClient с сервером, которое является многопоточным. Клиент "теоретически" один поток. Но на практике на клиенте несколько потоков, и в этом моя проблема. Например, подумайте о таймере, который срабатывает каждую минуту, чтобы получить данные с сервера. И подумайте, что происходит, когда пользователь запускает команду одновременно с этим событием таймера. По правде говоря, моя проблема вызвана нашим инструментом отчетности "Построитель отчетов", который (досадно) настаивает на загрузке каждой страницы отчета, что занимает некоторое время. В отчете используется наш "специальный" набор данных, который имеет механизм кэширования для одновременной передачи пакетов записей (так что несколько обращений к серверу для получения всех данных). Между тем, если пользователь делает что-то еще в то же время, мы, кажется, получаем перекрестные данные. Похоже, пользователь возвращает данные, предназначенные для отчета.

Кстати, эта ошибка встречается крайне редко, но гораздо реже для одного конкретного клиента, у которого самый медленный в мире интернет (повезло мне, у меня теперь есть тестовая среда).

Так что на клиенте у меня код немного похож на это...

procedure DoCommand(MyIdTCPClient:tIdTCPClient; var DATA:tMemoryStream);
var
  Buffer: TBytes;
  DataSize: Integer;
  CommsVerTest: String;
begin
  //Write Data
  MyIdTCPClient.IOHandler.Write(DATA.Size);
  MyIdTCPClient.IOHandler.Write(RawToBytes(Data.Memory^,DataSize));

  //Read back 6 bytes CommsVerTest should always be the same (ie ABC123)
  SetLength(Buffer,0); //Clear out buffer
  MyIdTCPClient.IOHandler.ReadBytes(Buffer,6); 
  CommsVerTest:=BytesToString(Buffer);
  if CommsVerTest<>'ABC123' then
    raise exception.create('Invalid Comms');      //It bugs out here in rare cases

  //Get Result Data Back from Server
  DataSize:=MyIdTCPClient.IOHandler.ReadLongInt;   
  Data.SetSize(DataSize);                         //Report thread is stuck here
  MyIdTCPClient.IOHandler.ReadBytes(Buffer,DataSize);
end; 

Теперь, когда я отлаживаю его, я могу подтвердить, что он выдает ошибки, когда в середине этой процедуры есть два потока. Основной поток останавливается на исключении. И поток отчета застрял где-то еще в той же процедуре.

Таким образом, мне кажется, что я должен сделать процедуру выше потокобезопасным. Я имею в виду, что если пользователь хочет что-то сделать, ему просто нужно дождаться окончания потока отчета.

Да, я думал, что мое клиентское приложение было однопоточным для отправки данных на сервер!

Я думаю, что использование TThread не будет работать - потому что у меня нет доступа к потоку внутри построителя отчетов. Я думаю, что мне нужно tCriticalSection.

Я думаю, что мне нужно сделать приложение так, чтобы вышеуказанная процедура могла выполняться только одним потоком за раз. Другие темы должны ждать.

Кто-то, пожалуйста, помогите с синтаксисом.

2 ответа

TIdIOHandler имеет Write() а также Read...() перегрузки для отправки / получения TStream данные:

procedure Write(AStream: TStream; ASize: TIdStreamSize = 0; AWriteByteCount: Boolean = False); overload; virtual;

procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1; AReadUntilDisconnect: Boolean = False); virtual;

Вам не нужно копировать TMemoryStream содержание до промежуточного TIdBytes перед отправкой или получить как TIdBytes перед копированием обратно в TStream. На самом деле, в показанном вами коде нет ничего, что нужно использовать TIdBytes прямо на всех:

procedure DoCommand(MyIdTCPClient: TIdTCPClient; var DATA: TMemoryStream);
var
  CommsVerTest: String;
begin
  //Write Data
  MyIdTCPClient.IOHandler.Write(DATA, 0, True);

  //Read back 6 bytes CommsVerTest should always be the same (ie ABC123)
  CommsVerTest := MyIdTCPClient.IOHandler.ReadString(6); 
  if CommsVerTest <> 'ABC123' then
    raise exception.create('Invalid Comms');

  //Get Result Data Back from Server
  DATA.Clear;
  MyIdTCPClient.IOHandler.ReadStream(DATA, -1, False);
end; 

При этом, если у вас есть несколько потоков, записывающих в один и тот же сокет одновременно, или несколько потоков, читающих из одного и того же сокета в одно и то же время, они повредят данные друг друга (или хуже). Вам необходимо синхронизировать доступ к сокету, как минимум, с критическим разделом. Из-за вашего многопоточного использования TIdTCPClientВам действительно нужно переосмыслить свой общий дизайн клиента.

По крайней мере, используя существующую логику, когда вам нужно отправить команду и прочитать ответ, остановите таймер и дождитесь обмена ожидающими данными перед отправкой команды, и не позволяйте ничему другому получить доступ к сокету пока ответ не вернется. Вы пытаетесь сделать слишком много за один раз, не синхронизируя все, чтобы избежать дублирования.

В долгосрочной перспективе было бы гораздо безопаснее выполнить все чтения из одного выделенного потока, а затем передать все полученные данные другим потокам для обработки по мере необходимости. Но это также означает изменение логики отправки для соответствия. Вы можете либо:

  1. Если ваш протокол позволяет вам иметь несколько команд в полете параллельно, то вы можете отправить команду из любого потока в любое время (просто обязательно используйте критический раздел, чтобы избежать совпадений), но не ждите немедленного ответа. Пусть каждый отправляющий поток движется дальше и выполняет другие действия, а поток чтения должен асинхронно уведомлять соответствующий отправляющий поток, когда ожидаемый ответ действительно прибывает.

  2. Если протокол не допускает параллельные команды, но вам все еще нужно, чтобы каждый поток-отправитель ждал своего соответствующего ответа, тогда предоставьте потоку сокета потокобезопасную очередь, в которую другие потоки могут помещать команды при необходимости. Затем поток сокетов может периодически проходить через эту очередь, посылая каждую команду и получая ответ по одному за раз. Каждый поток, который помещает команду в очередь, может включать TEvent чтобы получить сигнал при получении ответа, таким образом они переходят в эффективные состояния ожидания во время ожидания, но вы сохраняете логику ожидания для каждого потока.

Спасибо, Реми.

TCriticalSection решил проблему. У меня нет контроля над такими вещами, как сторонний построитель отчетов. И запуск отчетов полностью в их собственном потоке не будет иметь большого значения - им все еще нужно совместно использовать одно и то же соединение (я не хочу или нуждаюсь в параллельных соединениях). В любом случае основная часть программы выполняется в главном потоке, и редко бывает, что двум потокам необходимо взаимодействовать с сервером одновременно.

Таким образом, TCriticalSection был идеален - он предотвращал выполнение этой процедуры дважды в одно и то же время (т. Е. Один поток должен дождаться завершения первого). И к счастью - это сработало блестяще.

В основном код теперь выглядит так:

procedure DoCommand(
    CS:tCriticalSection; 
    MyIdTCPClient:tIdTCPClient; 
    var DATA:tMemoryStream);
var
  Buffer: TBytes;
  DataSize: Integer;
  CommsVerTest: String;
begin
  CS.Enter;     //enter Critical Section
  try
    //Write Data
    MyIdTCPClient.IOHandler.Write(DATA.Size);
    MyIdTCPClient.IOHandler.Write(RawToBytes(Data.Memory^,DataSize));

    //Read back 6 bytes CommsVerTest should always be the same (ie ABC123)
    SetLength(Buffer,0); //Clear out buffer
    MyIdTCPClient.IOHandler.ReadBytes(Buffer,6); 
    CommsVerTest:=BytesToString(Buffer);
    if CommsVerTest<>'ABC123' then
      raise exception.create('Invalid Comms');      

    //Get Result Data Back from Server
    DataSize:=MyIdTCPClient.IOHandler.ReadLongInt;   
    Data.SetSize(DataSize);                         
    MyIdTCPClient.IOHandler.ReadBytes(Buffer,DataSize);
  finally
    cs.Leave;
  end;
end;
Другие вопросы по тегам