Как уменьшить время обработки каждой партии с помощью Spark Streaming?
Моя цель состоит в том, чтобы извлечь данные из Kafka с помощью Spark Streaming, преобразовать данные и сохранить их в корзину S3 в виде файлов Parquet, а также использовать папки на основе даты (разделенные данные для более быстрых запросов в Афинах). Моя основная проблема заключается в том, что в процессе увеличивается количество активных пакетов, и я просто хочу иметь только один активный пакет. У меня проблема с задержкой, я пробовал разные конфигурации и размеры кластеров, чтобы решить каждую ванну за меньшее время, чем общая продолжительность ванны. Например, если у меня есть партия в 5 минут, я хочу разрешить эту партию менее чем за 5 минут и иметь такое поведение для всех партий в течение времени. Другими словами, я хочу:
1) Решать каждую партию быстрее, чем общее время.
2) Сохранить поведение предыдущего пункта для всех партий в течение времени и без ошибок.
3) Использовать как можно меньше ресурсов, затрачивая меньше денег.
Что я имею?
В Kafka у нас почти 200000 сообщений в секунду, и каждое сообщение имеет около 45 полей в Protobuffer, и я конвертирую каждое поле в строку. Что касается вывода, я генерирую около 1 терабайта файлов Parquet в час в Amazon S3. Наша тема Кафка имеет 60 разделов.
Что я пробовал?
- Интервал разных партий. (1 секунда, 10 секунд, 1 минута, 5 минут, 10 минут и т. Д.)
- Увеличьте кластер, вертикально и горизонтально. (Я использую Amazon EMR)
- Различные конфигурации отправки спарка (Макс. Скорость на раздел, противодавление, количество исполнителей и т. Д.)
Наконец, кто-то может порекомендовать мне конфигурацию кластера для решения этой проблемы? С точки зрения количества узлов и вида машин. Кроме того, конфигурация spark-submit в соответствии с этим кластером. Кроме того, я не знаю, могу ли я выполнить оптимизацию в коде Scala (я вставил код ниже), или как найти причину этой проблемы.
Дополнительная информация:
Это одна из команд, которые я пробовал:
spark-submit --deploy-mode cluster --основная пряжа
--conf spark.dynamicAllocation.enabled = false
--num-исполнителей 30
--executor-cores 2
--exe-память 3G
--conf "spark.sql.parquet.mergeSchema = false"
--conf "spark.debug.maxToStringFields = 100"
--packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.3.0
--class "RequestsDstream" RequestsDataLake-assembly-0.1.jar
Это код:
import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Calendar
import com.tap.proto.ProtoMessages
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, date_format}
object SQLContextSingleton {
@transient private var instance: SQLContext = null
def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
object RequestsDstream {
def message_proto(value:Array[Byte]): Map[String, String] = {
try {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val requests_proto = ProtoMessages.SspRequest.parseFrom(value)
val json = Map(
"version" -> (requests_proto.getVersion().toString),
"adunit" -> (requests_proto.getAdunit().toString),
"adunit_original" -> (requests_proto.getAdunitOriginal().toString),
"brand" -> (requests_proto.getBrand().toString),
"country" -> (requests_proto.getCountry().toString),
"device_connection_type" -> (requests_proto.getDeviceConnectionType().toString),
"device_density" -> (requests_proto.getDeviceDensity().toString),
"device_height" -> (requests_proto.getDeviceHeight().toString),
"device_id" -> (requests_proto.getDeviceId().toString),
"device_type" -> (requests_proto.getDeviceType().toString),
"device_width" -> (requests_proto.getDeviceWidth().toString),
"domain" -> (requests_proto.getDomain().toString),
"endpoint" -> (requests_proto.getEndpoint().toString),
"endpoint_type" -> (requests_proto.getEndpointType().toString),
"endpoint_version" -> (requests_proto.getEndpointVersion().toString),
"external_dfp_id" -> (requests_proto.getExternalDfpId().toString),
"id_req" -> (requests_proto.getIdReq().toString),
"ip" -> (requests_proto.getIp().toString),
"lang" -> (requests_proto.getLang().toString),
"lat" -> (requests_proto.getLat().toString),
"lon" -> (requests_proto.getLon().toString),
"model" -> (requests_proto.getModel().toString),
"ncc" -> (requests_proto.getNcc().toString),
"noc" -> (requests_proto.getNoc().toString),
"non" -> (requests_proto.getNon().toString),
"os" -> (requests_proto.getOs().toString),
"osv" -> (requests_proto.getOsv().toString),
"scc" -> (requests_proto.getScc().toString),
"sim_operator_code" -> (requests_proto.getSimOperatorCode().toString),
"size" -> (requests_proto.getSize().toString),
"soc" -> (requests_proto.getSoc().toString),
"son" -> (requests_proto.getSon().toString),
"source" -> (requests_proto.getSource().toString),
"ts" -> (dateFormat.format(new Date(requests_proto.getTs()))).toString,
"user_agent" -> (requests_proto.getUserAgent().toString),
"status" -> (requests_proto.getStatus().toString),
"delivery_network" -> (requests_proto.getDeliveryNetwork().toString),
"delivery_time" -> (requests_proto.getDeliveryTime().toString),
"delivery_status" -> (requests_proto.getDeliveryStatus().toString),
"delivery_network_key" -> (requests_proto.getDeliveryNetworkKey().toString),
"delivery_price" -> (requests_proto.getDeliveryPrice().toString),
"device_id_original" -> (requests_proto.getDeviceIdOriginal().toString),
"tracking_limited" -> (requests_proto.getTrackingLimited().toString),
"from_cache" -> (requests_proto.getFromCache().toString)
)
return json
}catch{
case e:Exception=>
print(e)
return Map("error" -> "error")
}
}
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Requests DStream EMR 10 minutes")
val ssc = new StreamingContext(conf, Seconds(60*10))
val sc = ssc.sparkContext
ssc.checkpoint("/home/data/checkpoint")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = format.format(Calendar.getInstance().getTime())
val group_id = "Request DStream EMR " + date
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "ip.internal:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" -> group_id,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("ssp.requests")
val stream = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams)
)
val events = stream
val sqlContext = SQLContextSingleton.getInstance(sc)
import sqlContext.implicits._
events.foreachRDD(rdd=>{
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//val rdd2 = rdd.repartition(2)
val current = rdd.map(record => (record.key, message_proto(record.value)))
val myDataFrame = current.toDF()
val query = myDataFrame.select(col("_2").as("value_udf"))
.select(col("value_udf")("version").as("version"), col("value_udf")("adunit").as("adunit"), col("value_udf")("adunit_original").as("adunit_original"),
col("value_udf")("brand").as("brand"), col("value_udf")("country").as("country"), col("value_udf")("device_connection_type").as("device_connection_type"),
col("value_udf")("device_density").as("device_density"), col("value_udf")("device_height").as("device_height"),
col("value_udf")("device_id").as("device_id"), col("value_udf")("device_type").as("device_type"), col("value_udf")("device_width").as("device_width"),
col("value_udf")("domain").as("domain"), col("value_udf")("endpoint").as("endpoint"), col("value_udf")("endpoint_type").as("endpoint_type"),
col("value_udf")("endpoint_version").as("endpoint_version"), col("value_udf")("external_dfp_id").as("external_dfp_id"),
col("value_udf")("id_req").as("id_req"), col("value_udf")("ip").as("ip"), col("value_udf")("lang").as("lang"), col("value_udf")("lat").as("lat"),
col("value_udf")("lon").as("lon"), col("value_udf")("model").as("model"), col("value_udf")("ncc").as("ncc"), col("value_udf")("noc").as("noc"),
col("value_udf")("non").as("non"), col("value_udf")("os").as("os"), col("value_udf")("osv").as("osv"), col("value_udf")("scc").as("scc"),
col("value_udf")("sim_operator_code").as("sim_operator_code"), col("value_udf")("size").as("size"), col("value_udf")("soc").as("soc"),
col("value_udf")("son").as("son"), col("value_udf")("source").as("source"), col("value_udf")("ts").as("ts").cast("timestamp"), col("value_udf")("user_agent").as("user_agent"),
col("value_udf")("status").as("status"), col("value_udf")("delivery_network").as("delivery_network"), col("value_udf")("delivery_time").as("delivery_time"),
col("value_udf")("delivery_status").as("delivery_status"), col("value_udf")("delivery_network_key").as("delivery_network_key"),
col("value_udf")("delivery_price").as("delivery_price"), col("value_udf")("device_id_original").as("device_id_original"),
col("value_udf")("tracking_limited").as("tracking_limited"), col("value_udf")("from_cache").as("from_cache"),
date_format(col("value_udf")("ts"), "yyyy").as("date").as("year"),
date_format(col("value_udf")("ts"), "MM").as("date").as("month"),
date_format(col("value_udf")("ts"), "dd").as("date").as("day"),
date_format(col("value_udf")("ts"), "HH").as("date").as("hour"))
try {
query.write.partitionBy("year", "month", "day", "hour")
.mode(SaveMode.Append).parquet("s3a://tap-datalake/ssp.requests")
events.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}catch{
case e:Exception=>
print(e)
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}