Использование pd.read_sql() для извлечения больших данных (>5 миллионов записей) из базы данных Oracle, что делает выполнение SQL очень медленным

  1. Первоначально пытался использовать pd.read_sql().
  2. Затем я попытался использовать sqlalchemy, объекты запросов, но ни один из этих методов не полезен, так как sql выполняется долго и никогда не заканчивается.
  3. Я пытался использовать подсказки.
  4. Я предполагаю, что проблема заключается в следующем: Pandas создает объект курсора на заднем плане. С помощью cx_Oracle мы не можем влиять на параметр "arraysize", который будет использоваться при этом, то есть всегда будет использоваться значение по умолчанию 100, которое слишком мало.

    CODE:
    
      import pandas as pd
      import Configuration.Settings as CS
      import DataAccess.Databases as SDB
      import sqlalchemy
      import cx_Oracle
    
      dfs = []
    
      DBM = SDB.Database(CS.DB_PRM,PrintDebugMessages=False,ClientInfo="Loader")
    
      sql = '''
                                  WITH            
                                      l AS
                                    (
                                    SELECT DISTINCT /*+ materialize */
                                      hcz.hcz_lwzv_id AS lwzv_id
    
                                    FROM
                                      pm_mbt_materialbasictypes mbt
                                      INNER JOIN pm_mpt_materialproducttypes mpt ON mpt.mpt_mbt_id = mbt.mbt_id
                                      INNER JOIN pm_msl_materialsublots msl ON msl.msl_mpt_id = mpt.mpt_id
                                      INNER JOIN pm_historycompattributes hca ON hca.hca_msl_id = msl.msl_id AND hca.hca_ignoreflag = 0 
                                      INNER JOIN pm_tpm_testdefprogrammodes tpm ON tpm.tpm_id = hca.hca_tpm_id 
                                      inner join pm_tin_testdefinsertions tin on tin.tin_id = tpm.tpm_tin_id             
                                      INNER JOIN pm_hcz_history_comp_zones hcz ON hcz.hcz_hcp_id = hca.hca_hcp_id
                                    WHERE
                                      mbt.mbt_name = :input1 and tin.tin_name = 'x1' and
                                     hca.hca_testendday < '2018-5-31' and hca.hca_testendday > '2018-05-30'                  
                                    ),
                                TPL as 
                                    (
                                    select /*+ materialize */
                                      *
                                      from
                                      (
                                      select
                                        ut.ut_id,
                                        ut.ut_basic_type,
                                        ut.ut_insertion,
                                        ut.ut_testprogram_name,
                                        ut.ut_revision  
                                      from
                                        pm_updated_testprogram ut
                                      where 
                                        ut.ut_basic_type = :input1 and ut.ut_insertion = :input2 
                                      order by
                                        ut.ut_revision desc  
                                      ) where rownum = 1
                                    )
    
                                    SELECT /*+ FIRST_ROWS */
                                      rcl.rcl_lotidentifier                                           AS LOT, 
                                      lwzv.lwzv_wafer_id                                              AS WAFER,
                                      pzd.pzd_zone_name                                               AS ZONE,
                                      tte.tte_tpm_id||'~'||tte.tte_testnumber||'~'||tte.tte_testname  AS Test_Identifier,
                                      case when ppd.ppd_measurement_result > 1e15 then NULL else SFROUND(ppd.ppd_measurement_result,6) END AS Test_Results
    
                                   FROM
                                      TPL 
                                      left JOIN pm_pcm_details pcm on pcm.pcm_ut_id = TPL.ut_id 
                                      left JOIN pm_tin_testdefinsertions tin ON tin.tin_name = TPL.ut_insertion 
                                      left JOIN pm_tpr_testdefprograms tpr ON tpr.tpr_name = TPL.ut_testprogram_name and tpr.tpr_revision = TPL.ut_revision
                                      left JOIN pm_tpm_testdefprogrammodes tpm ON tpm.tpm_tpr_id = tpr.tpr_id and tpm.tpm_tin_id = tin.tin_id 
                                      left JOIN pm_tte_testdeftests tte on tte.tte_tpm_id = tpm.tpm_id and tte.tte_testnumber = pcm.pcm_testnumber 
                                      cross join l 
                                      left JOIN pm_lwzv_info lwzv ON lwzv.lwzv_id = l.lwzv_id 
                                      left JOIN pm_rcl_resultschipidlots rcl ON rcl.rcl_id = lwzv.lwzv_rcl_id                                   
                                      left JOIN pm_pcm_zone_def pzd ON pzd.pzd_basic_type = TPL.ut_basic_type and pzd.pzd_pcm_x = lwzv.lwzv_pcm_x and pzd.pzd_pcm_y = lwzv.lwzv_pcm_y 
                                      left JOIN pm_pcm_par_data ppd ON ppd.ppd_lwzv_id = l.lwzv_id and ppd.ppd_tte_id = tte.tte_id
    
        '''
    #method1: using query objects.
    
    Q = DBM.getQueryObject(sql)
    Q.execute({"input1":'xxxx',"input2":'yyyy'})
    
    while not Q.AtEndOfResultset:
      print Q
    
    #method2: using sqlalchemy
    
     connectstring = "oracle+cx_oracle://username:Password@(description= 
     (address_list=(address=(protocol=tcp)(host=tnsconnect string) 
     (port=pertnumber)))(connect_data=(sid=xxxx)))"
     engine = sqlalchemy.create_engine(connectstring, arraysize=10000)
     df_p = pd.read_sql(sql, params= 
     {"input1":'xxxx',"input2":'yyyy'}, con=engine)
    
    #method3: using pd.read_sql()
    
    df_p = pd.read_sql_query(SQL_PCM, params= 
    {"input1":'xxxx',"input2":'yyyy'}, 
    coerce_float=True, con= DBM.Connection)
    

Было бы здорово, если бы кто-нибудь помог мне в этом. Заранее спасибо.

2 ответа

И еще одна возможность отрегулировать размер массива без необходимости создавать oraaccess.xml, как предложено Крисом. Это может не работать с остальным кодом, как есть, но оно должно дать вам представление о том, как действовать, если вы хотите попробовать этот подход!

class Connection(cx_Oracle.Connection):

    def __init__(self):
        super(Connection, self).__init__("user/pw@dsn")

    def cursor(self):
        c = super(Connection, self).cursor()
        c.arraysize = 5000
        return c

engine = sqlalchemy.create_engine(creator=Connection)
pandas.read_sql(sql, engine)

Вот еще одна альтернатива для экспериментов.

Установите размер предварительной выборки, используя внешнюю конфигурацию, доступную для программ Oracle Call Interface, таких как cx_Oracle. Это переопределяет внутренние настройки, используемые программами OCI. Создайте файл oraaccess.xml:

<?xml version="1.0"?>
 <oraaccess xmlns="http://xmlns.oracle.com/oci/oraaccess"
  xmlns:oci="http://xmlns.oracle.com/oci/oraaccess"
  schemaLocation="http://xmlns.oracle.com/oci/oraaccess
  http://xmlns.oracle.com/oci/oraaccess.xsd">
  <default_parameters>
    <prefetch>
      <rows>1000</rows>
    </prefetch>
  </default_parameters>
</oraaccess>

Если вы используете tnsnames.ora или sqlnet.ora для cx_Oracle, поместите файл oraaccess.xml в тот же каталог. В противном случае создайте новый каталог и задайте для переменной окружения TNS_ADMIN это имя каталога.

cx_Oracle должен использовать библиотеки Oracle Client 12c или более поздней версии.

Экспериментируйте с разными размерами.

См. Параметры развертывания на стороне клиента OCI с использованием oraaccess.xml.

Другие вопросы по тегам