Spark Streaming Kinesis Integration: Ошибка при инициализации LeaseCoordinator в Worker

У меня были некоторые проблемы при запуске простой ванильной потоковой передачи с приложением Kinesis в Scala. Я придерживался основных указаний в некоторых руководствах, таких как Snowplow и WordCountASL.

Но я все еще не могу заставить его работать из-за этой ошибки Kinesis Worker:

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
    ... 4 more

Вот мой пример кода:

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

/**
  * Created by franco on 11/11/16.
  */
object TestApp {
  // === Configurations for Kinesis streams ===
  val awsAccessKeyId = "XXXXXX"
  val awsSecretKey = "XXXXXXX"
  val kinesisStreamName = "MyStream"
  val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
  val appName = "MyAppName"

  def main(args: Array[String]): Unit = {

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)

    val provider = new StaticCredentialsProvider(credentials)

    val kinesisClient = new AmazonKinesisClient(provider)
    kinesisClient.setEndpoint(kinesisEndpointUrl)

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()

    val streams = shards

    val batchInterval = Milliseconds(2000)

    val kinesisCheckpointInterval = batchInterval

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName

    val cores : Int = Runtime.getRuntime.availableProcessors()
    println("Available Cores : " + cores.toString)
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
    val ssc = new StreamingContext(config, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until streams).map { i =>
      KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
    }

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }


}

И моя политика IAM выглядит так:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:region:account:stream/name"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DeleteItem",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Scan",
                "dynamodb:UpdateItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:region:account:table/name"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

Я не могу понять, что не так с этим приложением. Любое руководство по этому вопросу будет оценено.

2 ответа

Решение

Существуют и другие конструкторы для DStreams, которые позволяют передавать ключ доступа AWS и секретный ключ.

1-й и 5-й конструкторы в приведенной ниже ссылке, например, позволят вам передать их в конструктор (и он должен быть передан через вашу систему) против необходимости устанавливать системное свойство.

KinesisUtil Constructors

В конце концов я заставляю его работать, устанавливая учетные значения в свойствах системы.

System.setProperty("aws.accessKeyId","XXXXXX")
System.setProperty("aws.secretKey","XXXXXX")

И все же я не совсем "доволен" этим решением.

Считаете ли вы, что есть какие-либо проблемы, касающиеся этого подхода?

Другие вопросы по тегам