Формат сериализации Джексона JSON от Python

Я пытаюсь создать инструмент, который должен выполнять операции администратора на сервисе Apache Pulsar. По какой-то причине они решили не использовать обычный JSON для параметров, находящихся в теле REST API, вместо этого они, кажется, используют сериализацию JSON Джексона. Инструмент, который я разрабатываю, написан на Python 3.6, и я ищу способы кодировать простые структуры данных в формат сериализации Jackson JSON или даже найти спецификацию для сериализованного формата. Существует ли такая документация или код Python? Поскольку типичные структуры данных, которые необходимо сериализовать, просты как Set<AuthActions>, с AuthActions будучи перечислением, было бы целесообразно вручную закодировать такие вещи непосредственно в сериализованный формат, если он известен.

Отредактировано на примере кода:

import asyncio
import aiohttp
import ssl
import os

async def go(loop):
    current_dir = os.path.abspath(os.path.dirname(__file__))
    sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
    sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
                               os.path.join(current_dir, 'super-key.pem'))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
                                 json={'actions': [0, 1]}, ssl=sslcontext) as resp:
             print(resp.status)
             print(await resp.text())
        async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
            print(resp.status)
            print(await resp.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()

1 ответ

Оказывается, это был только недостаток документации в интерфейсе администратора Apache Pulsar. Вот рабочий пример:

import asyncio
import aiohttp
import ssl
import os

async def go(loop):
    current_dir = os.path.abspath(os.path.dirname(__file__))
    sslcontext = ssl.create_default_context(cafile=os.path.join(current_dir, 'cacert.pem'))
    sslcontext.load_cert_chain(os.path.join(current_dir, 'super-cert.pem'),
                               os.path.join(current_dir, 'super-key.pem'))
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.post('https://localhost:8081/admin/namespaces/sample/standalone/ns1/permissions/testrole',
                                 json=[0, 1], ssl=sslcontext) as resp:
             print(resp.status)
             print(await resp.text())
        async with session.get('https://localhost:8081/admin/persistent/sample/standalone/ns1', ssl=sslcontext) as resp:
            print(resp.status)
            print(await resp.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
Другие вопросы по тегам