Как обрабатывать ограничения скорости API с помощью диспетчера контекста?
Я пытаюсь написать менеджер контекста для обработки GitHub
исключение ограничения скорости. По сути, я хочу, чтобы он прослушивал ошибку и, когда она возникает, динамически вытягивал время сброса (все сделано через GitHub
API), и подождите столько времени. В этот момент я бы хотел возобновить программу и делать это столько раз, сколько необходимо для завершения работы.
Вот что у меня так далеко:
@contextlib.contextmanager
def api_rate_manager(api_obj: g3.GitHub):
# Check for the API ratelimit being exhausted. Limited to 5k
# requests per hour.
try:
yield
except GitHubError as e:
if 'rate limit exceeded' in e.msg.lower():
info = g3.rate_limit()['resources']['core']
reset = mu.convert_unix_timestamp(info.get('reset'))
delta = reset - datetime.now()
sleep(
delta.seconds + 1) # Add a second to account for milliseconds
В настоящее время он будет корректно перехватывать ошибку и ждать, но затем просто выйдет из программы (что имеет смысл) вместо того, чтобы вернуться назад, чтобы продолжить. Я знаю, что мог бы поставить проверку в коде, чтобы увидеть, каков был оставшийся лимит, и подождать, пока он достигнет 0, но я хотел попрактиковаться с менеджерами контекста.
Это будет использоваться следующим образом:
with api_rate_manager(gh):
for commit_iter in commit_iters:
handler: gu.EtagHandler = commit_iter.etag_handler
for commit in commit_iter:
if not commit:
continue
commit.refresh()
author_data: dict = commit.commit.author
data = {
'sha': commit.sha,
'author': author_data.get('name'),
'author_email': author_data.get('email'),
'create_date': author_data.get('date'),
'additions': commit.additions,
'deletions': commit.deletions,
'total': commit.total
}
mu.add_etl_fields(data)
writer.writerow(data)
has_data = True
etag: str = commit_iter.get_etag()
if etag:
logger.info(f'Etag for {commit_iter.name}: {etag}')
handler.store_in_db(etag=etag)
1 ответ
Хотя менеджер контекста использует yield
(как в генераторе), он генерирует только один раз. Смотрите документацию contextlib.
В результате, если исключение перехвачено в вашем диспетчере контекста, выполнение возобновляется после yield
и выходит. Вы можете изменить порядок менеджера контекста и вашей основной итерации. Смотрите мой пример ниже.
Диспетчер контекста:
import contextlib
from time import sleep
@contextlib.contextmanager
def api_rate_manager():
try:
yield
except KeyError as e:
print('sleeping')
sleep(3)
Тестовый пример 1:
a = {0:0,1:2,2:4,3:6,5:10}
with api_rate_manager():
for i in range(8):
print(a[i])
Выходы:
0
2
4
6
sleeping
Контрольный пример 2:
for i in range(8):
with api_rate_manager():
print(a[i])
Выходы:
0
2
4
6
sleeping
10
sleeping
sleeping