Flink Rolling File Sink
Bucketing File Sink
创建一个默认的bucketing sink,sink到一个由时间作为切分的滚动的文件
DataStream<String> input = ...;
//只需要一个文件存储的路径
input.addSink(new BucketingSink<String>("/base/path"));
默认情况下bucketing sink会以元素到达的系统时间作为切分,并且使用"yyyy-MM-dd--HH"这样的形式来命名buckets(使用SimpleDateFormat 生成).当遇到一个新的日期的时候会生成一个新的bucket,例如,如果以分钟为粒度,你会每分钟得到一个bucket.每个bucket的目录都会包含几部分文件:每个并发实例都会创建一个它自己的part文件,当文件太大的时候会创建一个新的文件.
当bucket不活跃的时候,打开的part文件会被flush出去并且关闭,一个bucket在最近没有写入操作会被视为不活跃.默认没分钟检查一次不活跃的bucke,可以通过在BucketingSing上调用setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold()方法来设置检查行为.
可以通过在BucketingSing上调用setBucketer()来指定一个自定义bucketer.如果需要可以使用元素的属性或tuple来决定bucket的目录
默认的writer是StringWriter,会在元素上调用toString()方法然后写入part文件,每个元素一行.可以通过在BucketingSing上调用setWriter()来指定一个自定义writer.
如果你想写入Hadoop的SequenceFiles你可以使用SequenceFileWriter,并且SequenceFileWriter 可以使用压缩格式
每个part的默认大小为384M,可以通过setBatchSize()方法来指定part大小.
可以通过调用setBucketingListener()来设置一个Listener,当文件生成完毕时会调用,需要new一个BucketingListener重写onFinalFileGenerated方法,参数PartInfo part包含文件的各种信息,比如name,size,rowCount(行数)等
正则写入的文件后缀为processing,当checkpoint完成的时候后缀会变成pending,然后去掉
自定义writer继承StreamWriterBase,bucker需要实现Bucketer接口
案例
sink = new BucketingSink<Record>(path).setWriter(writer).setMetricName(getId())
.setBucketer(new EventTimeBucketer(DEFAULT_BUCKET_FORMAT,
timeColumnIdx)).setBucketingListener(new BucketingListener() {
private static final long serialVersionUID = 85537235702524970L;
@Override
public void onFinalFileGenerated(PartInfo part) {
logger.info("new output file generated: {}", part.getFileName());
if (useMonitor) {
dataset.getStorageConfiguration().put("path", part.getFileName());
dataset.getStorageConfiguration().put("size", "" + part.getFileSize());
dataset.setRowNumber(part.getRowCount());
dataset.setSliceTime(part.getSliceTime());
FlowOutputUpdateEvent event =
monitor.createEvent(flow.getName(), getId(), dataset);
logger.info("send output event: {}",
JsonBuilder.getInstance().toJson(event));
try {
monitor.sendEvent(event);
} catch (Exception e) {
logger.error("send event to {} failed with exception {}",
monitorUrl, e);
throw new RuntimeException("send event to " +
monitorUrl + " failed");
}
}
}
});