好了,看到这的话,Apache Flink基础概念啥的都应该了解差不多了吧,我们几天就See一下,平时用到的StreamApi中各式各样的算子都有什么,然后,我们搞点Demo试一下。
📒 我也是边学边实现一些Demo,这样呢可以方便自己理解,形成体系以后也应该能帮到别人快速学习吧。
这就是地址了👉 https://github.com/geekibli/flink-study 欢迎star!
下面的Demo都是可以直接运行的 如果是通过socket获取数据的话,确认现开启端口啊,我用的Mac系统,可以使用以下命令 nc -lk 9999
提供一个全局获取环境的方法
我们一个静态方法getEnv(), 不然每次还要new,挺麻烦的;
1 2 3
| private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); }
|
POJO类
1 2 3 4 5
| public class SensorReading implements Serializable { private String sersorId; private double timestamp; private double newtemp; }
|
map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void mapTest() throws Exception { StreamExecutionEnvironment env = getEnv(); ArrayList<Integer> nums = Lists.newArrayList(); nums.add(1); nums.add(2); nums.add(3); DataStreamSource<Integer> source = env.fromCollection(nums); SingleOutputStreamOperator<Integer> map = source.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer * integer; } }); map.print(); env.execute(); }
|
keyBy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void keyByTest() throws Exception { StreamExecutionEnvironment env = getEnv(); DataStreamSource<Tuple2<String, Integer>> source = env.fromElements( new Tuple2<String, Integer>("age", 1), new Tuple2<String, Integer>("name", 2), new Tuple2<String, Integer>("name", 3), new Tuple2<String, Integer>("name", 3));
source.map( new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { Integer f1 = stringIntegerTuple2.f1; stringIntegerTuple2.setField(f1 + 10, 1); return stringIntegerTuple2; } }) .keyBy(1) .print(); env.execute(); }
|
reduce
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void reduceTest() throws Exception { StreamExecutionEnvironment env = getEnv(); env.fromElements( Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 5L)) .keyBy(1) .reduce(new ReduceFunction<Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> reduce(Tuple2<Long, Long> longLongTuple2, Tuple2<Long, Long> t1) throws Exception { return new Tuple2<Long, Long>(t1.f0, longLongTuple2.f1 + t1.f1); } }) .print(); env.execute(); }
|
还有一个栗子🌰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void reduce() throws Exception { StreamExecutionEnvironment env = getEnv(); DataStreamSource<String> dataSource = env.socketTextStream("localhost", 9999);
DataStream<SensorReading> dataStream = dataSource.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] strings = s.split(","); return new SensorReading(strings[0], new Long(strings[1]), new Double(strings[2])); } });
DataStream<SensorReading> sersorId = dataStream.keyBy("sersorId") .reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception { String id = t1.getSersorId(); Double time = t1.getTimestamp(); return new SensorReading(id, time, Math.max(sensorReading.getNewtemp(), t1.getNewtemp())); } }); sersorId.print(); env.execute(); }
|
split|select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public static void splitTest() throws Exception { StreamExecutionEnvironment env = getEnv(); DataStreamSource<String> dataSource = env.socketTextStream("localhost", 9999);
DataStream<SensorReading> dataStream = dataSource.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] strings = s.split(","); return new SensorReading(strings[0], new Double(strings[1]), new Double(strings[2])); } });
SplitStream<SensorReading> split = dataStream.split(new OutputSelector<SensorReading>() { @Override public Iterable<String> select(SensorReading value) { return (value.getNewtemp() > 30) ? Collections.singleton("high") : Collections.singleton("low"); } });
DataStream<SensorReading> low = split.select("low"); DataStream<SensorReading> high = split.select("high"); DataStream<SensorReading> all = split.select("high", "low");
DataStream<Tuple2<String, Double>> highStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getSersorId(), sensorReading.getNewtemp()); } });
ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = highStream.connect(low);
SingleOutputStreamOperator<Object> resultStream = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() { @Override public Object map1(Tuple2<String, Double> stringDoubleTuple2) throws Exception { return new Tuple3<>(stringDoubleTuple2.f0, stringDoubleTuple2.f0, "high temp warning"); }
@Override public Object map2(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getSersorId(), "normal temp"); } });
resultStream.print(); env.execute(); }
|
connect | coMap
如上split方法下面我们是有操作connect的api的
union
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void unionTest() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> firstStream = env.socketTextStream("localhost", 9999); DataStreamSource<String> secondStream = env.socketTextStream("localhost", 7777);
DataStream<String> unionStream = firstStream.union(secondStream); DataStream<SensorReading> dataStream = unionStream.map(new MapFunction<String, SensorReading>() { @Override public SensorReading map(String s) throws Exception { String[] strings = s.split(","); return new SensorReading(strings[0], new Double(strings[1]), new Double(strings[2])); } }); dataStream.print(); env.execute(); }
|
// TODO 不断学习 不断补充