Как использовать сигнальный протокол для шифрования / дешифрования?
Я занимался реверс-инжинирингом сети WhatsApp. Я создал собственный клиент на Python для подключения к WhatsApp с помощью websocket, создания QR-кода и сохранения учетных данных для входа в файл. Когда я попытался использовать его, я понял, что сквозное шифрование WhatsApp реально. Я использовал этот модуль Python для генерации двух ключей протокола сигналов:
#!/usr/bin/python3
import websockets as WSocket
import asyncio, json, requests
import base64, random
import qrcode
from io import BytesIO
from tkinter import *
import signal_protocol
wsurl = "wss://web.whatsapp.com/ws"
async def whatsapp(uri):
async with WSocket.connect(uri, extra_headers={"Origin": "https://web.whatsapp.com"}) as ws:
keys = signal_protocol.curve.KeyPair.generate()
pubkey = base64.b64encode(keys.public_key().serialize()[1:]).decode()
# ^ I removed the first byte, bc it is every time \x05
# (it is the key type)
browserid = "8TDLL9cktZvZvMiicMrubw=="
await ws.send('1633259071.--1,["admin","init",[2,2138,13],["Python","-","x86_64"],"{}",true]'.format(browserid))
data = await ws.recv()
print(data)
lst = data.split(',')
idk = lst[0]
del lst[0]
parsed = json.loads(','.join(lst))
auth = parsed["ref"]
qrtext = '{},{},{}'.format(auth, pubkey, browserid)
print(qrtext)
#await ws.send('1633276651.--1,["admin","Conn","reref"]') #retry
code = qrcode.make(qrtext) #.show()
root = Tk()
w, h = root.winfo_screenwidth(), root.winfo_screenheight()
m = min(w, h) // 2
root.resizable(False, False)
root.geometry(f'{m}x{m}+{w//2-m//2}+{h//2-m//2}')
buffer = BytesIO()
code.resize((m, m)).save(buffer, format="png")
buffer.seek(0)
img = PhotoImage(data=buffer.read())
lable = Button(root, image=img).place(x=0, y=0)
root.attributes('-topmost', True)
root.overrideredirect(True)
root.update()
res = await ws.recv()
print(res)
with open("token", "wt") as o:
o.write(res)
o.write('\n')
o.write(browserid)
o.write(',')
o.write(base64.b64encode(keys.private_key().serialize()).decode())
o.close()
root.destroy()
if __name__ == '__main__':
asyncio.run(whatsapp(wsurl))
Моя проблема в том, как я могу расшифровать шифрованный текст, который отправляет мне обратно в WhatsApp. Вот мой клиент:
#!/usr/bin/python3
import websockets as WSocket
import asyncio, json, requests
import base64, random, hmac, hashlib
import signal_protocol
wsurl = "wss://web.whatsapp.com/ws"
def parse(token):
lines = token.strip().split('\n')
chunks = lines[0].strip().split(',')
del chunks[0]
jdata = json.loads(','.join(chunks))
print(jdata[0])
plus = lines[1].strip().split(',')
jdata[1]["browserID"] = plus[0]
jdata[1]["key"] = signal_protocol.curve.PrivateKey.deserialize(base64.b64decode(plus[1]))
return jdata[1]
def processdata(data):
print(data)
async def whatsapp(uri, token):
async with WSocket.connect(uri, extra_headers={"Origin": "https://web.whatsapp.com"}) as ws:
print("Logging in...")
await ws.send('1633521509.--0,["admin","init",[2,2138,13],["Python","-","x86_64"],"{}",true]'.format(token["browserID"]))
print(await ws.recv())
await ws.send('1633521518.--1,["admin","login","{}","{}","{}","takeover"]'.format(token["clientToken"],
token["serverToken"],
token["browserID"])
)
data = await ws.recv()
print(data)
chunks = data.split(',')
del chunks[0]
jdata = json.loads(','.join(chunks))
if type(jdata) == list and jdata[0] == "Cmd" and jdata[1]["type"] == "challenge":
print("Solve challange...")
await ws.send('1633521518.--1,["admin","login","{}","{}","{}","takeover"]'.format(token["clientToken"],
token["serverToken"],
token["browserID"])
)
#print(jdata)
resolve = base64.b64encode(hmac.new(token["key"].serialize(), base64.b64decode(jdata[1]["challenge"]), hashlib.sha256).digest())
ready = '1633521544.--2,["admin","challenge","{}","{}","{}"]'.format(resolve.decode(), token["serverToken"], token["browserID"])
print(ready)
await ws.send(ready)
elif type(jdata) == list and jdata[0] == "Conn" or jdata["status"] == 200:
print("OK")
else:
raise ValueError("Invalid token")
processdata(data)
# 1633415961.--2,["admin","challenge","nJOSG/8l6ee33hVSyqhWwGTlEnGDxiSXwGGK9+zAB+w=","1@LV3dAbOqdpPZPnitEw1xI67X7fBTFHUnINQ8ZUdzpmUPWm0pKkJIN/sgxRSog1jqIt+f3AfePp5k1A==","8TDLL9cktZvZvMiicMrubw=="]
# 1633415954.--1,["admin","login","u5a7azVDJzsr7Vpmpkga4sf5LbSpRW7jqdx6/j9GBGs=","1@LV3dAbOqdpPZPnitEw1xI67X7fBTFHUnINQ8ZUdzpmUPWm0pKkJIN/sgxRSog1jqIt+f3AfePp5k1A==","8TDLL9cktZvZvMiicMrubw==","takeover"]
while True:
data = await ws.recv()
processdata(data)
if __name__ == '__main__':
with open("token", "rt") as o:
token = o.read()
o.close()
asyncio.run(whatsapp(wsurl, parse(token)))
1 ответ
У меня есть проект с открытым исходным кодом, который делает именно это, но теперь, к сожалению, WhatsApp принудительно обновился, что сильно изменилось, и больше не поддерживает это.
Вот проект: реверс-инжиниринг WhatsApp Web
РЕДАКТИРОВАТЬ:
Так что я ошибся с сигнальным протоколом, потому что WhatsApp использовала curve25519.
При получении _login (источник ) сгенерируйте закрытый ключ и получите открытый ключ из закрытого ключа:
self.loginInfo["privateKey"] = curve25519.Private();
self.loginInfo["publicKey"] = self.loginInfo["privateKey"].get_public();
Когда получен Conn (источник ), получите «секрет» из полученного объекта json и сгенерируйте общий секрет из полученного секрета и вашего закрытого ключа:
self.connInfo["secret"] = base64.b64decode(jsonObj[1]["secret"]);
self.connInfo["sharedSecret"] = self.loginInfo["privateKey"].get_shared_key(curve25519.Public(self.connInfo["secret"][:32]), lambda a: a);
sse = self.connInfo["sharedSecretExpanded"] = HKDF(self.connInfo["sharedSecret"], 80);
ГдеHKDF
есть (источник ):
def HmacSha256(key, sign):
return hmac.new(key, sign, hashlib.sha256).digest();
def HKDF(key, length, appInfo=""): # implements RFC 5869, some parts from https://github.com/MirkoDziadzka/pyhkdf
key = HmacSha256("\0"*32, key);
keyStream = "";
keyBlock = "";
blockIndex = 1;
while len(keyStream) < length:
keyBlock = hmac.new(key, msg=keyBlock+appInfo+chr(blockIndex), digestmod=hashlib.sha256).digest();
blockIndex += 1;
keyStream += keyBlock;
return keyStream[:length];
Затем используйте это, чтобы получить еще 2 ключа (источник ):
keysEncrypted = sse[64:] + self.connInfo["secret"][64:];
keysDecrypted = AESDecrypt(sse[:32], keysEncrypted);
self.loginInfo["key"]["encKey"] = keysDecrypted[:32];
self.loginInfo["key"]["macKey"] = keysDecrypted[32:64];
ГдеAESDecrypt
есть (источник ):
def AESUnpad(s):
return s[:-ord(s[len(s)-1:])];
def AESDecrypt(key, ciphertext): # from https://stackoverflow.com/a/20868265
iv = ciphertext[:AES.block_size];
cipher = AES.new(key, AES.MODE_CBC, iv);
plaintext = cipher.decrypt(ciphertext[AES.block_size:]);
return AESUnpad(plaintext);
Когда получено зашифрованное сообщение (источник ), используйтеself.loginInfo["key"]["encKey"]
чтобы расшифровать его:
decryptedMessage = AESDecrypt(self.loginInfo["key"]["encKey"], messageContent[32:]);