Ним межпотоковая передача сообщений: Как избежать глобального TChannel?
У меня есть следующий простой пример проблемы связи между потоками: я хочу запускать произвольные алгоритмы "в любое время" в фоновом потоке. Алгоритм в любое время выполняет некоторое вычисление типа результата T
постепенно, т. е. время от времени дает новые, более точные результаты. На языке Nim они, вероятно, лучше всего представлены итератором. В главном потоке я теперь хочу обернуть такие итераторы каждый в свой собственный поток с возможностью запрашивать у потоков такие вещи, как "доступно ли новое значение" или "каков текущий результат вычисления".
Поскольку я не знаком с концепциями параллелизма Nim, у меня возникли проблемы с реализацией необходимого межпоточного взаимодействия. Моя идея состояла в том, чтобы использовать TChannel
для общения. Согласно этому сообщению на форуме, TChannel
не может использоваться в сочетании с spawn
но требует использовать createThread
, Мне удалось получить следующее для компиляции и запуска:
import os, threadpool
proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =
type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]
# I think I have to wrap the iterator to pass it to createThread
proc threadFunc(args: Args) {.thread.} =
echo "Thread is starting"
let iter = args.iter
var channel = args.channel[]
for i in iter():
echo "Sending ", i
channel.send(i)
var thread: TThread[Args]
var channel: TChannel[T]
channel.open()
let args = (f, channel.addr)
createThread(thread, threadFunc, args)
result = channel
# example use in some main thread:
iterator test(): int {.closure.} =
sleep(500)
yield 1
sleep(500)
yield 2
var channel = spawnBackgroundJob[int](test)
for i in 0 .. 10:
sleep(200)
echo channel.peek()
echo "Finished"
К сожалению, это не имеет ожидаемого поведения, то есть я никогда не получаю ничего в основном потоке. Мне сказали на IRC, что проблема в том, что я не использую глобальные переменные. Но даже после долгих раздумий я не вижу точно, почему это не удается, и есть ли способ решить это. Проблема в том, что я не могу просто сделать переменные thread
а также channel
глобальный, так как они зависят от типа T
, Я также хочу избежать ограничения на запуск только одного алгоритма в любое время (или другого фиксированного числа N). Мне также сказали, что этот подход не имеет общего смысла, так что, может быть, мне просто не хватает того, что эта проблема имеет совершенно другое решение?
1 ответ
Причина:
Вы используете два разных канала в send и recv.
Назначение объекта в Nim - это глубокая копия, это другой объект.
var channel = args.channel[]
а также
result = channel
Чтобы объяснить это, см. Фрагмент кода ниже:
type
A = object
x: int
y: int
var a,b: A
var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation
a = c[]
b = c[]
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000
a.x = 1
a.y = 2
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000
b.x = 3
b.y = 4
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400
Решение для передачи канала в и из процесса:
Чтобы передать канал в качестве параметра и возвращаемого значения, обратитесь к ответу Jehan's на форуме Nim.
вставьте ответ Jehan's здесь для быстрого ознакомления и сделайте так, чтобы компиляция проходила в Nim 0.11.2
type SharedChannel[T] = ptr TChannel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc send[T](ch: SharedChannel[T], content: T) =
ch[].send(content)
proc recv[T](ch: SharedChannel[T]): T =
result = ch[].recv
proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
let (mainChannel, responseChannel) = ch
while true:
let s = mainChannel.recv
if s == nil:
break
echo s
responseChannel.send(true)
responseChannel.send(false)
proc main() =
var
mainChannel = newSharedChannel[string]()
responseChannel = newSharedChannel[bool]()
th: TThread[(SharedChannel[string], SharedChannel[bool])]
createThread(th, someThread, (mainChannel, responseChannel))
for i in 0..2:
echo("main thread send: " & $i)
mainChannel.send($i)
if not responseChannel.recv:
break
mainChannel.send(nil)
joinThread(th)
close(mainChannel)
close(responseChannel)
main()
Выход:
main thread send: 0
0
main thread send: 1
1
main thread send: 2
2
Еще один шаг, решение этого вопроса:
import os, threadpool, macros
template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
block:
proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
echo "Thread is starting"
for i in iter:
echo "Sending ", i
channel[].send(i)
channel[].open()
var thread: TThread[ptr TChannel[t]]
createThread(thread, threadFunc, chan)
#joinThread(thread)
# example use in some main thread:
iterator testJob(): int =
yield 0
sleep(500)
yield 1
sleep(500)
yield 2
var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())
for i in 1 .. 10:
sleep(200)
echo channel[].peek()
channel[].close()