Как писать и читать события в кластере Eventstore с помощью клиента.Net
Я пытаюсь записывать и читать события в кластере хранилища событий, используя клиент.Net. Поэтому я настроил 3 локальных док-контейнера для проверки записи и чтения в кластер. Я уже настроил кластер, и он говорит, что он жив (1 Master и 2 Slaves).
К сожалению, когда я пытаюсь записать или прочитать из.Net-клиента в хранилище событий, соединение открывается и закрывается все время, и в магазин не поступает никаких звонков. Как я могу подключиться к кластеру для записи и чтения событий?
Если я использую хранилище событий без кластера, методы работают правильно, и я могу создавать и использовать события. С кластером я получаю только следующие сообщения журнала:
Start connecting to Eventstore
Start reading events:
[04,06:49:19.771,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[04,06:49:19.771,INFO] Discovering attempt 1/10 successful: best candidate is [172.16.1.103:1112, n/a].
[10,06:49:20.396,INFO] ClientAPI TcpConnection closed [06:49:20.396: N172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}]:
[10,06:49:20.396,INFO] Received bytes: 0, Sent bytes: 0
[10,06:49:20.396,INFO] Send calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Receive calls: 0, callbacks: 0
[10,06:49:20.396,INFO] Close reason: [Success] Connection establishment timeout.
[10,06:49:20.397,DEBUG] TcpPackageConnection: connection [172.16.1.103:1112, L, {220a086e-3394-49c3-affa-b3d6fc385ea2}] was closed cleanly.
[10,06:49:22.564,INFO] Discovering attempt 1/10 failed: no candidate found.
[10,06:49:23.068,INFO] Discovering: found best choice [172.16.1.103:1112,n/a] (Master).
[10,06:49:23.068,INFO] Discovering attempt 2/10 successful: best candidate is [172.16.1.103:1112, n/a].
Если я запишу событие в кластерное хранилище событий с помощью веб-интерфейса, оно примет его, а затем событие можно будет найти в веб-интерфейсе хранилища событий.
Ниже приведены мои настройки на случай, если вам нужны подробности:
Кластер хранилища событий настраивается, как описано в документации к хранилищу событий. Кластер настроен так, чтобы не использовать DNS, установив discover-via-dns=false
, Вот мой .yaml
файл для настройки докера:
version: '3'
services:
eventstore1:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.101
EVENTSTORE_EXT_IP: 172.16.1.101
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.102:2113,172.16.1.103:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
ports:
- 1111:1111
- 1112:1112
- 2113:2113
- 2114:2114
networks:
app_net:
ipv4_address: 172.16.1.101
eventstore2:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.102
EVENTSTORE_EXT_IP: 172.16.1.102
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.103:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:4114/"
ports:
- 3111:1111
- 3112:1112
- 4113:2113
- 4114:2114
networks:
app_net:
ipv4_address: 172.16.1.102
eventstore3:
image: "eventstore/eventstore:latest"
environment:
EVENTSTORE_DISABLE_HTTP_CACHING: "True"
EVENTSTORE_RUN_PROJECTIONS: ALL
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_INT_IP: 172.16.1.103
EVENTSTORE_EXT_IP: 172.16.1.103
EVENTSTORE_INT_TCP_PORT: 1111
EVENTSTORE_EXT_TCP_PORT: 1112
EVENTSTORE_INT_HTTP_PORT: 2113
EVENTSTORE_EXT_HTTP_PORT: 2114
EVENTSTORE_DISCOVER_VIA_DNS: "False"
EVENTSTORE_GOSSIP_SEED: 172.16.1.101:2113,172.16.1.102:2113
EVENTSTORE_INT_HTTP_PREFIXES: "http://*:2113/"
EVENTSTORE_EXT_HTTP_PREFIXES: "http://*:2114/"
ports:
- 5111:1111
- 5112:1112
- 6113:2113
- 6114:2114
networks:
app_net:
ipv4_address: 172.16.1.103
networks:
app_net:
external: true
Для проверки соединения и возможности записи и чтения в кластере хранилища событий я создал небольшое приложение:
Основная программа:
class Program
{
const string STREAM = "MyTestStream";
private static ConnectionFactory _connectionFactory = new ConnectionFactory();
static void Main(string[] args)
{
var useCluster = true;
Console.WriteLine("Start connecting to Eventstore");
var writeEvents = new WriteEvents(_connectionFactory);
writeEvents.Write(useCluster, STREAM).Wait();
Console.WriteLine("Start reading events:");
var readEvents = new ReadEvents(_connectionFactory);
readEvents.Read(useCluster, STREAM).Wait();
Console.WriteLine("Check if events are written and press key to exit");
Console.ReadKey();
}
}
ConnectionFactory:
public class ConnectionFactory
{
public IEventStoreConnection SingleConnection()
{
return EventStoreConnection.Create(ConnectionSettings.Create(),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
}
public IEventStoreConnection ClusterConnection()
{
return EventStoreConnection.Create(
ConnectionSettings.Create().KeepReconnecting().UseConsoleLogger()
.SetGossipSeedEndPoints(
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2113),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 4113),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6113))
.WithConnectionTimeoutOf(TimeSpan.FromMilliseconds(500))
);
}
public IEventStoreConnection CreateConnection(bool useCluster)
{
if (useCluster)
{
return ClusterConnection();
}
return SingleConnection();
}
}
Соединение для кластера создается, как описано в документации к хранилищу событий, однако документация не на 100% соответствует последней используемой мной версии nuget.
версия nuget:
<PackageReference Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />
WriteEvents:
public class WriteEvents
{
private ConnectionFactory _connectionFactory;
public WriteEvents(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public async Task Write(bool useCluster, string streamName)
{
using (var conn = _connectionFactory.CreateConnection(useCluster))
{
await conn.ConnectAsync();
for (var x = 0; x < 100; x++)
{
await conn.AppendToStreamAsync(streamName,
ExpectedVersion.Any,
GetEventDataFor(x));
Console.WriteLine("event " + x + " written.");
}
}
}
private IEnumerable<EventData> GetEventDataFor(int i)
{
yield return new EventData(
Guid.NewGuid(),
"MyTestEvent",
true,
Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
);
}
}
ReadEvents:
public class ReadEvents
{
private ConnectionFactory _connectionFactory;
public ReadEvents(ConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
}
public async Task Read(bool useCluster, string streamName)
{
using (var conn = _connectionFactory.CreateConnection(useCluster))
{
await conn.ConnectAsync();
var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
foreach (var evt in slice.Events)
{
Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
}
}
}
}
1 ответ
Это не проблема, связанная с типом клиента. Вы получите то же самое поведение и с не NetCore. Когда клиент пытается подключиться к одному узлу в кластере, EventStore отвечает с IP-адресом, который не доступен. В некоторых сценариях, например, когда вы используете программный оркестратор, такой как Kubernetes или Swarm, ваши клиенты должны быть размещены в одной и той же оверлейной сети, чтобы иметь возможность подключаться к кластеру с помощью Tcp Client. При разработке вашего приложения / микросервиса вы можете использовать строку подключения к одноузловому EventStore, не размещенному в Kubernetes, Swarm или локальном движке Docker. При развертывании приложения в среде Kubernetes, Swarm или Docker-Compose вы можете установить строку подключения вашего приложения с семенами сплетен и использовать ips хостов вашего кластера.
Клиент C#, использующий соединение кластера
namespace TestClusterConnection
{
class Program
{
private const string Stream = "MyTestStream";
static void Main(string[] args)
{
try
{
var useCluster = true;
Console.WriteLine("Start connecting to Eventstore");
Write(useCluster, Stream).Wait();
Console.WriteLine("Start reading events:");
Read(useCluster, Stream).Wait();
Console.WriteLine("Check if events are written and press key to exit");
}
catch (Exception e)
{
Console.WriteLine(e.GetBaseException().Message);
}
Console.ReadKey();
}
public static async Task Read(bool useCluster, string streamName)
{
using (var conn = CreateConnection(useCluster))
{
await conn.ConnectAsync();
var slice = await conn.ReadStreamEventsForwardAsync(streamName, 0, 100, false);
foreach (var evt in slice.Events)
Console.WriteLine($"Received event. Type: '{evt.Event.EventType}', Data: '{Encoding.UTF8.GetString(evt.Event.Data)}'");
}
}
private static async Task Write(bool useCluster, string streamName)
{
using (var conn = CreateConnection(useCluster))
{
await conn.ConnectAsync();
for (var x = 0; x < 100; x++)
{
await conn.AppendToStreamAsync(streamName,
ExpectedVersion.Any,
GetEventDataFor(x));
Console.WriteLine("event " + x + " written.");
}
}
}
private static IEnumerable<EventData> GetEventDataFor(int i)
{
yield return new EventData(
Guid.NewGuid(),
"MyTestEvent",
true,
Encoding.ASCII.GetBytes("{'somedata' : " + i + "}"),
Encoding.ASCII.GetBytes("{'metadata' : " + i + "}")
);
}
private static IEventStoreConnection SingleConnection()
{
return EventStoreConnection.Create(ConnectionSettings.Create(),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
}
private static IEventStoreConnection ClusterConnection()
{
return EventStoreConnection.Create(
ConnectionSettings.Create().KeepRetrying().KeepReconnecting().UseConsoleLogger()
.SetGossipSeedEndPoints(
new IPEndPoint(IPAddress.Parse("172.19.0.2"), 2112),
new IPEndPoint(IPAddress.Parse("172.19.0.3"), 2112),
new IPEndPoint(IPAddress.Parse("172.19.0.4"), 2112))
.SetHeartbeatInterval(TimeSpan.FromSeconds(3))
.SetHeartbeatTimeout(TimeSpan.FromSeconds(6))
.WithConnectionTimeoutOf(TimeSpan.FromSeconds(10))
);
}
private static IEventStoreConnection CreateConnection(bool useCluster)
{
return useCluster ? ClusterConnection() : SingleConnection();
}
}
}
Dockerfile
FROM mono:4.6.2.16
ADD . /home/TestClusterConnection
CMD [ "mono", "home/TestClusterConnection/TestClusterConnection.exe" ]
докер-compose.yaml
version: '3.4'
services:
esclienttest:
image: testclient
build:
context: .
dockerfile: Dockerfile
depends_on:
- eventstore1
- eventstore2
- eventstore3
eventstore1:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore1
ports:
- 1113:1113
- 2112:2112
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.2
eventstore2:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore2
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.3
eventstore3:
image: eventstore/eventstore:release-4.1.0
hostname: eventstore3
environment:
EVENTSTORE_CLUSTER_DNS: eventstore1
EVENTSTORE_CLUSTER_SIZE: 3
EVENTSTORE_CLUSTER_GOSSIP_PORT: 2112
EVENTSTORE_EXT_IP_ADVERTISE_AS: 172.19.0.4
Это только для тестирования, и могут потребоваться дополнительные настройки сети.
Надеюсь это поможет.
Riccardo