mac 安装 flink
1、执行 brew install apache-flink 命令
1 2 3 4 5 6 7 8 9
| gaolei:/ gaolei$ brew install apache-flink Updating Homebrew... ==> Auto-updated Homebrew! Updated 1 tap (homebrew/services). No changes to formulae.
==> Downloading https://archive.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz # 🍺 /usr/local/Cellar/apache-flink/1.9.1: 166 files, 277MB, built in 15 minutes 29 seconds
|
2、执行flink启动脚本
1 2
| /usr/local/Cellar/apache-flink/1.9.1/libexec/bin ./start-cluster.sh
|
WordCount批处理Demo
创建maven项目,导入依赖
注意自己的flink版本 👇👇
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.9.1</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.9.1</version> </dependency> </dependencies>
|
编写批处理程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath = "/Users/gaolei/Documents/DemoProjects/flink-start/src/main/resources/hello.txt"; DataSource<String> dataSource = env.readTextFile(inputPath); AggregateOperator<Tuple2<String, Integer>> result = dataSource.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); result.print(); }
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) { String[] words = s.split(" "); for (String word : words) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } }
|
准备数据源文件
1 2 3 4 5 6
| hello spark hello world hello java hello flink how are you what is your name
|
执行结果
1 2 3 4 5 6 7 8 9 10 11 12
| (is,1) (what,1) (you,1) (flink,1) (name,1) (world,1) (hello,4) (your,1) (are,1) (java,1) (how,1) (spark,1)
|
flink 处理流式数据
1、通过 nc -lk <port>
打开一个socket服务,监听7777端口 用于模拟实时的流数据
2、java代码处理流式数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class StreamWordCount { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
DataStream<Tuple2<String,Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()) .keyBy(0) .sum(1);
resultStream.print();
env.execute(); } }
|
4、在首次启动的时候遇到一个错误 ❌
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream
处理方法可参照 参考资料 👇
参考资料