Heiken Ashi Использование панды питона

Я определял функцию Heiken Ashi, которая является одним из популярных типов диаграмм в техническом анализе. Я писал на нем функцию с использованием панд, но обнаружил небольшие трудности. Вот так выглядит Хейкен Аши [ХА]

                 Heikin-Ashi Candle Calculations
           HA_Close = (Open + High + Low + Close) / 4
           HA_Open = (previous HA_Open + previous HA_Close) / 2
           HA_Low = minimum of Low, HA_Open, and HA_Close
           HA_High = maximum of High, HA_Open, and HA_Close

               Heikin-Ashi Calculations on First Run
            HA_Close = (Open + High + Low + Close) / 4
                   HA_Open = (Open + Close) / 2
                           HA_Low = Low
                           HA_High = High

На различных веб-сайтах доступно много вещей, использующих for loop и чистый python, но я думаю, что Pandas также может хорошо выполнять свою работу. Это мой прогресс

   def HA(df):

       df['HA_Close']=(df['Open']+ df['High']+ df['Low']+ df['Close'])/4

       ha_o=df['Open']+df['Close']  #Creating a Variable
       #(for 1st row)

       HA_O=df['HA_Open'].shift(1)+df['HA_Close'].shift(1) #Another variable
       #(for subsequent rows)

       df['HA_Open']=[ha_o/2 if df['HA_Open']='nan' else HA_O/2]     
       #(error Part Where am i going wrong?)

       df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)

       df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)

       return df

Может ли кто-нибудь помочь мне с этим, пожалуйста?`Это не работает.... Я пытался на этом-

  import pandas_datareader.data as web 
  import HA
  import pandas as pd
  start='2016-1-1'
  end='2016-10-30'
  DAX=web.DataReader('^GDAXI','yahoo',start,end)

Это новый код, который я написал

    def HA(df):
            df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
...:        ha_o=df['Open']+df['Close']
...:        df['HA_Open']=0.0
...:        HA_O=df['HA_Open'].shift(1)+df['HA_Close'].shift(1)
...:        df['HA_Open']= np.where( df['HA_Open']==np.nan, ha_o/2, HA_O/2 )
...:        df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
...:        df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
...:        return df

Но все же результат HA_Open не был удовлетворительным

13 ответов

Вот самая быстрая, точная и эффективная реализация в соответствии с моими тестами:

def HA(df):
    df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    idx = df.index.name
    df.reset_index(inplace=True)

    for i in range(0, len(df)):
        if i == 0:
            df.set_value(i, 'HA_Open', ((df.get_value(i, 'Open') + df.get_value(i, 'Close')) / 2))
        else:
            df.set_value(i, 'HA_Open', ((df.get_value(i - 1, 'HA_Open') + df.get_value(i - 1, 'HA_Close')) / 2))

    if idx:
        df.set_index(idx, inplace=True)

    df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
    df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
    return df

Вот мой тестовый алгоритм (по сути, я использовал алгоритм, представленный в этом посте, для сравнения результатов по скорости):

import quandl
import time

df = quandl.get("NSE/NIFTY_50", start_date='1997-01-01')

def test_HA():
    print('HA Test')
    start = time.time()
    HA(df)
    end = time.time()
    print('Time taken by set and get value functions for HA {}'.format(end-start))

    start = time.time()
    df['HA_Close_t']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    from collections import namedtuple
    nt = namedtuple('nt', ['Open','Close'])
    previous_row = nt(df.ix[0,'Open'],df.ix[0,'Close'])
    i = 0
    for row in df.itertuples():
        ha_open = (previous_row.Open + previous_row.Close) / 2
        df.ix[i,'HA_Open_t'] = ha_open
        previous_row = nt(ha_open, row.Close)
        i += 1

    df['HA_High_t']=df[['HA_Open_t','HA_Close_t','High']].max(axis=1)
    df['HA_Low_t']=df[['HA_Open_t','HA_Close_t','Low']].min(axis=1)
    end = time.time()
    print('Time taken by ix (iloc, loc) functions for HA {}'.format(end-start))

Вот вывод, который я получил на своем процессоре i7 (обратите внимание, что результаты могут отличаться в зависимости от скорости вашего процессора, но я предполагаю, что результаты будут похожими):

HA Test
Time taken by set and get value functions for HA 0.05005788803100586
Time taken by ix (iloc, loc) functions for HA 0.9360761642456055

Мой опыт работы с Pandas показывает, что функционирует как ix, loc, iloc медленнее по сравнению с set_value а также get_value функции. Кроме того, вычисляя значение для столбца на себя, используя shift Функция дает ошибочные результаты.

К сожалению, set_value() и get_value() устарели. Основываясь на ответе Аркокхара, я смог получить увеличение скорости на 75%, используя следующий метод понимания списка с моими собственными данными OHLC (7000 строк данных). Это быстрее, чем использовать at и iat.

def HA( dataframe ):

    df = dataframe.copy()

    df['HA_Close']=(df.Open + df.High + df.Low + df.Close)/4

    df.reset_index(inplace=True)

    ha_open = [ (df.Open[0] + df.Close[0]) / 2 ]
    [ ha_open.append((ha_open[i] + df.HA_Close.values[i]) / 2) \
    for i in range(0, len(df)-1) ]
    df['HA_Open'] = ha_open

    df.set_index('index', inplace=True)

    df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
    df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)

    return df
def heikenashi(df):
    df['HA_Close'] = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
    df['HA_Open'] = (df['Open'].shift(1) + df['Open'].shift(1)) / 2
    df.iloc[0, df.columns.get_loc("HA_Open")] = (df.iloc[0]['Open'] + df.iloc[0]['Close'])/2
    df['HA_High'] = df[['High', 'Low', 'HA_Open', 'HA_Close']].max(axis=1)
    df['HA_Low'] = df[['High', 'Low', 'HA_Open', 'HA_Close']].min(axis=1)
    df = df.drop(['Open', 'High', 'Low', 'Close'], axis=1)  # remove old columns
    df = df.rename(columns={"HA_Open": "Open", "HA_High": "High", "HA_Low": "Low", "HA_Close": "Close", "Volume": "Volume"})
    df = df[['Open', 'High', 'Low', 'Close', 'Volume']]  # reorder columns
    return df

Я не очень разбираюсь в Python или Pandas, но после некоторого исследования, я думаю, это было бы хорошим решением.

Пожалуйста, не стесняйтесь добавлять любые комментарии. Я очень ценю.

Я использовал namedtuples и itertuples (кажется, самый быстрый, если проходит цикл через DataFrame).

Я надеюсь, что это помогает!

def HA(df):
    df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    nt = namedtuple('nt', ['Open','Close'])
    previous_row = nt(df.ix[0,'Open'],df.ix[0,'Close'])
    i = 0
    for row in df.itertuples():
        ha_open = (previous_row.Open + previous_row.Close) / 2
        df.ix[i,'HA_Open'] = ha_open
        previous_row = nt(ha_open, row.Close)
        i += 1

    df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
    df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
    return df

Прекрасно работающая функция HekinAshi. Я не являюсь первоначальным автором этого кода. Я нашел это на Github (https://github.com/emreturan/heikin-ashi/blob/master/heikin_ashi.py)

      def heikin_ashi(df):
        heikin_ashi_df = pd.DataFrame(index=df.index.values, columns=['open', 'high', 'low', 'close'])
    
    heikin_ashi_df['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
    
    for i in range(len(df)):
        if i == 0:
            heikin_ashi_df.iat[0, 0] = df['open'].iloc[0]
        else:
            heikin_ashi_df.iat[i, 0] = (heikin_ashi_df.iat[i-1, 0] + heikin_ashi_df.iat[i-1, 3]) / 2
        
    heikin_ashi_df['high'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['high']).max(axis=1)
    
    heikin_ashi_df['low'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['low']).min(axis=1)
    
    return heikin_ashi_df

Я скорректировал код, чтобы он работал с Python 3.7.

def HA(df):
    df_HA = df
    df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    #idx = df_HA.index.name
    #df_HA.reset_index(inplace=True)

    for i in range(0, len(df)):
        if i == 0:
            df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
        else:
            df_HA['Open'][i] = ( (df['Open'][i-1] + df['Close'][i-1] )/ 2)


    #if idx:
        #df_HA.set_index(idx, inplace=True)

    df_HA['High']=df[['Open','Close','High']].max(axis=1)
    df_HA['Low']=df[['Open','Close','Low']].min(axis=1)
    return df_HA

Версия Numpy, работающая с Numba

      @jit(nopython=True)
def heiken_ashi_numpy(c_open, c_high, c_low, c_close):
    ha_close = (c_open + c_high + c_low + c_close) / 4
    ha_open = np.empty_like(ha_close)
    ha_open[0] = (c_open[0] + c_close[0]) / 2
    for i in range(1, len(c_close)):
        ha_open[i] = (c_open[i - 1] + c_close[i - 1]) / 2
    ha_high = np.maximum(np.maximum(ha_open, ha_close), c_high)
    ha_low = np.minimum(np.minimum(ha_open, ha_close), c_low)
    return ha_open, ha_high, ha_low, ha_close

Будет быстрее с NumPy.

 def HEIKIN(O, H, L, C, oldO, oldC):
     HA_Close = (O + H + L + C)/4
     HA_Open = (oldO + oldC)/2
     elements = numpy.array([H, L, HA_Open, HA_Close])
     HA_High = elements.max(0)
     HA_Low = elements.min(0)
     out = numpy.array([HA_Close, HA_Open, HA_High, HA_Low])  
     return out

Решение без цикла для DataFrames

Это было самое простое и понятное решение без циклов, которое я смог придумать для фреймов данных .

  • Временно хранить выходные данные Heikin-Ashi в столбцах «o», «h», «l», «c»
  • 'h' на основе вчерашних значений, поэтому мы можем использовать .shift(1)и скопируйте первую запись
  • Замените «Открыть», «Высокий», «Низкий», «Закрыть» на «o», «h», «l», «c».

Питон 3.9.7

      def heikin_ashi(df):
    df = df.copy()
    df['c'] = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
    df['o'] = ((df['Open'] + df['Close']) / 2).shift(1)
    df.iloc[0,-1] = df['o'].iloc[1]
    df['h'] = df[['High', 'o', 'c']].max(axis=1)
    df['l'] = df[['Low', 'o', 'c']].min(axis=1)
    df['Open'], df['High'], df['Low'], df['Close'] = df['o'], df['h'], df['l'], df['c']
    return df.drop(['o', 'h', 'l', 'c'], axis=1)

импортировать pandas_ta как ta # TA-lib
импортировать панды как pd

Использование реализации ta в Pandas было для меня самым простым и быстрым.

      dfHA = df.ta.ha()

Я предполагаю, что это не было доступно на момент задания вопроса.

      def HA(df):
    df_HA = df
    df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4


    for i in range(0, len(df)):
        if i == 0:
            df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
        else:
            df_HA['Open'][i] = ( (df['Open'][i-1] + df['Close'][i-1] )/ 2)


    df_HA['High']=df[['Open','Close','High']].max(axis=1)
    df_HA['Low']=df[['Open','Close','Low']].min(axis=1)
    return df_HA

Этот код работает, но неправильно рассчитывает свечи HA. Оператор Else ищет обычные свечи для открытия и закрытия вместо HA для расчета следующего HA Open. Заменить:

          for i in range(0, len(df)):
    if i == 0:
        df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
    else:
        df_HA['Open'][i] = ( (df_HA['Open'][i-1] + df_HA['Close'][i-1] )/ 2)

Далее HA High и Low. Расчеты не правильные.

          df_HA['High']=df[['Open','Close','High']].max(axis=1)
    df_HA['Low']=df[['Open','Close','Low']].min(axis=1)

Опять сравнивается только с обычными свечами, а не с текущими обычными свечами High, HA Open и HA Close. этот код устраняет проблему:

      def HA_Initialise(df):
    df_HA = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close'])

    df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    for i in range(0, len(df)):
        if i == 0:
            df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
        else:
            test = []
            df_HA['Open'][i] = ( (df_HA['Open'][i-1] + df_HA['Close'][i-1] )/ 2)
            test.append(df['High'][i])
            test.append(df['Low'][i])
            test.append(df_HA['Open'][i])
            test.append(df_HA['Close'][i])

            high = max(test)
            low = min(test)
            df_HA['High'][i] = high
            df_HA['Low'][i] = low

    return df_HA

df — это фрейм данных с обычными данными свечей, а df_HA — это то, что мы строим и изучаем, пока выполняется код для необходимых вычислений.

Предполагая, что у вас есть все в списке списков; где каждая строка имеет: время, открытие, закрытие, высокий, низкий, объем.

              if candles:
            close_values = [sum(row[1:5]) / 4 for row in candles]

            previous_close = close_values[0]
            previous_open = (candles[0][1] + previous_close) / 2

            opens = collections.deque()
            opens.append(previous_open)
            for close_value in close_values[1:]:
                previous_open = (previous_open + previous_close) / 2
                opens.append(previous_open)
                previous_close = close_value

            candles = [[row[0], o, c, max(row[3], o, c), min(row[4], o, c), row[5]] 
for row, o, c in zip(candles, opens, close_values)]

В этом решении используются только генераторы списков и модуль коллекций.

Если вы хотите вернуть кадр данных:

      return pd.DataFrame.from_records(
            data=candles,
            columns=['Time', 'Open', 'Close', 'High', 'Low', 'Volume'],
            index='Time',
            coerce_float=True,
        )

Самое быстрое решение, которое я нашел.

      def HA(df):
    df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4

    idx = df.index.name
    df.reset_index(inplace=True)

    ha_close_values = self.data['HA_Close'].values

    length = len(df)
    ha_open = np.zeros(length, dtype=float)
    ha_open[0] = (df['Open'][0] + df['Close'][0]) / 2

    for i in range(0, length - 1):
        ha_open[i + 1] = (ha_open[i] + ha_close_values[i]) / 2

    df['HA_Open'] = ha_open

    df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
    df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
    return df

Это решение аналогично user11186769 с двумя дополнительными оптимизациями.

Основные оптимизации, которые дали ускорение в 3,5-4 раза, - это эта часть:

      ha_close_values = self.data['HA_Close'].values

length = len(df)
ha_open = np.zeros(length, dtype=float)
ha_open[0] = (df['Open'][0] + df['Close'][0]) / 2

for i in range(0, length - 1):
    ha_open[i + 1] = (ha_open[i] + ha_close_values[i]) / 2

против этого:

      [ha_open.append((ha_open[i] + df.HA_Close.values[i]) / 2) for i in range(0, len(df)-1)]

Первое отличие состоит в том, что в этом ответе на каждой итерации присутствует ненужный и дорогостоящий вызов. Что это: df.HA_Close.values[i]. (Он преобразует серию в массив numpy на каждой итерации.)

Как видите, в своем решении я рассчитал это значение только один раз и сохранил его следующим образом: ha_close_values = self.data['HA_Close'].values, и использовал это значение в цикле for.

Другая оптимизация заключается в использовании массива numpy с фиксированным размером вместо списка Python. Вместо того, чтобы добавлять в этот список на каждой итерации, я просто использовал текущий индекс + 1, чтобы установить значения ha_open.

Другие вопросы по тегам