Cosign Signature API (json, не SOAP) не проверяет созданную им подпись

Я пытаюсь подписать крошечный буфер, используя Signature API (JSON), в качестве POC для интеграции в мой проект. Я использую сервер учетной записи разработчика CoSign Trial. Подпись создана успешно, но при попытке проверить буфер ответ содержит следующую (не очень полезную) ошибку:

{u'ErrData': {u'Code': -24,
          u'InnerCode': -1878850959,
          u'Message': u'Failed to verify buffer.',
          u'Module': u'VerifyBuffer'},
 u'Success': False}

Я следил за документацией с этой страницы:

http://docs.arx.com/CoSign_APIs/doc_v7.1/Default.htm#doc_7.1/Verify Buffer.htm#_Toc398808255%3FTocPath%3DCoSign%2520Signature%7CAPI%7CSigning%2520and%2520Verifying%7C_____4

и я прилагаю свой код в Python (имя пользователя и пароль стерты)

import urllib2
import hashlib
import xml.dom.minidom
import base64
import pprint
import json


class Client:
    SIGN_URL = 'https://prime.cosigntrial.com:8081/sapiws/SignBuffer'
    VERIFY_URL = 'https://prime.cosigntrial.com:8081/sapiws/VerifyBuffer'
    GET_CERTS_URL = 'https://prime.cosigntrial.com:8081/sapiws/UserCertificatesGet'
    HEADERS = {'Content-Type': 'application/json; charset=utf-8'}

    def __init__(self, username, password):
        self._username = username
        self._password = password

    def signBytes(self, bytes):
        obj = dict(
            Username=self._username, Password=self._password,
            BufferToSign=base64.b64encode(bytes))
        return self._transaction(obj)

    def verifyBytes(self, bytes, signature):
        obj = dict(
            BufferToSign=base64.b64encode(bytes), Signature=signature)
        return self._transaction(obj, url=self.VERIFY_URL)

    def getCertificates(self):
        return self._transaction(dict(Username=self._username, Password=self._password), url=self.GET_CERTS_URL)

    def signUsingHash(self, bytes):
        digest = hashlib.sha512(bytes).digest()
        obj = dict(
            Username=self._username, Password=self._password,
            BufferToSign=base64.b64encode(digest),
            BufferHash=True, HashAlg="Sha512")
        return self._transaction(obj)

    def verifyUsingHash(self, bytes, signature):
        digest = hashlib.sha512(bytes).digest()
        obj = dict(
            BufferToSign=base64.b64encode(digest), Signature=signature,
            BufferHash=True)
        return self._transaction(obj, url=self.VERIFY_URL)

    def _transaction(self, message, url=SIGN_URL):
        print "Y"*100
        pprint.pprint(message)
        request = urllib2.Request(url, json.dumps(message))
        connection = urllib2.urlopen(request)
        try:
            return json.loads(connection.read())
        finally:
            connection.close()


client = Client(username="", password="")
value = "ABCDEFG"
response1 = client.signBytes(value)
print "X"*100
print response1
verified1 = client.verifyBytes(value, response1[u'Data']    [u'Signature'])
print "Z"*100
pprint.pprint(verified1)

Я также попытался использовать аргумент BufferHash и сам вычислил хэш, но возникает та же ошибка.

3 ответа

Спасибо Almog и ALX, ваши ответы помогли. К сожалению, я не могу высказать ваши ответы, у меня недостаточно репутации stackru.

Мне удалось обойти некоторые другие детали, например, создать подпись pkcs#1 и проверить ее. Я выкладываю здесь полностью очищенный исходный код для дальнейшего использования другими. Мой вывод заключается в том, что пример использования исходного кода с использованием REST API действительно помог бы, независимо от языка, поэтому я предлагаю написать один или даже использовать мой, где-нибудь на странице примеров на сайте документации.

В очередной раз благодарим за помощь.

import urllib2
import hashlib
import base64
import json
import string


class Client:
    _HEADERS = {'Content-Type': 'application/json; charset=utf-8'}
    _ALGORITHMS = dict(
        Sha256=lambda x: hashlib.sha256(x).digest(),
        Sha384=lambda x: hashlib.sha385(x).digest(),
        Sha512=lambda x: hashlib.sha512(x).digest())
    _PKCS1_FLAGS = 0x80000
    _ALGORITHM_FLAGS = dict(
        Sha256=0x04000,
        Sha384=0x08000,
        Sha512=0x10000)

    def __init__(self, username, password, hostname="prime.cosigntrial.com"):
        self._username = username
        self._password = password
        self._signURL = 'https://%s:8081/sapiws/SignBuffer' % hostname
        self._verifyURL = 'https://%s:8081/sapiws/VerifyBuffer' % hostname
        self._getCertificatesURL = 'https://%s:8081/sapiws/UserCertificatesGet' % hostname
        self._serverInfoURL = 'https://%s:8081/sapiws/ServerInfoGet' % hostname

    def signBytes(self, bytes, algorithm="Sha512", pkcs1=False):
        assert algorithm in self._ALGORITHMS, "'%s' is not Sha256|384|512" % algorithm
        parameters = dict(
            Username=self._username, Password=self._password,
            BufferToSign=base64.b64encode(bytes), HashAlg=algorithm)
        if pkcs1:
            parameters['Flags'] = self._PKCS1_FLAGS
        result = self._transaction(self._signURL, parameters)
        if not result[u'Success']:
            raise Exception("Sign buffer failed (%s)" % result)
        return result[u'Data'][u'Signature']

    def verifyBytes(self, bytes, signature, pkcs1Certificate=None, pkcs1Algorithm="Sha512"):
        assert self._isBase64(signature), "Signature must be in base64 format"
        parameters = dict(
            BufferToSign=base64.b64encode(bytes), Signature=self._removeCRLF(signature))
        if pkcs1Certificate is not None:
            assert self._isBase64(pkcs1Certificate), "Certificate must be in base 64 format"
            assert pkcs1Algorithm in self._ALGORITHM_FLAGS, "'%s' is no Sha256|384|512" % pkcs1Algorithm
            parameters['Flags'] = self._PKCS1_FLAGS | self._ALGORITHM_FLAGS[pkcs1Algorithm]
            parameters['Certificate'] = self._removeCRLF(pkcs1Certificate)
        result = self._transaction(self._verifyURL, parameters)
        if not result[u'Success']:
            raise Exception("Verify buffer failed (%s)" % result)
        return result[u'Data']

    def signUsingHash(self, bytes, algorithm="Sha512", pkcs1=False):
        assert algorithm in self._ALGORITHMS, "'%s' is not Sha256|384|512" % algorithm
        digest = self._ALGORITHMS[algorithm](bytes)
        parameters = dict(
            Username=self._username, Password=self._password,
            BufferToSign=base64.b64encode(digest),
            BufferHash=True, HashAlg=algorithm)
        if pkcs1:
            parameters['Flags'] = self._PKCS1_FLAGS
        result = self._transaction(self._signURL, parameters)
        if not result[u'Success']:
            raise Exception("Sign buffer failed (%s)" % result)
        return result[u'Data'][u'Signature']

    def verifyUsingHash(self, bytes, signature, algorithm="Sha512", pkcs1Certificate=None):
        assert self._isBase64(signature), "Signature must be in base64 format"
        assert algorithm in self._ALGORITHMS, "'%s' is not Sha256|384|512" % algorithm
        digest = self._ALGORITHMS[algorithm](bytes)
        parameters = dict(
            BufferToSign=base64.b64encode(digest), Signature=self._removeCRLF(signature),
            BufferHash=True)
        if pkcs1Certificate is not None:
            assert self._isBase64(pkcs1Certificate), "Certificate must be in base 64 format"
            parameters['Flags'] = self._PKCS1_FLAGS
            parameters['Certificate'] = self._removeCRLF(pkcs1Certificate)
        result = self._transaction(self._verifyURL, parameters)
        if not result[u'Success']:
            raise Exception("Verify buffer failed (%s)" % result)
        return result[u'Data']

    def getCertificates(self):
        result = self._transaction(
            self._getCertificatesURL,
            dict(Username=self._username, Password=self._password))
        if not result[u'Success']:
            raise Exception("Getting certificates failed (%s)" % result)
        return result[u'Data'][u'Certificates']

    def serverInfo(self):
        result = self._transaction(self._serverInfoURL, {})
        if not result[u'Success']:
            raise Exception("Getting server info failed (%s)" % result)
        return result[u'Data']

    def _transaction(self, url, parameters):
        request = urllib2.Request(url, json.dumps(parameters), self._HEADERS)
        connection = urllib2.urlopen(request)
        try:
            return json.loads(connection.read())
        finally:
            connection.close()

    def _isBase64(self, data):
        valid = string.lowercase + string.uppercase + string.digits + "+/=\r\n"
        return [x for x in data if x not in valid] == []

    def _removeCRLF(self, s):
        return s.replace("\r\n", "")


if __name__ == "__main__":
    import pprint

    USERNAME = ""  # fill this
    PASSWORD = ""  # fill this
    VALUE = "ABCDEFG"

    client = Client(username=USERNAME, password=PASSWORD)

    print "SERVER INFO:"
    pprint.pprint(client.serverInfo())

    print "USER CERTIFICATES:"
    certificates = client.getCertificates()
    pprint.pprint(certificates)
    firstCertificate = certificates[0]['Certificate']

    print "Signing bytes using pkcs#7:"
    signedWith7 = client.signBytes(VALUE)
    print "Signing bytes using pkcs#1:"
    signedWith1 = client.signBytes(VALUE, pkcs1=True)
    print "Signing hash using pkcs#7:"
    signedHashWith7 = client.signUsingHash(VALUE)
    print "Signing hash using pkcs#1:"
    signedHashWith1 = client.signUsingHash(VALUE, pkcs1=True)

    assert signedWith1 == signedHashWith1, \
        "Expected signature from hash to be equals to be identical"

    print "Verify bytes using pkcs#7:"
    result = client.verifyBytes(VALUE, signedWith7)
    assert result[u'IsValid'], "Expected to be valid"
    pprint.pprint(result)
    print "Verify bytes using pkcs#1:"
    result = client.verifyBytes(VALUE, signedWith1, pkcs1Certificate=firstCertificate)
    assert result[u'IsValid'], "Expected to be valid"
    pprint.pprint(result)
    print "Verify hash using pkcs#7:"
    result = client.verifyUsingHash(VALUE, signedWith7)
    assert result[u'IsValid'], "Expected to be valid"
    pprint.pprint(result)
    print "Verify hash, using hash signature, using pkcs#7:"
    result = client.verifyUsingHash(VALUE, signedHashWith7)
    assert result[u'IsValid'], "Expected to be valid"
    pprint.pprint(result)
    print "Verify hash using pkcs#1:"
    result = client.verifyUsingHash(VALUE, signedWith1, pkcs1Certificate=firstCertificate)
    assert result[u'IsValid'], "Expected to be valid"
    pprint.pprint(result)

    print "All tests passed"

Подпись, возвращаемая функцией signBuffer, содержит символы \r\n. Удалите эти символы из подписи перед вызовом verifyBuffer, чтобы устранить проблему.

Все, что вам нужно сделать, чтобы ваш код работал правильно, это:

  1. Передайте свои предопределенные заголовки запроса самому запросу.

    urllib2.Request(url, json.dumps(message), self.HEADERS)
    
  2. Обратите внимание, что возвращенные данные подписи в формате base64 содержат избыточные символы \r\n (это известная проблема, которая будет исправлена ​​в более позднем выпуске). Просто удалите эти символы перед передачей данных в операцию проверки. Например-

    client.verifyBytes(value, response1[u'Data'][u'Signature'].replace("\r\n", ""))
    
Другие вопросы по тегам