Потоковая передача на fasthttp.RequestCtx
Я хотел бы позволить клиенту выполнять потоковое API и использовать fasthttp в качестве бэкэнда. Я не хочу выставлять fasthttp.RequestCtx
к клиенту и не хотят получать канал от функции клиента.
Вот код, который у меня есть (функция клиента worker
), когда я запускаю его, а затем делаю curl -iv http://localhost:8080
это застревает. На стороне сервера я вижу 5 WRITE: ...
печать. Отладка кажется, что сервер зависает в s.writer.Flush()
Есть идеи, как это реализовать?
package main
import (
"bufio"
"fmt"
"log"
"github.com/valyala/fasthttp"
)
var (
poem = []string{
"The Road goes ever on and on",
"Down from the door where it began.",
"Now far ahead the Road has gone,",
"And I must follow, if I can,",
"Pursuing it with eager feet,",
"Until it joins some larger way",
"Where many paths and errands meet.",
"And whither then? I cannot say.",
}
)
// Streamer is streaming API
type Streamer interface {
// Write writes bytes to streamer
Write(p []byte) (n int, err error)
// Flush flushes data to the client
Flush() error
// SetStatusCode sets the status code. *Must* be called before Write and Flush
SetStatusCode(statusCode int) error
// SetHeader sets a response header. *Must* be called before Write and Flush
SetHeader(key string, value interface{}) error
}
// HTTPStreamer implement Streamer with fasthttp
type HTTPStreamer struct {
ctx *fasthttp.RequestCtx
done chan bool
writer *bufio.Writer
}
// NewHTTPStreamer returns a new HTTPStreamer
func NewHTTPStreamer(ctx *fasthttp.RequestCtx) *HTTPStreamer {
return &HTTPStreamer{
ctx: ctx,
}
}
// Close closes the stream
func (s *HTTPStreamer) Close() {
if s.done != nil {
close(s.done)
}
}
// Write writes bytes to streamer
func (s *HTTPStreamer) Write(p []byte) (n int, err error) {
if s.writer == nil {
s.done = make(chan bool)
ready := make(chan bool)
go func() {
s.ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
s.writer = w
ready <- true // Signal that writer is set
<-s.done // Wait for stream to be closed
})
}()
<-ready // Wait until writer is set
}
fmt.Printf("WRITE: %s", string(p))
return s.writer.Write(p)
}
// Flush flushes data to the client
func (s *HTTPStreamer) Flush() error {
if s.writer != nil {
return s.writer.Flush()
}
return nil
}
// SetStatusCode sets the status code. *Must* be called before Write and Flush
func (s *HTTPStreamer) SetStatusCode(statusCode int) error {
if s.writer != nil {
return fmt.Errorf("Streaming started - can't set status")
}
s.ctx.SetStatusCode(statusCode)
return nil
}
// SetHeader sets a response header. *Must* be called before Write and Flush
// value can be string or []byte
func (s *HTTPStreamer) SetHeader(key string, value interface{}) error {
if s.writer != nil {
return fmt.Errorf("Streaming started - can't set header")
}
switch v := value.(type) {
case string:
s.ctx.Response.Header.Set(key, v)
case []byte:
s.ctx.Response.Header.SetBytesV(key, v)
default:
return fmt.Errorf("Unsupported header value type - %T", value)
}
return nil
}
func worker(s Streamer) {
s.SetStatusCode(201)
s.SetHeader("X-T", "VALUE")
for _, line := range poem {
s.Write([]byte(line + "\n"))
s.Flush()
}
}
func handler(ctx *fasthttp.RequestCtx) {
s := NewHTTPStreamer(ctx)
worker(s)
s.Close()
}
func main() {
srv := &fasthttp.Server{
Handler: handler,
}
if err := srv.ListenAndServe(":8080"); err != nil {
log.Fatal(err)
}
}