Ним межпотоковая передача сообщений: Как избежать глобального TChannel?

У меня есть следующий простой пример проблемы связи между потоками: я хочу запускать произвольные алгоритмы "в любое время" в фоновом потоке. Алгоритм в любое время выполняет некоторое вычисление типа результата T постепенно, т. е. время от времени дает новые, более точные результаты. На языке Nim они, вероятно, лучше всего представлены итератором. В главном потоке я теперь хочу обернуть такие итераторы каждый в свой собственный поток с возможностью запрашивать у потоков такие вещи, как "доступно ли новое значение" или "каков текущий результат вычисления".

Поскольку я не знаком с концепциями параллелизма Nim, у меня возникли проблемы с реализацией необходимого межпоточного взаимодействия. Моя идея состояла в том, чтобы использовать TChannel для общения. Согласно этому сообщению на форуме, TChannel не может использоваться в сочетании с spawn но требует использовать createThread, Мне удалось получить следующее для компиляции и запуска:

import os, threadpool

proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =

  type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]

  # I think I have to wrap the iterator to pass it to createThread
  proc threadFunc(args: Args) {.thread.} =
    echo "Thread is starting"
    let iter = args.iter
    var channel = args.channel[]

    for i in iter():
      echo "Sending ", i
      channel.send(i)

  var thread: TThread[Args]
  var channel: TChannel[T]
  channel.open()

  let args = (f, channel.addr)
  createThread(thread, threadFunc, args)

  result = channel


# example use in some main thread:
iterator test(): int {.closure.} =
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel = spawnBackgroundJob[int](test)

for i in 0 .. 10:
  sleep(200)
  echo channel.peek()

echo "Finished"

К сожалению, это не имеет ожидаемого поведения, то есть я никогда не получаю ничего в основном потоке. Мне сказали на IRC, что проблема в том, что я не использую глобальные переменные. Но даже после долгих раздумий я не вижу точно, почему это не удается, и есть ли способ решить это. Проблема в том, что я не могу просто сделать переменные thread а также channel глобальный, так как они зависят от типа T, Я также хочу избежать ограничения на запуск только одного алгоритма в любое время (или другого фиксированного числа N). Мне также сказали, что этот подход не имеет общего смысла, так что, может быть, мне просто не хватает того, что эта проблема имеет совершенно другое решение?

1 ответ

Решение

Причина:

Вы используете два разных канала в send и recv.

Назначение объекта в Nim - это глубокая копия, это другой объект.

var channel = args.channel[]

а также

result = channel

Чтобы объяснить это, см. Фрагмент кода ниже:

type
  A = object
    x: int
    y: int

var a,b: A

var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation

a = c[]
b = c[]

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000

a.x = 1
a.y = 2

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000

b.x = 3
b.y = 4

echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400

Решение для передачи канала в и из процесса:

Чтобы передать канал в качестве параметра и возвращаемого значения, обратитесь к ответу Jehan's на форуме Nim.

вставьте ответ Jehan's здесь для быстрого ознакомления и сделайте так, чтобы компиляция проходила в Nim 0.11.2

type SharedChannel[T] = ptr TChannel[T]

proc newSharedChannel[T](): SharedChannel[T] =
  result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
  open(result[])

proc close[T](ch: var SharedChannel[T]) =
  close(ch[])
  deallocShared(ch)
  ch = nil

proc send[T](ch: SharedChannel[T], content: T) =
  ch[].send(content)


proc recv[T](ch: SharedChannel[T]): T =
  result = ch[].recv

proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
  let (mainChannel, responseChannel) = ch
  while true:
    let s = mainChannel.recv
    if s == nil:
      break
    echo s
    responseChannel.send(true)
  responseChannel.send(false)

proc main() =
  var
    mainChannel = newSharedChannel[string]()
    responseChannel = newSharedChannel[bool]()
    th: TThread[(SharedChannel[string], SharedChannel[bool])]
  createThread(th, someThread, (mainChannel, responseChannel))
  for i in 0..2:
    echo("main thread send: " & $i)
    mainChannel.send($i)
    if not responseChannel.recv:
      break
  mainChannel.send(nil)
  joinThread(th)
  close(mainChannel)
  close(responseChannel)

main()

Выход:

main thread send: 0
0
main thread send: 1
1
main thread send: 2
2

Еще один шаг, решение этого вопроса:

import os, threadpool, macros

template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
  block:
    proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
      echo "Thread is starting"

      for i in iter:
        echo "Sending ", i
        channel[].send(i)

    channel[].open()

    var thread: TThread[ptr TChannel[t]]
    createThread(thread, threadFunc, chan)
    #joinThread(thread)


# example use in some main thread:
iterator testJob(): int =
  yield 0
  sleep(500)
  yield 1
  sleep(500)
  yield 2

var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())

for i in 1 .. 10:
  sleep(200)
  echo channel[].peek()

channel[].close()
Другие вопросы по тегам