PHP - Многопоточность и пулы
Я использую Pool
объект в PHP pthread
и сделал следующий тестовый скрипт, чтобы увидеть, как пул должен работать. Я подумал, что пул должен делать, чтобы получить заданное количество задач, открыть максимум x
количество работников, и назначить им задачи, и как только работник завершит выполнение задачи, если доступно больше задач, назначьте этому работнику новую задачу.
Приведенный ниже пример и приведенное выше предположение:
class Work extends Threaded {
public $id;
public function __construct($id) {
$this->id = $id;
}
public function run() {
if ($this->id == 0) {
sleep(3);
echo $this->id . " is ready\n";
return;
} else {
echo $this->id . " is ready\n";
return;
}
}
}
$pool = new Pool(2, 'Worker', []);
for ($i=0; $i<4; $i++) $pool->submit(new Work($i));
while ($pool->collect());
$pool->shutdown();
Я ожидал, что этот скрипт выведет следующую информацию:
1 готово
2 готово
3 готово
0 готово
потому что, по сути, есть 2 работника, и из-за sleep
Когда первый рабочий сталкивается, задание 1,2,3 должно быть выполнено вторым рабочим.
Вместо этого я получаю вывод:
1 готово
3 готово
0 готово
2 готово
Понятно, что работник 1 получает задание 0, а задание 2 - на ходу, таким образом, работник 2, после окончания заданий 1 и 3, просто ждет, вместо того, чтобы принять работу 2 у работника 1.
Это ошибка? Или это должно работать так?
Моя версия PHP:
PHP 7.2.14 (cli) (built: Jan 9 2019 22:23:26) ( ZTS MSVC15 (Visual C++ 2017) x64 )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.2.0, Copyright (c) 1998-2018 Zend Technologies
3 ответа
По какой-то причине мой Docker потерял самообладание теперь, когда я обновил Windows до 1809, поэтому публикация не прошла тестирование. (Так что извините, нет выхода, чтобы дать атм)
Модифицированный существующий код, который я использую в проекте с вашим счетчиком + сон.
$pool = new Pool(2);
foreach ([0,1,2,3] as $count) {
$pool->submit(
new class ($count) extends Threaded
{
private $count;
public function __construct(int $count)
{
$this->count= $count;
}
public function run()
{
if ($this->count== 0) {
sleep(3);
echo $this->count . " is ready\n";
} else {
echo $this->count . " is ready\n";
}
}
}
);
}
while ($pool->collect());
$pool->shutdown();
Я использую анонимный класс (new class ($count) extends Threaded
) как submit()
пары.
На сервере это работает отлично, используя экземпляр Docker с PHP ZTS 7.2.13 на Alpine 3.8
Позвольте мне ответить: из того, что я знаю о pthreads в php, пул похож на число обрабатывающих php.exe, которые могут быть запущены одновременно.
Итак, в вашем случае вы определяете два пула с помощью new Pool(2, 'Worker', []);
Итак, давайте сделаем абстрактное объяснение по этому поводу. Есть 2 бассейна, назовите его как PoolA
а также PoolB
,
Цикл от 0 до 3, каждый цикл отправляет задачу в пул.
Есть 4 задачи от 0 до 3, давайте называть их по task0
, task1
, task2
, task3
,
Когда цикл происходит, с моей точки зрения, это должно быть в очереди, как это
PoolA -> submit task0
PoolB -> submit task1
PoolA -> submit task2
PoolB -> submit task3
Но из class Work
это будет задача 0, ... до задачи 3.
Ситуация / Состояние
Вы определяете некоторую логику в run() =>, когда параметр (в данном случае $id из конструктора) равен 0, тогда sleep(3)
,
Из этой ситуации PoolA
это представить task0
который содержит параметр ($id) равен 0, PoolA
будет ждать 3 секунды. PoolA
также представить task2
,
С другой стороны, PoolB
Отправить task1
а также task3
из этой ситуации не нужно ждать 3 секунды.
Так когда while($pool->collect());
выполняется, возможная очередь, которая, скорее всего, произойдет
task1 (PoolB)
task3 (PoolB)
task0 (PoolA) ->>>> PoolA delayed because from task0 needs to sleep for 3 seconds
task2 (PoolA)
Так что я думаю, что это правильно, когда выходы
1 готово
3 готово
0 готово
2 готово
Есть вопросы.
Почему задерживается только PoolA, даже если задерживается PoolA, почему task2 не отправляется в PoolB или почему task1 или task3 не отправляются в PoolA??
Ну, я тоже не понимаю. У меня есть задача, похожая на вашу, после многих экспериментов, я не уверен, что Pool & Threaded
является multi-threading or multiprocessing
,
Эхо отдельных потоков может быть обманчивым. Я часто обнаруживаю, что кажется, будто они исполняют свои обязанности еще до того, как их вызывают. Я бы рекомендовал избегать эха из внутренних потоков, если только вам не важен порядок, поскольку он все еще может быть полезен для проверки конкретных обстоятельств и т. д.
Ниже приведен некоторый код, который должен решить любые вопросы о том, когда код выполняется, поскольку этот код сортирует результаты по фактическому времени их выполнения. (Это также хороший пример того, как получить результаты из пула потоков.)
<?php
class Work extends Threaded {
public $id;
public $data;
private $complete = false;
public function __construct($id) {
$this->id = $id;
}
public function run() {
$temp = array();
if ($this->id == 0) {
echo "<pre>".$this->id . " started (from inside threaded)";
$temp[] = array(microtime(true), $this->id . " started");
sleep(3);
}
echo "<pre>".$this->id . " is ready (from inside threaded)";
$temp[] = array(microtime(true), $this->id . " is ready");
$this->data = (array) $temp; // note: it's important to cast as array, otherwise you will get a volitile
$this->complete = true;
}
public function isDone() {
return $this->complete;
}
}
// we create a custom pool, to pass on our results
class ExamplePool extends Pool {
public $dataAr = array(); // used to return data after we're done
private $numTasks = 0; // counter used to know when we're done
private $numCompleted = 0; // keep track of how many threads finished
/**
* override the submit function from the parent
* to keep track of our jobs
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* used to wait until all workers are done
*/
public function process() {
// Run this loop as long as we have
// jobs in the pool
while ($this->numCompleted < $this->numTasks) {
$this->collect(function (Work $task) {
// If a task was marked as done, collect its results
if ($task->isDone()) {
//this is how you get your completed data back out [accessed by $pool->process()]
$this->dataAr = array_merge($this->dataAr, $task->data);
$this->numCompleted++;
}
return $task->isDone();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return $this->dataAr;
}
}
$pool = new ExamplePool(4);
for($i=0; $i<4; $i++) {
$pool->submit(new Work($i));
}
$retArr = $pool->process();
usort($retArr, 'sortResultsByTime'); // sort the results by time
// echo out the sorted results
echo "<br><br>";
for($i=0;$i<count($retArr);$i++){
echo number_format($retArr[$i][0], 4, ".", "").' '.$retArr[$i][1]."\n";
}
function sortResultsByTime($a, $b) {
return $a[0] > $b[0];
}
?>
Обратите внимание, что приведенный выше код дает мне это:
0 started (from inside threaded)
0 is ready (from inside threaded)
1 is ready (from inside threaded)
2 is ready (from inside threaded)
3 is ready (from inside threaded)
1609458117.8764 0 started
1609458117.8776 1 is ready
1609458117.8789 2 is ready
1609458117.8802 3 is ready
1609458120.8765 0 is ready
И, как и ожидалось, материал, отраженный внутри потоков, кажется странным, однако, если вы сохраните результаты и отсортируете их по времени их выполнения, вы увидите, что он действует так, как ожидалось.