Python3 Asyncio и почтовый запрос
Используя Python3.7, у меня есть два сценария py. server_execute.py должен принять почтовый запрос, а после его принятия он должен вызвать server_scripts.py, который должен запускать тестовые сценарии асинхронно.
Сценарий server_execute.py будет принимать запрос после публикации, но я не могу заставить его вызвать асинхронную функцию server_scripts.py для выполнения тестов. Я новичок в публикации запросов и асинхронного программирования... Любая помощь очень ценится, спасибо!
server_execute.py
#!/usr/bin/python3
import server_scripts
from aiohttp import web
import json
global e2e
async def handle(request):
response_obj = {'status': 'success'}
return web.Response(text=json.dumps(response_obj), status=200)
#in python command prompt enter the following for the post request:
# import requests
# response = requests.post("http://localhost:8080/e2e?name=222")
# response.json()
async def e2e(request):
try:
e2e = request.query['name']
print("Running e2e tests:", e2e)
server_scripts.runPyTest222("test")
print("Parallel script execution completed.")
response_obj = {'status': 'success', 'message': 'e2e successfully run'}
return web.Response(text=json.dumps(response_obj), status=200)
except Exception as e:
response_obj = {'status': 'failed', 'message': str(e)}
return web.Response(text=json.dumps(response_obj), status=500)
app = web.Application()
app.router.add_get('/', handle)
app.router.add_post('/e2e', e2e)
web.run_app(app)
server_scripts.py
#!/usr/bin/python3
import asyncio
import time
import datetime
from colorama import Fore
from colorama import Style
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
if proc.returncode == 0:
print(f'[stdout]\n{Fore.GREEN}{stdout.decode()}{Style.RESET_ALL}!')
else:
print(f'[stdout]\n{Fore.RED}{stdout.decode()}{Style.RESET_ALL}!')
if stderr:
print(f'[stderr]\n{Fore.RED}{stderr.decode()}{Style.RESET_ALL}!')
async def runPyTest222(filename):
print('started at this time:', time.strftime('%X'))
await asyncio.gather(
run('python3 -m pytest test_222_rlv_overview_005.py'),
run('python3 -m pytest test_222_ltv_overview_002.py'),
)
print('finished at', time.strftime('%X'))
if __name__ == '__main__':
a = datetime.datetime.now()
asyncio.run(runPyTest222("test"))
b = datetime.datetime.now()
print(b - a)