Пытается разделить буфер загрузки файлов на отдельные потоки
Я пытаюсь загрузить буфер файла в 5 потоков, но кажется, что он искажается.
from numpy import arange
import requests
from threading import Thread
import urllib2
url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length']
splitBy = 5
splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy)
dataLst = []
def bufferSplit(url, idx, splits):
req = urllib2.Request(url, headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
print {'bytes=%d-%d' % (splits[idx], splits[idx+1])}
dataLst.append(urllib2.urlopen(req).read())
for idx in range(splitBy):
dlth = Thread(target=bufferSplit, args=(url, idx, splits))
dlth.start()
print dataLst
with open('page.html', 'w') as fh:
fh.write(''.join(dataLst))
Обновление: так что я работал над этим и получил немного, но прогресс, однако, если я загружаю jpg, он кажется поврежденным;
from numpy import arange
import os
import requests
import threading
import urllib2
# url ='http://s1.fans.ge/mp3/201109/08/John_Legend_So_High_Remix(fans_ge).mp3'
url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
# url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
splitBy = 5
dataLst = []
class ThreadedFetch(threading.Thread):
""" docstring for ThreadedFetch
"""
def __init__(self, url, fileName, splitBy=5):
super(ThreadedFetch, self).__init__()
self.__url = url
self.__spl = splitBy
self.__dataLst = []
self.__fileName = fileName
def run(self):
if not sizeInBytes:
print "Size cannot be determined."
return
splits = arange(self.__spl + 1) * (float(sizeInBytes)/self.__spl)
for idx in range(self.__spl):
req = urllib2.Request(self.__url, headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
self.__dataLst.append(urllib2.urlopen(req).read())
def getFileData(self):
return ''.join(self.__dataLst)
fileName = url.split('/')[-1]
dl = ThreadedFetch(url, fileName)
dl.start()
dl.join()
content = dl.getFileData()
if content:
with open(fileName, 'w') as fh:
fh.write(content)
print "Finished Writing file %s" % fileName
Ниже показано, как изображение после загрузки.
3 ответа
Вот еще одна версия проекта. Отличия:
код потока - это одна маленькая функция
каждый поток загружает чанк, а затем сохраняет его в глобальном словаре
темы начнутся, потом
join()
Эд - они все бегут одновременнокогда все сделано, данные собираются в правильном порядке, а затем записываются на диск
дополнительная печать, чтобы убедиться, что все правильно
Размер выходного файла рассчитывается, для дополнительного сравнения
источник
import os, requests
import threading
import urllib2
import time
URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
def buildRange(value, numsplits):
lst = []
for i in range(numsplits):
if i == 0:
lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
else:
lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
return lst
def main(url=None, splitBy=3):
start_time = time.time()
if not url:
print "Please Enter some url to begin download."
return
fileName = url.split('/')[-1]
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
print "%s bytes to download." % sizeInBytes
if not sizeInBytes:
print "Size cannot be determined."
return
dataDict = {}
# split total num bytes into ranges
ranges = buildRange(int(sizeInBytes), splitBy)
def downloadChunk(idx, irange):
req = urllib2.Request(url)
req.headers['Range'] = 'bytes={}'.format(irange)
dataDict[idx] = urllib2.urlopen(req).read()
# create one downloading thread per chunk
downloaders = [
threading.Thread(
target=downloadChunk,
args=(idx, irange),
)
for idx,irange in enumerate(ranges)
]
# start threads, let run in parallel, wait for all to finish
for th in downloaders:
th.start()
for th in downloaders:
th.join()
print 'done: got {} chunks, total {} bytes'.format(
len(dataDict), sum( (
len(chunk) for chunk in dataDict.values()
) )
)
print "--- %s seconds ---" % str(time.time() - start_time)
if os.path.exists(fileName):
os.remove(fileName)
# reassemble file in correct order
with open(fileName, 'w') as fh:
for _idx,chunk in sorted(dataDict.iteritems()):
fh.write(chunk)
print "Finished Writing file %s" % fileName
print 'file size {} bytes'.format(os.path.getsize(fileName))
if __name__ == '__main__':
main(URL)
выход
102331 bytes to download.
done: got 3 chunks, total 102331 bytes
--- 0.380599021912 seconds ---
Finished Writing file 607800main_kepler1200_1600-1200.jpg
file size 102331 bytes
Вот как я это сделал, если кто-то получил какие-либо предложения по возможному улучшению, пожалуйста.
import os
import requests
import threading
import urllib2
import time
url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
def buildRange(value, numsplits):
lst = []
for i in range(numsplits):
if i == 0:
lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
else:
lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
return lst
class SplitBufferThreads(threading.Thread):
""" Splits the buffer to ny number of threads
thereby, concurrently downloading through
ny number of threads.
"""
def __init__(self, url, byteRange):
super(SplitBufferThreads, self).__init__()
self.__url = url
self.__byteRange = byteRange
self.req = None
def run(self):
self.req = urllib2.Request(self.__url, headers={'Range': 'bytes=%s' % self.__byteRange})
def getFileData(self):
return urllib2.urlopen(self.req).read()
def main(url=None, splitBy=3):
start_time = time.time()
if not url:
print "Please Enter some url to begin download."
return
fileName = url.split('/')[-1]
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
print "%s bytes to download." % sizeInBytes
if not sizeInBytes:
print "Size cannot be determined."
return
dataLst = []
for idx in range(splitBy):
byteRange = buildRange(int(sizeInBytes), splitBy)[idx]
bufTh = SplitBufferThreads(url, byteRange)
bufTh.start()
bufTh.join()
dataLst.append(bufTh.getFileData())
content = ''.join(dataLst)
if dataLst:
if os.path.exists(fileName):
os.remove(fileName)
print "--- %s seconds ---" % str(time.time() - start_time)
with open(fileName, 'w') as fh:
fh.write(content)
print "Finished Writing file %s" % fileName
if __name__ == '__main__':
main(url)
это первый голый код, который я получил, я обнаружил, что если установить bufTh
Поток буфера к Daemon False, затем процесс занимает больше времени для завершения.
Я попросил ChatGPT помочь мне в этом. Я знаю, я обманываю :)
Вот код, который он создал, и он отлично работал с файлом размером 10 ГБ, загрузка которого занимала целую вечность:
import os
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
url = "URL-TO-DOWNLOAD"
num_parts = 10
filename = url.split("/")[-1]
r = requests.head(url)
file_size = int(r.headers['content-length'])
class FilePartDownload:
def __init__(self, start, end, url, part, progress_bar):
self.start = start
self.end = end
self.url = url
self.part = part
self.progress_bar = progress_bar
def download(self):
# Check if part file already exists
if os.path.exists(f"{self.part}_{filename}"):
current_size = os.path.getsize(f"{self.part}_{filename}")
# Adjust start based on what we already downloaded
self.start += current_size
headers = {'Range': f'bytes={self.start}-{self.end}'}
r = requests.get(self.url, headers=headers, stream=True)
# Open the file in append mode
with open(f"{self.part}_{filename}", 'ab') as fp:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
fp.write(chunk)
self.progress_bar.update(len(chunk))
def combine_files(parts, filename):
with open(filename, 'wb') as fp:
for part in parts:
with open(f"{part}_{filename}", 'rb') as fpart:
fp.write(fpart.read())
os.remove(f"{part}_{filename}")
parts = list(range(num_parts))
starts = [file_size//num_parts * i for i in range(num_parts)]
ends = [file_size//num_parts * i - 1 for i in range(1, num_parts)] + [file_size]
progress_bar = tqdm(total=file_size, unit='B', unit_scale=True, desc="Total Progress")
# Create FilePartDownload instances without starting the downloads
downloads = [FilePartDownload(start, end, url, part, progress_bar) for part, start, end in zip(parts, starts, ends)]
# Update the progress bar with the size of already downloaded parts
for download in downloads:
if os.path.exists(f"{download.part}_{filename}"):
progress_bar.update(os.path.getsize(f"{download.part}_{filename}"))
# Start the downloads
with ThreadPoolExecutor() as executor:
for download in downloads:
executor.submit(download.download)
progress_bar.close()
combine_files(parts, filename)