Heiken Ashi Использование панды питона
Я определял функцию Heiken Ashi, которая является одним из популярных типов диаграмм в техническом анализе. Я писал на нем функцию с использованием панд, но обнаружил небольшие трудности. Вот так выглядит Хейкен Аши [ХА]
Heikin-Ashi Candle Calculations
HA_Close = (Open + High + Low + Close) / 4
HA_Open = (previous HA_Open + previous HA_Close) / 2
HA_Low = minimum of Low, HA_Open, and HA_Close
HA_High = maximum of High, HA_Open, and HA_Close
Heikin-Ashi Calculations on First Run
HA_Close = (Open + High + Low + Close) / 4
HA_Open = (Open + Close) / 2
HA_Low = Low
HA_High = High
На различных веб-сайтах доступно много вещей, использующих for loop и чистый python, но я думаю, что Pandas также может хорошо выполнять свою работу. Это мой прогресс
def HA(df):
df['HA_Close']=(df['Open']+ df['High']+ df['Low']+ df['Close'])/4
ha_o=df['Open']+df['Close'] #Creating a Variable
#(for 1st row)
HA_O=df['HA_Open'].shift(1)+df['HA_Close'].shift(1) #Another variable
#(for subsequent rows)
df['HA_Open']=[ha_o/2 if df['HA_Open']='nan' else HA_O/2]
#(error Part Where am i going wrong?)
df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
return df
Может ли кто-нибудь помочь мне с этим, пожалуйста?`Это не работает.... Я пытался на этом-
import pandas_datareader.data as web
import HA
import pandas as pd
start='2016-1-1'
end='2016-10-30'
DAX=web.DataReader('^GDAXI','yahoo',start,end)
Это новый код, который я написал
def HA(df):
df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
...: ha_o=df['Open']+df['Close']
...: df['HA_Open']=0.0
...: HA_O=df['HA_Open'].shift(1)+df['HA_Close'].shift(1)
...: df['HA_Open']= np.where( df['HA_Open']==np.nan, ha_o/2, HA_O/2 )
...: df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
...: df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
...: return df
Но все же результат HA_Open не был удовлетворительным
13 ответов
Вот самая быстрая, точная и эффективная реализация в соответствии с моими тестами:
def HA(df):
df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
idx = df.index.name
df.reset_index(inplace=True)
for i in range(0, len(df)):
if i == 0:
df.set_value(i, 'HA_Open', ((df.get_value(i, 'Open') + df.get_value(i, 'Close')) / 2))
else:
df.set_value(i, 'HA_Open', ((df.get_value(i - 1, 'HA_Open') + df.get_value(i - 1, 'HA_Close')) / 2))
if idx:
df.set_index(idx, inplace=True)
df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
return df
Вот мой тестовый алгоритм (по сути, я использовал алгоритм, представленный в этом посте, для сравнения результатов по скорости):
import quandl
import time
df = quandl.get("NSE/NIFTY_50", start_date='1997-01-01')
def test_HA():
print('HA Test')
start = time.time()
HA(df)
end = time.time()
print('Time taken by set and get value functions for HA {}'.format(end-start))
start = time.time()
df['HA_Close_t']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
from collections import namedtuple
nt = namedtuple('nt', ['Open','Close'])
previous_row = nt(df.ix[0,'Open'],df.ix[0,'Close'])
i = 0
for row in df.itertuples():
ha_open = (previous_row.Open + previous_row.Close) / 2
df.ix[i,'HA_Open_t'] = ha_open
previous_row = nt(ha_open, row.Close)
i += 1
df['HA_High_t']=df[['HA_Open_t','HA_Close_t','High']].max(axis=1)
df['HA_Low_t']=df[['HA_Open_t','HA_Close_t','Low']].min(axis=1)
end = time.time()
print('Time taken by ix (iloc, loc) functions for HA {}'.format(end-start))
Вот вывод, который я получил на своем процессоре i7 (обратите внимание, что результаты могут отличаться в зависимости от скорости вашего процессора, но я предполагаю, что результаты будут похожими):
HA Test
Time taken by set and get value functions for HA 0.05005788803100586
Time taken by ix (iloc, loc) functions for HA 0.9360761642456055
Мой опыт работы с Pandas показывает, что функционирует как ix
, loc
, iloc
медленнее по сравнению с set_value
а также get_value
функции. Кроме того, вычисляя значение для столбца на себя, используя shift
Функция дает ошибочные результаты.
К сожалению, set_value() и get_value() устарели. Основываясь на ответе Аркокхара, я смог получить увеличение скорости на 75%, используя следующий метод понимания списка с моими собственными данными OHLC (7000 строк данных). Это быстрее, чем использовать at и iat.
def HA( dataframe ):
df = dataframe.copy()
df['HA_Close']=(df.Open + df.High + df.Low + df.Close)/4
df.reset_index(inplace=True)
ha_open = [ (df.Open[0] + df.Close[0]) / 2 ]
[ ha_open.append((ha_open[i] + df.HA_Close.values[i]) / 2) \
for i in range(0, len(df)-1) ]
df['HA_Open'] = ha_open
df.set_index('index', inplace=True)
df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
return df
def heikenashi(df):
df['HA_Close'] = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
df['HA_Open'] = (df['Open'].shift(1) + df['Open'].shift(1)) / 2
df.iloc[0, df.columns.get_loc("HA_Open")] = (df.iloc[0]['Open'] + df.iloc[0]['Close'])/2
df['HA_High'] = df[['High', 'Low', 'HA_Open', 'HA_Close']].max(axis=1)
df['HA_Low'] = df[['High', 'Low', 'HA_Open', 'HA_Close']].min(axis=1)
df = df.drop(['Open', 'High', 'Low', 'Close'], axis=1) # remove old columns
df = df.rename(columns={"HA_Open": "Open", "HA_High": "High", "HA_Low": "Low", "HA_Close": "Close", "Volume": "Volume"})
df = df[['Open', 'High', 'Low', 'Close', 'Volume']] # reorder columns
return df
Я не очень разбираюсь в Python или Pandas, но после некоторого исследования, я думаю, это было бы хорошим решением.
Пожалуйста, не стесняйтесь добавлять любые комментарии. Я очень ценю.
Я использовал namedtuples и itertuples (кажется, самый быстрый, если проходит цикл через DataFrame).
Я надеюсь, что это помогает!
def HA(df):
df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
nt = namedtuple('nt', ['Open','Close'])
previous_row = nt(df.ix[0,'Open'],df.ix[0,'Close'])
i = 0
for row in df.itertuples():
ha_open = (previous_row.Open + previous_row.Close) / 2
df.ix[i,'HA_Open'] = ha_open
previous_row = nt(ha_open, row.Close)
i += 1
df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
return df
Прекрасно работающая функция HekinAshi. Я не являюсь первоначальным автором этого кода. Я нашел это на Github (https://github.com/emreturan/heikin-ashi/blob/master/heikin_ashi.py)
def heikin_ashi(df):
heikin_ashi_df = pd.DataFrame(index=df.index.values, columns=['open', 'high', 'low', 'close'])
heikin_ashi_df['close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
for i in range(len(df)):
if i == 0:
heikin_ashi_df.iat[0, 0] = df['open'].iloc[0]
else:
heikin_ashi_df.iat[i, 0] = (heikin_ashi_df.iat[i-1, 0] + heikin_ashi_df.iat[i-1, 3]) / 2
heikin_ashi_df['high'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['high']).max(axis=1)
heikin_ashi_df['low'] = heikin_ashi_df.loc[:, ['open', 'close']].join(df['low']).min(axis=1)
return heikin_ashi_df
Я скорректировал код, чтобы он работал с Python 3.7.
def HA(df):
df_HA = df
df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
#idx = df_HA.index.name
#df_HA.reset_index(inplace=True)
for i in range(0, len(df)):
if i == 0:
df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
else:
df_HA['Open'][i] = ( (df['Open'][i-1] + df['Close'][i-1] )/ 2)
#if idx:
#df_HA.set_index(idx, inplace=True)
df_HA['High']=df[['Open','Close','High']].max(axis=1)
df_HA['Low']=df[['Open','Close','Low']].min(axis=1)
return df_HA
Версия Numpy, работающая с Numba
@jit(nopython=True)
def heiken_ashi_numpy(c_open, c_high, c_low, c_close):
ha_close = (c_open + c_high + c_low + c_close) / 4
ha_open = np.empty_like(ha_close)
ha_open[0] = (c_open[0] + c_close[0]) / 2
for i in range(1, len(c_close)):
ha_open[i] = (c_open[i - 1] + c_close[i - 1]) / 2
ha_high = np.maximum(np.maximum(ha_open, ha_close), c_high)
ha_low = np.minimum(np.minimum(ha_open, ha_close), c_low)
return ha_open, ha_high, ha_low, ha_close
Будет быстрее с NumPy.
def HEIKIN(O, H, L, C, oldO, oldC):
HA_Close = (O + H + L + C)/4
HA_Open = (oldO + oldC)/2
elements = numpy.array([H, L, HA_Open, HA_Close])
HA_High = elements.max(0)
HA_Low = elements.min(0)
out = numpy.array([HA_Close, HA_Open, HA_High, HA_Low])
return out
Решение без цикла для DataFrames
Это было самое простое и понятное решение без циклов, которое я смог придумать для фреймов данных .
- Временно хранить выходные данные Heikin-Ashi в столбцах «o», «h», «l», «c»
- 'h' на основе вчерашних значений, поэтому мы можем использовать
.shift(1)
и скопируйте первую запись - Замените «Открыть», «Высокий», «Низкий», «Закрыть» на «o», «h», «l», «c».
Питон 3.9.7
def heikin_ashi(df):
df = df.copy()
df['c'] = (df['Open'] + df['High'] + df['Low'] + df['Close']) / 4
df['o'] = ((df['Open'] + df['Close']) / 2).shift(1)
df.iloc[0,-1] = df['o'].iloc[1]
df['h'] = df[['High', 'o', 'c']].max(axis=1)
df['l'] = df[['Low', 'o', 'c']].min(axis=1)
df['Open'], df['High'], df['Low'], df['Close'] = df['o'], df['h'], df['l'], df['c']
return df.drop(['o', 'h', 'l', 'c'], axis=1)
импортировать pandas_ta как ta # TA-lib
импортировать панды как pd
Использование реализации ta в Pandas было для меня самым простым и быстрым.
dfHA = df.ta.ha()
Я предполагаю, что это не было доступно на момент задания вопроса.
def HA(df):
df_HA = df
df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
for i in range(0, len(df)):
if i == 0:
df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
else:
df_HA['Open'][i] = ( (df['Open'][i-1] + df['Close'][i-1] )/ 2)
df_HA['High']=df[['Open','Close','High']].max(axis=1)
df_HA['Low']=df[['Open','Close','Low']].min(axis=1)
return df_HA
Этот код работает, но неправильно рассчитывает свечи HA. Оператор Else ищет обычные свечи для открытия и закрытия вместо HA для расчета следующего HA Open. Заменить:
for i in range(0, len(df)):
if i == 0:
df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
else:
df_HA['Open'][i] = ( (df_HA['Open'][i-1] + df_HA['Close'][i-1] )/ 2)
Далее HA High и Low. Расчеты не правильные.
df_HA['High']=df[['Open','Close','High']].max(axis=1)
df_HA['Low']=df[['Open','Close','Low']].min(axis=1)
Опять сравнивается только с обычными свечами, а не с текущими обычными свечами High, HA Open и HA Close. этот код устраняет проблему:
def HA_Initialise(df):
df_HA = pd.DataFrame(columns=['Date', 'Open', 'High', 'Low', 'Close'])
df_HA['Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
for i in range(0, len(df)):
if i == 0:
df_HA['Open'][i]= ( (df['Open'][i] + df['Close'][i] )/ 2)
else:
test = []
df_HA['Open'][i] = ( (df_HA['Open'][i-1] + df_HA['Close'][i-1] )/ 2)
test.append(df['High'][i])
test.append(df['Low'][i])
test.append(df_HA['Open'][i])
test.append(df_HA['Close'][i])
high = max(test)
low = min(test)
df_HA['High'][i] = high
df_HA['Low'][i] = low
return df_HA
df — это фрейм данных с обычными данными свечей, а df_HA — это то, что мы строим и изучаем, пока выполняется код для необходимых вычислений.
Предполагая, что у вас есть все в списке списков; где каждая строка имеет: время, открытие, закрытие, высокий, низкий, объем.
if candles:
close_values = [sum(row[1:5]) / 4 for row in candles]
previous_close = close_values[0]
previous_open = (candles[0][1] + previous_close) / 2
opens = collections.deque()
opens.append(previous_open)
for close_value in close_values[1:]:
previous_open = (previous_open + previous_close) / 2
opens.append(previous_open)
previous_close = close_value
candles = [[row[0], o, c, max(row[3], o, c), min(row[4], o, c), row[5]]
for row, o, c in zip(candles, opens, close_values)]
В этом решении используются только генераторы списков и модуль коллекций.
Если вы хотите вернуть кадр данных:
return pd.DataFrame.from_records(
data=candles,
columns=['Time', 'Open', 'Close', 'High', 'Low', 'Volume'],
index='Time',
coerce_float=True,
)
Самое быстрое решение, которое я нашел.
def HA(df):
df['HA_Close']=(df['Open']+ df['High']+ df['Low']+df['Close'])/4
idx = df.index.name
df.reset_index(inplace=True)
ha_close_values = self.data['HA_Close'].values
length = len(df)
ha_open = np.zeros(length, dtype=float)
ha_open[0] = (df['Open'][0] + df['Close'][0]) / 2
for i in range(0, length - 1):
ha_open[i + 1] = (ha_open[i] + ha_close_values[i]) / 2
df['HA_Open'] = ha_open
df['HA_High']=df[['HA_Open','HA_Close','High']].max(axis=1)
df['HA_Low']=df[['HA_Open','HA_Close','Low']].min(axis=1)
return df
Это решение аналогично user11186769 с двумя дополнительными оптимизациями.
Основные оптимизации, которые дали ускорение в 3,5-4 раза, - это эта часть:
ha_close_values = self.data['HA_Close'].values
length = len(df)
ha_open = np.zeros(length, dtype=float)
ha_open[0] = (df['Open'][0] + df['Close'][0]) / 2
for i in range(0, length - 1):
ha_open[i + 1] = (ha_open[i] + ha_close_values[i]) / 2
против этого:
[ha_open.append((ha_open[i] + df.HA_Close.values[i]) / 2) for i in range(0, len(df)-1)]
Первое отличие состоит в том, что в этом ответе на каждой итерации присутствует ненужный и дорогостоящий вызов. Что это:
df.HA_Close.values[i]
. (Он преобразует серию в массив numpy на каждой итерации.)
Как видите, в своем решении я рассчитал это значение только один раз и сохранил его следующим образом:
ha_close_values = self.data['HA_Close'].values
, и использовал это значение в цикле for.
Другая оптимизация заключается в использовании массива numpy с фиксированным размером вместо списка Python. Вместо того, чтобы добавлять в этот список на каждой итерации, я просто использовал текущий индекс + 1, чтобы установить значения
ha_open
.