Как параметризация этой функции заставила мой проект Zio перестать работать?
Я начал писать сервер, который использует zio-http для пересылки сообщений из темы Pulsar в WebSocket. Он работал нормально, но я понял, что снова создаю потребителя, когда сокет закрывался, поэтому я реорганизовал этот код, чтобы принять потребителя в качестве параметра. Теперь сообщения не пересылаются. На самом деле, они даже не попадают вQueue
, что еще больше меня озадачивает. Я использую Zio всего несколько дней, поэтому пока не могу понять, что я сделал не так.
Вот код сокета до изменения:
package com.example.dashback
import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.PulsarService
import org.apache.pulsar.client.api.{Message, Schema}
def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))
object WebSocketHandler:
private def createSocket(pulsar: PulsarService) = {
val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)
val socket = Http.collectZIO[WebSocketChannelEvent] {
case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
for
c <- consumer
s = ZStream.fromQueue(c.queue)
f <- s.foreach { msg =>
channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
}
yield s
case ChannelEvent(channel, ChannelUnregistered) =>
for
_ <- ZIO.log("WebSocket closed; closing consumer")
c <- consumer
_ <- ZIO.attempt(c.consumer.close())
yield ()
case e =>
for
_ <- ZIO.log(e.toString)
yield ()
}
socket.toSocketApp.toResponse
}
val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
case r @ Method.GET -> !! / "ws" =>
for
pulsar <- ZIO.service[PulsarService]
s <- createSocket(pulsar)
yield s
}
А вот код после изменения, где я создаю потребителя вне обработчика сообщений WebSocket и передаю его в качестве аргумента, чтобы один и тот же использовался как при подключении, так и при отключении:
package com.example.dashback
import zio.*
import zio.stream.{ZSink, ZStream}
import zhttp.http.*
import zhttp.socket.*
import zhttp.service.ChannelEvent
import zhttp.service.ChannelEvent.UserEvent.HandshakeComplete
import zhttp.service.ChannelEvent.{ChannelUnregistered, UserEventTriggered}
import com.example.dashback.pulsar.{PulsarConsumer, PulsarService}
import org.apache.pulsar.client.api.{Consumer, Message, Schema}
def log(s: String) = println(s"\u001b[38;5;226;48;5;19m${s}\u001b[0m")
def zlog(s: String) = ZIO.succeed(log(s))
object WebSocketHandler:
private def createSocket(pulsar: PulsarService) = {
val consumerName = s"dashboard-events-${scala.util.Random.nextInt(Int.MaxValue).toHexString}"
val consumer = pulsar.consumer[String](consumerName, Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)
def socket(c: PulsarConsumer[String]) = Http.collectZIO[WebSocketChannelEvent] {
case ChannelEvent(channel, UserEventTriggered(HandshakeComplete)) =>
for
_ <- zlog("We get here")
_ <- ZStream.fromQueue(c.queue).foreach { msg =>
channel.writeAndFlush(WebSocketFrame.text(msg.getValue))
}.fork
_ <- channel.writeAndFlush(WebSocketFrame.text("Hello\n"))
yield ()
case ChannelEvent(channel, ChannelUnregistered) =>
for
_ <- ZIO.log("WebSocket closed; closing consumer")
_ <- ZIO.attempt(c.consumer.close())
}
yield ()
case e =>
for
_ <- ZIO.log(e.toString)
yield ()
}
for
c <- consumer
s <- socket(c).toSocketApp.toResponse
yield s
}
val app: Http[PulsarService & JwtService, Throwable, Request, Response] = Http.collectZIO[Request] {
case r @ Method.GET -> !! / "ws" =>
for
pulsar <- ZIO.service[PulsarService]
s <- createSocket(pulsar)
yield s
}
Для справки, вот мой сервис Pulsar:
package com.example.dashback.pulsar
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.jdk.CollectionConverters.*
import zio.*
import zio.stream.ZStream
import org.apache.pulsar.client.api.{Consumer, Message, MessageListener, PulsarClient, Schema, SubscriptionType}
import com.example.dashback.{log, zlog}
case class PulsarConsumer[T](consumer: Consumer[T], queue: Queue[Message[T]])
class PulsarServiceImpl extends PulsarService:
private val client = PulsarClient.builder()
.serviceUrl("pulsar://pulsar.example.com:6650")
.build()
def consumer[T](subscriptionName: String, topics: Seq[String], schema: Schema[T]): Task[PulsarConsumer[T]] =
def createConsumer =
ZIO.attempt {
client.newConsumer(schema)
.topics(topics.asJava)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
}
def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Message[T]] =
ZIO.async { cb =>
consumer.receiveAsync().thenAccept { msg =>
cb {
for
_ <- zlog("Offering message to queue")
_ <- queue.offer(msg)
_ <- zlog("Acknowledging message")
_ <- ZIO.attempt(consumer.acknowledge(msg))
_ <- zlog("Acknowledged")
v <- ZIO.attempt(msg)
yield v
}
}
}
for
q <- Queue.unbounded[Message[T]]
c <- createConsumer
_ <- receive(c, q).forever.fork
yield PulsarConsumer(c, q)
object PulsarServiceImpl:
val layer = ZLayer.succeed(new PulsarServiceImpl)
«Предложение сообщения в очередь» не отображается в журнале после изменения. Тем не менее, я получаю вручную отправленное «Hello» в сокете.
Я также пробовал альтернативную версиюreceive
функция, а "Помещение тестового сообщения в очередь" даже в логе не появляется.
def receive(consumer: Consumer[T], queue: Queue[Message[T]]): Task[Unit] =
for
_ <- zlog("[Pulsar] Putting test message in the queue")
_ <- queue.offer(MessageImpl.create(new MessageMetadata, ByteBuffer.wrap("Test message".getBytes), schema, topics.head))
_ <- zlog("[Pulsar] Attempting to receive a message")
_ <- ZIO.async[Any, Throwable, Message[T]] { cb =>
log("[Pulsar] Calling receiveAsync")
consumer.receiveAsync().thenAccept { msg =>
log("A message was received")
cb {
for
_ <- zlog("Offering message to queue")
_ <- queue.offer(msg)
_ <- zlog("Acknowledging message")
_ <- ZIO.attempt(consumer.acknowledge(msg))
_ <- zlog("Acknowledged")
v <- ZIO.attempt(msg)
yield v
}
}
}
yield ()
Обновление: я прокомментировал части WebSocket и напрямую создал потребителя в своемrun
метод, и это работает нормально, поэтому проблема не в этом. Я получаю как вставленное вручную тестовое сообщение, так и любые сообщения, которые я публикую в теме Pulsar.
def run = for
_ <- ZIO.log("Dashback starting")
c <- (new PulsarServiceImpl).consumer("test", Seq("persistent://example/00000000-0000-4000-8000-000000000001/events"), Schema.STRING)
_ <- ZStream.fromQueue(c.queue).foreach { msg =>
zlog(msg.getValue)
}
yield ()