欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Flink简介

程序员文章站 2022-07-14 13:31:31
...

Flink简介

what

针对无限和有限数据流进行有状态计算的分布式执行引擎框架。集群部署,随意扩容;内存计算,速度快。

流处理应用的基本组件

  • 状态
  • 时间
    • 事件时间
    • 处理时间

应用场景

  • 事件驱动型应用
  • 数据分析应用
  • 数据管道应用

Flink优势

  • 处理高吞吐量的事件流
  • 处理随时产生的事件,始终保持低延迟(sub-second)
  • 高效、易于使用的k/v结构的state
  • 真正的流处理框架。一次处理一个事件,每个事件都有自己的时间窗口。
  • 丰富的编程模型可以很容易地实现复杂的语义。对比微批处理,在事件流上进行推理更容易。
  • 使用事件时间,可以很容易地处理乱序事件等流缺陷

Flink示例

// 司机维度的行程数量
public class RideCount {
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start the data generator
        // 事件数据源
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());

        // map each ride to a tuple of (driverId, 1)
        // 将单个事件 map为 其他数据模型
        DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {

            @Override
            public Tuple2<Long, Long> map(TaxiRide ride) {
                // 这里用1L的原因:该例子用于统计司机的行程数量,每出现一个行程事件,则加1.
                // 若统计其他,如总里程、总金额等,1L对应改为单次里程数或单次金额等。
                return Tuple2.of(ride.driverId, 1L);
            }
        });

        // partition the stream by the driverId
        // 分片,将Tuple2<Long, Long> 的第一个field作为key
        KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);

        // count the rides for each driver
        // 使用第几个field进行聚合,positionToSum based-0
        DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);

        // we could, in fact, print out any or all of these streams
        rideCounts.print();

        // run the cleansing pipeline
        env.execute("Ride Count");
    }
}
// 疲劳驾驶预警
public class LongRidesSolution {

    /**
     * Main method.
     * @throws Exception which occurs during job execution.
     */
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // start the data generator
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());

        // 依赖与分区相关的state,实现数据处理。如
        DataStream<TaxiRide> longRides = rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction());

        longRides.print();

        env.execute("Long Taxi Rides");
    }

    private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {

        private ValueState<TaxiRide> rideState;

        @Override
        public void open(Configuration config) {
            // 只有分区才可绑定相关的valueState
            // 每个分区可以有多个不同名称的valueStatue
            ValueStateDescriptor<TaxiRide> stateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class);
            rideState = getRuntimeContext().getState(stateDescriptor);
        }

        @Override
        public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
            // ride 事件数据
            // out 结果集

            // 获取valueState的数据
            TaxiRide previousRideEvent = rideState.value();

            // valueState无数据时,进行更新
            if (previousRideEvent == null) {
                // 更新valueState
                rideState.update(ride);
                if (ride.isStart) {
                    // 注册计时器
                    context.timerService().registerEventTimeTimer(getTimerTime(ride));
                }
            }
            // valueStatue有数据时,进行处理
            else {
                if (!ride.isStart) {
                    // it's an END event, so event saved was the START event and has a timer
                    // the timer hasn't fired yet, and we can safely kill the timer
                    // 删除计时器
                    context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
                }
                // both events have now been seen, we can clear the state
                // 清空valueState
                rideState.clear();
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
            // 计时器触发
            // out 结果集
            // if we get here, we know that the ride started two hours ago, and the END hasn't been processed
            out.collect(rideState.value());
            rideState.clear();
        }

        private long getTimerTime(TaxiRide ride) {
            return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
        }
    }

}
// 每小时获取最多小费的司机
public class HourlyTipsSolution {

    /**
     * Main method.
     * @throws Exception which occurs during job execution.
     */
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // start the data generator
        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());

        // compute tips per hour for each driver
        // 分片-窗口-处理窗口内数据
        // 分片间并行处理
        DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId)
                // 滚动窗口,如每小时,整点,如1:00:00 - 2:00:00
                .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());

        // 对窗口范围内包含的所有(分片)生产的窗口进行聚合,无法并行
        DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
                .windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);

//		You should explore how this alternative behaves. In what ways is the same as,
//		and different from, the solution above (using a windowAll)?

// 		DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
// 			.keyBy(t -> t.f0)
// 			.maxBy(2);

        hourlyMax.print();

        // execute the transformation pipeline
        env.execute("Hourly Tips (java)");
    }

    /*
     * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
     */
    public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {

        @Override
        public void process(Long key, Context context, Iterable<TaxiFare> fares,
                Collector<Tuple3<Long, Long, Float>> out) {
            float sumOfTips = 0F;
            for (TaxiFare f : fares) {
                sumOfTips += f.tip;
            }

            /*System.out.println(
                    new Date(context.window().getStart()) + Tuple3.of(context.window().getEnd(), key, sumOfTips)
                            .toString());*/
            out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
        }
    }
}
// 行程与车费
public class RidesAndFaresExample {

    /**
     * Main method.
     * @throws Exception which occurs during job execution.
     */
    public static void main(String[] args) throws Exception {

        // Set up streaming execution environment, including Web UI and REST endpoint.
        // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
        // using the State Processor API.

        Configuration conf = new Configuration();
        conf.setString("state.backend", "filesystem");
        conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
        conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(2);

        env.enableCheckpointing(10000L);
        CheckpointConfig config = env.getCheckpointConfig();
        // 默认为精确一次
        // config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()).filter((TaxiRide ride) -> ride.isStart)
                .keyBy((TaxiRide ride) -> ride.rideId);

        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator()).keyBy((TaxiFare fare) -> fare.rideId);

        // Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides.connect(fares).flatMap(new EnrichmentFunction())
                .uid("enrichment");

        enrichedRides.print();

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction
            extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {

        // keyed, managed state
        private ValueState<TaxiRide> rideState;

        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            //System.out.println("ride " + ride);
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(Tuple2.of(ride, fare));
            }
            else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            //System.out.println("fare " + fare);
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(Tuple2.of(ride, fare));
            }
            else {
                fareState.update(fare);
            }
        }
    }
}

 

可前往Flink官方文档,获取更多。