Как проверить подпись JWT, созданную AWS Cognito в Python 3.6?
Вот мой сценарий
import urllib.request
import json
import time
from jose import jwk, jwt
from jose.utils import base64url_decode
import base64
region = '....'
userpool_id = '.....'
app_client_id = '...'
keys_url = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpool_id)
response = urllib.request.urlopen(keys_url)
keys = json.loads(response.read())['keys']
token = request.headers['Authorization']
print(token)
# get the kid from the headers prior to verification
headers = jwt.get_unverified_headers(request.headers['Authorization'])
kid = headers['kid']
print(kid)
# search for the kid in the downloaded public keys
key_index = -1
for i in range(len(keys)):
if kid == keys[i]['kid']:
key_index = i
break
if key_index == -1:
print('Public key not found in jwks.json')
return False
# construct the public key
public_key = jwk.construct(keys[key_index])
# get the last two sections of the token,
# message and signature (encoded in base64)
message, encoded_signature = str(token).rsplit('.', 1)
# decode the
print('>>encoded signature')
print(encoded_signature)
decoded_signature = base64.b64decode(encoded_signature)
if not public_key.verify(message, decoded_signature):
print('Signature verification failed')
return False
print('Signature successfully verified')
Я всегда заканчиваю тем, что проверка подписи не удалась, несмотря на то, что токен jwt генерируется действительным легитимным пулом Cognito. Я посмотрел на документацию, и она на самом деле не определяет весь процесс проверки.
2 ответа
Я вижу, вы используете Jose, и я использую PyJWT, но это решение может помочь вам. Большая часть кода снизу взята из проекта "api-gateway-authorizer-python". Обратите внимание, что это очень хрупкий код, который просто сломается, если что-то не получится, в итоге я не использовал лямбда-аутентификацию, а выбрал аутентификацию AWS_IAM для своего шлюза API с пулами идентификации, поэтому я никогда не заканчивал его.
В этом примере требуется, чтобы вы установили pyjwt и криптографию с pip в вашу рабочую директорию и загрузили все как файл.zip.
Я рекомендую вам посмотреть это видео, если вы хотите рассмотреть вариант аутентификации AWS_IAM: https://www.youtube.com/watch?v=VZqG7HjT2AQ
У них также есть решение с более сложной реализацией лямбда-авторизатора в github по адресу: https://github.com/awslabs/aws-serverless-auth-reference-app (они показывают ссылку в начале видео), но я не не знаю об их зависимостях.
from __future__ import print_function
from jwt.algorithms import RSAAlgorithm
import re
import jwt
import json
import sys
import urllib
region = 'your-region'
userpoolId = 'your-user-pool-id'
appClientId = 'your-app-client-id'
keysUrl = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpoolId)
def lambda_handler(event, context):
bearerToken = event['authorizationToken']
methodArn = event['methodArn']
print("Client token: " + bearerToken)
print("Method ARN: " + methodArn)
response = urllib.urlopen(keysUrl)
keys = json.loads(response.read())['keys']
jwtToken = bearerToken.split(' ')[-1]
header = jwt.get_unverified_header(jwtToken)
kid = header['kid']
jwkValue = findJwkValue(keys, kid)
publicKey = RSAAlgorithm.from_jwk(json.dumps(jwkValue))
decoded = decodeJwtToken(jwtToken, publicKey)
print('Decoded token: ' + json.dumps(decoded))
principalId = decoded['cognito:username']
methodArn = event['methodArn'].split(':')
apiGatewayArnTmp = methodArn[5].split('/')
awsAccountId = methodArn[4]
policy = AuthPolicy(principalId, awsAccountId)
policy.restApiId = apiGatewayArnTmp[0]
policy.region = methodArn[3]
policy.stage = apiGatewayArnTmp[1]
#policy.denyAllMethods()
policy.allowAllMethods()
# Finally, build the policy
authResponse = policy.build()
# new! -- add additional key-value pairs associated with the authenticated principal
# these are made available by APIGW like so: $context.authorizer.<key>
# additional context is cached
context = {
'key': 'value', # $context.authorizer.key -> value
'number': 1,
'bool': True
}
# context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it
# context['obj'] = {'foo':'bar'} <- also invalid
authResponse['context'] = context
return authResponse
def findJwkValue(keys, kid):
for key in keys:
if key['kid'] == kid:
return key
def decodeJwtToken(token, publicKey):
try:
decoded=jwt.decode(token, publicKey, algorithms=['RS256'], audience=appClientId)
return decoded
except Exception as e:
print(e)
raise
class HttpVerb:
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
PATCH = 'PATCH'
HEAD = 'HEAD'
DELETE = 'DELETE'
OPTIONS = 'OPTIONS'
ALL = '*'
class AuthPolicy(object):
# The AWS account id the policy will be generated for. This is used to create the method ARNs.
awsAccountId = ''
# The principal used for the policy, this should be a unique identifier for the end user.
principalId = ''
# The policy version used for the evaluation. This should always be '2012-10-17'
version = '2012-10-17'
# The regular expression used to validate resource paths for the policy
pathRegex = '^[/.a-zA-Z0-9-\*]+$'
'''Internal lists of allowed and denied methods.
These are lists of objects and each object has 2 properties: A resource
ARN and a nullable conditions statement. The build method processes these
lists and generates the approriate statements for the final policy.
'''
allowMethods = []
denyMethods = []
# The API Gateway API id. By default this is set to '*'
restApiId = '*'
# The region where the API is deployed. By default this is set to '*'
region = '*'
# The name of the stage used in the policy. By default this is set to '*'
stage = '*'
def __init__(self, principal, awsAccountId):
self.awsAccountId = awsAccountId
self.principalId = principal
self.allowMethods = []
self.denyMethods = []
def _addMethod(self, effect, verb, resource, conditions):
'''Adds a method to the internal lists of allowed or denied methods. Each object in
the internal list contains a resource ARN and a condition statement. The condition
statement can be null.'''
if verb != '*' and not hasattr(HttpVerb, verb):
raise NameError('Invalid HTTP verb ' + verb + '. Allowed verbs in HttpVerb class')
resourcePattern = re.compile(self.pathRegex)
if not resourcePattern.match(resource):
raise NameError('Invalid resource path: ' + resource + '. Path should match ' + self.pathRegex)
if resource[:1] == '/':
resource = resource[1:]
resourceArn = 'arn:aws:execute-api:{}:{}:{}/{}/{}/{}'.format(self.region, self.awsAccountId, self.restApiId, self.stage, verb, resource)
if effect.lower() == 'allow':
self.allowMethods.append({
'resourceArn': resourceArn,
'conditions': conditions
})
elif effect.lower() == 'deny':
self.denyMethods.append({
'resourceArn': resourceArn,
'conditions': conditions
})
def _getEmptyStatement(self, effect):
'''Returns an empty statement object prepopulated with the correct action and the
desired effect.'''
statement = {
'Action': 'execute-api:Invoke',
'Effect': effect[:1].upper() + effect[1:].lower(),
'Resource': []
}
return statement
def _getStatementForEffect(self, effect, methods):
'''This function loops over an array of objects containing a resourceArn and
conditions statement and generates the array of statements for the policy.'''
statements = []
if len(methods) > 0:
statement = self._getEmptyStatement(effect)
for curMethod in methods:
if curMethod['conditions'] is None or len(curMethod['conditions']) == 0:
statement['Resource'].append(curMethod['resourceArn'])
else:
conditionalStatement = self._getEmptyStatement(effect)
conditionalStatement['Resource'].append(curMethod['resourceArn'])
conditionalStatement['Condition'] = curMethod['conditions']
statements.append(conditionalStatement)
if statement['Resource']:
statements.append(statement)
return statements
def allowAllMethods(self):
'''Adds a '*' allow to the policy to authorize access to all methods of an API'''
self._addMethod('Allow', HttpVerb.ALL, '*', [])
def denyAllMethods(self):
'''Adds a '*' allow to the policy to deny access to all methods of an API'''
self._addMethod('Deny', HttpVerb.ALL, '*', [])
def allowMethod(self, verb, resource):
'''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods for the policy'''
self._addMethod('Allow', verb, resource, [])
def denyMethod(self, verb, resource):
'''Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods for the policy'''
self._addMethod('Deny', verb, resource, [])
def allowMethodWithConditions(self, verb, resource, conditions):
'''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods and includes a condition for the policy statement. More on AWS policy
conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
self._addMethod('Allow', verb, resource, conditions)
def denyMethodWithConditions(self, verb, resource, conditions):
'''Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods and includes a condition for the policy statement. More on AWS policy
conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
self._addMethod('Deny', verb, resource, conditions)
def build(self):
'''Generates the policy document based on the internal lists of allowed and denied
conditions. This will generate a policy with two main statements for the effect:
one statement for Allow and one statement for Deny.
Methods that includes conditions will have their own statement in the policy.'''
if ((self.allowMethods is None or len(self.allowMethods) == 0) and
(self.denyMethods is None or len(self.denyMethods) == 0)):
raise NameError('No statements defined for the policy')
policy = {
'principalId': self.principalId,
'policyDocument': {
'Version': self.version,
'Statement': []
}
}
policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Allow', self.allowMethods))
policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Deny', self.denyMethods))
return policy
Следующий класс проверяет токены Cognito. Вам необходимо установить jose и pydantic.
Реализация взята из этого репозитория , она содержит более подробную информацию, дополнительные функции, тесты и т. д.
import json
import logging
import os
import time
import urllib.request
from typing import Dict, List
from jose import jwk, jwt
from jose.utils import base64url_decode
from pydantic import BaseModel
class JWK(BaseModel):
"""A JSON Web Key (JWK) model that represents a cryptographic key.
The JWK specification:
https://datatracker.ietf.org/doc/html/rfc7517
"""
alg: str
e: str
kid: str
kty: str
n: str
use: str
class CognitoAuthenticator:
def __init__(self, pool_region: str, pool_id: str, client_id: str) -> None:
self.pool_region = pool_region
self.pool_id = pool_id
self.client_id = client_id
self.issuer = f"https://cognito-idp.{self.pool_region}.amazonaws.com/{self.pool_id}"
self.jwks = self.__get_jwks()
def __get_jwks(self) -> List[JWK]:
"""Returns a list of JSON Web Keys (JWKs) from the issuer. A JWK is a
public key used to verify a JSON Web Token (JWT).
Returns:
List of keys
Raises:
Exception when JWKS endpoint does not contain any keys
"""
file = urllib.request.urlopen(f"{self.issuer}/.well-known/jwks.json")
res = json.loads(file.read().decode("utf-8"))
if not res.get("keys"):
raise Exception("The JWKS endpoint does not contain any keys")
jwks = [JWK(**key) for key in res["keys"]]
return jwks
def verify_token(
self,
token: str,
) -> bool:
"""Verify a JSON Web Token (JWT).
For more details refer to:
https://docs.aws.amazon.com/cognito/latest/developerguide/amazon-cognito-user-pools-using-tokens-verifying-a-jwt.html
Args:
token: The token to verify
Returns:
True if valid, False otherwise
"""
try:
self._is_jwt(token)
self._get_verified_header(token)
self._get_verified_claims(token)
except CognitoError:
return False
return True
def _is_jwt(self, token: str) -> bool:
"""Validate a JSON Web Token (JWT).
A JSON Web Token (JWT) includes three sections: Header, Payload and
Signature. They are base64url encoded and are separated by dot (.)
characters. If JWT token does not conform to this structure, it is
considered invalid.
Args:
token: The token to validate
Returns:
True if valid
Raises:
CognitoError when invalid token
"""
try:
jwt.get_unverified_header(token)
jwt.get_unverified_claims(token)
except jwt.JWTError:
logging.info("Invalid JWT")
raise InvalidJWTError
return True
def _get_verified_header(self, token: str) -> Dict:
"""Verifies the signature of a a JSON Web Token (JWT) and returns its
decoded header.
Args:
token: The token to decode header from
Returns:
A dict representation of the token header
Raises:
CognitoError when unable to verify signature
"""
# extract key ID (kid) from token
headers = jwt.get_unverified_header(token)
kid = headers["kid"]
# find JSON Web Key (JWK) that matches kid from token
key = None
for k in self.jwks:
if k.kid == kid:
# construct a key object from found key data
key = jwk.construct(k.dict())
break
if not key:
logging.info(f"Unable to find a signing key that matches '{kid}'")
raise InvalidKidError
# get message and signature (base64 encoded)
message, encoded_signature = str(token).rsplit(".", 1)
signature = base64url_decode(encoded_signature.encode("utf-8"))
if not key.verify(message.encode("utf8"), signature):
logging.info("Signature verification failed")
raise SignatureError
# signature successfully verified
return headers
def _get_verified_claims(self, token: str) -> Dict:
"""Verifies the claims of a JSON Web Token (JWT) and returns its claims.
Args:
token: The token to decode claims from
Returns:
A dict representation of the token claims
Raises:
CognitoError when unable to verify claims
"""
claims = jwt.get_unverified_claims(token)
# verify expiration time
if claims["exp"] < time.time():
logging.info("Expired token")
raise TokenExpiredError
# verify issuer
if claims["iss"] != self.issuer:
logging.info("Invalid issuer claim")
raise InvalidIssuerError
# verify audience
# note: claims["client_id"] for access token, claims["aud"] otherwise
if claims["client_id"] != self.client_id:
logging.info("Invalid audience claim")
raise InvalidAudienceError
# verify token use
if claims["token_use"] != "access":
logging.info("Invalid token use claim")
raise InvalidTokenUseError
# claims successfully verified
return claims
class CognitoError(Exception):
pass
class InvalidJWTError(CognitoError):
pass
class InvalidKidError(CognitoError):
pass
class SignatureError(CognitoError):
pass
class TokenExpiredError(CognitoError):
pass
class InvalidIssuerError(CognitoError):
pass
class InvalidAudienceError(CognitoError):
pass
class InvalidTokenUseError(CognitoError):
pass
if __name__ == "__main__":
auth = CognitoAuthenticator(
pool_region=os.environ["AWS_COGNITO_REGION"],
pool_id=os.environ["AWS_USER_POOL_ID"],
client_id=os.environ["AWS_USER_POOL_CLIENT_ID"],
)
# note: if you are not using access token, see line 161
access_token = "my_access_token"
print(f"Token verified: {auth.verify_token(access_token)}")