欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka数据写入Hudi

程序员文章站 2022-07-14 21:50:11
...

1.配置pom.xml文件

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-client</artifactId>
  <version>0.5.3</version>
</dependency>

<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-spark-bundle_2.12</artifactId>
  <version>0.5.3</version>
</dependency>

<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-common</artifactId>
  <version>0.5.3</version>
</dependency>

<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-hadoop-mr-bundle</artifactId>
  <version>0.5.3</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_2.12</artifactId>
  <version>2.4.5</version>
</dependency>

2.main方法入口

数据结构
database:test,table:t_stu,eventType:INSERT,data:id_10-name_zhangsan&id_11-name_lisi
database:test,table:t_stu,eventType:UPDATE,data:id_8-name_xixi id_8-name_new&id_9-name_xixi id_9-name_new
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

import scala.collection.mutable.ArrayBuffer

object KafkaToHudi {
  def main(args: Array[String]): Unit = {
    val sc = new SparkConf().setAppName("KafkaToHudi").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(sc,Seconds(10))
    ssc.checkpoint("/checkPoint");
    val topicsSet = Array("canalDemo")
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "slave1:9092,slave2:9092,slave3:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "test",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

    val streams = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    val message: DStream[(String, String, String, String, String, String, String, String)] = streams.map(_.value()).flatMap(line => {
      val commitTime = System.currentTimeMillis().toString;
      val datas = line.split(",");
      val database = datas(0).split(":")(1);
      val table = datas(1).split(":")(1);
      val eventType = datas(2).split(":")(1);
      val updateStatement = datas(3).split(":")(1).split("&");
      val tuples: Array[(String, String, String)] = updateStatement.map(statement => {
        var beforeData = "";
        var afterData = "";
        var id="";
        val dataArr = statement.split(" ");
        if(dataArr.length>1){
          id = dataArr(1).split("-")(0).split("_")(1);
          beforeData = dataArr(0);
          afterData = dataArr(1);
        }else{
          id = dataArr(0).split("-")(0).split("_")(1);
          afterData = dataArr(0)
        }
        (beforeData, afterData ,id)
      })
      val result = tuples.map(x => {
        val uuid = commitTime+"/"+x._3;
        (database, table, eventType, x._1 , x._2 ,x._3,commitTime,uuid);
      })
      result
    });
    val spark = SparkSession.builder()
      .config(sc)
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._

    System.setProperty("HADOOP_USER_NAME","root");
    val sparkContext = spark.sparkContext
    sparkContext.hadoopConfiguration.set("fs.defaultFS","hdfs://master")
    sparkContext.hadoopConfiguration.set("dfs.nameservices","master")

    val schema = StructType(Array(StructField("database",StringType,false),
      StructField("table",StringType,false),
      StructField("eventType",StringType,false),
      StructField("beforeData",StringType,false),
      StructField("afterData",StringType,false),
      StructField("id",StringType,false),
      StructField("ts",StringType,false),
      StructField("uuid",StringType,false)));

    message.foreachRDD((rdd) => {
      val res:RDD[Row] = rdd.map { row =>
        val buffer = ArrayBuffer.empty[Any]
        buffer.append(row._1)
        buffer.append(row._2)
        buffer.append(row._3)
        buffer.append(row._4)
        buffer.append(row._5)
        buffer.append(row._6)
        buffer.append(row._7)
        buffer.append(row._8)
        Row.fromSeq(buffer)
      }
      val df = spark.createDataFrame(res, schema)
      df.show();
      if(!df.isEmpty){
        df.write.format("org.apache.hudi")
          .option(PRECOMBINE_FIELD_OPT_KEY, "ts") // 提交时间列
          .option(RECORDKEY_FIELD_OPT_KEY, "uuid") // 指定uuid唯一标示列
          .option(PARTITIONPATH_FIELD_OPT_KEY, "")
          .option(TABLE_NAME, "testTable")
          .mode(SaveMode.Append)
          .save("/sparkHudi")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

如有问题,欢迎一起交流讨论。