Подписка на Консул Ключ-Значение
Я пытаюсь использовать API-интерфейс Consul Key-Value для передачи конфигурации JSON в приложение, работающее в одном из моих контейнеров. Он прекрасно работает, но API не предоставляет никаких функций pub-sub (просто чтение / запись / удаление) для ключа.
До сих пор мне удавалось искусственно компенсировать, используя функцию Watch для перезапуска контейнера при каждом изменении ключа, но это оставляет меня с необходимостью
- Внешняя настройка Consul (либо загрузите новую конфигурацию, либо используйте CLI, чтобы указать ключ для наблюдения и скрипт оболочки для запуска)
- Перезапустите мое приложение в середине - хороший способ потенциально потерять данные.
Есть ли какой-либо программный способ (кроме опроса каждые x секунд) получать уведомления о смене ключа в Консуле, или я должен просто отказаться от этого решения и перейти на Redis?
2 ответа
После быстрого взгляда на источники консула, кажется, что "часы" - это простая периодическая тяга. Вы можете реализовать нечто подобное в своем коде для достижения этой цели.
Периодический прогон наблюдателя: https://github.com/hashicorp/consul/blob/master/watch/plan.go#L46
Обработчик ключей: https://github.com/hashicorp/consul/blob/master/watch/funcs.go#L29
Как вы заметили, Consul предлагает функцию наблюдения на стороне сервера, однако для этого требуется настройка на стороне сервера и последующий перезапуск. Вы можете реализовать это на стороне клиента, долго опрашивая дерево KV. Обратите внимание, что Consul вернет все дерево, если какой-либо ключ или значение в корне изменится. В следующей открытой проблеме утверждается, что она дает возможность возвращать только определенные элементы поддерева, которые были изменены, что значительно снизит нагрузку и увеличит производительность в случае просмотра пути KV, содержащего множество элементов: https://github .com/hashicorp/consul/issues/6366 — возможно, однажды команда Консула доберется до реализации этого для магазина KV.
При реализации наблюдения Consul KV на стороне клиента важно придерживаться рекомендаций Consul по блокировке вызовов, указанных здесь: https://developer.hashicorp.com/consul/api-docs/features/blocking
Для этого вы можете использовать API-интерфейс KV get с определенным индексом и recurse=True. Вот пример кода Python.
Обратите внимание, что это не реализует функцию ограничения скорости, рекомендованную Consul. Для реализации этого можно легко использовать PerformanceLimiter Anyio: https://anyio.readthedocs.io/en/stable/synchronization.html#capacity-limiters
Приведенный ниже код требует установки пакетов:aiohttp
иpy-consul
import asyncio
import consul.aio
from typing import Tuple
async def poll_consul_kv_path_callback(
client: consul.aio.Consul, path: str, current_index: int
) -> Tuple[bool, int]:
# the index checks and rate limiting functionalities are made following Consul's recommendation here:
# https://developer.hashicorp.com/consul/api-docs/features/blocking
new_index = await _poll_path_by_index(client, path, current_index)
update_occurred = True
if new_index <= 0:
print(f"invalid consul index {new_index} on {client} {path}")
current_index = 0
if new_index < current_index:
# expect to see this due to items in the watched KV path with the highest index being removed.
print(
f"consul index reset on poll of {client} {path} from index {current_index} to {new_index}"
)
current_index = 0
elif new_index == current_index:
print(
f"poll of {client} {path} returned with no change in index {current_index}"
)
update_occurred = False
else:
current_index = new_index
# there's been an update to the watched path
print(f"update to watched path has occurred")
return update_occurred, current_index
async def _poll_path_by_index(client: consul.aio.Consul, path: str, index: int) -> int:
"""
specify an index for the path. If the index matches the max current ModifyIndex within the path, then the below
call to get will wait until the path has been updated, causing the index to be incremented and the API call to
return. This allows users of this API to 'watch' a path.
"""
print(f"polling path {path} with consul client {client} with index {index}")
try:
# the consul api requires trailing '/'. For example, polling 'foo/bar' will actually poll 'foo/'
index_ret, _ = await client.kv.get(
path + "/",
separator="/",
recurse=True,
index=index,
wait="30s",
)
except TimeoutError as ex:
print(f"timed out waiting for poll on {client} {path} at index {index}: {ex}")
except Exception as ex:
print(
f"unexpected error occurred polling {client} on {path} at index {index}: {ex}"
)
else:
index = int(index_ret)
return index
async def main():
consul_host = "your-consul-hostname"
consul_kv_path_to_watch = "kv/path/to/watch"
client = consul.aio.Consul(host=consul_host)
current_index = 1
while True:
# todo must implement a rate limiter here to ensure the frequency of polls to the Consul server are rate limited
# if frequent updates occur
update_occurred, current_index = await poll_consul_kv_path_callback(
client, consul_kv_path_to_watch, current_index
)
if update_occurred:
print("do your work on a consul update here")
if __name__ == "__main__":
asyncio.run(main())