Проблемы управления памятью Pypy CFFI
T попытался ускорить мой код Pypy, добавив некоторые функции c. Проблема в том, что использование памяти постоянно увеличивается! Я видел несколько постов на эту тему и попытался сделать простой тест, чтобы проиллюстрировать это. В моем тесте ниже я могу освободить память в соответствии с этим постом ( проблемы управления памятью Python CFFI), но мой код вылетает следующим образом: free(): неверный следующий размер (нормальный). Кажется, я дважды освобождаю память...
Может ли кто-нибудь помочь мне решить мою проблему?
import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()
Create_TypeStructure = """
typedef struct _STRUCT1
{
int Data1;
int Data2;
int Data3;
double Data4;
double Data5;
double Data6;
double Data7;
double Data8;
}STRUCT1, *PSTRUCT1;
"""
Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
{
PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
SetDummyValues(&pStruct1);
if(pStruct1 != NULL) printf("SECOND TEST: ptr is not null/n");
else printf("SECOND TEST: ptr is null/n");
return pStruct1;
}
"""
Set_DummyValues = """
void SetDummyValues( PSTRUCT1 ptr )
{
ptr = NULL;
}
"""
Free_DataStructure = """
void FreeDataStructure(PSTRUCT1 ptr)
{
free(ptr);
}
"""
Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p)
{
int tmp=-1;
int numline = 5999;
while (tmp < numline)
{
tmp++ ;
{
p[tmp].Data1 = 1000000;
p[tmp].Data2 = 1000000;
p[tmp].Data3 = 1000000;
p[tmp].Data4 = 2125585.265;
p[tmp].Data5 = 2125585.265;
p[tmp].Data6 = 2125585.265;
p[tmp].Data7 = 2125585.265;
p[tmp].Data8 = 2125585.265;
}
}
return p;
}
"""
ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('void SetDummyValues(PSTRUCT1 pStruct1);') #declare function
ffibuilder.set_source("c", Create_TypeStructure + Set_DummyValues + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib
def worker_process(channel):
# Import modules
import gc
from time import sleep
import execnet
from c import ffi, lib
# task processor, sits on each CPU
channel.send("ready")
for x in channel:
if x is None: # we can shutdown
break
data_list = {}
for i in range(15000):
# Create a pointer to the data structure and tell the garbage collector how to destroy it
gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
lib.SomeCFunction( gc_c_pDataStructure ) # Populate the data structure
data_list[i]= gc_c_pDataStructure # Store some data
#lib.FreeDataStructure(data_list[i])
for i in range(15000):
lib.FreeDataStructure(data_list[i])
#sleep(15)
channel.send("Ok!")
numberOfTasks = 500 # simulations to launch
workerCount = 1 # CPUs
group = execnet.Group()
for i in range(workerCount): # CPUs
group.makegateway()
# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)
# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")
tasks = range(numberOfTasks) # a list of tasks, here just integers
terminated = 0
while 1:
channel, item = q.get()
if item == "Stop":
terminated += 1
print "Terminated task on channel %s" % channel.gateway.id
if terminated == len(mch):
print "Got all results, Finish!"
break
continue
if item != "ready":
print "%s: Terminated: %s" % (channel.gateway.id, item)
if not tasks:
print "No tasks remain, sending termination request to all"
mch.send_each(None)
tasks = -1
if tasks and tasks != -1:
task = tasks.pop()
channel.send(task)
print "Sent task %r to channel %s" % (task, channel.gateway.id)
group.terminate()
1 ответ
Мое решение: заставить сборщик мусора Pypy работать и убрать ручное освобождение. Также не нужно устанавливать указатель на NULL.
import gc
from cffi import FFI
from time import sleep
import execnet
ffibuilder = FFI()
import __pypy__
Create_TypeStructure = """
typedef struct _STRUCT1
{
int Data1;
int Data2;
int Data3;
double Data4;
double Data5;
double Data6;
double Data7;
double Data8;
}STRUCT1, *PSTRUCT1;
"""
Create_DataStructure = """
PSTRUCT1 CreateDataStructure()
{
PSTRUCT1 pStruct1 = ( PSTRUCT1 ) malloc( sizeof( STRUCT1 )*6000 );
return pStruct1;
}
"""
Free_DataStructure = """
void FreeDataStructure(PSTRUCT1 ptr)
{
free(ptr);
}
"""
Some_Function = """
PSTRUCT1 SomeCFunction(STRUCT1 *p)
{
int tmp=-1;
int numline = 5999;
while (tmp < numline)
{
tmp++ ;
{
p[tmp].Data1 = 1000000;
p[tmp].Data2 = 1000000;
p[tmp].Data3 = 1000000;
p[tmp].Data4 = 2125585.265;
p[tmp].Data5 = 2125585.265;
p[tmp].Data6 = 2125585.265;
p[tmp].Data7 = 2125585.265;
p[tmp].Data8 = 2125585.265;
}
}
return p;
}
"""
ffibuilder.cdef(Create_TypeStructure) #declare strucutres
ffibuilder.cdef('PSTRUCT1 CreateDataStructure();') #declare function
ffibuilder.cdef('void FreeDataStructure(PSTRUCT1 ptr);') #declare function
ffibuilder.cdef('PSTRUCT1 SomeCFunction(PSTRUCT1 ptr);') #declare function
ffibuilder.set_source("c", Create_TypeStructure + Create_DataStructure + Free_DataStructure + Some_Function )
ffibuilder.compile(verbose=True)
from c import ffi, lib
def worker_process(channel):
# Import modules
import gc
from time import sleep
import execnet
from c import ffi, lib
import __pypy__
# task processor, sits on each CPU
channel.send("ready")
for x in channel:
if x is None: # we can shutdown
break
data_list = {}
for i in range(5000):
# Create a pointer to the data structure and tell the garbage collector how to destroy it
gc_c_pDataStructure = ffi.gc( lib.CreateDataStructure(), lib.FreeDataStructure )
__pypy__.add_memory_pressure(6000)
lib.SomeCFunction( gc_c_pDataStructure ) # Populate the data structure
data_list[i]= gc_c_pDataStructure # Store some data
#lib.FreeDataStructure(data_list[i])
#for i in range(5000):
#lib.FreeDataStructure(data_list[i])
#sleep(15)
channel.send("Ok!")
numberOfTasks = 500 # simulations to launch
workerCount = 3 # CPUs
group = execnet.Group()
for i in range(workerCount): # CPUs
group.makegateway()
# execute taskprocessor everywhere
mch = group.remote_exec(worker_process)
# get a queue that gives us results
q = mch.make_receive_queue(endmarker="Stop")
tasks = range(numberOfTasks) # a list of tasks, here just integers
terminated = 0
while 1:
channel, item = q.get()
if item == "Stop":
terminated += 1
print "Terminated task on channel %s" % channel.gateway.id
if terminated == len(mch):
print "Got all results, Finish!"
break
continue
if item != "ready":
print "%s: Terminated: %s" % (channel.gateway.id, item)
if not tasks:
print "No tasks remain, sending termination request to all"
mch.send_each(None)
tasks = -1
if tasks and tasks != -1:
task = tasks.pop()
channel.send(task)
print "Sent task %r to channel %s" % (task, channel.gateway.id)
group.terminate()