Delphi: Многопоточность, Потокобезопасность не работает
Когда данные отправляются в "туннельный" сокет, они иногда объединяются, реализуют критический раздел, но он не работает..
Что я делаю не так?
type
my_ff_thread = class;
my_ss_thread = class;
Tmy_tunnel_from_MappedPortTCP = class;
Tmy_thread_list = class
ff_id : string;
ff_connection : TIdTCPConnection;
constructor Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
end;
Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent)
protected
procedure InitComponent; override;
public
function my_connect:boolean;
end;
my_ff_thread = class(TThread)
protected
procedure Execute; override;
public
constructor Create;
end;
my_ss_thread = class(TThread)
protected
Fff_id : string;
Fff_cmd : string;
Fff_data : TIdBytes;
procedure Execute; override;
public
constructor Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
function prepare_cmd(cmd:string; id:string; data:string):string;
function set_nulls_at_begin(s:string):string;
end;
var my_list : TThreadList;
CS: TRTLCriticalSection;
tunnel: TIdTCPConnection;
Implementation
constructor my_ff_thread.Create;
begin
inherited Create(True);
end;
constructor my_ss_thread.Create(ff_id:string; ff_cmd:string; ff_data:TIdBytes);
begin
inherited Create(True);
Fff_id := ff_id;
Fff_cmd := ff_cmd;
Fff_data := ff_data;
end;
constructor Tmy_thread_list.Create(local_ff_id: string; local_ss_c: TIdTCPConnection);
begin
ff_id := local_ff_id;
ff_connection := local_ss_c;
end;
function my_ss_thread.set_nulls_at_begin(s:string):string;
var len, i : integer;
res : string;
begin
if s='' then
begin
Result := '';
Exit;
end;
res := '';
len := Length(s);
if len < 10 then
for i:=1 to (10 - len) do
begin
res := res + '0';
end;
Result := res + s;
end;
function my_ss_thread.prepare_cmd(cmd:string; id:string; data:string):string;
var
packet : string;
begin
packet := set_nulls_at_begin(IntToStr(Length(cmd))) + cmd;
packet := packet + set_nulls_at_begin(IntToStr(Length(id))) + id;
packet := packet + set_nulls_at_begin(IntToStr(Length(data))) + data;
Result := packet;
end;
function del_ff_from_list(firefox_id:string):boolean;
var i : integer;
begin
Result := True;
try
with my_list.LockList do
begin
for i:=0 to Count-1 do
begin
if Tmy_thread_list(Items[i]).ff_id = firefox_id then
begin
Delete(i);
break;
end;
end;
end;
finally
my_list.UnlockList;
end;
end;
procedure my_ss_thread.Execute;
var ss : TIdTCPClient;
unix_time : integer;
data : TIdBytes;
packet : string;
packet_stream: TStringStream;
begin
ss := TIdTCPClient.Create(nil);
try
with TIdTcpClient(ss) do
begin
Host := '127.0.0.1';
Port := 6666;
ReadTimeout := 1000 * 5;
Connect;
end;
except
on E:Exception do
begin
ss.Disconnect;
exit;
end;
end;
try
my_list.LockList.Add(Tmy_thread_list.Create(Fff_id, ss));
finally
my_list.UnlockList;
end;
try
ss.Socket.Write(Fff_data);
except
on E:Exception do begin {Fmy_memo.Lines.Add('First data not sent!');} end;
end;
unix_time := DateTimeToUnix(NOW);
while True do
begin
ss.Socket.CheckForDataOnSource(5);
if not ss.Socket.InputBufferIsEmpty then
begin
SetLength(data, 0);
ss.Socket.InputBuffer.ExtractToBytes(data);
packet := prepare_cmd('data_from_ss', Fff_id, TIdEncoderMIME.EncodeBytes(data));
packet_stream := TStringStream.Create(packet);
packet_stream.Position := 0;
ss.Socket.InputBuffer.Clear;
unix_time := DateTimeToUnix(NOW);
try
EnterCriticalSection(CS);
tunnel.Socket.Write(packet_stream, -1, True);
LeaveCriticalSection(CS);
except
on E:Exception do
begin
end;
end;
end;
if (DateTimeToUnix(NOW) - unix_time) > 120 then
begin
ss.Disconnect;
break;
end;
if not ss.Connected then
begin
break;
end;
if not tunnel.Connected then
begin
ss.Disconnect;
break;
end;
end;
try
if tunnel.Connected then
begin
EnterCriticalSection(CS);
packet := prepare_cmd('disconnect', Fff_id, 'x');
packet_stream := TStringStream.Create(packet);
packet_stream.Position := 0;
tunnel.Socket.Write(packet_stream, -1, True);
LeaveCriticalSection(CS);
end;
except
on E:Exception do begin end;
end;
Terminate;
end;
procedure my_ff_thread.Execute;
var
t : my_ss_thread;
cmd, id : string;
i : integer;
found_ss : TIdTCPConnection;
list : TList;
packet : string;
cmd_len, id_len, data_len : integer;
data : TIdBytes;
orig_data : string;
packet_stream: TStringStream;
cmd_len_str, id_len_str, data_len_str : string;
begin
packet_stream := TStringStream.Create;
while not Terminated do
begin
packet_stream.Position := 0;
try
tunnel.Socket.ReadStream(packet_stream);
except
on E:Exception do begin end;
end;
packet := packet_stream.DataString;
if packet = '0000' then
continue;
try
cmd_len_str := Copy(packet, 1, 10);
cmd_len := StrToInt(cmd_len_str);
except
on E:Exception do begin end;
end;
Delete(packet, 1, 10);
cmd := Copy(packet, 1, cmd_len);
Delete(packet, 1, cmd_len);
try
id_len_str := Copy(packet, 1, 10);
id_len := StrToInt(id_len_str);
except
on E:Exception do begin end;
end;
Delete(packet, 1, 10);
id := Copy(packet, 1, id_len);
Delete(packet, 1, id_len);
SetLength(data, 0);
try
data_len_str := Copy(packet, 1, 10);
data_len := StrToInt(data_len_str);
except
on E:Exception do begin end;
end;
Delete(packet, 1, 10);
data := TIdDecoderMIME.DecodeBytes(Copy(packet, 1, data_len));
orig_data := Copy(packet, 1, data_len);
Delete(packet, 1, data_len);
found_ss := nil;
try
list := my_list.LockList;
for i:=0 to list.Count-1 do
begin
if Tmy_thread_list(list[i]).ff_id = id then
begin
found_ss := Tmy_thread_list(list[i]).ff_connection;
break;
end;
end;
finally
my_list.UnlockList;
end;
if cmd = 'disconnect' then
begin
if found_ss <> nil then
if found_ss.Connected then
begin
found_ss.Disconnect;
del_ff_from_list(id);
continue;
end;
end;
if found_ss = nil then
begin
t := my_ss_thread.Create(id, cmd, data);
t.Start;
end
else
begin
if found_ss <> nil then
try
if found_ss.Connected then
begin
found_ss.Socket.Write(data);
end;
except
on E:Exception do begin end;
end;
end;
if not tunnel.Connected then
begin
Terminate;
break;
end;
end;
end;
function Tmy_tunnel_from_MappedPortTCP.my_connect:boolean;
var t : my_ff_thread;
begin
Result := True;
try
with TIdTcpClient(tunnel) do
begin
Host := '192.168.0.157';
Port := 8099;
Connect;
end;
except
on E:Exception do
begin
tunnel.Disconnect;
exit;
end;
end;
t := my_ff_thread.Create;
t.Start;
end;
initialization
InitializeCriticalSection(CS);
my_list := TThreadList.Create;
tunnel := TIdTCPClient.Create(nil);
finalization
DeleteCriticalSection(CS);
end.
1 ответ
Решение
Попробуйте что-то вроде этого:
type
my_ff_thread = class;
my_ss_thread = class;
Tmy_tunnel_from_MappedPortTCP = class;
Tmy_thread_list = class
public
ff_id : string;
ff_connection : TIdTCPConnection;
constructor Create(const local_ff_id: string; local_ss_c: TIdTCPConnection);
end;
Tmy_tunnel_from_MappedPortTCP = class(TIdBaseComponent)
protected
procedure InitComponent; override;
public
function my_connect: boolean;
function my_disconnect: boolean;
end;
my_ff_thread = class(TThread)
protected
procedure Execute; override;
public
constructor Create;
end;
my_ss_thread = class(TThread)
protected
Fff_id : string;
Fff_cmd : string;
Fff_data : TIdBytes;
procedure Execute; override;
public
constructor Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes);
end;
var
my_list : TThreadList = nil;
CS: TCriticalSection = nil;
tunnel: TIdTCPClient = nil;
tunnel_thread: my_ff_thread = nil;
implementation
constructor Tmy_thread_list.Create(const local_ff_id: string; local_ss_c: TIdTCPConnection);
begin
ff_id := local_ff_id;
ff_connection := local_ss_c;
end;
constructor my_ss_thread.Create(const ff_id, ff_cmd: string; const ff_data: TIdBytes);
begin
inherited Create(False);
Fff_id := ff_id;
Fff_cmd := ff_cmd;
Fff_data := Copy(ff_data, 0, Length(ff_data));
end;
procedure my_ss_thread.Execute;
var
ss : TIdTCPClient;
data : TIdBytes;
packet : string;
procedure WriteStrToStream(strm: TStream; const s: String);
var
buf: TIdBytes;
len: Integer;
begin
buf := ToBytes(s, IndyUTF8Encoding);
len := Length(buf);
strm.WriteBuffer(len, SizeOf(Integer));
if bytes <> nil then
strm.WriteBuffer(buf[0], len);
end;
procedure WritePacketToTunnel(const cmd: string; const bytes: TIdBytes = nil);
var
strm: TMemoryStream;
begin
strm := TMemoryStream.Create;
try
WriteStrToStream(strm, cmd);
WriteStrToStream(strm, Fff_id);
WriteStrToStream(strm, TIdEncoderMIME.EncodeBytes(bytes));
CS.Enter;
try
tunnel.IOHandler.Write(strm, 0, True);
finally
CS.Leave;
end;
finally
strm.Free;
end;
end;
begin
ss := TIdTCPClient.Create(nil);
try
ss.Host := '127.0.0.1';
ss.Port := 6666;
ss.ReadTimeout := 1000 * 120;
ss.Connect;
try
my_list.Add(Tmy_thread_list.Create(Fff_id, ss));
try
ss.IOHandler.Write(Fff_data);
except
{Fmy_memo.Lines.Add('First data not sent!');}
raise;
end;
while not Terminated do
begin
SetLength(data, 0);
ss.IOHandler.ReadBytes(data, -1);
if Length(data) = 0 then
break;
WritePacketToTunnel('data_from_ss', data);
end;
WritePacketToTunnel('disconnect');
finally
ss.Disconnect;
end;
finally
ss.Free;
end;
end;
constructor my_ff_thread.Create;
begin
inherited Create(False);
end;
procedure my_ff_thread.Execute;
var
cmd, id : string;
data : TIdBytes;
i : integer;
found_ss : TIdTCPConnection;
list : TList;
function ReadStrFromStream(strm: TStream): string;
var
len: Integer;
begin
strm.ReadBuffer(len, SizeOf(Integer));
if len > 0 then
Result := IdGlobal.ReadStringFromStream(strm, len, IndyUTF8Encoding)
else
Result := '';
end;
procedure ReadPacketFromTunnel(var v_cmd, v_id: string; var v_data: TIdBytes);
var
strm: TMemoryStream;
begin
strm := TMemoryStream.Create;
try
tunnel.IOHandler.ReadStream(strm, -1, False);
strm.Position := 0;
v_cmd := ReadStrFromStream(strm);
v_id := ReadStrFromStream(strm);
v_data := TIdDecoderMIME.DecodeBytes(ReadStrFromStream(strm));
finally
strm.Free;
end;
end;
begin
while not Terminated do
begin
ReadPacketFromTunnel(cmd, id, data);
found_ss := nil;
list := my_list.LockList;
try
for i := 0 to list.Count-1 do
begin
if Tmy_thread_list(list[i]).ff_id = id then
begin
found_ss := Tmy_thread_list(list[i]).ff_connection;
break;
end;
end;
finally
my_list.UnlockList;
end;
if cmd = 'disconnect' then
begin
if found_ss <> nil then
found_ss.Disconnect;
del_ff_from_list(id);
continue;
end;
if found_ss <> nil then
begin
try
found_ss.IOHandler.Write(data);
except
end;
Continue;
end;
my_ss_thread.Create(id, cmd, data);
end;
end;
function Tmy_tunnel_from_MappedPortTCP.my_connect: boolean;
begin
Result := True;
try
tunnel.Host := '192.168.0.157';
tunnel.Port := 8099;
tunnel.Connect;
tunnel_thread := my_ff_thread.Create(tunnel);
except
tunnel.Disconnect;
Result := False;
end;
end;
function Tmy_tunnel_from_MappedPortTCP.my_disconnect: boolean;
begin
Result := True;
try
if tunnel_thread <> nil then tunnel_thread.Terminate;
try
tunnel.Disconnect;
finally
if tunnel_thread <> nil then
begin
tunnel_thread.WaitFor;
FreeAnNil(tunnel_thread);
end;
end;
except
Result := False;
end;
end;
initialization
CS := TCriticalSection.Create;
my_list := TThreadList.Create;
tunnel := TIdTCPClient.Create(nil);
finalization
tunnel.Free;
my_list.Free;
CS.Free;
end.