欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

初识Flink之Windows下Flink流处理(Streaming)案例开发

程序员文章站 2022-06-16 17:26:49
...

       本次示例在Windows系统下,使用idea2019.3进行开发,jdk版本1.8,flink版本1.6.1.

  • 新建一个maven项目,没有flink-quickstart-java模板,可以选择添加:Add Archetype  填写如下内容:
  • 初识Flink之Windows下Flink流处理(Streaming)案例开发
  • 初识Flink之Windows下Flink流处理(Streaming)案例开发
  • 填写项目对应的信息。
  • 初识Flink之Windows下Flink流处理(Streaming)案例开发
  • 创建StreamingJob内容如下:
  •  public static void main(String[] args) throws Exception {
            //获取需要的端口号分析:
            // 通过Socket模拟产生单词,使用Flink程序对数据进行汇总计算。
            int port;
            try {
                ParameterTool parameterTool = ParameterTool.fromArgs(args);
                port = parameterTool.getInt("port");
            } catch (Exception e) {
                System.err.println("No port set. use default port 9000--Java");
                port = 9000;
            }
            //获取Flink的运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //主机名不能填写错误,这里使用localhost或者通过ipconfig/all 查到的主机名
            String hostname = "localhost";
            String delimiter = "\n";
            //连接Socket获取输入的数据
            DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
            // a a c        // a 1        // a 1        // c 1
            DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
    
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                    String[] splits = value.split("\\\\s");
                    for (String word : splits) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            }).keyBy("word")
         //指定时间窗口大小为2s,指定时间间隔为2s
                    .timeWindow(Time.seconds(2), Time.seconds(2))
          //在这里使用sum或者reduce都可以
                    .sum("count");
      
    //                .reduce(new ReduceFunction<WordWithCount>() {
    //            @Override
    //            public WordWithCount reduce(WordWithCount a, WordWithCount b) {
    //                return new WordWithCount(a.word, a.count + b.count);
    //            }
    //        });
      
            //把数据打印到控制台并且设置并行度
            windowCounts.print().setParallelism(1);
            //这一行代码一定要实现,否则程序不执行
            env.execute("Socket window count");
        }
    
        public static class WordWithCount {
            public String word;
            public long count;
    
            public WordWithCount() {
            }
    
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return "WordWithCount{" + "word='" + word + '\'' +
                        ", count=" + count +
                        '}';
            }
    
        }

     

  • 在Windows系统下需要下载一个nc,打开cmd窗口执行nc -l -p 9000
  • nc下载地址:https://pan.baidu.com/s/1YvG49wL8JmMMCEO_c6-uww 
  • 提取码:l03b
  • 解压后nc.exe 放入c盘users 对应的用户目录下,打开cmd即可执行:
  • 初识Flink之Windows下Flink流处理(Streaming)案例开发
  • 在idea 执行程序:结果如下:
  • 初识Flink之Windows下Flink流处理(Streaming)案例开发
  • 项目地址:https://gitee.com/linghongkang90/flink/tree/master/flinkDemo