PHP - Многопоточность и пулы

Я использую Pool объект в PHP pthreadи сделал следующий тестовый скрипт, чтобы увидеть, как пул должен работать. Я подумал, что пул должен делать, чтобы получить заданное количество задач, открыть максимум x количество работников, и назначить им задачи, и как только работник завершит выполнение задачи, если доступно больше задач, назначьте этому работнику новую задачу.

Приведенный ниже пример и приведенное выше предположение:

class Work extends Threaded {
    public $id;

    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        if ($this->id == 0) {
            sleep(3);
            echo $this->id . " is ready\n";
            return;
        } else {
            echo $this->id . " is ready\n";
            return;
        }
    }
}

$pool = new Pool(2, 'Worker', []);
for ($i=0; $i<4; $i++) $pool->submit(new Work($i));
while ($pool->collect());
$pool->shutdown();

Я ожидал, что этот скрипт выведет следующую информацию:

1 готово
2 готово
3 готово
0 готово

потому что, по сути, есть 2 работника, и из-за sleep Когда первый рабочий сталкивается, задание 1,2,3 должно быть выполнено вторым рабочим.

Вместо этого я получаю вывод:

1 готово
3 готово
0 готово
2 готово

Понятно, что работник 1 получает задание 0, а задание 2 - на ходу, таким образом, работник 2, после окончания заданий 1 и 3, просто ждет, вместо того, чтобы принять работу 2 у работника 1.

Это ошибка? Или это должно работать так?

Моя версия PHP:

PHP 7.2.14 (cli) (built: Jan  9 2019 22:23:26) ( ZTS MSVC15 (Visual C++ 2017) x64 )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.2.0, Copyright (c) 1998-2018 Zend Technologies

3 ответа

По какой-то причине мой Docker потерял самообладание теперь, когда я обновил Windows до 1809, поэтому публикация не прошла тестирование. (Так что извините, нет выхода, чтобы дать атм)


Модифицированный существующий код, который я использую в проекте с вашим счетчиком + сон.

$pool = new Pool(2);
foreach ([0,1,2,3] as $count) {
    $pool->submit(
        new class ($count) extends Threaded
        {
            private $count;

            public function __construct(int $count)
            {
                $this->count= $count;
            }

            public function run()
            {
                if ($this->count== 0) {
                    sleep(3);
                    echo $this->count . " is ready\n";
                } else {
                    echo $this->count . " is ready\n";
                }
            }
        }
    );
}

while ($pool->collect());

$pool->shutdown();

Я использую анонимный класс (new class ($count) extends Threaded) как submit() пары.

На сервере это работает отлично, используя экземпляр Docker с PHP ZTS 7.2.13 на Alpine 3.8

Позвольте мне ответить: из того, что я знаю о pthreads в php, пул похож на число обрабатывающих php.exe, которые могут быть запущены одновременно.

Итак, в вашем случае вы определяете два пула с помощью new Pool(2, 'Worker', []);

Итак, давайте сделаем абстрактное объяснение по этому поводу. Есть 2 бассейна, назовите его как PoolA а также PoolB,

Цикл от 0 до 3, каждый цикл отправляет задачу в пул.

Есть 4 задачи от 0 до 3, давайте называть их по task0, task1, task2, task3,

Когда цикл происходит, с моей точки зрения, это должно быть в очереди, как это

PoolA -> submit task0
PoolB -> submit task1
PoolA -> submit task2
PoolB -> submit task3

Но из class Work это будет задача 0, ... до задачи 3.

Ситуация / Состояние

Вы определяете некоторую логику в run() =>, когда параметр (в данном случае $id из конструктора) равен 0, тогда sleep(3),

Из этой ситуации PoolA это представить task0 который содержит параметр ($id) равен 0, PoolA будет ждать 3 секунды. PoolA также представить task2,

С другой стороны, PoolB Отправить task1 а также task3 из этой ситуации не нужно ждать 3 секунды.

Так когда while($pool->collect()); выполняется, возможная очередь, которая, скорее всего, произойдет

task1    (PoolB)
task3    (PoolB)
task0    (PoolA)  ->>>> PoolA delayed because from task0 needs to sleep for 3 seconds
task2    (PoolA)

Так что я думаю, что это правильно, когда выходы

1 готово
3 готово
0 готово
2 готово

Есть вопросы.

Почему задерживается только PoolA, даже если задерживается PoolA, почему task2 не отправляется в PoolB или почему task1 или task3 не отправляются в PoolA??

Ну, я тоже не понимаю. У меня есть задача, похожая на вашу, после многих экспериментов, я не уверен, что Pool & Threaded является multi-threading or multiprocessing,

Эхо отдельных потоков может быть обманчивым. Я часто обнаруживаю, что кажется, будто они исполняют свои обязанности еще до того, как их вызывают. Я бы рекомендовал избегать эха из внутренних потоков, если только вам не важен порядок, поскольку он все еще может быть полезен для проверки конкретных обстоятельств и т. д.

Ниже приведен некоторый код, который должен решить любые вопросы о том, когда код выполняется, поскольку этот код сортирует результаты по фактическому времени их выполнения. (Это также хороший пример того, как получить результаты из пула потоков.)

      <?php
class Work extends Threaded {
    public $id;
    public $data;
    private $complete = false;
    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        $temp = array();
        if ($this->id == 0) {
            echo "<pre>".$this->id . " started (from inside threaded)";
            $temp[] = array(microtime(true), $this->id . " started");
            sleep(3);
        }
        echo "<pre>".$this->id . " is ready (from inside threaded)";
        $temp[] = array(microtime(true), $this->id . " is ready");
        $this->data = (array) $temp; // note: it's important to cast as array, otherwise you will get a volitile
        $this->complete = true;
    }

    public function isDone() {
        return $this->complete;
    }
}

// we create a custom pool, to pass on our results
class ExamplePool extends Pool {
    public $dataAr = array(); // used to return data after we're done
    private $numTasks = 0; // counter used to know when we're done
    private $numCompleted = 0; // keep track of how many threads finished
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
    public function submit(Threaded $task) {
        $this->numTasks++;
        parent::submit($task);
    }
    /**
     * used to wait until all workers are done
     */
    public function process() {
        // Run this loop as long as we have
        // jobs in the pool
        while ($this->numCompleted < $this->numTasks) {
            $this->collect(function (Work $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->dataAr = array_merge($this->dataAr, $task->data);
                    $this->numCompleted++;
                }
                return $task->isDone();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->dataAr;
    }
}

$pool = new ExamplePool(4);
for($i=0; $i<4; $i++) { 
    $pool->submit(new Work($i));
}
$retArr = $pool->process();
usort($retArr, 'sortResultsByTime'); // sort the results by time

// echo out the sorted results
echo "<br><br>";
for($i=0;$i<count($retArr);$i++){
    echo number_format($retArr[$i][0], 4, ".", "").' '.$retArr[$i][1]."\n";
}

function sortResultsByTime($a, $b) {
    return $a[0] > $b[0];
}
?>

Обратите внимание, что приведенный выше код дает мне это:

      0 started (from inside threaded)
0 is ready (from inside threaded)
1 is ready (from inside threaded)
2 is ready (from inside threaded)
3 is ready (from inside threaded)

1609458117.8764 0 started
1609458117.8776 1 is ready
1609458117.8789 2 is ready
1609458117.8802 3 is ready
1609458120.8765 0 is ready

И, как и ожидалось, материал, отраженный внутри потоков, кажется странным, однако, если вы сохраните результаты и отсортируете их по времени их выполнения, вы увидите, что он действует так, как ожидалось.

Другие вопросы по тегам