欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink批量处理之DataSet

程序员文章站 2022-07-14 13:48:54
...

Flink批量处理之DataSet

flink不仅可以支持实时流式处理,它也可以支持批量处理,其中批量处理也可以看作是实时处理的一个特殊情况

1、dataSet的内置数据源

基于文件数据源

  • readTextFile(path) / TextInputFormat:逐行读取文件并将其作为字符串(String)返回
  • readTextFileWithValue(path) / TextValueInputFormat:逐行读取文件并将其作为StringValue返回。StringValue是Flink对String的封装,可变、可序列化,一定程度上提高性能。 解析以逗号(或其他字符)分隔字段的文件。返回元组或pojo
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat 跟readCsvFile类似,只不过以原生类型返回而不是Tuple。 读取SequenceFile,以Tuple2<Key, Value>返回

基于集合数据源:

fromCollection(Collection) 
fromCollection(Iterator, Class) 
fromElements(T ...) 
fromParallelCollection(SplittableIterator, Class) 
generateSequence(from, to)

通用数据源:

readFile(inputFormat, path) / FileInputFormat 
createInput(inputFormat) / InputFormat

1.1、文件数据源

入门案例就是基于文件数据源,如果需要对文件夹进行递归,那么我们也可以使用参数来对文件夹下面的多级文件夹进行递归

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}
object BatchOperate {
  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\count.txt"
    val outPut = "D:\\data\\result2"

val configuration: Configuration = new Configuration()
configuration.setBoolean("recursive.file.enumeration",true)

    //获取程序入口类ExecutionEnvironment
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath) .withParameters(configuration)

    //引入隐式转换
    import org.apache.flink.api.scala._
    val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
    value.writeAsText("d:\\datas\\result.txt").setParallelism(1)
    env.execute("batch word count")
  }
}

1.2、集合数据源

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object DataSetSource {

  def main(args: Array[String]): Unit = {
    //获取批量处理程序入口类ExecutionEnvironment
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    //从集合当中创建dataSet
    val myArray = Array("hello world","spark flink")
    val collectionSet: DataSet[String] = environment.fromCollection(myArray)
    val result: AggregateDataSet[(String, Int)] = collectionSet.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
    result.setParallelism(1).print()
   // result.writeAsText("c:\\HELLO.TXT")
    environment.execute()
  }

}

1.3、Flink的dataSet connectors

文件系统connector
为了从文件系统读取数据,Flink内置了对以下文件系统的支持:

文件系统 Schema 备注
HDFS hdfs:// Hdfs文件系统
S3 s3:// 通过hadoop文件系统实现支持
MapR maprfs:// 需要用户添加jar
Alluxio alluxio:// 通过hadoop文件系统实现

注意:Flink允许用户使用实现org.apache.hadoop.fs.FileSystem接口的任何文件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种文件系统

Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码:
使用Hadoop Writable data type
使用任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
使用任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
使用Hadoop Mapper作为FlatMapFunction
使用Hadoop Reducer作为GroupReduceFunction

2、dataSet的算子介绍

官网算子介绍

2.1、dataSet的transformation算子

Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
FlatMap:输入一个元素,可以返回零个,一个或者多个元素
MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
Aggregate:sum、max、min等
Distinct:返回一个数据集中去重之后的元素,data.distinct()
Join:内连接
OuterJoin:外链接

(1)使用mapPartition将数据保存到数据库

第一步:导入mysql的jar包坐标

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
</dependency>

第二步:创建mysql数据库以及数据库表

/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @[email protected]@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @[email protected]@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @[email protected]@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @[email protected]@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`flink_db` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `flink_db`;

/*Table structure for table `user` */

DROP TABLE IF EXISTS `user`;

CREATE TABLE `user` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

第三步:代码开发

import java.sql.PreparedStatement
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object MapPartition2MySql {
  def main(args: Array[String]): Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val sourceDataset: DataSet[String] = environment.fromElements("1 zhangsan","2 lisi","3 wangwu")
    sourceDataset.mapPartition(part => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_db", "root", "123456")
      part.map(x => {
        val statement: PreparedStatement = conn.prepareStatement("insert into user (id,name) values(?,?)")
        statement.setInt(1, x.split(" ")(0).toInt)
        statement.setString(2, x.split(" ")(1))
        statement.execute()
      })
    }).print()
    environment.execute()

  }
}

(2)连接操作

左外连接,右外连接,满外连接等算子的操作可以实现对两个dataset进行join操作,按照我们指定的条件进行join

object BatchDemoOuterJoinScala {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val data1 = ListBuffer[Tuple2[Int,String]]()
    data1.append((1,"zs"))
    data1.append((2,"ls"))
    data1.append((3,"ww"))


    val data2 = ListBuffer[Tuple2[Int,String]]()
    data2.append((1,"beijing"))
    data2.append((2,"shanghai"))
    data2.append((4,"guangzhou"))

    val text1 = env.fromCollection(data1)
    val text2 = env.fromCollection(data2)

    text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
      if(second==null){
        (first._1,first._2,"null")
      }else{
        (first._1,first._2,second._2)
      }
    }).print()

    println("===============================")

    text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
      if(first==null){
        (second._1,"null",second._2)
      }else{
        (first._1,first._2,second._2)
      }
    }).print()


    println("===============================")

    text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
      if(first==null){
        (second._1,"null",second._2)
      }else if(second==null){
        (first._1,first._2,"null")
      }else{
        (first._1,first._2,second._2)
      }
    }).print()

  }

}

2.2、dataSet的partition算子

Rebalance:对数据集进行再平衡,重分区,消除数据倾斜
Hash-Partition:根据指定key的哈希值对数据集进行分区.partitionByHash()
Range-Partition:根据指定的key对数据集进行范围分区 .partitionByRange()
Custom Partitioning:自定义分区规则,自定义分区需要实现Partitioner接口partitionCustom(partitioner, “someKey”)或者partitionCustom(partitioner, 0)

在flink批量处理当中,分区算子主要涉及到rebalance,partitionByHash
,partitionByRange以及partitionCustom来进行分区

object FlinkPartition {
  def main(args: Array[String]): Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    environment.setParallelism(2)
    import org.apache.flink.api.scala._
    val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hive sqoop")
    val filterSet: DataSet[String] = sourceDataSet.filter(x => x.contains("hello"))
      .rebalance()
    filterSet.print()
    environment.execute()
  }
}

自定义分区来实现数据分区操作
第一步:自定义分区scala的class类

import org.apache.flink.api.common.functions.Partitioner

class MyPartitioner2  extends Partitioner[String]{
  override def partition(word: String, num: Int): Int = {
    println("分区个数为" +  num)
    if(word.contains("hello")){
      println("0号分区")
      0
    }else{
      println("1号分区")
      1
    }
  }
}

第二步:代码实现

import org.apache.flink.api.scala.ExecutionEnvironment

object FlinkCustomerPartition {
  def main(args: Array[String]): Unit = {
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
      //设置我们的分区数,如果不设置,默认使用CPU核数作为分区个数
    environment.setParallelism(2)
    import  org.apache.flink.api.scala._
    //获取dataset
    val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop")
    val result: DataSet[String] = sourceDataSet.partitionCustom(new MyPartitioner2,x => x + "")
    val value: DataSet[String] = result.map(x => {
      println("数据的key为" + x + "线程为" + Thread.currentThread().getId)
      x
    })
    value.print()
    environment.execute()
  }
}

2.3、dataSet的sink算子

1、writeAsText() / TextOutputFormat:以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的
2、writeAsFormattedText() / TextOutputFormat:以字符串的形式逐行写入元素。字符串是通过为每个元素调用用户定义的format()方法获得的。
3、writeAsCsv(...) / CsvOutputFormat:将元组写入以逗号分隔的文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
4、print() / printToErr() / print(String msg) / printToErr(String msg) ()(注: 线上应用杜绝使用,采用抽样打印或者日志的方式)
5、write() / FileOutputFormat
6、output()/ OutputFormat:通用的输出方法,用于不基于文件的数据接收器(如将结果存储在数据库中)。

3、dataSet的参数传递

在dataSet代码当中,经常用到一些参数,我们可以通过构造器的方式传递参数,或者使用withParameters方法来进行参数传递,或者使用ExecutionConfig来进行参数传递

3.1、使用构造器来传递参数

object FlinkParameter {

  def main(args: Array[String]): Unit = {
    val env=ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")
    val filterSet: DataSet[String] = sourceSet.filter(new MyFilterFunction("test"))
    filterSet.print()
    env.execute()
  }
}

class MyFilterFunction (parameter:String) extends FilterFunction[String]{
  override def filter(t: String): Boolean = {
    if(t.contains(parameter)){
      true
    }else{
      false
    }
  }
}

3.2、使用withParameters来传递参数

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object FlinkParameter {
  def main(args: Array[String]): Unit = {
    val env=ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")
    val configuration = new Configuration()
    configuration.setString("parameterKey","test")
    val filterSet: DataSet[String] = sourceSet.filter(new MyFilter).withParameters(configuration)
    filterSet.print()
    env.execute()
  }
}
class MyFilter extends RichFilterFunction[String]{
  var value:String ="";
  override def open(parameters: Configuration): Unit = {
    value = parameters.getString("parameterKey","defaultValue")
  }
  override def filter(t: String): Boolean = {
    if(t.contains(value)){
      true
    }else{
      false
    }
  }
}

3.3、全局参数传递

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object FlinkParameter {
  def main(args: Array[String]): Unit = {
    val configuration = new Configuration()
    configuration.setString("parameterKey","test")

    val env=ExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setGlobalJobParameters(configuration)
    import org.apache.flink.api.scala._
    val sourceSet: DataSet[String] = env.fromElements("hello world","abc test")

    val filterSet: DataSet[String] = sourceSet.filter(new MyFilter)
    filterSet.print()
    env.execute()
  }
}
class MyFilter extends RichFilterFunction[String]{
  var value:String ="";
  override def open(parameters: Configuration): Unit = {
    val parameters: ExecutionConfig.GlobalJobParameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters

    val globalConf:Configuration =  parameters.asInstanceOf[Configuration]
    value = globalConf.getString("parameterKey","test")
  }
  override def filter(t: String): Boolean = {
    if(t.contains(value)){
      true
    }else{
      false
    }
  }
}
相关标签: Hadoop生态框架