При сканировании удаленной таблицы hbase с использованием Happybase происходит "Ошибка чтения 0 байтов в Tsocket"
Я пытаюсь сканировать удаленную таблицу HBASE, которая содержит более 1 000 000 000 строк. После сканирования, используя отсканированные строки, попробуйте сделать файл CSV, используя в hdfs.
Я пытался решить почти 3 недели, но не могу.
Таким образом я сканирую данные и создаю CSV-файл
источник /host/anaconda3/lib/python3.6/site-packages/thriftpy/transport/socket.py
источник /host/anaconda3/lib/python3.6/site-packages/thriftpy/transport/socket.py
==> Я пробовал протокол Compat, увеличить сетевой буфер памяти TCP, увеличить конфигурацию тайм-аута, установив от 1 до 10000 размер пакета в параметре сканирования и т.д..
Но работает хорошо почти 30 минут, но вдруг происходит ошибка. Почти в 1/50 раза он хорошо заканчивается (работает без ошибок) Пожалуйста, помогите мне. Я пытался найти причину ошибки. Но я не могу получить это.
Кто-нибудь знает, как это решить?
Это мой код
import sys
print ("--sys.version--")
print (sys.version)
from pyhive import hive
import csv
import os
import happybase
import time
import subprocess
import datetime
import chardet
import logging
logging.basicConfig(level=logging.DEBUG)
csv_list=[]
col=[]
def conn_base():
print('conn_base starts')
#SETTING CONNECTION AND CONFIGURATION
conn=happybase.Connection('13.xxx.xxx.xxx',port=9090)
table=conn.table(b'TEMP_TABLE')
#ITERATE DATA AND MAKE CSV FILE PER 100,000 RECORD. AND TAKE A TIME TO SLEEP PER 500000
tmp=[]
print('LET\'S MAKE CSV FILE FROM HBASE')
index=0
st=0
global csv_list
for row_key, data in table.scan():
try:
if (st%1000000==0):
time.sleep(30)
print("COUNT: ",st)
if (st%500000==0):
print("CHANGE CSV _FILE")
index+=1
ta_na='TEMP_TABLE'+str(index)+'_version.csv'
csv_list.append(ta_na)
st+=1
with open('/home/host01/csv_dir/TEMP_TABLE/'+csv_list[index-1] ,'a') as f:
tmp=[]
tmp.append(data[b'CF1:XXXXX'].decode())
tmp.append(data[b'CF1:YYYYY'].decode())
tmp.append(data[b'CF1:DDDDD'].decode())
tmp.append(data[b'CF1:SSSSS'].decode())
tmp.append(data[b'CF1:GGGGG'].decode())
tmp.append(data[b'CF1:HHHHH'].decode())
tmp.append(data[b'CF1:QQQQQ'].decode())
tmp.append(data[b'CF1:WWWWWW'].decode())
tmp.append(data[b'CF1:EEEEE'].decode())
tmp.append(data[b'CF1:RRRRR'].decode())
f.write(",".join(tmp)+'\n')
tmp=[]
except:
pass
#PUT CSV FILES TO HDFS.
st=1
for i in range(len(csv_list)):
try:
st+=1
cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE"+str(csv_list[i])+" /user/hive/warehouse/TEMP_TABLE/"
subprocess.call(cmd,shell=True)
if (st%50==0):
time.sleep(5)
except:
pass
cmd="hdfs dfs -put /home/host01/csv_dir/TEMP_TABLE/*.csv /user/hive/warehouse/TEMP_TABLE/"
subprocess.call(cmd,shell=True)
print("PUT ALL CSV FILES TO HDFS")
conn.close()
1 ответ
Сначала убедитесь, что сервер HBase Thrift запущен и работает. Вы можете запустить Thrift-сервер с помощью следующей команды:
hbase-daemon.sh start thrift [ -p 9090 ]
Если вы хотите указать номер порта, используйте -p. Порт по умолчанию 9090
Вы усложняете его, глядя на приведенный выше код, это всего несколько простых шагов
- Убедитесь, что Hbase Thrift запущен и работает.(Используйте команду, предложенную выше)
- Получить webHdfs можно в файлах настроек HDFS.
- Из пакета hdfs используйте небезопасный клиентский класс (если не аутентифицирован Kerberos) для прямой записи файлов в HDFS(очень просто)