Bucketing File Sink

创建一个默认的bucketing sink,sink到一个由时间作为切分的滚动的文件

DataStream<String> input = ...;
//只需要一个文件存储的路径
input.addSink(new BucketingSink<String>("/base/path"));

默认情况下bucketing sink会以元素到达的系统时间作为切分,并且使用"yyyy-MM-dd--HH"这样的形式来命名buckets(使用SimpleDateFormat 生成).当遇到一个新的日期的时候会生成一个新的bucket,例如,如果以分钟为粒度,你会每分钟得到一个bucket.每个bucket的目录都会包含几部分文件:每个并发实例都会创建一个它自己的part文件,当文件太大的时候会创建一个新的文件.

当bucket不活跃的时候,打开的part文件会被flush出去并且关闭,一个bucket在最近没有写入操作会被视为不活跃.默认没分钟检查一次不活跃的bucke,可以通过在BucketingSing上调用setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold()方法来设置检查行为.

可以通过在BucketingSing上调用setBucketer()来指定一个自定义bucketer.如果需要可以使用元素的属性或tuple来决定bucket的目录

默认的writer是StringWriter,会在元素上调用toString()方法然后写入part文件,每个元素一行.可以通过在BucketingSing上调用setWriter()来指定一个自定义writer.

如果你想写入Hadoop的SequenceFiles你可以使用SequenceFileWriter,并且SequenceFileWriter 可以使用压缩格式

每个part的默认大小为384M,可以通过setBatchSize()方法来指定part大小.

可以通过调用setBucketingListener()来设置一个Listener,当文件生成完毕时会调用,需要new一个BucketingListener重写onFinalFileGenerated方法,参数PartInfo part包含文件的各种信息,比如name,size,rowCount(行数)等

正则写入的文件后缀为processing,当checkpoint完成的时候后缀会变成pending,然后去掉

自定义writer继承StreamWriterBase,bucker需要实现Bucketer接口

案例

sink = new BucketingSink<Record>(path).setWriter(writer).setMetricName(getId())
            .setBucketer(new EventTimeBucketer(DEFAULT_BUCKET_FORMAT,
 timeColumnIdx)).setBucketingListener(new BucketingListener() {
        private static final long serialVersionUID = 85537235702524970L;

        @Override
        public void onFinalFileGenerated(PartInfo part) {
            logger.info("new output file generated: {}", part.getFileName());
            if (useMonitor) {
                dataset.getStorageConfiguration().put("path", part.getFileName());
                dataset.getStorageConfiguration().put("size", "" + part.getFileSize());
                dataset.setRowNumber(part.getRowCount());
                dataset.setSliceTime(part.getSliceTime());
                FlowOutputUpdateEvent event = 
                                    monitor.createEvent(flow.getName(), getId(), dataset);
                logger.info("send output event: {}", 
                                        JsonBuilder.getInstance().toJson(event));

                try {
                    monitor.sendEvent(event);
                } catch (Exception e) {
                    logger.error("send event to {} failed with exception {}",
                                                 monitorUrl, e);
                    throw new RuntimeException("send event to " + 
                                                monitorUrl + " failed");
                }
            }
        }
    });