Flink版本:1.14
Hudi版本:0.8.0

背景

测试发现当使用MOR(Merge On Read)表的时候写入速度很慢

问题原因

对环境和代码进行分析,发现有2个问题

  1. 测试环境使用的是虚拟机,IO速度很慢
  2. Hudi代码有问题

下面对代码问题进行分析

问题分析

hudi有个参数write.batch.size.MB,用来配置写数据时的缓存大小,默认是128M,当缓存的数据超过阈值的时候就会向文件系统写数据。

StreamWriteFunction的bufferRecord方法:

private void bufferRecord(I value) {
    final String bucketID = getBucketID(value);

    DataBucket bucket =
            this.buckets.computeIfAbsent(
                    bucketID,
                    k -> new DataBucket(this.config.getDouble(HudiOptions.WRITE_BATCH_SIZE)));
    boolean needFlush = bucket.detector.detect(value);
    if (needFlush) {
        flushBucket(bucket);
        bucket.reset();
    }
    bucket.records.add((HoodieRecord<?>) value);
}

问题出在写数据的时候,所有数据被放在一个List中,而这个List还是ArrayList,可能是为了GC友好的原因,Hudi在每写入一条数据的时候会将它从列表中移除,问题就出现在这。

当用户将write.batch.size.MB配置的比较大或者每条数据大小比较小的时候,列表中就会缓存大量数据,当调用remove方法的时候就会非常耗时,使原来O(n)复杂度的操作变为O(n^2)。

@Override
protected byte[] serializeRecords() throws IOException {
  Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
  GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutputStream output = new DataOutputStream(baos);

  // 1. Write out the log block version
  output.writeInt(HoodieLogBlock.version);

  // 2. Write total number of records
  output.writeInt(records.size());

  // 3. Write the records
  Iterator<IndexedRecord> itr = records.iterator();
  while (itr.hasNext()) {
    IndexedRecord s = itr.next();
    ByteArrayOutputStream temp = new ByteArrayOutputStream();
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
    encoderCache.set(encoder);
    try {
      // Encode the record into bytes
      writer.write(s, encoder);
      encoder.flush();

      // Get the size of the bytes
      int size = temp.toByteArray().length;
      // Write the record size
      output.writeInt(size);
      // Write the content
      output.write(temp.toByteArray());
      //当数据量很大的时候remove操作很费时
      itr.remove();
    } catch (IOException e) {
      throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
    }
  }
  output.close();
  return baos.toByteArray();
}

解决问题

解决办法也非常简单将itr.remove();这行代码移除即可。因为这个List是另一个方法的局部变量,当数据写完的时候就没有引用了,JVM会自动处理。

题外话

本想把这个问题提交到社区发现有人已经在一个feature中顺便把这个问题解决了,可能是使用的时候发现了这个问题吧。

github

[HUDI-431] Adding support for Parquet in MOR LogBlocks