欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 流处理 API_Sink

程序员文章站 2022-06-17 09:39:38
...

Sink

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

  stream.addSink(new MySink(xxxx)) 

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。   

Flink 流处理 API_Sink

 

1 Kafka

pom.xml

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

    <version>1.7.2</version>

</dependency>

主函数中添加 sink 具体代码

package com.imau.edu.sinkTest

import java.util.Properties

import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}


object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    //    val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

    // Transform操作

    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString  // 转成String方便序列化输出
        }
      )

    // sink
    dataStream.addSink(new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
    dataStream.print()

    env.execute("kafka sink test")
  }
}

 

2 Redis

pom.xml

<dependency>

    <groupId>org.apache.bahir</groupId>

    <artifactId>flink-connector-redis_2.11</artifactId>

    <version>1.0</version>

</dependency>

定义一个 redis mapper 类,用于定义保存到 redis 时调用的命令:

package com.imau.edu.sinkTest

import com.imau.edu.Flink_StreamAPI.SensorReading

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile("F:\\IDEA-DATA\\Flink_Demo\\source.txt")

    // transform
    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
        }
      )

    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("localhost")
      .setPort(6379)
      .build()

    // sink
    dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )

    env.execute("redis sink test")
  }
}

class MyRedisMapper() extends RedisMapper[SensorReading]{

  // 定义保存数据到redis的命令
  override def getCommandDescription: RedisCommandDescription = {
    // 把传感器id和温度值保存成哈希表 HSET key field value
    new RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )
  }

  // 定义保存到redis的value
  override def getValueFromData(t: SensorReading): String = t.temperature.toString

  // 定义保存到redis的key
  override def getKeyFromData(t: SensorReading): String = t.id
}

 

3 Elasticsearch  

pom.xml

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>

    <version>1.7.2</version>

</dependency>

具体代码

package com.imau.edu.sinkTest

import java.util

import com.imau.edu.Flink_StreamAPI.SensorReading


import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")

    // transform
    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
        }
      )

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    // 创建一个esSink 的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
          println("saving data: " + element)
          // 包装成一个Map或者JsonObject
          val json = new util.HashMap[String, String]()
          json.put("sensor_id", element.id)
          json.put("temperature", element.temperature.toString)
          json.put("ts", element.timestamp.toString)

          // 创建index request,准备发送数据
          val indexRequest = Requests.indexRequest()
            .index("sensor")
            .`type`("readingdata")
            .source(json)

          // 利用index发送请求,写入数据
          indexer.add(indexRequest)
          println("data saved.")
        }
      }
    )

    // sink
    dataStream.addSink( esSinkBuilder.build() )

    env.execute("es sink test")
  }
}

 

 

4 JDBC 自定义 sink

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.44</version>

</dependency>

具体代码

package com.imau.edu.sinkTest


import java.sql.{Connection, DriverManager, PreparedStatement}

import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // source
    val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")

    // transform
    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        }
      )

    // sink
    dataStream.addSink( new MyJdbcSink() )

    env.execute("jdbc sink test")
  }
}

class MyJdbcSink() extends RichSinkFunction[SensorReading]{
  // 定义sql连接、预编译器
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  // 初始化,创建连接和预编译语句
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
    insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")
    updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
  }


  // 调用连接,执行sql
  override def invoke(value: SensorReading): Unit = {
    // 执行更新语句
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果update没有查到数据,那么执行插入语句
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  // 关闭时做清理工作
  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

在 main 方法中增加,把明细保存到 mysql 中

 dataStream.addSink(new MyJdbcSink())