Delphi: Многопоточность, Потокобезопасность не работает

Когда данные отправляются в "туннельный" сокет, они иногда объединяются, реализуют критический раздел, но он не работает..

Что я делаю не так?

type
  my_ff_thread = class;
  my_ss_thread = class;
  Tmy_tunnel_from_MappedPortTCP = class;


  Tmy_thread_list = class
      ff_id   : string;
      ff_connection : TIdTCPConnection;
      constructor Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
  end;


  Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent)
  protected
    procedure InitComponent; override;
  public
    function my_connect:boolean;
  end;


  my_ff_thread = class(TThread)
  protected
    procedure Execute; override;
  public
    constructor Create;
  end;


  my_ss_thread = class(TThread)
  protected
    Fff_id : string;
    Fff_cmd : string;
    Fff_data : TIdBytes;
    procedure Execute; override;
  public
    constructor Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
    function prepare_cmd(cmd:string; id:string; data:string):string;
    function set_nulls_at_begin(s:string):string;
  end;


var my_list : TThreadList;
    CS: TRTLCriticalSection;
    tunnel: TIdTCPConnection;

Implementation



constructor my_ff_thread.Create;
begin
  inherited Create(True);
end;




constructor my_ss_thread.Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
begin
  inherited Create(True);
  Fff_id := ff_id;
  Fff_cmd := ff_cmd;
  Fff_data := ff_data;
end;


constructor Tmy_thread_list.Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
begin
  ff_id   := local_ff_id;
  ff_connection := local_ss_c;
end;





function my_ss_thread.set_nulls_at_begin(s:string):string;
var len, i : integer;
    res : string;
begin
  if s='' then
  begin
    Result := '';
    Exit;
  end;
  res := '';
  len := Length(s);
  if len < 10 then
    for i:=1 to (10 - len) do
    begin
      res := res + '0';
    end;
  Result := res + s;
end;




function my_ss_thread.prepare_cmd(cmd:string; id:string; data:string):string;
var
  packet : string;
begin
  packet := set_nulls_at_begin(IntToStr(Length(cmd))) + cmd;
  packet := packet + set_nulls_at_begin(IntToStr(Length(id))) + id;
  packet := packet + set_nulls_at_begin(IntToStr(Length(data))) + data;
  Result := packet;
end;








function del_ff_from_list(firefox_id:string):boolean;
var i : integer;
begin
  Result := True;
  try
    with my_list.LockList do
    begin
      for i:=0 to Count-1 do
      begin
        if Tmy_thread_list(Items[i]).ff_id = firefox_id then
        begin
          Delete(i);
          break;
        end;
      end;
    end;
  finally
    my_list.UnlockList;
  end;
end;




procedure my_ss_thread.Execute;
var ss : TIdTCPClient;
    unix_time : integer;
    data : TIdBytes;
    packet : string;
    packet_stream: TStringStream;
begin
    ss := TIdTCPClient.Create(nil);
    try
      with TIdTcpClient(ss) do
      begin
        Host := '127.0.0.1';
        Port := 6666;
        ReadTimeout := 1000 * 5;
        Connect;
      end;
    except
      on E:Exception do
      begin
        ss.Disconnect;
        exit;
      end;
    end;





    try
      my_list.LockList.Add(Tmy_thread_list.Create(Fff_id, ss));
    finally
      my_list.UnlockList;
    end;

    try
      ss.Socket.Write(Fff_data);
    except
      on E:Exception do begin {Fmy_memo.Lines.Add('First data not sent!');} end;
    end;


    unix_time := DateTimeToUnix(NOW);


    while True do
    begin
      ss.Socket.CheckForDataOnSource(5);
      if not ss.Socket.InputBufferIsEmpty then
      begin
        SetLength(data, 0);

        ss.Socket.InputBuffer.ExtractToBytes(data);
        packet := prepare_cmd('data_from_ss', Fff_id, TIdEncoderMIME.EncodeBytes(data));
        packet_stream := TStringStream.Create(packet);
        packet_stream.Position := 0;

        ss.Socket.InputBuffer.Clear;
        unix_time := DateTimeToUnix(NOW);

        try
            EnterCriticalSection(CS);
            tunnel.Socket.Write(packet_stream, -1, True);
            LeaveCriticalSection(CS);

        except
          on E:Exception do
          begin
          end;
        end;
      end;


      if (DateTimeToUnix(NOW) - unix_time) > 120 then
      begin
        ss.Disconnect;
        break;
      end;

      if not ss.Connected then
      begin
        break;
      end;

      if not tunnel.Connected then
      begin
        ss.Disconnect;
        break;
      end;


    end;


    try
      if tunnel.Connected then
      begin
        EnterCriticalSection(CS);
          packet := prepare_cmd('disconnect', Fff_id, 'x');
          packet_stream := TStringStream.Create(packet);
          packet_stream.Position := 0;
          tunnel.Socket.Write(packet_stream, -1, True);
        LeaveCriticalSection(CS);
      end;
    except
      on E:Exception do begin end;
    end;


Terminate;
end;










procedure my_ff_thread.Execute;
var
  t : my_ss_thread;
  cmd, id : string;
  i : integer;
  found_ss : TIdTCPConnection;
  list : TList;
  packet : string;
  cmd_len, id_len, data_len : integer;
  data : TIdBytes;
  orig_data : string;
  packet_stream: TStringStream;
  cmd_len_str, id_len_str, data_len_str : string;
begin
  packet_stream := TStringStream.Create;

  while not Terminated do
  begin
        packet_stream.Position := 0;
        try
          tunnel.Socket.ReadStream(packet_stream);
        except
          on E:Exception do begin end;
        end;

        packet := packet_stream.DataString;


        if packet = '0000' then
          continue;




        try
        cmd_len_str := Copy(packet, 1, 10);
        cmd_len := StrToInt(cmd_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        cmd := Copy(packet, 1, cmd_len);
        Delete(packet, 1, cmd_len);

        try
        id_len_str := Copy(packet, 1, 10);
        id_len := StrToInt(id_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        id := Copy(packet, 1, id_len);
        Delete(packet, 1, id_len);

        SetLength(data, 0);
        try
        data_len_str := Copy(packet, 1, 10);
        data_len := StrToInt(data_len_str);
        except
          on E:Exception do begin end;
        end;
        Delete(packet, 1, 10);
        data := TIdDecoderMIME.DecodeBytes(Copy(packet, 1, data_len));
        orig_data := Copy(packet, 1, data_len);
        Delete(packet, 1, data_len);

        found_ss := nil;
        try
          list := my_list.LockList;
          for i:=0 to list.Count-1 do
          begin
            if Tmy_thread_list(list[i]).ff_id = id then
            begin
              found_ss := Tmy_thread_list(list[i]).ff_connection;
              break;
            end;
          end;
        finally
          my_list.UnlockList;
        end;


        if cmd = 'disconnect' then
        begin
          if found_ss <> nil then
            if found_ss.Connected then
            begin
              found_ss.Disconnect;
              del_ff_from_list(id);
              continue;
            end;
        end;


        if found_ss = nil then
        begin
          t := my_ss_thread.Create(id, cmd, data);
          t.Start;
        end
        else
        begin
          if found_ss <> nil then
            try
            if found_ss.Connected then
              begin
                found_ss.Socket.Write(data);
              end;
            except
              on E:Exception do begin end;
            end;
        end;




      if not tunnel.Connected then
      begin
        Terminate;
        break;
      end;


  end;

end;



function Tmy_tunnel_from_MappedPortTCP.my_connect:boolean;
var t : my_ff_thread;
begin
  Result := True;
    try
      with TIdTcpClient(tunnel) do
      begin
        Host := '192.168.0.157';
        Port := 8099;
        Connect;
      end;
    except
      on E:Exception do
      begin
        tunnel.Disconnect;
        exit;
      end;
    end;
    t := my_ff_thread.Create;
    t.Start;

end;


initialization
  InitializeCriticalSection(CS);
  my_list := TThreadList.Create;
  tunnel := TIdTCPClient.Create(nil);
finalization
  DeleteCriticalSection(CS);


end.

1 ответ

Решение

Попробуйте что-то вроде этого:

type 
  my_ff_thread = class; 
  my_ss_thread = class; 
  Tmy_tunnel_from_MappedPortTCP = class; 

  Tmy_thread_list = class 
  public
    ff_id   : string; 
    ff_connection : TIdTCPConnection; 
    constructor Create(const local_ff_id: string; local_ss_c: TIdTCPConnection); 
  end; 

  Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent) 
  protected 
    procedure InitComponent; override; 
  public 
    function my_connect: boolean; 
    function my_disconnect: boolean; 
  end; 

  my_ff_thread = class(TThread) 
  protected 
    procedure Execute; override; 
  public 
    constructor Create;
  end; 

  my_ss_thread = class(TThread) 
  protected 
    Fff_id : string; 
    Fff_cmd : string; 
    Fff_data : TIdBytes; 
    procedure Execute; override; 
  public 
    constructor Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes); 
  end; 

var
  my_list : TThreadList = nil; 
  CS: TCriticalSection = nil; 
  tunnel: TIdTCPClient = nil; 
  tunnel_thread: my_ff_thread = nil;

implementation 

constructor Tmy_thread_list.Create(const local_ff_id: string; local_ss_c: TIdTCPConnection); 
begin 
  ff_id := local_ff_id; 
  ff_connection := local_ss_c; 
end; 

constructor my_ss_thread.Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes); 
begin 
  inherited Create(False); 
  Fff_id   := ff_id; 
  Fff_cmd  := ff_cmd; 
  Fff_data := Copy(ff_data, 0, Length(ff_data)); 
end; 

procedure my_ss_thread.Execute; 
var
  ss : TIdTCPClient; 
  data : TIdBytes; 
  packet : string;

  procedure WriteStrToStream(strm: TStream; const s: String);
  var
    buf: TIdBytes;
    len: Integer;
  begin
    buf := ToBytes(s, IndyUTF8Encoding);
    len := Length(buf);
    strm.WriteBuffer(len, SizeOf(Integer));
    if bytes <> nil then
      strm.WriteBuffer(buf[0], len);
  end;

  procedure WritePacketToTunnel(const cmd: string; const bytes: TIdBytes = nil);
  var
    strm: TMemoryStream;
  begin
    strm := TMemoryStream.Create;
    try
      WriteStrToStream(strm, cmd);
      WriteStrToStream(strm, Fff_id);
      WriteStrToStream(strm, TIdEncoderMIME.EncodeBytes(bytes));

      CS.Enter;
      try 
        tunnel.IOHandler.Write(strm, 0, True);
      finally
        CS.Leave;
      end;
    finally
      strm.Free;
    end;
  end;

begin 
  ss := TIdTCPClient.Create(nil); 
  try
    ss.Host := '127.0.0.1'; 
    ss.Port := 6666; 
    ss.ReadTimeout := 1000 * 120; 

    ss.Connect; 
    try  
      my_list.Add(Tmy_thread_list.Create(Fff_id, ss)); 

      try 
        ss.IOHandler.Write(Fff_data); 
      except 
        {Fmy_memo.Lines.Add('First data not sent!');}
        raise;
      end; 

      while not Terminated do 
      begin 
        SetLength(data, 0); 
        ss.IOHandler.ReadBytes(data, -1);
        if Length(data) = 0 then
          break;

        WritePacketToTunnel('data_from_ss', data);
      end; 

      WritePacketToTunnel('disconnect');
    finally
      ss.Disconnect;
    end;
  finally
    ss.Free;
  end;
end; 

constructor my_ff_thread.Create; 
begin 
  inherited Create(False); 
end; 

procedure my_ff_thread.Execute; 
var 
  cmd, id : string; 
  data : TIdBytes; 
  i : integer; 
  found_ss : TIdTCPConnection; 
  list : TList; 

  function ReadStrFromStream(strm: TStream): string;
  var
    len: Integer;
  begin
    strm.ReadBuffer(len, SizeOf(Integer));
    if len > 0 then
      Result := IdGlobal.ReadStringFromStream(strm, len, IndyUTF8Encoding)
    else
      Result := '';
  end;

  procedure ReadPacketFromTunnel(var v_cmd, v_id: string; var v_data: TIdBytes);
  var
    strm: TMemoryStream;
  begin
    strm := TMemoryStream.Create;
    try
      tunnel.IOHandler.ReadStream(strm, -1, False);
      strm.Position := 0;
      v_cmd  := ReadStrFromStream(strm);
      v_id   := ReadStrFromStream(strm);
      v_data := TIdDecoderMIME.DecodeBytes(ReadStrFromStream(strm));
    finally
      strm.Free;
    end;
  end;

begin 
  while not Terminated do 
  begin 
    ReadPacketFromTunnel(cmd, id, data); 

    found_ss := nil; 
    list := my_list.LockList; 
    try 
      for i := 0 to list.Count-1 do 
      begin 
        if Tmy_thread_list(list[i]).ff_id = id then 
        begin 
          found_ss := Tmy_thread_list(list[i]).ff_connection; 
          break; 
        end; 
      end; 
    finally 
      my_list.UnlockList; 
    end; 

    if cmd = 'disconnect' then 
    begin 
      if found_ss <> nil then
        found_ss.Disconnect; 
      del_ff_from_list(id); 
      continue; 
    end; 

    if found_ss <> nil then 
    begin 
      try 
        found_ss.IOHandler.Write(data); 
      except 
      end; 
      Continue;
    end;

    my_ss_thread.Create(id, cmd, data);
  end; 
end; 

function Tmy_tunnel_from_MappedPortTCP.my_connect: boolean; 
begin 
  Result := True; 
  try 
    tunnel.Host := '192.168.0.157'; 
    tunnel.Port := 8099; 
    tunnel.Connect; 
    tunnel_thread := my_ff_thread.Create(tunnel); 
  except 
    tunnel.Disconnect; 
    Result := False;
  end; 
end; 

function Tmy_tunnel_from_MappedPortTCP.my_disconnect: boolean; 
begin 
  Result := True; 
  try
    if tunnel_thread <> nil then tunnel_thread.Terminate;
    try
      tunnel.Disconnect; 
    finally
      if tunnel_thread <> nil then
      begin
        tunnel_thread.WaitFor;
        FreeAnNil(tunnel_thread);
      end;
    end;
  except 
    Result := False;
  end; 
end; 

initialization 
  CS := TCriticalSection.Create; 
  my_list := TThreadList.Create; 
  tunnel := TIdTCPClient.Create(nil); 
finalization 
  tunnel.Free; 
  my_list.Free; 
  CS.Free; 

end.
Другие вопросы по тегам