Python Acme V2 - порядок повторного использования / проблема
Я использую python-acme для написания небольшого скрипта, который принимает домен и выполняет одно из двух:
- Используя запрос DNS01, если запрос не прошел, верните запись DNS, которую необходимо добавить в домен.
- Если запрос прошел проверку DNS01, верните информацию о сертификате.
Однако я сталкиваюсь с проблемами, и документация просто ужасна:P Я пытаюсь найти способ "повторно использовать" заказ / вызов. По сути, каждый раз, когда я запускаю скрипт до сих пор, токен проверки для записи DNS изменяется.
print("Validating Challenge...")
response, validation = challenge.response_and_validation(client_acme.net.key)
print("response %s" % response.to_partial_json())
print("validation %s" % validation)
print("-> Validation Domain: %s" % challenge.chall.validation_domain_name(domain))
print("-> Validation Value: %s" % challenge.chall.validation(client_acme.net.key))
# TODO - We are here, gotta actually get info on the DNS challange and attempt to validate.
print("Validation Completed!")
Вещи, которые я пробовал:
- Зарегистрируйте учетную запись с тем же ключом RSA. При регистрации возникают ошибки проверки.
- Бег
new_order
с той же КСО. По-прежнему возвращает другой ключ.
Полный код (во славе прототипа)
import json
import josepy as jose
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from acme import client
from acme import messages
from acme import challenges
from acme import crypto_util
import OpenSSL
from boto.s3.connection import S3Connection
from boto.s3.key import Key
"""
Generates an SSL certificate through Let's Encrypt provided a domain. If the
domain requires a DNS challenge to be passed, that information is passed back
to the user. Otherwise, the SSL certificate is generated and returned back to
the user.
Documentation for generating SSL via this method:
http://www.gilesthomas.com/2018/11/python-code-to-generate-lets-encrypt-certificates/
ACME Documentation:
https://kite.com/python/docs/acme.client.ClientV2.new_order
"""
print('Loading function')
DEBUG = True
DIRECTORY_URL = 'https://acme-staging-v02.api.letsencrypt.org/directory'
KEY_SIZE = 2048
CERT_PKEY_BITS = 2048
USER_AGENT = 'python-acme-example'
EMAIL_ADDRESS = 'REDACTED'
S3_BUCKET = 'REDACTED'
S3_KEY = 'REDACTED'
S3_SECRET = 'REDACTED'
# TODO - Load These From Event
PASSWORD = "swordfish"
SALT = "yourAppName"
def new_csr_comp(domain_name, pkey_pem=None):
"""Create certificate signing request."""
if pkey_pem is None:
# Create private key.
pkey = OpenSSL.crypto.PKey()
pkey.generate_key(OpenSSL.crypto.TYPE_RSA, CERT_PKEY_BITS)
pkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
pkey)
csr_pem = crypto_util.make_csr(pkey_pem, [domain_name])
return pkey_pem, csr_pem
def save_key(pk, filename):
pem = pk.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
return pem
def lambda_handler(event, context):
print("Formatting Domain...")
domain = event['domain']
domain = domain.lower()
print("Formatted! Domain is: %s" % domain)
print("Generating User Key...")
conn = S3Connection(S3_KEY, S3_SECRET)
bucket = conn.get_bucket(S3_BUCKET)
key_name = "%s.key" % domain
existing_account = False
print("-> Looking For Existing Key.")
rsa_key = bucket.get_key(key_name)
if rsa_key is None or DEBUG:
print("-> Key Not Found. Creating New One.")
rsa_key = rsa.generate_private_key(
public_exponent=65537,
key_size=KEY_SIZE,
backend=default_backend()
)
pem = save_key(rsa_key, key_name)
print(key_name)
k = Key(bucket)
k.key = key_name
k.set_contents_from_string(pem)
else:
print("-> Key File Found.")
existing_account = True
rsa_key = rsa_key.get_contents_as_string()
rsa_key = load_pem_private_key(rsa_key, password=None, backend=default_backend())
print(rsa_key)
print("-> Converted File To Usable Format.")
acc_key = jose.JWKRSA(
key=rsa_key
)
print("Generated!")
print("Registering With Let's Encrypt...")
print("-> Connecting to Let's Encrypt on {}".format(DIRECTORY_URL))
net = client.ClientNetwork(acc_key, user_agent=USER_AGENT)
directory = messages.Directory.from_json(net.get(DIRECTORY_URL).json())
client_acme = client.ClientV2(directory, net=net)
print("-> Registering")
email = (EMAIL_ADDRESS)
regr = None
# TODO - Use Existing Account
# if existing_account:
# regr = messages.NewRegistration(key=acc_key.public_key(), only_return_existing=True)
# else:
account_created = messages.NewRegistration.from_data(email=email, terms_of_service_agreed=True)
regr = client_acme.new_account(account_created)
print("Registered!")
print("Creating CSR...")
temp_pkey_pem, temp_csr_pem = new_csr_comp(domain)
key_name = "%s.pkey_pem" % domain
pkey_pem = bucket.get_key(key_name)
if pkey_pem is None:
print("-> Creating New PKEY")
k = Key(bucket)
k.key = key_name
k.set_contents_from_string(temp_pkey_pem)
pkey_pem = temp_pkey_pem
else:
print("-> Using Existing PKEY")
pkey_pem = pkey_pem.get_contents_as_string()
key_name = "%s.csr_pem" % domain
csr_pem = bucket.get_key(key_name)
if csr_pem is None:
print("-> Creating New CSR")
k = Key(bucket)
k.key = key_name
k.set_contents_from_string(temp_csr_pem)
csr_pem = temp_csr_pem
else:
print("-> Using Existing CSR")
csr_pem = csr_pem.get_contents_as_string()
print("Created!")
print("Requesting Challenges...")
orderr = client_acme.new_order(csr_pem)
print("Requested!")
print("Selecting DNS Challenge...")
challenge = None
authz_list = orderr.authorizations
for authz in authz_list:
for i in authz.body.challenges:
if isinstance(i.chall, challenges.DNS01):
challenge = i
else:
print("-> Other challenge found: %s" % i.chall)
if challenge is None:
raise Exception("Could not find a DNS challenge!")
print("Selected!")
print("Validating Challenge...")
response, validation = challenge.response_and_validation(client_acme.net.key)
print("response %s" % response.to_partial_json())
print("validation %s" % validation)
print("-> Validation Domain: %s" % challenge.chall.validation_domain_name(domain))
print("-> Validation Value: %s" % challenge.chall.validation(client_acme.net.key))
# TODO - We are here, gotta actually get info on the DNS challange and attempt to validate.
print("Validation Completed!")
print("Starting Challenge...")
client_acme.answer_challenge(challenge, response)
finalized_orderr = client_acme.poll_and_finalize(orderr)
fullchain_pem = finalized_orderr.fullchain_pem
print("-> PEM: %s" % fullchain_pem)
print("Challenge Completed!")
# TODO - We need to return the DNS challenge if it hasn't been completed yet.
return "done"
#raise Exception('Something went wrong')
1 ответ
Брайант
print("Starting Challenge...")
client_acme.answer_challenge(challenge, response)
finalized_orderr = client_acme.poll_and_finalize(orderr)
fullchain_pem = finalized_orderr.fullchain_pem
print("-> PEM: %s" % fullchain_pem)
print("Challenge Completed!")`enter code here`
Каждый раз, когда вы запускаете этот блок кода, он будет отправлять запрос на сервер LetsEncrypt. Если этот вызов будет успешным, он вернет ваши сертификаты и все будет в порядке, но если вызов недействителен, LetsEncrypt Server изменит ваши авторизацию ключа и значение TXT.
Надеюсь, это поможет.