Как сделать так, чтобы действующий на одном процессе актер отправлял сообщение другому действующему на отдельном процессе?
Я хочу, чтобы субъекты, работающие в разных процессах (или узлах), отправляли сообщения другим субъектам, работающим в разных процессах (или узлах), и все это при сохранении отказоустойчивости и балансировки нагрузки. В настоящее время я пытаюсь использовать функцию шардинга Akka.Cluster для достижения этой цели.
Тем не менее, я не уверен, как это сделать...
У меня есть следующий код, который отражает мой начальный узел:
let configurePort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
auto-down-unreachable-after = 5s
seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
}
persistence {
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2551)
let shardRegion1 = spawnSharded id system1 "shardRegion1" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system2 = System.create "cluster-system" (configurePort 2552)
let shardRegion2 = spawnSharded id system2 "shardRegion2" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system3 = System.create "cluster-system" (configurePort 2553)
let shardRegion3 = spawnSharded id system3 "shardRegion3" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(3000)
// NOTE: Even thou we sent all messages through single shard region,
// some of them will be executed on the second and third one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-1", "entity-2", "hello world 2")
shardRegion1 <! ("shard-2", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")
System.Threading.Thread.Sleep(1000)
let printShards shardRegion =
async {
let! (reply:AskResult<ShardRegionStats>) = (retype shardRegion) <? GetShardRegionStats.Instance
let (stats: ShardRegionStats) = reply.Value
for kv in stats.Stats do
printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value
} |> Async.RunSynchronously
let printNodes() =
printfn "\nShards active on node 'localhost:2551':"
printShards shardRegion1
printfn "\nShards active on node 'localhost:2552':"
printShards shardRegion2
printfn "\nShards active on node 'localhost:2553':"
printShards shardRegion3
printNodes()
Вывод выглядит примерно так:
Shards active on node 'localhost:2551':
Shard 'shard-1' has 2 entities on it
Shard 'shard-2' has 2 entities on it
Осколки, активные на узле localhost:2552:
Затем у меня есть отдельный процесс, который выполняет следующий код:
let configurePort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = "0"
}
}
cluster {
auto-down-unreachable-after = 5s
seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
}
persistence {
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configurePort 2554)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system2 = System.create "cluster-system" (configurePort 2555)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
System.Threading.Thread.Sleep(1000)
let system3 = System.create "cluster-system" (configurePort 2556)
let shardRegion3 = spawnSharded id system3 "printer" <| props (actorOf2 consumer)
Моя кластерная система (запущенная в отдельном процессе) распознает новые узлы, которые присоединяются:
> [INFO][3/15/2017 9:12:13 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52953] is JOINING, roles []
[INFO][3/15/2017 9:12:14 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52956] is JOINING, roles []
[INFO][3/15/2017 9:12:15 PM][Thread 0054][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Node [akka.tcp://cluster-system@localhost:52961] is JOINING, roles []
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52953] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52956] to [Up]
[INFO][3/15/2017 9:12:18 PM][Thread 0055][[akka://cluster-system/system/cluster/core/daemon#2086121649]] Leader is moving node [akka.tcp://cluster-system@localhost:52961] to [Up]
Заключение:
В заключение я хочу, чтобы участники, работающие в различных процессах (или узлах), отправляли сообщения другим субъектам, работающим в разных процессах (или узлах), поддерживая отказоустойчивость и балансировку нагрузки. В настоящее время я пытаюсь использовать функцию шардинга Akka.Cluster для достижения этой цели.
Приложение:
open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif
#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r @"C:\Users\Snimrod\Documents\Visual Studio 2015\Projects\Temp\packages\Akka.FSharp.1.1.3\lib\net45\Akka.FSharp.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"
open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence
open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion
1 ответ
Чтобы обеспечить согласованное представление о шардах и их расположении, постоянный бэкэнд Akka.Cluster.Sharding должен указывать на базу данных, которая видна всем процессам. В вашей конфигурации вы используете akka.persistence.journal.inmem
хранилище данных в памяти (используется только для тестирования и разработки). Это не будет видно из других процессов.
Вам необходимо настроить постоянный бэкэнд, чтобы осколки были видны между узлами, расположенными на разных машинах / процессах. Вы можете сделать это, например, используя Akka.Persistence.SqlServer или любой другой плагин. Это самая базовая конфигурация для вашего постоянного интерфейса, используемая только с помощью шардинга:
akka.persistence {
journal {
plugin = "akka.persistence.journal.sql-server"
sql-server {
connection-string = "<connection-string>"
auto-initialize = on
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sql-server"
sql-server {
connection-string = "<connection-string>"
auto-initialize = on
}
}
}
Для чего-то более практичного, пожалуйста, обратитесь к этой статье.
Также имейте в виду, что плагины Akka.Cluster.Sharding и Akka.Persistence доступны только в предварительном режиме (поэтому вам нужно установить пакет с флагом -pre).