
这是一个计算每个驾驶员乘坐次数的示例。
关键类TaxiRide 出租车乘车事件抽象类DataGenerator 模拟数据生成器Tuple 数据类型
元组是包含固定数量的各种类型的字段的复合类型。Java API 提供从Tuple1最高到Tuple25. 元组的每个字段都可以是任意 Flink 类型,包括更多元组,从而产生嵌套元组。tuple.f4可以使用字段名称 as或使用通用 getter 方法 直接访问元组的字段tuple.getField(int position)。字段索引从 0 开始。请注意,这与 Scala 元组相反,但它更符合 Java 的一般索引。
RideCountExample 代码
public class RideCountExample {
public static void main(String[] args) throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 增加源数据
DataStream rides = env.addSource(new TaxiRideGenerator());
// 聚合源数据成一个元素数据流
DataStream> tuples = rides.map(new MapFunction>() {
@Override
public Tuple2 map(TaxiRide ride) {
return Tuple2.of(ride.driverId, 1L);
}
});
// 根据driverId对数据源进行分区
KeyedStream, Long> keyedByDriverId = tuples.keyBy(t -> t.f0);
// 聚合计算司机数量 sum(1) 中的参数是分区号
DataStream> rideCounts = keyedByDriverId.sum(1);
// we could, in fact, print out any or all of these streams
rideCounts.print();
// run the cleansing pipeline
env.execute("Ride Count");
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)