Формат сериализации Джексона JSON от Python
Я пытаюсь создать инструмент, который должен выполнять операции администратора на сервисе Apache Pulsar. По какой-то причине они решили не использовать обычный JSON для параметров, находящихся в теле REST API, вместо этого они, кажется, используют сериализацию JSON Джексона. Инструмент, который я разрабатываю, написан на Python 3.6, и я ищу способы кодировать простые структуры данных в формат сериализации Jackson JSON или даже найти спецификацию для сериализованного формата. Существует ли такая документация или код Python? Поскольку типичные структуры данных, которые необходимо сериализовать, просты как Set<AuthActions>
, с AuthActions
будучи перечислением, было бы целесообразно вручную закодировать такие вещи непосредственно в сериализованный формат, если он известен.
Отредактировано на примере кода:
import asyncio
import aiohttp
import ssl
import os
async def go(loop):
current_dir = os.path.abspath(os.path.dirname(__file__))
sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
os.path.join(current_dir, 'super-key.pem'))
async with aiohttp.ClientSession(loop=loop) as session:
async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
json={'actions': [0, 1]}, ssl=sslcontext) as resp:
print(resp.status)
print(await resp.text())
async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
print(resp.status)
print(await resp.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
1 ответ
Оказывается, это был только недостаток документации в интерфейсе администратора Apache Pulsar. Вот рабочий пример:
import asyncio
import aiohttp
import ssl
import os
async def go(loop):
current_dir = os.path.abspath(os.path.dirname(__file__))
sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
os.path.join(current_dir, 'super-key.pem'))
async with aiohttp.ClientSession(loop=loop) as session:
async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
json=[0, 1], ssl=sslcontext) as resp:
print(resp.status)
print(await resp.text())
async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
print(resp.status)
print(await resp.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()