Асинхронное вычисление блоков массива dask (Dask + FastAPI)
Я создаю приложение FastAPI, которое будет обслуживать фрагменты массива Dask. Я хотел бы использовать асинхронную функциональность FastAPI вместе с возможностью Dask-distribution работать асинхронно. Ниже приведен mcve, который демонстрирует, что я пытаюсь сделать как на серверной, так и на клиентской стороне приложения:
На стороне сервера:
import time
import dask.array as da
import numpy as np
import uvicorn
from dask.distributed import Client
from fastapi import FastAPI
app = FastAPI()
# create a dask array that we can serve
data = da.from_array(np.arange(0, 1e6, dtype=np.int), chunks=100)
async def _get_block(block_id):
"""return one block of the dask array as a list"""
block_data = data.blocks[block_id].compute()
return block_data.tolist()
@app.get("/")
async def get_root():
time.sleep(1)
return {"Hello": "World"}
@app.get("/{block_id}")
async def get_block(block_id: int):
time.sleep(1) # so we can test concurrency
my_list = await _get_block(block_id)
return {"block": my_list}
if __name__ == "__main__":
client = Client(n_workers=2)
print(client)
print(client.cluster.dashboard_link)
uvicorn.run(app, host="0.0.0.0", port=9000, log_level="debug")
Сторона клиента
import dask
import requests
from dask.distributed import Client
client = Client()
responses = [
dask.delayed(requests.get, pure=False)(f"http://127.0.0.1:9000/{i}") for i in range(10)
]
dask.compute(responses)
В этой настройке compute()
вызывать _get_block
является "блокирующим" и одновременно вычисляется только один фрагмент. Я пробовал различные комбинацииClient(asynchronous=True)
а также client.compute(dask.compute(responses)
) без каких-либо улучшений. Это возможноawait
вычисление массива dask?
1 ответ
Эта линия
block_data = data.blocks[block_id].compute()
это блокирующий звонок. Если бы вы вместо этого сделалиclient.compute(data.blocks[block_id])
, вы получите ожидаемое будущее, которое можно использовать вместе с IOLoop, если Dask использует тот же цикл.
Обратите внимание, что сервер приема очень хотел бы работать таким образом (он тоже стремится передавать данные по кускам для массивов и других типов данных).