Определите темы, которые заканчиваются в порядке

У меня есть следующий многопоточный код

class My_Thread extends Thread {

    public function run() {

        /* ... */
        // The time it takes to execut the code is different for each thread
    }
}

// Create a array
$threads = array();


/* *** STEP 1 *** */
//Initiate Miltiple Threads
foreach ( range("A", "B") as $i ) {
    $threads[] = new My_Thread($i);
}


/* *** STEP 2 *** */
// Start The Threads
foreach ($threads as $thread) {
    $thread->start(); // Thread A starts before thread B, and it takes more time to finish
}


/* *** STEP 3 *** */
// Process the threads
foreach ($threads as $thread) {
    if ($thread->join()) {
        /* ... Do Something ... */
    }
}

Чтобы объяснить код быстро:

Шаг 1: я создаю два потока, A и B

Шаг 2: Поток A запускается первым, и для его завершения требуется больше времени, чем для потока B.

Шаг 3: Затем я жду завершения каждого потока, начиная с потока А.

Теперь проблема в шаге 3. Когда я перебираю потоки, мне нужно дождаться завершения потока A, чтобы выполнить дальнейшую обработку, но поток B ожидает в состоянии ожидания, потому что для его завершения требуется более короткое время, и не будет обрабатываться, если поток A не будет обработан на шаге 3. Не гарантируется, что поток A займет больше времени, поэтому мне нужно написать общее решение.

Как я могу гарантировать, что шаг 3 обрабатывает тот поток, который закончен первым? Другими словами, есть ли что-то вроде этого псевдокода?

/* *** STEP 3 *** */
// Do the following for all threads in the $threads array, FIRST COME FIRST SERVE
// If the thread finished STEP 2, then immediately process it.

Благодарю.

1 ответ

Решение

Прежде всего, Thread представляет контекст выполнения.

Что вам нужно сделать, так это продумать контекст и данные отдельно...

<?php
class Test extends Thread {

    public function __construct(Volatile $queue, $value) {
        $this->queue = $queue;
        $this->value = $value;
    }

    public function run() {
        $data = strlen(
            file_get_contents("http://www.google.co.uk/?q={$this->value}"));

        usleep(mt_rand(10000, 20000));

        $this->queue->synchronized(function($queue, $value, $data) {
            $queue[] = (array) [
                $value => $data
            ];
            $queue->notify();
        }, $this->queue, $this->value, $data);
    }

    private $queue;
    private $value;
}

$chars = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J"];
$queue = new Volatile();
$tests = [];

for ($test = 0; $test < 10; $test++) {
    $tests[$test] = new Test($queue, $chars[$test]);
    $tests[$test]->start();
}

$test = 0;

while (($next = $queue->synchronized(function() use($queue, &$test) {
    /* guard infinite loop */
    if (++$test > 10)
        return false;

    /* predicated wait for data */
    while (!count($queue))
        $queue->wait();

    /* return next item */
    return $queue->shift();
}))) {
    var_dump($next);
}

foreach ($tests as $thread)
    $thread->join();
?>

Приведенный выше код предназначен для pthreads v3, PHP7, которая является лучшей доступной версией и рекомендуемой версией для использования в новых проектах.

Кишки раствора содержатся в Test::run и цикл while в основном контексте.

$data = strlen(
    file_get_contents("http://www.google.co.uk/?q={$this->value}"));

usleep(mt_rand(10000, 20000));

Это было предназначено для получения мусора из Google, так случилось, что время отклика настолько согласованно, что мне пришлось добавить usleep Только для того, чтобы вы могли видеть, что порядок не имеет значения, если все сделано правильно.

Вы никогда не должны использовать usleep в реальном мире многопоточный код.

$this->queue->synchronized(function($queue, $value, $data) {
    $queue[] = (array) [
        $value => $data
    ];
    $queue->notify();
}, $this->queue, $this->value, $data);

Сгенерировав некоторые данные, каждый Test синхронизируется с очередью, добавляет в нее некоторые данные и отправляет уведомление в любой контекст, ожидающий в данный момент.

Между тем, это происходит:

$test = 0;

while (($next = $queue->synchronized(function() use($queue, &$test) {
    /* guard infinite loop */
    if (++$test > 10)
        return false;

    /* predicated wait for data */
    while (!count($queue))
        $queue->wait();

    /* return next item */
    return $queue->shift();
}))) {
    var_dump($next);
}

Основной контекст синхронизируется с очередью, в то время как в синхронизированном блоке он защищает от бесконечных циклов (поскольку мы знаем, сколько данных поступает), тогда, если в очереди нет данных, он будет ждать, пока некоторые из них станут доступными. Наконец, возвращаем первый элемент в очереди в основной контекст.

Приведенный выше код выведет что-то вроде:

array(1) {
  ["I"]=>
  int(188965)
}
array(1) {
  ["B"]=>
  int(188977)
}
array(1) {
  ["C"]=>
  int(188921)
}
array(1) {
  ["F"]=>
  int(188962)
}
array(1) {
  ["J"]=>
  int(188954)
}
array(1) {
  ["A"]=>
  int(188912)
}
array(1) {
  ["E"]=>
  int(188929)
}
array(1) {
  ["G"]=>
  int(188941)
}
array(1) {
  ["D"]=>
  int(188946)
}
array(1) {
  ["H"]=>
  int(188929)
}

Ключевым моментом здесь является то, что контекст и данные являются отдельными проблемами.

Другие вопросы по тегам