欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink流处理-DataStream常用Source及Sink

程序员文章站 2022-06-17 09:36:03
...

环境

flink-1.9.0

一、需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

二、初始化执行环境读取数据文件

数据文件word

how	are	you
world	and	that
hello	world
jack	and
app	storm	storm	what
spark	spark

初始化流处理执行环境

 /**
  * 初始化流处理执行环境
  */
 private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

三、常用Source

1.socketTextStream
从socket套接字中获取数据

/**
 * 从socket套接字中获取数据
 */
public static void socketTextStream() throws Exception {
    // 从本地socket套接字中读取数据
    DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
    // 打印输入的内容
    dataStream.print();
    // 执行任务
    env.execute();
}

打开CMD,运行netcat,输入nc -l -p 9999后运行该程序,等待连接完成后输入一些内容,可以看到在控制台中输出了socket中输入的内容
Flink流处理-DataStream常用Source及Sink
2.readTextFile(path)
从文本文件中读取数据作为数据源

/**
 * 从文本文件中读取数据作为数据源
 * 注意:文件可以是本地文件,也可以是hdfs中的文件,只需要指定对应的路径即可
 */
public static void readTextFile() throws Exception {
    // 从本地文本文件中读取数据
    DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
    // 从hdfs文件系统中读取数据
    //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");

    // 将文本中每行单词切分成单个单词并收集
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            String[] words = s.split("\t");
            for(String word: words){
                collector.collect(word);
            }
        }
    }).print();
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink
3.generateSequence(from, to)
将序列作为数据源

/**
 * 从生成序列中读取数据作为数据源
 */
public static void generateSequence() throws Exception {
    // 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
    env.setParallelism(1);
    // 生成1-10的序列并输出
    env.generateSequence(1, 10).print();
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink
4.fromCollection(Seq)
将集合中的数据当作数据源

/**
 * 从Java.util.Collection集合中读取数据作为数据源
 */
public static void fromCollection() throws Exception {
    ArrayList<String> list = new ArrayList<>(5);
    list.add("flink");
    list.add("scala");
    list.add("spark");
    list.add("hadoop");
    list.add("hive");
    env.fromCollection(list).print();
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink
5.fromElements(elements: _*)
将一堆元素作为数据源

/**
 * 从Java.util.Collection集合中读取数据作为数据源
 */
public static void fromElements() throws Exception {
    env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink
6.addSource
自定义数据源
MySQL表中的数据

1	hello	1
2	hi	3
3	flink	1
4	scala	1
5	spark	1
6	hadoop	1
7	hive	1

Mysql自定义数据源类

package cn.myclass.stream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
 * 自定义mysql数据源
 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
 * 的open以及close方法更加合理的利用资源读取数据。
 * @author Yang
 */
public class MysqlDataSource extends RichSourceFunction<String> {

    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        //创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "select word from word";
        // 获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 读取数据时执行此方法,从查询结果中依次获得单词
     * @param sourceContext 数据源上下文对象
     */
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        // 执行查询获得结果
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()){
            // 将结果添加到收集器中
            sourceContext.collect(resultSet.getString("word"));
        }
    }

    /**
     * 取消任务时执行
     */
    @Override
    public void cancel() {
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

调用方法

/**
 * 从自定义数据源中读取数据
 */
public static void addSource() throws Exception {
    // 添加自定义数据源并打印读取的数据
    env.addSource(new MysqlDataSource()).print();
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink

四、常用Sink

1.print
测试时常用的方法,将结果直接输出到标准输出设备(控制台)。略

2.writeAsText
保存为文本文件

 /**
  * 将结果写入到文本文件中
  */
 public static void writeAsText() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 设置并行度为1,将结果写入到一个文件中
     env.setParallelism(1);
     // 将结果写入到hdfs中
     //dataStream.writeAsText("hdfs://master:9000/words.txt");
     // 将结果写到本地文本文件中
     dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
     // 执行任务
     env.execute();
 }

运行结果
Flink流处理-DataStream常用Source及Sink

3.writeAsCsv
保存为csv文件

/**
 * 将结果写入到csv文件中
 * 注意:将结果写入到csv只支持元组类型的数据,所以在这里将结果转化成了元组并无实际意义
 */
public static void writeAsCsv() throws Exception {
    // 从本地文本文件中读取数据
    DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
    // 设置并行度
    env.setParallelism(1);
    // 将单词转化成元组
    dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] words = s.split("\t");
            for(String word: words){
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
    // 执行任务
    env.execute();
}

运行结果
Flink流处理-DataStream常用Source及Sink
4.writeToSocket
输出到套接字

 /**
  * 将结果写入到socket套接字中
  */
 public static void writeToSocket() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 将结果写入到socket套接字中,以简单字符串类型发送
     dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
     // 执行任务
     env.execute();
 }

打开CMD,运行netcat,输入nc -lp 9999等待接收运行结果后执行该方法,可以看到nc中输出的结果
Flink流处理-DataStream常用Source及Sink
5.addSink
自定义Mysql数据沉槽类

package cn.myclass.stream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * 自定义Mysql数据沉槽
 * @author Yang
 */
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        // 创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "insert into word(word,count) values(?,?)";
        // 预编译语句并获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 每条结果执行的方法
     * @param tuple2 元组数据
     * @param context 上下文
     */
    @Override
    public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
        // 设置sql语句中的第一个和第二个值
        preparedStatement.setString(1, tuple2.f0);
        preparedStatement.setInt(2, tuple2.f1);
        // 执行插入
        preparedStatement.executeUpdate();
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

调用方法

 /**
  * 将结果写入自定义数据沉槽
  */
 public static void addSink() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 将单词形成元组并设置次数为1
     dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                 @Override
                 public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                     String[] words = s.split("\t");
                     for(String word: words){
                         collector.collect(new Tuple2<>(word, 1));
                     }
                 }
                 // 写入MySql中
             }).addSink(new MysqlDataSink());
     // 执行任务
     env.execute();
 }

运行结果
Flink流处理-DataStream常用Source及Sink

五、完整代码

自定义Mysql数据源

package cn.myclass.stream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
 * 自定义mysql数据源
 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
 * 的open以及close方法更加合理的利用资源读取数据。
 * @author Yang
 */
public class MysqlDataSource extends RichSourceFunction<String> {

    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        //创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "select word from word";
        // 获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 读取数据时执行此方法,从查询结果中依次获得单词
     * @param sourceContext 数据源上下文对象
     */
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        // 执行查询获得结果
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()){
            // 将结果添加到收集器中
            sourceContext.collect(resultSet.getString("word"));
        }
    }

    /**
     * 取消任务时执行
     */
    @Override
    public void cancel() {
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

自定义Mysql数据沉槽

package cn.myclass.stream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * 自定义Mysql数据沉槽
 * @author Yang
 */
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        // 创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "insert into word(word,count) values(?,?)";
        // 预编译语句并获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 每条结果执行的方法
     * @param tuple2 元组数据
     * @param context 上下文
     */
    @Override
    public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
        // 设置sql语句中的第一个和第二个值
        preparedStatement.setString(1, tuple2.f0);
        preparedStatement.setInt(2, tuple2.f1);
        // 执行插入
        preparedStatement.executeUpdate();
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

主类

package cn.myclass.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

/**
 * Flink流处理常用数据源及数据沉槽
 * @author Yang
 */
public class DataStreamSourceAndSink {

    /**
     * 初始化流处理执行环境
     */
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    /**
     * 从socket套接字中获取数据作为数据源
     */
    public static void socketTextStream() throws Exception {
        // 从本地socket套接字中读取数据
        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
        // 打印输入的内容
        dataStream.print();
        // 执行任务
        env.execute();
    }

    /**
     * 从文本文件中读取数据作为数据源
     * 文本文件可以是本地文件,也可以是hdfs中的文件,只需要指定路径即可
     */
    public static void readTextFile() throws Exception {
        // 从本地文本文件中读取数据
        DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 从hdfs文件系统中读取数据
        //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");

        // 将文本中每行单词切分成单个单词并收集
        dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split("\t");
                for(String word: words){
                    collector.collect(word);
                }
            }
        }).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从生成序列中读取数据作为数据源
     */
    public static void generateSequence() throws Exception {
        // 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
        env.setParallelism(1);
        // 生成1-10的序列并输出
        env.generateSequence(1, 10).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从Java.util.Collection集合中读取数据作为数据源
     */
    public static void fromCollection() throws Exception {
        ArrayList<String> list = new ArrayList<>(5);
        list.add("flink");
        list.add("scala");
        list.add("spark");
        list.add("hadoop");
        list.add("hive");
        env.fromCollection(list).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从Java.util.Collection集合中读取数据作为数据源
     */
    public static void fromElements() throws Exception {
        env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
        // 执行任务
        env.execute();
    }

    /**
     * 从自定义数据源中读取数据
     */
    public static void addSource() throws Exception {
        // 添加自定义数据源并打印读取的数据
        env.addSource(new MysqlDataSource()).print();
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到文本文件中
     */
    public static void writeAsText() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 设置并行度为1,将结果写入到一个文件中
        env.setParallelism(1);
        // 将结果写入到hdfs中
        //dataStream.writeAsText("hdfs://master:9000/words.txt");
        // 将结果写到本地文本文件中
        dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到csv文件中
     * 注意:将结果写入到csv只支持元组类型的数据
     */
    public static void writeAsCsv() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 设置并行度
        env.setParallelism(1);
        // 将单词转化成元组
        dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split("\t");
                for(String word: words){
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到socket套接字中
     */
    public static void writeToSocket() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 将结果写入到socket套接字中,以简单字符串类型发送
        dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入自定义数据沉槽
     */
    public static void addSink() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 将单词形成元组并初始化次数为1
        dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] words = s.split("\t");
                        for(String word: words){
                            collector.collect(new Tuple2<>(word, 1));
                        }
                    }
                    // 写入MySql中
                }).addSink(new MysqlDataSink());
        // 执行任务
        env.execute();
    }


    public static void main(String[] args) throws Exception {
        socketTextStream();
        readTextFile();
        generateSequence();
        fromCollection();
        fromElements();
        addSource();
        writeAsText();
        writeAsCsv();
        writeToSocket();
        addSink();
    }
}

如有错误,望指正!