设置从指定offset开始读取

在Flink 1.3.0及以上可以使用setStartFromSpecificOffsets设置从指定offset开始读取

Fault Tolerance

开启checkpoint即可

offset提交行为

如果checkpoint被禁用,Flink Kafka Consumer依靠内部的Kafka clients自动的周期性的提交offset,通过设置enable.auto.commit(kafka 0.8为auto.commit.enable)和auto.commit.interval.ms来开启自动提交

如果开启了checkpoint,Flink Kafka Consumer会在checkpoint完成的时候自动提交offset,用户可以通过调用setCommitOffsetsOnCheckpoints(boolean)来开启或关闭自动提交,开启的时候前面提到的周期提交设置就会被忽略

案例

import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Matrix42 on 2017/7/11.
 */
public class ReadText {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
         StreamExecutionEnvironment.getExecutionEnvironment();

        //checkpoint间隔
        env.enableCheckpointing(10000);
        //checkpoint存储位置
        MemoryStateBackend backend = new MemoryStateBackend();
        env.setStateBackend(backend);
        DeserializationSchema valueDeserializer = new SimpleStringSchema();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node2:9093");
        properties.setProperty("group.id", "balance");
        //如果指定offset是指定offset不在范围内的默认策略;默认读取策略
        properties.setProperty("auto.offset.reset", "earliest");
        //如果kafka版本为0.8需要指定zookeeper.connect

        /*
        //加载properties文件
        BufferedReader reader = 
        new BufferedReader(new InputStreamReader(new FileInputStream("p.properties")));
        properties.load(reader);
        */

        FlinkKafkaConsumerBase kafkaConsumer =
         new FlinkKafkaConsumer09("balance",valueDeserializer,properties);

        //在checkpoint完成的时候自动提交offset
        //0.9以前提交到zookeeper,0.9及之后提交到kafka,默认开启
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();

        KafkaTopicPartition topicPartition1 = new KafkaTopicPartition("balance",0);
        specificStartupOffsets.put(topicPartition1,1L);
        KafkaTopicPartition topicPartition2 = new KafkaTopicPartition("balance",1);
        specificStartupOffsets.put(topicPartition2,1L);
        KafkaTopicPartition topicPartition3 = new KafkaTopicPartition("balance",2);
        specificStartupOffsets.put(topicPartition3,1L);
        KafkaTopicPartition topicPartition4 = new KafkaTopicPartition("balance",3);
        specificStartupOffsets.put(topicPartition4,1L);
        kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);

        env.addSource(kafkaConsumer).print();

        env.execute();
    }
}