欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink1.11.2-pg-source&sink

程序员文章站 2022-07-14 14:19:38
...

Flink1.11.2-pg-source&sink

[CSVSource_ToPg.java]

package com.flink.source.pg_jdbc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class CSVSource_ToPg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dsFile = env.readTextFile("D:\\code\\flink111\\src\\main\\resources\\input\\water_sensor.txt");
        dsFile.map(new MapFunction<String, Map>() {
            @Override
            public Map map(String s) throws Exception {
                String[] strings = s.split(",");
                HashMap<String, String> map = new HashMap<>();
                map.put("id",strings[0]);
                map.put("ts",strings[1]);
                map.put("vc",strings[2]);
                return map;
            }
        })
      .addSink(new PgsqlSink());
        env.execute("txt write to the psql demo");
    }
    public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String,Integer>> out){
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token:tokens){
                if (token.length()>0){
                    out.collect(new Tuple2<String, Integer>(token,1));
                }
            }
        }
    }

}

[PgsqlSink.java]

package com.flink.source.pg_jdbc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Map;

public class PgsqlSink extends RichSinkFunction<Map> {
   private static final long serialVersionUID = 1L;

   private Connection connection;

   private PreparedStatement preparedStatement;


   @Override
   public void open(Configuration parameters) throws Exception {
       //JDBC连接信息
       String USERNAME = "test_user";
       String PASSWORD = "aaaaaa";
       String driverClass = "org.postgresql.Driver";
       String URL = "jdbc:postgresql://192.168.1.163:5432/test_db2";
       //加载jdbc的驱动
       Class.forName(driverClass);
       //获取数据库的连接
       connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
       String sql = "insert into water_sensor(id, ts,vc) values (?,?,?)";
       preparedStatement = connection.prepareStatement(sql);
       super.open(parameters);
   }

   @Override
   public void invoke(Map value, Context context) {
       try {
           String id = (String) value.get("id");
           Long  ts = Long.valueOf(value.get("ts").toString()) ;
           Integer vc = Integer.valueOf(value.get("vc").toString());
           preparedStatement.setString(1, id);
           preparedStatement.setLong(2, ts);
           preparedStatement.setInt(3, vc);


           preparedStatement.executeUpdate();
       } catch (Exception e) {
           e.printStackTrace();
       }
   }


   @Override
   public void close() throws Exception {
       if (preparedStatement != null) {
           preparedStatement.close();
       }
       if (connection != null) {
           connection.close();
       }
       super.close();
   }

}

[FromPg_Print.java]

package com.flink.source.pg_jdbc;

import com.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FromPg_Print {
   public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.getConfig().disableSysoutLogging();
       DataStream<WaterSensor> stream = env.addSource(new PsqlSource());
       stream.print();


       env.execute("PostGreSQL Source to Flink demo");
   }


}

[PsqlSource.java]

package com.flink.source.pg_jdbc;


import com.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


public class PsqlSource extends RichSourceFunction<WaterSensor> {


   private static final long serialVersionUID = 1L;

   private Connection connection;

   private PreparedStatement preparedStatement;

   @Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       String USERNAME = "test_user";
       String PASSWORD = "aaaaaa";
       String driverClass = "org.postgresql.Driver";
       String URL = "jdbc:postgresql://192.168.1.163:5432/test_db2";
       Class.forName(driverClass);
       connection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
       String sql = " SELECT * FROM  public.water_sensor ";
       preparedStatement = connection.prepareStatement(sql);
   }

   @Override
   public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
       try {
           ResultSet resultSet = preparedStatement.executeQuery();
           while (resultSet.next()) {
               WaterSensor waterSensor = new WaterSensor();
               waterSensor.setId(resultSet.getString("id"));
               waterSensor.setTs(Long.valueOf(resultSet.getString("ts")));
               waterSensor.setVc(Integer.valueOf(resultSet.getString("vc")));
               sourceContext.collect(waterSensor);
           }

       } catch (Exception e) {
           e.printStackTrace();
       }

   }

   @Override
   public void cancel() {

   }

   @Override
   public void close() throws Exception {
       super.close();
       if (connection != null) {
           connection.close();
       }
       if (preparedStatement != null) {
           preparedStatement.close();
       }
   }
}

water_sensor.txt

sensor_1,1549044122,1
sensor_1,1549044123,2
sensor_1,1549044124,3
sensor_1,1549044125,4
sensor_2,1549044123,2
sensor_3,1549044124,3
sensor_4,1549044125,4
sensor_5,1549044126,5
sensor_6,1549044127,6
sensor_7,1549044128,7
-- public.water_sensor definition

-- Drop table

-- DROP TABLE public.water_sensor;

CREATE TABLE public.water_sensor (
	id varchar NULL,
	ts int8 NULL,
	vc int4 NULL
);
相关标签: Flink1.11.2 flink