读取文件
这里是以txt文件为例,实现WordCount,其他文件类型同理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath = "/Users/gaolei/Documents/DemoProjects/flink-start/src/main/resources/hello.txt"; DataSource<String> dataSource = env.readTextFile(inputPath); AggregateOperator<Tuple2<String, Integer>> result = dataSource.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); result.print(); }
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) { String[] words = s.split(" "); for (String word : words) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } }
|
实现自定义数据源
需要自己写一个类,实现SourceFunction接口的run方法和cancle方法,注意⚠️,SourceFunction的泛型类型必须要写上,不然会报错的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); DataStreamSource dataStreamSource = env.addSource(new MySourceFunction()); dataStreamSource.print(); env.execute(); }
public static class MySourceFunction implements SourceFunction<SensorReading> { private boolean running = true;
public void run(SourceContext ctx) throws Exception { HashMap<String, Double> sensorMap = new HashMap<String, Double>(10); for (int i = 0; i < 10; i++) { sensorMap.put("sensor_" + (i + 1), 60 + new Random().nextGaussian() * 20); } while (running) { for (String sensor : sensorMap.keySet()) { double newtemp = sensorMap.get(sensor) + new Random().nextGaussian(); sensorMap.put(sensor, newtemp); ctx.collect(new SensorReading(sensor, System.currentTimeMillis(), newtemp)); } Thread.sleep(10000); } }
public void cancel() { running = false; } }
|