Map

输入一个元素,输出一个元素

DataStream → DataStream

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap

输入一个元素,输出零个或多个元素

DataStream → DataStream

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

把流中的每一个元素用一个返回类型为boolean的方法计算,返回true的保留

DataStream → DataStream

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

逻辑上把一个流分割成互斥的分区,每个分区都包含具有相同key的元素.内部由hash partitioning实现.下面的转换返回一个KeyedDataStream

DataStream → KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce

在一个有键的数据流上滚动的reduce,在最近被reduce处理过的值上合并当前的元素并且提交新值

KeyedStream → DataStream

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Fold

在一个有键的数据流上以一个初始值滚动的fold,在最近被flod处理过的值上合并当前的元素并且提交新值

KeyedStream → DataStream

当输入为1,2,3,4,5的时候,输出"start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

Aggregations

在一个有键的数据流上以一个初始值滚动的聚合,min和minBy的不同是:min返回最小值,minBy包含最小值的元素(max和maxBy同理)

KeyedStream → DataStream

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Window

窗口可以定义在已经分区的KeyedStream上.窗口根据每个特性对每个键中的数据进行分组(例如最近5秒内到达的数据)

KeyedStream → WindowedStream

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); 
// Last 5 seconds of data

WindowAll

一个可以定义在规则的DataStream上的窗口.窗口根据一些特征将所有流分组(例如最近5秒内到达的数据)

警告:在很多情况这是一个不能并发的transformation,所有的记录会被聚集在一个任务中进行WindowAll操作

DataStream → AllWindowedStream

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

对一个窗口整体应用一个函数

WindowedStream → DataStream

AllWindowedStream → DataStream

注意:如果你使用WindowAll,你需要用AllWindowFunction

下面是一个手动计算一个窗口内所有元素的和的函数

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

Window Reduce

在一个窗口上应用一个reduce函数,返回一个reduce处理过的值

WindowedStream → DataStream

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
                                Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

Window Fold

在一个窗口上应用一个fold函数,返回一个fold处理过的值

WindowedStream → DataStream

下面的例子输入1,2,3,4,5,输出字符串在一个窗口上应用一个reduce函数,返回一个reduce处理过的值

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});

Aggregations on windows

聚合一个窗口的内容,min和minBy的不同是:min返回最小值,minBy包含最小值的元素(max和maxBy同理)

WindowedStream → DataStream

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

Union

联合两个或以上数据流,产生一个包含所有流中元素的数据流

注意:如果你联合同一个流你会得到一个包含两个相同元素的流

DataStream* → DataStream

dataStream.union(otherStream1, otherStream2, ...);

Window Join

在给定的key和相同的窗口内Join两个流

DataStream,DataStream → DataStream

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Window CoGroup

在给定的key和相同的窗口内Cogroup两个流

DataStream,DataStream → DataStream

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

Connect

Connect两个数据流,保留他们的类型,Connect允许两个流共享state

DataStream,DataStream → ConnectedStreams

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

Similar to map and flatMap on a connected data stream

ConnectedStreams → DataStream

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

Split

根据规则把一个流拆分成两个或更多的流

DataStream → SplitStream

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select

从SplitStream中选择一个或多个流

SplitStream → DataStream

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Iterate

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.

DataStream → IterativeStream → DataStream

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
});

Extract Timestamps

为了使用窗口从记录中提取时间戳

DataStream → DataStream

stream.assignTimestamps (new TimeStampExtractor() {...});