在Flink中从Kafka指定位置开始消费
设置从指定offset开始读取
在Flink 1.3.0及以上可以使用setStartFromSpecificOffsets设置从指定offset开始读取
Fault Tolerance
开启checkpoint即可
offset提交行为
如果checkpoint被禁用,Flink Kafka Consumer依靠内部的Kafka clients自动的周期性的提交offset,通过设置enable.auto.commit(kafka 0.8为auto.commit.enable)和auto.commit.interval.ms来开启自动提交
如果开启了checkpoint,Flink Kafka Consumer会在checkpoint完成的时候自动提交offset,用户可以通过调用setCommitOffsetsOnCheckpoints(boolean)来开启或关闭自动提交,开启的时候前面提到的周期提交设置就会被忽略
案例
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Created by Matrix42 on 2017/7/11.
*/
public class ReadText {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint间隔
env.enableCheckpointing(10000);
//checkpoint存储位置
MemoryStateBackend backend = new MemoryStateBackend();
env.setStateBackend(backend);
DeserializationSchema valueDeserializer = new SimpleStringSchema();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node2:9093");
properties.setProperty("group.id", "balance");
//如果指定offset是指定offset不在范围内的默认策略;默认读取策略
properties.setProperty("auto.offset.reset", "earliest");
//如果kafka版本为0.8需要指定zookeeper.connect
/*
//加载properties文件
BufferedReader reader =
new BufferedReader(new InputStreamReader(new FileInputStream("p.properties")));
properties.load(reader);
*/
FlinkKafkaConsumerBase kafkaConsumer =
new FlinkKafkaConsumer09("balance",valueDeserializer,properties);
//在checkpoint完成的时候自动提交offset
//0.9以前提交到zookeeper,0.9及之后提交到kafka,默认开启
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>();
KafkaTopicPartition topicPartition1 = new KafkaTopicPartition("balance",0);
specificStartupOffsets.put(topicPartition1,1L);
KafkaTopicPartition topicPartition2 = new KafkaTopicPartition("balance",1);
specificStartupOffsets.put(topicPartition2,1L);
KafkaTopicPartition topicPartition3 = new KafkaTopicPartition("balance",2);
specificStartupOffsets.put(topicPartition3,1L);
KafkaTopicPartition topicPartition4 = new KafkaTopicPartition("balance",3);
specificStartupOffsets.put(topicPartition4,1L);
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
env.addSource(kafkaConsumer).print();
env.execute();
}
}