Нужна помощь в понимании того, как работает libchan

Я пытаюсь использовать библиотеку libchan для отправки сообщений между компьютерами с использованием транспорта, аналогичного каналу go.

Из того, что я собрал, грубая идея заключается в следующем:

  1. У вас есть клиент SPDY, который отправляет сериализованный объект команды по адресу через tcp. Этот объект команды содержит канал libchan, называемый Pipe что ответ отправлен.
  2. Когда сервер получает входящее соединение, он ожидает объект команды. Когда он получает один, он отправляет ответ через Pipe содержится в объекте.

Вот моя точка замешательства. Чтобы канал сохранялся между двумя машинами, они должны были бы делиться памятью или, по крайней мере, делиться абстракцией, которая связывает их две. Судя по тому, как я начал работать с кодовой базой libchan, я понятия не имею, как это возможно.

Вот фрагмент из примера в репо:

// client

    receiver, remoteSender := libchan.Pipe()
    command := &RemoteCommand{
        Cmd:        os.Args[1],
        Args:       os.Args[2:],
        Stdin:      os.Stdin,
        Stdout:     os.Stdout,
        Stderr:     os.Stderr,
        StatusChan: remoteSender,
    }

    err = sender.Send(command)
    if err != nil {
        log.Fatal(err)
    }
    err = receiver.Receive(response)
    if err != nil {
        log.Fatal(err)
    }

    os.Exit(response.Status)

и сервер:

// server
t := spdy.NewTransport(p)

        go func() {
            for {
                receiver, err := t.WaitReceiveChannel()
                if err != nil {
                    log.Print("receiver error")
                    log.Print(err)
                    break
                }
                log.Print("about to spawn receive proc")
                go func() {
                    for {
                        command := &RemoteReceivedCommand{}
                        err := receiver.Receive(command)
                        returnResult := &CommandResponse{}
                        if res != nil {
                            if exiterr, ok := res.(*exec.ExitError); ok {
                                returnResult.Status = exiterr.Sys(). 
                              (syscall.WaitStatus).ExitStatus()
                            } else {
                                log.Print("res")
                                log.Print(res)
                                returnResult.Status = 10
                            }
                        }
                        err = command.StatusChan.Send(returnResult)

Дело в том, что я пытаюсь отточить это здесь:

libchan.Pipe()

По словам источника, это возвращает канал. Одна ссылка хранится на клиенте, а другая отправляется на сервер. Этот канал затем используется для передачи значений от последнего к первому. Как это на самом деле работает на практике?

Полный код для клиента и сервера

1 ответ

Во-первых, хорошо знать, что все, что выполняет Pipe(), - это создание канала и возврат пары отправитель / получатель в памяти.

От inmem.go:

// Pipe returns an inmemory Sender/Receiver pair.
func Pipe() (Receiver, Sender) {
    c := make(chan interface{})
    return pReceiver(c), pSender(c)
}

Тогда вы можете посмотреть в inmem_test.go для простого сквозного примера.

Эта структура является эквивалентом RemoteCommand из демо.

type InMemMessage struct {
    Data   string
    Stream io.ReadWriteCloser
    Ret    Sender
}

В TestInmemRetPipe(), простой клиент и сервер созданы.

Клиент создает локальную пару отправитель / получатель, используя Pipe(), а сервер просто использует libchan.Sender интерфейс в InMemMessage структура.

Обратите внимание, что клиент и сервер являются функциями, которые получают отправителя или получателя в качестве аргумента соответственно. Подробнее об этом в следующем фрагменте кода.

func TestInmemRetPipe(t *testing.T) {
    client := func(t *testing.T, w Sender) {
        ret, retPipe := Pipe()
        message := &InMemMessage{Data: "hello", Ret: retPipe}

        err := w.Send(message)
        if err != nil {
            t.Fatal(err)
        }
        msg := &InMemMessage{}
        err = ret.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "this better not crash" {
            t.Fatalf("%#v", msg)
        }

    }
    server := func(t *testing.T, r Receiver) {
        msg := &InMemMessage{}
        err := r.Receive(msg)
        if err != nil {
            t.Fatal(err)
        }

        if msg.Data != "hello" {
            t.Fatalf("Wrong message:\n\tExpected: %s\n\tActual: %s", "hello", msg.Data)
        }
        if msg.Ret == nil {
            t.Fatal("Message Ret is nil")
        }

        message := &InMemMessage{Data: "this better not crash"}
        if err := msg.Ret.Send(message); err != nil {
            t.Fatal(err)
        }
    }
    SpawnPipeTestRoutines(t, client, server)

}

SpawnPipeTestRoutines() выполняет функции клиента и сервера. В этой функции создается другой воздух отправителя / получателя через Pipe(),

В демонстрационном приложении функция, выполняемая здесь Pipe() (т. е. облегчение связи между экземплярами клиента и сервера) вместо этого обрабатывается через сетевую связь.

func SpawnPipeTestRoutines(t *testing.T, s SendTestRoutine, r ReceiveTestRoutine) {
    end1 := make(chan bool)
    end2 := make(chan bool)

    receiver, sender := Pipe()

    go func() {
        defer close(end1)
        s(t, sender)
        err := sender.Close()
        if err != nil {
            t.Fatalf("Error closing sender: %s", err)
        }
    }()

    go func() {
        defer close(end2)
        r(t, receiver)
    }()
    ...

В демонстрационном приложении связь облегчается с помощью вызовов Transport.NewSendChannel () на клиенте и Transport.WaitReceiveChannel (), которые возвращают libchan.Sender а также libchan.Receiver соответственно. Эти экземпляры libchan обрабатывают "трубу" через сеть.

С client.go:

sender, err := transport.NewSendChannel()
...
err = sender.Send(command)

С server.go:

receiver, err := t.WaitReceiveChannel()
...
err := receiver.Receive(command)

В обоих случаях предварительная конфигурация транспорта выполняется заранее (т. Е. Привязка к сокетам, использование TLS и т. Д.).

Вероятно, также стоит отметить, что используемая библиотека spdy является частью дистрибутива libchan, следовательно, она предоставляет примитивы libchan.

Другие вопросы по тегам