Как уменьшить время обработки каждой партии с помощью Spark Streaming?

Моя цель состоит в том, чтобы извлечь данные из Kafka с помощью Spark Streaming, преобразовать данные и сохранить их в корзину S3 в виде файлов Parquet, а также использовать папки на основе даты (разделенные данные для более быстрых запросов в Афинах). Моя основная проблема заключается в том, что в процессе увеличивается количество активных пакетов, и я просто хочу иметь только один активный пакет. У меня проблема с задержкой, я пробовал разные конфигурации и размеры кластеров, чтобы решить каждую ванну за меньшее время, чем общая продолжительность ванны. Например, если у меня есть партия в 5 минут, я хочу разрешить эту партию менее чем за 5 минут и иметь такое поведение для всех партий в течение времени. Другими словами, я хочу:

1) Решать каждую партию быстрее, чем общее время.

2) Сохранить поведение предыдущего пункта для всех партий в течение времени и без ошибок.

3) Использовать как можно меньше ресурсов, затрачивая меньше денег.

Что я имею?

В Kafka у нас почти 200000 сообщений в секунду, и каждое сообщение имеет около 45 полей в Protobuffer, и я конвертирую каждое поле в строку. Что касается вывода, я генерирую около 1 терабайта файлов Parquet в час в Amazon S3. Наша тема Кафка имеет 60 разделов.

Что я пробовал?

  • Интервал разных партий. (1 секунда, 10 секунд, 1 минута, 5 минут, 10 минут и т. Д.)
  • Увеличьте кластер, вертикально и горизонтально. (Я использую Amazon EMR)
  • Различные конфигурации отправки спарка (Макс. Скорость на раздел, противодавление, количество исполнителей и т. Д.)

Наконец, кто-то может порекомендовать мне конфигурацию кластера для решения этой проблемы? С точки зрения количества узлов и вида машин. Кроме того, конфигурация spark-submit в соответствии с этим кластером. Кроме того, я не знаю, могу ли я выполнить оптимизацию в коде Scala (я вставил код ниже), или как найти причину этой проблемы.


Дополнительная информация:

Это одна из команд, которые я пробовал:

spark-submit --deploy-mode cluster --основная пряжа

--conf spark.dynamicAllocation.enabled = false

--num-исполнителей 30

--executor-cores 2

--exe-память 3G

--conf "spark.sql.parquet.mergeSchema = false"

--conf "spark.debug.maxToStringFields = 100"

--packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.3.0

--class "RequestsDstream" RequestsDataLake-assembly-0.1.jar

Это код:

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Calendar
import com.tap.proto.ProtoMessages
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, date_format}


object SQLContextSingleton {
  @transient private var instance: SQLContext = null
  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

object RequestsDstream {

  def message_proto(value:Array[Byte]): Map[String, String] = {
    try {
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val requests_proto = ProtoMessages.SspRequest.parseFrom(value)    
      val json = Map(
        "version" -> (requests_proto.getVersion().toString),
        "adunit" -> (requests_proto.getAdunit().toString),
        "adunit_original" -> (requests_proto.getAdunitOriginal().toString),
        "brand" -> (requests_proto.getBrand().toString),
        "country" -> (requests_proto.getCountry().toString),
        "device_connection_type" -> (requests_proto.getDeviceConnectionType().toString),
        "device_density" -> (requests_proto.getDeviceDensity().toString),
        "device_height" -> (requests_proto.getDeviceHeight().toString),
        "device_id" -> (requests_proto.getDeviceId().toString),
        "device_type" -> (requests_proto.getDeviceType().toString),
        "device_width" -> (requests_proto.getDeviceWidth().toString),
        "domain" -> (requests_proto.getDomain().toString),
        "endpoint" -> (requests_proto.getEndpoint().toString),
        "endpoint_type" -> (requests_proto.getEndpointType().toString),
        "endpoint_version" -> (requests_proto.getEndpointVersion().toString),
        "external_dfp_id" -> (requests_proto.getExternalDfpId().toString),
        "id_req" -> (requests_proto.getIdReq().toString),
        "ip" -> (requests_proto.getIp().toString),
        "lang" -> (requests_proto.getLang().toString),
        "lat" -> (requests_proto.getLat().toString),
        "lon" -> (requests_proto.getLon().toString),
        "model" -> (requests_proto.getModel().toString),
        "ncc" -> (requests_proto.getNcc().toString),
        "noc" -> (requests_proto.getNoc().toString),
        "non" -> (requests_proto.getNon().toString),
        "os" -> (requests_proto.getOs().toString),
        "osv" -> (requests_proto.getOsv().toString),
        "scc" -> (requests_proto.getScc().toString),
        "sim_operator_code" -> (requests_proto.getSimOperatorCode().toString),
        "size" -> (requests_proto.getSize().toString),
        "soc" -> (requests_proto.getSoc().toString),
        "son" -> (requests_proto.getSon().toString),
        "source" -> (requests_proto.getSource().toString),
        "ts" ->  (dateFormat.format(new Date(requests_proto.getTs()))).toString,
        "user_agent" -> (requests_proto.getUserAgent().toString),
        "status" -> (requests_proto.getStatus().toString),
        "delivery_network" -> (requests_proto.getDeliveryNetwork().toString),
        "delivery_time" -> (requests_proto.getDeliveryTime().toString),
        "delivery_status" -> (requests_proto.getDeliveryStatus().toString),
        "delivery_network_key" -> (requests_proto.getDeliveryNetworkKey().toString),
        "delivery_price" -> (requests_proto.getDeliveryPrice().toString),
        "device_id_original" -> (requests_proto.getDeviceIdOriginal().toString),
        "tracking_limited" -> (requests_proto.getTrackingLimited().toString),
        "from_cache" -> (requests_proto.getFromCache().toString)
      )

      return json

    }catch{
      case e:Exception=>            
        print(e)          

        return Map("error" -> "error")
    }
  }

  def main(args: Array[String]){    

    val conf = new SparkConf().setAppName("Requests DStream EMR 10 minutes")
    val ssc = new StreamingContext(conf, Seconds(60*10))
    val sc = ssc.sparkContext
    ssc.checkpoint("/home/data/checkpoint")
    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val date = format.format(Calendar.getInstance().getTime())
    val group_id = "Request DStream EMR " + date
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "ip.internal:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "group.id" -> group_id,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)    
    )

    val topics = Array("ssp.requests")
    val stream = KafkaUtils.createDirectStream[String, Array[Byte]](
      ssc,
      PreferConsistent,
      Subscribe[String, Array[Byte]](topics, kafkaParams)
    )

    val events = stream
    val sqlContext = SQLContextSingleton.getInstance(sc)
    import sqlContext.implicits._

    events.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //val rdd2 = rdd.repartition(2)
      val current =   rdd.map(record => (record.key, message_proto(record.value)))

      val myDataFrame = current.toDF()    
      val query = myDataFrame.select(col("_2").as("value_udf"))
        .select(col("value_udf")("version").as("version"), col("value_udf")("adunit").as("adunit"), col("value_udf")("adunit_original").as("adunit_original"),
          col("value_udf")("brand").as("brand"), col("value_udf")("country").as("country"), col("value_udf")("device_connection_type").as("device_connection_type"),
          col("value_udf")("device_density").as("device_density"), col("value_udf")("device_height").as("device_height"),
          col("value_udf")("device_id").as("device_id"),  col("value_udf")("device_type").as("device_type"),  col("value_udf")("device_width").as("device_width"),
          col("value_udf")("domain").as("domain"),  col("value_udf")("endpoint").as("endpoint"), col("value_udf")("endpoint_type").as("endpoint_type"),
          col("value_udf")("endpoint_version").as("endpoint_version"),  col("value_udf")("external_dfp_id").as("external_dfp_id"),
          col("value_udf")("id_req").as("id_req"),  col("value_udf")("ip").as("ip"), col("value_udf")("lang").as("lang"),  col("value_udf")("lat").as("lat"),
          col("value_udf")("lon").as("lon"),  col("value_udf")("model").as("model"),  col("value_udf")("ncc").as("ncc"),  col("value_udf")("noc").as("noc"),
          col("value_udf")("non").as("non"), col("value_udf")("os").as("os"), col("value_udf")("osv").as("osv"), col("value_udf")("scc").as("scc"),
          col("value_udf")("sim_operator_code").as("sim_operator_code"),  col("value_udf")("size").as("size"),  col("value_udf")("soc").as("soc"),
          col("value_udf")("son").as("son"),  col("value_udf")("source").as("source"),  col("value_udf")("ts").as("ts").cast("timestamp"), col("value_udf")("user_agent").as("user_agent"),
          col("value_udf")("status").as("status"), col("value_udf")("delivery_network").as("delivery_network"),  col("value_udf")("delivery_time").as("delivery_time"),
          col("value_udf")("delivery_status").as("delivery_status"),  col("value_udf")("delivery_network_key").as("delivery_network_key"),
          col("value_udf")("delivery_price").as("delivery_price"),  col("value_udf")("device_id_original").as("device_id_original"),
          col("value_udf")("tracking_limited").as("tracking_limited"),  col("value_udf")("from_cache").as("from_cache"),
          date_format(col("value_udf")("ts"), "yyyy").as("date").as("year"),
          date_format(col("value_udf")("ts"), "MM").as("date").as("month"),
          date_format(col("value_udf")("ts"), "dd").as("date").as("day"),
          date_format(col("value_udf")("ts"), "HH").as("date").as("hour"))

      try {
        query.write.partitionBy("year", "month", "day", "hour")
          .mode(SaveMode.Append).parquet("s3a://tap-datalake/ssp.requests")
        events.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }catch{
        case e:Exception=>              
          print(e)
      }
    })

    ssc.start()

    ssc.awaitTermination()

    ssc.stop()

  }
}

Потоковая статистика

Завершенные партии

Детали для запроса

Исполнители

Среда

введите описание изображения здесь

введите описание изображения здесь

введите описание изображения здесь

0 ответов

Другие вопросы по тегам