Потоковая передача на fasthttp.RequestCtx

Я хотел бы позволить клиенту выполнять потоковое API и использовать fasthttp в качестве бэкэнда. Я не хочу выставлять fasthttp.RequestCtx к клиенту и не хотят получать канал от функции клиента.

Вот код, который у меня есть (функция клиента worker), когда я запускаю его, а затем делаю curl -iv http://localhost:8080 это застревает. На стороне сервера я вижу 5 WRITE: ... печать. Отладка кажется, что сервер зависает в s.writer.Flush()

Есть идеи, как это реализовать?

package main

import (
    "bufio"
    "fmt"
    "log"

    "github.com/valyala/fasthttp"
)

var (
    poem = []string{
        "The Road goes ever on and on",
        "Down from the door where it began.",
        "Now far ahead the Road has gone,",
        "And I must follow, if I can,",
        "Pursuing it with eager feet,",
        "Until it joins some larger way",
        "Where many paths and errands meet.",
        "And whither then? I cannot say.",
    }
)

// Streamer is streaming API
type Streamer interface {
    // Write writes bytes to streamer
    Write(p []byte) (n int, err error)
    // Flush flushes data to the client
    Flush() error
    // SetStatusCode sets the status code. *Must* be called before Write and Flush
    SetStatusCode(statusCode int) error
    // SetHeader sets a response header. *Must* be called before Write and Flush
    SetHeader(key string, value interface{}) error
}

// HTTPStreamer implement Streamer with fasthttp
type HTTPStreamer struct {
    ctx    *fasthttp.RequestCtx
    done   chan bool
    writer *bufio.Writer
}

// NewHTTPStreamer returns a new HTTPStreamer
func NewHTTPStreamer(ctx *fasthttp.RequestCtx) *HTTPStreamer {
    return &HTTPStreamer{
        ctx: ctx,
    }
}

// Close closes the stream
func (s *HTTPStreamer) Close() {
    if s.done != nil {
        close(s.done)
    }
}

// Write writes bytes to streamer
func (s *HTTPStreamer) Write(p []byte) (n int, err error) {
    if s.writer == nil {
        s.done = make(chan bool)
        ready := make(chan bool)
        go func() {
            s.ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
                s.writer = w
                ready <- true // Signal that writer is set
                <-s.done      // Wait for stream to be closed
            })
        }()

        <-ready // Wait until writer is set
    }

    fmt.Printf("WRITE: %s", string(p))
    return s.writer.Write(p)
}

// Flush flushes data to the client
func (s *HTTPStreamer) Flush() error {
    if s.writer != nil {
        return s.writer.Flush()
    }

    return nil
}

// SetStatusCode sets the status code. *Must* be called before Write and Flush
func (s *HTTPStreamer) SetStatusCode(statusCode int) error {
    if s.writer != nil {
        return fmt.Errorf("Streaming started - can't set status")
    }

    s.ctx.SetStatusCode(statusCode)
    return nil
}

// SetHeader sets a response header. *Must* be called before Write and Flush
// value can be string or []byte
func (s *HTTPStreamer) SetHeader(key string, value interface{}) error {
    if s.writer != nil {
        return fmt.Errorf("Streaming started - can't set header")
    }

    switch v := value.(type) {
    case string:
        s.ctx.Response.Header.Set(key, v)
    case []byte:
        s.ctx.Response.Header.SetBytesV(key, v)
    default:
        return fmt.Errorf("Unsupported header value type - %T", value)
    }

    return nil
}

func worker(s Streamer) {
    s.SetStatusCode(201)
    s.SetHeader("X-T", "VALUE")
    for _, line := range poem {
        s.Write([]byte(line + "\n"))
        s.Flush()
    }
}

func handler(ctx *fasthttp.RequestCtx) {
    s := NewHTTPStreamer(ctx)
    worker(s)
    s.Close()
}

func main() {
    srv := &fasthttp.Server{
        Handler: handler,
    }

    if err := srv.ListenAndServe(":8080"); err != nil {
        log.Fatal(err)
    }
}

0 ответов

Другие вопросы по тегам