Как параметризация этой функции заставила мой проект Zio перестать работать?

Я начал писать сервер, который использует zio-http для пересылки сообщений из темы Pulsar в WebSocket. Он работал нормально, но я понял, что снова создаю потребителя, когда сокет закрывался, поэтому я реорганизовал этот код, чтобы принять потребителя в качестве параметра. Теперь сообщения не пересылаются. На самом деле, они даже не попадают вQueue, что еще больше меня озадачивает. Я использую Zio всего несколько дней, поэтому пока не могу понять, что я сделал не так.

Вот код сокета до изменения:

      package com.example.dashback

import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.PulsarService
import org.apache.pulsar.client.api.{Message, Schema}

def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))

object WebSocketHandler:
    private def createSocket(pulsar: PulsarService) = {
        val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
        val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)

        val socket = Http.collectZIO[WebSocketChannelEvent] {
            case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
                for
                    c <- consumer
                    s = ZStream.fromQueue(c.queue)
                    f <- s.foreach { msg =>
                        channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
                    }
                yield s

            case ChannelEvent(channel, ChannelUnregistered) =>
                for
                    _ <- ZIO.log("WebSocket closed; closing consumer")
                    c <- consumer
                    _ <- ZIO.attempt(c.consumer.close())
                yield ()

            case e =>
                for
                    _ <- ZIO.log(e.toString)
                yield ()
        }

        socket.toSocketApp.toResponse
    }

    val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
        case r @ Method.GET -> !! / "ws" =>
            for
                pulsar <- ZIO.service[PulsarService]
                s <- createSocket(pulsar)
            yield s
    }

А вот код после изменения, где я создаю потребителя вне обработчика сообщений WebSocket и передаю его в качестве аргумента, чтобы один и тот же использовался как при подключении, так и при отключении:

      package com.example.dashback

import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.{PulsarConsumer, PulsarService}
import org.apache.pulsar.client.api.{Consumer, Message, Schema}

def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))

object WebSocketHandler:
    private def createSocket(pulsar: PulsarService) = {
        val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
        val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)

        def socket(c: PulsarConsumer[String]) = Http.collectZIO[WebSocketChannelEvent] {
            case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
                for
                    _ <- zlog("We get here")
                    _ <- ZStream.fromQueue(c.queue).foreach { msg =>
                        channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
                    }.fork
                    _ <- channel.writeAndFlush(WebSocketFrame.text("Hello\n"))
                yield ()

            case ChannelEvent(channel, ChannelUnregistered) =>
                for
                    _ <- ZIO.log("WebSocket closed; closing consumer")
                    _ <- ZIO.attempt(c.consumer.close())
                    }
                yield ()

            case e =>
                for
                    _ <- ZIO.log(e.toString)
                yield ()
        }

        for
            c <- consumer
            s <- socket(c).toSocketApp.toResponse
        yield s
    }

    val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
        case r @ Method.GET -> !! / "ws" =>
            for
                pulsar <- ZIO.service[PulsarService]
                s <- createSocket(pulsar)
            yield s
    }

Для справки, вот мой сервис Pulsar:

      package com.example.dashback.pulsar

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.jdk.CollectionConverters.*
import zio.*
import zio.stream.ZStream
import org.apache.pulsar.client.api.{Consumer, Message, MessageListener, PulsarClient, Schema, SubscriptionType}

import com.example.dashback.{log, zlog}

case class PulsarConsumer[T](consumer: Consumer[T], queue: Queue[Message[T]])

class PulsarServiceImpl extends PulsarService:
    private val client = PulsarClient.builder()
        .serviceUrl("pulsar://pulsar.example.com:6650")
        .build()

    def consumer[T](subscriptionName: String, topics: Seq[String], schema: Schema[T]): Task[PulsarConsumer[T]] =
        def createConsumer =
            ZIO.attempt {
                client.newConsumer(schema)
                    .topics(topics.asJava)
                    .subscriptionName(subscriptionName)
                    .subscriptionType(SubscriptionType.Exclusive)
                    .subscribe()
            }

        def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Message[T]] =
            ZIO.async { cb =>
                consumer.receiveAsync().thenAccept { msg =>
                    cb {
                        for
                            _ <- zlog("Offering message to queue")
                            _ <- queue.offer(msg)
                            _ <- zlog("Acknowledging message")
                            _ <- ZIO.attempt(consumer.acknowledge(msg))
                            _ <- zlog("Acknowledged")
                            v <- ZIO.attempt(msg)
                        yield v
                    }
                }
            }

        for
            q <- Queue.unbounded[Message[T]]
            c <- createConsumer
            _ <- receive(c, q).forever.fork
        yield PulsarConsumer(c, q)

object PulsarServiceImpl:
    val layer = ZLayer.succeed(new PulsarServiceImpl)

«Предложение сообщения в очередь» не отображается в журнале после изменения. Тем не менее, я получаю вручную отправленное «Hello» в сокете.

Я также пробовал альтернативную версиюreceiveфункция, а "Помещение тестового сообщения в очередь" даже в логе не появляется.

              def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Unit] =
            for
                _ <- zlog("[Pulsar] Putting test message in the queue")
                _ <- queue.offer(MessageImpl.create(new MessageMetadata, ByteBuffer.wrap("Test message".getBytes), schema, topics.head))

                _ <- zlog("[Pulsar] Attempting to receive a message")
                _ <- ZIO.async[Any, Throwable, Message[T]] { cb =>
                    log("[Pulsar] Calling receiveAsync")
                    consumer.receiveAsync().thenAccept { msg =>
                        log("A message was received")
                        cb {
                            for
                                _ <- zlog("Offering message to queue")
                                _ <- queue.offer(msg)
                                _ <- zlog("Acknowledging message")
                                _ <- ZIO.attempt(consumer.acknowledge(msg))
                                _ <- zlog("Acknowledged")
                                v <- ZIO.attempt(msg)
                            yield v
                        }
                    }
                }
            yield ()

Обновление: я прокомментировал части WebSocket и напрямую создал потребителя в своемrunметод, и это работает нормально, поэтому проблема не в этом. Я получаю как вставленное вручную тестовое сообщение, так и любые сообщения, которые я публикую в теме Pulsar.

          def run = for
        _ <- ZIO.log("Dashback starting")
        c <- (new PulsarServiceImpl).consumer("test", Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)
        _ <- ZStream.fromQueue(c.queue).foreach { msg =>
            zlog(msg.getValue)
        }
    yield ()

0 ответов

Другие вопросы по тегам