2022-04-20-Flink-45(四)
2024-04-09 20:50:41  阅读数 239

1. Transformation

map
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class mapTransformation {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0001", 200000L) 
        );

        //1. 实现一个类
        source.map(new myMap()).print();
        //2. 使用匿名类
        source.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event event) throws Exception {
                return event.url;
            }
        }).print();
        //3. Lambda表达式,这里已经帮你实现了没有类型擦除
        source.map(data -> data.timestamp.toString()).print();

        env.execute();
    }


    public static class myMap implements MapFunction<Event,String>{


        @Override
        public String map(Event event) throws Exception {
            return event.user;
        }
    }
}
filter
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class filterTransformation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L),
                new Event("小黄", "0003", 200000L)

        );

        //1. 实现一个类
        source.filter(new myFilter()).print();
        //2. 使用匿名类
        source.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.user.equals("小红");
            }
        }).print();
        //3. lambda表达式
        source.filter(data -> data.user.equals("小黄")).print();

        env.execute();
    }

    public static  class  myFilter implements FilterFunction<Event>{


        @Override
        public boolean filter(Event s) throws Exception {
            return s.user.equals("小米");
        }
    }
}
flatmap
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class flatmapTransformation {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L)
        );

        //1.实现一个类
        source.flatMap(new myflatMap()).print("1");
        //2. 匿名类
        //3. 注意使用lambda类型擦除的问题
        source.flatMap((Event event ,Collector<String> collector) -> {
                  if (event.user.equals("小米")){
                      collector.collect(event.user);
                      collector.collect(event.url);
                      collector.collect(event.timestamp.toString());
                  }
                  else if (event.user.equals("小红")){
                         collector.collect("空");
                  }

        }).returns(TypeInformation.of(String.class)).print("2");
         //returns(new TypeHint<String>() {}).print("2");

        env.execute();
    }

    public static class myflatMap implements FlatMapFunction<Event , String>{


        @Override
        public void flatMap(Event event, Collector<String> collector) throws Exception {
              collector.collect(event.user);
              collector.collect(event.url);
              collector.collect(event.timestamp.toString());
        }
    }
}
keyBy

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

max&maxBy

区别max返回最大值 maxBy 把最大值对应的元素全部返回

public class keyByAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0002", 200000L),
                new Event("小米", "0003", 150000L),
                new Event("小红", "0001", 200000L)
        );

        source.keyBy(new KeySelector<Event, String>() {
            @Override
            public String getKey(Event event) throws Exception {
                return event.user;
            }
        }).max("timestamp").print("max");

       source.keyBy(data -> data.user).maxBy("timestamp").print("maxBy");


        env.execute();
    }
}
reduce
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class reduceAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0002", 200000L),
                new Event("小米", "0003", 150000L),
                new Event("小红", "0001", 200000L)
        );
        // 1.次数统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = source.map(data -> Tuple2.of(data.user, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(data -> data.f0).reduce((data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

        reduce.print();


        //2.选取当前最活跃的用户
        reduce.keyBy(data -> "key").reduce((data1,data2) -> data1.f1 > data2.f1 ? data1 : data2 ).print();

        env.execute();
    }
}
用户自定义UDF

函数类(Function Classes
匿名函数(Lambda Functions

富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  1. open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
  2. close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  3. getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
public class richMap {

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);

            DataStreamSource<Event> source = env.fromElements(
                    new Event("小米", "0001", 100000L),
                    new Event("小米", "0002", 200000L),
                    new Event("小米", "0003", 150000L),
                    new Event("小红", "0001", 200000L)
            );
            source.map(new myRichMap()).print();

            env.execute();
        }

        public static  class  myRichMap extends RichMapFunction<Event,Tuple2<String,Integer>>{
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println("open" + getRuntimeContext().getTaskNameWithSubtasks());
            }

            @Override
            public Tuple2<String, Integer> map(Event event) throws Exception {
                return Tuple2.of(event.user,1);
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("close" + getRuntimeContext().getTaskName());
            }
        }
    }