Spark Streaming Kinesis Integration: Ошибка при инициализации LeaseCoordinator в Worker
У меня были некоторые проблемы при запуске простой ванильной потоковой передачи с приложением Kinesis в Scala. Я придерживался основных указаний в некоторых руководствах, таких как Snowplow и WordCountASL.
Но я все еще не могу заставить его работать из-за этой ошибки Kinesis Worker:
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
Вот мой пример кода:
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
И моя политика IAM выглядит так:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
Я не могу понять, что не так с этим приложением. Любое руководство по этому вопросу будет оценено.
2 ответа
Существуют и другие конструкторы для DStreams, которые позволяют передавать ключ доступа AWS и секретный ключ.
1-й и 5-й конструкторы в приведенной ниже ссылке, например, позволят вам передать их в конструктор (и он должен быть передан через вашу систему) против необходимости устанавливать системное свойство.
В конце концов я заставляю его работать, устанавливая учетные значения в свойствах системы.
System.setProperty("aws.accessKeyId","XXXXXX")
System.setProperty("aws.secretKey","XXXXXX")
И все же я не совсем "доволен" этим решением.
Считаете ли вы, что есть какие-либо проблемы, касающиеся этого подхода?