Apache Flink简介

  • 状态
  • 时间
    • 事件时间
    • 处理时间


  • 事件驱动型应用
  • 数据分析应用
  • 数据管道应用


  • 处理高吞吐量的事件流
  • 处理随时产生的事件,始终保持低延迟(sub-second)
  • 高效、易于使用的k/v结构的state
  • 真正的流处理框架。一次处理一个事件,每个事件都有自己的时间窗口。
  • 丰富的编程模型可以很容易地实现复杂的语义。对比微批处理,在事件流上进行推理更容易。
  • 使用事件时间,可以很容易地处理乱序事件等流缺陷


// 司机维度的行程数量
public class RideCount {
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start the data generator
        // 事件数据源
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());

        // map each ride to a tuple of (driverId, 1)
        // 将单个事件 map为 其他数据模型
        DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {

            public Tuple2<Long, Long> map(TaxiRide ride) {
                // 这里用1L的原因:该例子用于统计司机的行程数量,每出现一个行程事件,则加1.
                // 若统计其他,如总里程、总金额等,1L对应改为单次里程数或单次金额等。
                return Tuple2.of(ride.driverId, 1L);

        // partition the stream by the driverId
        // 分片,将Tuple2<Long, Long> 的第一个field作为key
        KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);

        // count the rides for each driver
        // 使用第几个field进行聚合,positionToSum based-0
        DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);

        // we could, in fact, print out any or all of these streams

        // run the cleansing pipeline
        env.execute("Ride Count");
// 疲劳驾驶预警
public class LongRidesSolution {

     * Main method.
     * @throws Exception which occurs during job execution.
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start the data generator
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());

        // 依赖与分区相关的state,实现数据处理。如
        DataStream<TaxiRide> longRides = rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction());


        env.execute("Long Taxi Rides");

    private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> {

        private ValueState<TaxiRide> rideState;

        public void open(Configuration config) {
            // 只有分区才可绑定相关的valueState
            // 每个分区可以有多个不同名称的valueStatue
            ValueStateDescriptor<TaxiRide> stateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class);
            rideState = getRuntimeContext().getState(stateDescriptor);

        public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception {
            // ride 事件数据
            // out 结果集

            // 获取valueState的数据
            TaxiRide previousRideEvent = rideState.value();

            // valueState无数据时,进行更新
            if (previousRideEvent == null) {
                // 更新valueState
                if (ride.isStart) {
                    // 注册计时器
            // valueStatue有数据时,进行处理
            else {
                if (!ride.isStart) {
                    // it's an END event, so event saved was the START event and has a timer
                    // the timer hasn't fired yet, and we can safely kill the timer
                    // 删除计时器
                // both events have now been seen, we can clear the state
                // 清空valueState

        public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception {
            // 计时器触发
            // out 结果集
            // if we get here, we know that the ride started two hours ago, and the END hasn't been processed

        private long getTimerTime(TaxiRide ride) {
            return ride.startTime.plusSeconds(120 * 60).toEpochMilli();

// 每小时获取最多小费的司机
public class HourlyTipsSolution {

     * Main method.
     * @throws Exception which occurs during job execution.
    public static void main(String[] args) throws Exception {

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start the data generator
        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());

        // compute tips per hour for each driver
        // 分片-窗口-处理窗口内数据
        // 分片间并行处理
        DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares.keyBy((TaxiFare fare) -> fare.driverId)
                // 滚动窗口,如每小时,整点,如1:00:00 - 2:00:00
                .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());

        // 对窗口范围内包含的所有(分片)生产的窗口进行聚合,无法并行
        DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips

//		You should explore how this alternative behaves. In what ways is the same as,
//		and different from, the solution above (using a windowAll)?

// 		DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
// 			.keyBy(t -> t.f0)
// 			.maxBy(2);


        // execute the transformation pipeline
        env.execute("Hourly Tips (java)");

     * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key.
    public static class AddTips extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {

        public void process(Long key, Context context, Iterable<TaxiFare> fares,
                Collector<Tuple3<Long, Long, Float>> out) {
            float sumOfTips = 0F;
            for (TaxiFare f : fares) {
                sumOfTips += f.tip;

                    new Date(context.window().getStart()) + Tuple3.of(context.window().getEnd(), key, sumOfTips)
            out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
// 行程与车费
public class RidesAndFaresExample {

     * Main method.
     * @throws Exception which occurs during job execution.
    public static void main(String[] args) throws Exception {

        // Set up streaming execution environment, including Web UI and REST endpoint.
        // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
        // using the State Processor API.

        Configuration conf = new Configuration();
        conf.setString("state.backend", "filesystem");
        conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
        conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        CheckpointConfig config = env.getCheckpointConfig();
        // 默认为精确一次
        // config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()).filter((TaxiRide ride) -> ride.isStart)
                .keyBy((TaxiRide ride) -> ride.rideId);

        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator()).keyBy((TaxiFare fare) -> fare.rideId);

        // Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides.connect(fares).flatMap(new EnrichmentFunction())


        env.execute("Join Rides with Fares (java RichCoFlatMap)");

    public static class EnrichmentFunction
            extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {

        // keyed, managed state
        private ValueState<TaxiRide> rideState;

        private ValueState<TaxiFare> fareState;

        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));

        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            //System.out.println("ride " + ride);
            TaxiFare fare = fareState.value();
            if (fare != null) {
                out.collect(Tuple2.of(ride, fare));
            else {

        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            //System.out.println("fare " + fare);
            TaxiRide ride = rideState.value();
            if (ride != null) {
                out.collect(Tuple2.of(ride, fare));
            else {

