Python Google Drive API - список всего дерева файлов диска
Я создаю приложение на python, которое использует API Google Drive, поэтому разработка хороша, но у меня проблема с получением полного дерева файлов Google, мне нужно это для двух целей:
- Проверьте, существует ли путь, поэтому, если я хочу загрузить test.txt в корневой каталог /folder1/folder2, я хочу проверить, существует ли файл, и в случае обновить его
- Создайте визуальный файловый менеджер, теперь я знаю, что Google предоставляет его (я не могу вспомнить имя, но я знаю, что он существует), но я хочу ограничить файловый менеджер определенными папками.
На данный момент у меня есть функция, которая извлекает корень из Gdrive, и я могу построить три, рекурсивно вызывая функцию, которая выводит мне содержимое одной папки, но она чрезвычайно медленная и потенциально может сделать тысячи запросов в Google, и это неприемлемый.
Вот функция получения рута:
def drive_get_root():
"""Retrieve a root list of File resources.
Returns:
List of dictionaries.
"""
#build the service, the driveHelper module will take care of authentication and credential storage
drive_service = build('drive', 'v2', driveHelper.buildHttp())
# the result will be a list
result = []
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
files = drive_service.files().list(**param).execute()
#add the files in the list
result.extend(files['items'])
page_token = files.get('nextPageToken')
if not page_token:
break
except errors.HttpError, _error:
print 'An error occurred: %s' % _error
break
return result
а вот тот, чтобы получить файл из папки
def drive_files_in_folder(folder_id):
"""Print files belonging to a folder.
Args:
folder_id: ID of the folder to get files from.
"""
#build the service, the driveHelper module will take care of authentication and credential storage
drive_service = build('drive', 'v2', driveHelper.buildHttp())
# the result will be a list
result = []
#code from google, is working so I didn't touch it
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
children = drive_service.children().list(folderId=folder_id, **param).execute()
for child in children.get('items', []):
result.append(drive_get_file(child['id']))
page_token = children.get('nextPageToken')
if not page_token:
break
except errors.HttpError, _error:
print 'An error occurred: %s' % _error
break
return result
и, например, теперь, чтобы проверить, существует ли файл, я использую это:
def drive_path_exist(file_path, list = False):
"""
This is a recursive function to che check if the given path exist
"""
#if the list param is empty set the list as the root of Gdrive
if list == False:
list = drive_get_root()
#split the string to get the first item and check if is in the root
file_path = string.split(file_path, "/")
#if there is only one element in the filepath we are at the actual filename
#so if is in this folder we can return it
if len(file_path) == 1:
exist = False
for elem in list:
if elem["title"] == file_path[0]:
#set exist = to the elem because the elem is a dictionary with all the file info
exist = elem
return exist
#if we are not at the last element we have to keep searching
else:
exist = False
for elem in list:
#check if the current item is in the folder
if elem["title"] == file_path[0]:
exist = True
folder_id = elem["id"]
#delete the first element and keep searching
file_path.pop(0)
if exist:
#recursive call, we have to rejoin the filpath as string an passing as list the list
#from the drive_file_exist function
return drive_path_exist("/".join(file_path), drive_files_in_folder(folder_id))
есть идеи как решить мою проблему? Я видел несколько обсуждений здесь о переполнении, и в некоторых ответах люди писали, что это возможно, но, конечно, не сказал, как!
Спасибо
5 ответов
Перестаньте думать о Драйве как о древовидной структуре. Это не так. "Папки" - это просто ярлыки, например. файл может иметь несколько родителей.
Чтобы построить представление дерева в вашем приложении, вам нужно сделать это...
- Запустите запрос списка дисков, чтобы получить все папки
- Выполните итерацию массива результатов и изучите свойство parent, чтобы построить иерархию в памяти
- Выполните второй запрос списка дисков, чтобы получить все не папки (например, файлы)
- Для каждого возвращенного файла поместите его в дерево в памяти
Если вы просто хотите проверить, существует ли файл A в папке B, подход зависит от того, гарантированно ли имя "папка B" будет уникальным.
Если он уникален, просто выполните запрос FilesList для title='file-A', затем выполните Get Files для каждого из его родителей и посмотрите, не называются ли какие-либо из них папкой-B.
Если папка "B" может существовать в папках "C" и "Папке D", то это более сложно, и вам нужно будет построить иерархию в памяти из шагов 1 и 2 выше.
Вы не говорите, создаются ли эти файлы и папки вашим приложением или пользователем с помощью Google Drive Webapp. Если ваше приложение является создателем этих файлов / папок, существует хитрость, с помощью которой вы можете ограничить поиск одним корнем. Скажи у тебя
MyDrive/app_root/folder-C/folder-B/file-A
вы можете сделать все папки-C, папка-B и файл-A потомками app_root
Таким образом, вы можете ограничить все ваши запросы, чтобы включить
and 'app_root_id' in parents
Никогда не будет работать так, за исключением очень маленьких деревьев. Вы должны переосмыслить весь свой алгоритм для облачного приложения (вы написали его как настольное приложение, где у вас есть машина), так как время его простоя истекает. Необходимо заранее отразить дерево (задачи и хранилище данных) не только для того, чтобы избежать тайм-аутов, но и для того, чтобы избежать ограничений скорости диска и как-то синхронизировать его (зарегистрируйтесь для отправки и т. Д.). Нелегко. Я сделал просмотрщик дерева дисков раньше.
Простой способ проверить, существует ли файл по определенному пути, это: drive_service.files(). List(q="THE_ID_OF_SPECIFIC_PATH" в родителях и title= "файл" "). Execute()
Чтобы пройти все папки и файлы:
import sys, os
import socket
import googleDriveAccess
import logging
logging.basicConfig()
FOLDER_TYPE = 'application/vnd.google-apps.folder'
def getlist(ds, q, **kwargs):
result = None
npt = ''
while not npt is None:
if npt != '': kwargs['pageToken'] = npt
entries = ds.files().list(q=q, **kwargs).execute()
if result is None: result = entries
else: result['items'] += entries['items']
npt = entries.get('nextPageToken')
return result
def uenc(u):
if isinstance(u, unicode): return u.encode('utf-8')
else: return u
def walk(ds, folderId, folderName, outf, depth):
spc = ' ' * depth
outf.write('%s+%s\n%s %s\n' % (spc, uenc(folderId), spc, uenc(folderName)))
q = "'%s' in parents and mimeType='%s'" % (folderId, FOLDER_TYPE)
entries = getlist(ds, q, **{'maxResults': 200})
for folder in entries['items']:
walk(ds, folder['id'], folder['title'], outf, depth + 1)
q = "'%s' in parents and mimeType!='%s'" % (folderId, FOLDER_TYPE)
entries = getlist(ds, q, **{'maxResults': 200})
for f in entries['items']:
outf.write('%s -%s\n%s %s\n' % (spc, uenc(f['id']), spc, uenc(f['title'])))
def main(basedir):
da = googleDriveAccess.DAClient(basedir) # clientId=None, script=False
f = open(os.path.join(basedir, 'hierarchy.txt'), 'wb')
walk(da.drive_service, 'root', u'root', f, 0)
f.close()
if __name__ == '__main__':
logging.getLogger().setLevel(getattr(logging, 'INFO'))
try:
main(os.path.dirname(__file__))
except (socket.gaierror, ), e:
sys.stderr.write('socket.gaierror')
использование googleDriveAccess github.com/HatsuneMiku/googleDriveAccess
Недавно я столкнулся с этой проблемой, потому что мне нужно было проверить наличие множества файлов на Google Диске внутри указанной папки и ее дочерних элементов.
Я создал несколько классов, чтобы справиться с этим:
driveServiceFetcher: класс, который обрабатывает запросы от Google, такие как получение ВСЕХ папок на Диске или получение всех файлов в указанных папках. Ничто не сильно отличается от того, что у вас здесь (и на основе некоторых решений, которые я прочитал в этой теме)
from fileinput import filename
import os.path
from typing import final
from urllib import response
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pyparsing import opAssoc
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
class driveServiceFetcher:
def __init__(self) -> None:
self._credentials = self.getDriveCredentials()
self.service = build('drive', 'v3', credentials=self._credentials)
def getDriveCredentials(self):
"""Shows basic usage of the Drive v3 API.
Prints the names and ids of the first 10 files the user has access to.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def get_all_folders_in_drive(self):
driveFolders = []
try:
page_token = None
max_allowed_page_size = 1000
foldersQuery = "trashed = false and mimeType = 'application/vnd.google-apps.folder'"
while True:
results = self.service.files().list(
pageSize=max_allowed_page_size,
fields="nextPageToken, files(id, name, parents)",
includeItemsFromAllDrives=False, supportsAllDrives=False,
corpora='user',
##driveId=DRIVE_ID,
pageToken=page_token,
q=foldersQuery).execute()
folders = results.get('files', [])
page_token = results.get('nextPageToken', None)
for folder in folders:
driveFolders.append(folder)
if page_token is None:
break
except:
pass
finally:
return driveFolders
def get_all_files_in_folders(self, parentFoldersIds):
"""
Return a dictionary of file IDs mapped to file names for the specified parent folders.
"""
files = []
page_token = None
max_allowed_page_size = 1000
parentsQuery = buildAllParentsQuery(parentFoldersIds)
filesQuery = f"mimeType != 'application/vnd.google-apps.folder' and trashed = false and ({parentsQuery})"
while True:
results = self.service.files().list(
pageSize=max_allowed_page_size,
fields="nextPageToken, files(id, name, mimeType, parents)",
includeItemsFromAllDrives=False, supportsAllDrives=False,
# corpora='drive',
# driveId=DRIVE_ID,
pageToken=page_token,
q=filesQuery).execute()
fetchedFiles = results.get('files', [])
page_token = results.get('nextPageToken', None)
for fetchedFile in fetchedFiles:
files.append(fetchedFile)
if page_token is None:
break
return files
def buildAllParentsQuery(parentIds):
return ' in parents or '.join('"{0}"'.format(f) for f in parentIds) + ' in parents'
if __name__ == '__main__':
pass
driveExplorer : строит иерархию GoogleDrive, поэтому мы можем получить все папки один раз, а затем нам не нужно постоянно запрашивать. Я использую это, чтобы получить все дочерние узлы рекурсивно для любой папки, которую я хочу, таким образом, я могу получить доступ к их идентификаторам и использовать их в моем serviceFetcher.get_all_files_in_folders()
import driveNode
class driveExplorer:
def __init__(self, rootId, folderNodes:list[driveNode.driveNode]):
##ConstructNodes
self.rootId = rootId
self._flatHierarchy = folderNodes
self._createTree()
self._assignRootNode()
if self.rootNode == None:
raise DriveExplorerException
self._optimizeFlattenedHierarchy()
def _createTree(self):
for n in self._flatHierarchy:
parent = self.getNodeWithId(n.parentId)
if parent != None:
parent.children.append(n)
n.parent = parent
def _assignRootNode(self):
self.rootNode = self.getNodeWithId(self.rootId)
def _optimizeFlattenedHierarchy(self):
print(f"Files in flat hierarchy PRE optimization: {len(self._flatHierarchy)}")
rootChildren = []
rootChildren.append(self.rootNode)
rootChildren.extend(self.getAllChildrenNodes(self.rootNode))
self._flatHierarchy = rootChildren
print(f"Files in flat hierarchy POST optimization: {len(self._flatHierarchy)}")
def getAllChildrenNodes(self, node:driveNode.driveNode):
nodes = []
for n in node.children:
nodes.append(n)
nodes.extend(self.getAllChildrenNodes(n))
return nodes
def getNodeWithId(self, nodeId):
for n in self._flatHierarchy:
if n.id == nodeId:
return n
return None
##TODO: Delete? Unused
def getNodesWithParent(self, parentId):
nodes = list[driveNode.driveNode]
for n in self._flatHierarchy:
if n.parentId == parentId:
nodes.append(n)
return nodes
def getNodesWithName(self, nodeName):
nodes = []
for n in self._flatHierarchy:
if n.name == nodeName:
nodes.append(n)
return nodes
def getDirectory(self, node:driveNode.driveNode):
directory = [node]
currentNode = node
while currentNode.id != self.rootId:
if currentNode.parent == None:
break
directory.insert(0, currentNode.parent)
currentNode = currentNode.parent
return directory
class DriveExplorerException(Exception):
pass
def folderListToNodeList(folders):
nodes = []
for folder in folders:
parents = folder.get('parents', None)
if parents is not None:
parent = parents[0]
else:
parent = None
newNode = driveNode.driveNode(folder["name"],
folder["id"],
parent)
nodes.append(newNode)
return nodes
if __name__ == '__main__':
pass
driveNode : класс с информацией о файле Google Диска: имя, идентификатор, parentId, массив дочерних элементов (узлов) и родитель (узел)
class driveNode:
def __init__(self, name, id, parentId) -> None:
self.name = name
self.id = id
self.parentId = parentId
self.children = []
self.parent = None
if __name__ == '__main__':
pass
Таким образом, я могу искать файлы (рекурсивно), делая что-то вроде этого:
##ROOT_NODE_ID, the drive explorer requires a folder to be the "parent node"
driveFetcher = driveServiceFetcher()
allFolders = driveFetcher.get_all_folders_in_drive()
explorer = driveExplorer(ROOT_NODE_ID, folderListToNodeList(allFolders))
...
nodesToCrawl = explorer.getAllChildrenNodes(folder)
idsToCrawl = []
for n in nodesToCrawl:
idsToCrawl.append(n.id)
files = driveFetcher.get_all_files_in_folders(idsToCrawl)
Я согласен с @pinoyyid - Google диск не является типичной древовидной структурой.
НО, для печати структуры папок я бы все же подумал об использовании библиотеки визуализации дерева (например, treelib).
Ниже приведено полное решение для рекурсивной печати файловой системы Google Диска.
from treelib import Node, Tree
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)
### Helper functions ###
def get_children(root_folder_id):
str = "\'" + root_folder_id + "\'" + " in parents and trashed=false"
file_list = drive.ListFile({'q': str}).GetList()
return file_list
def get_folder_id(root_folder_id, root_folder_title):
file_list = get_children(root_folder_id)
for file in file_list:
if(file['title'] == root_folder_title):
return file['id']
def add_children_to_tree(tree, file_list, parent_id):
for file in file_list:
tree.create_node(file['title'], file['id'], parent=parent_id)
print('parent: %s, title: %s, id: %s' % (parent_id, file['title'], file['id']))
### Recursion over all children ###
def populate_tree_recursively(tree,parent_id):
children = get_children(parent_id)
add_children_to_tree(tree, children, parent_id)
if(len(children) > 0):
for child in children:
populate_tree_recursively(tree, child['id'])
### Create tree and start populating from root ###
def main():
root_folder_title = "your-root-folder"
root_folder_id = get_folder_id("root", root_folder_title)
tree = Tree()
tree.create_node(root_folder_title, root_folder_id)
populate_tree_recursively(tree, root_folder_id)
tree.show()
if __name__ == "__main__":
main()