Skip to content

Comet native Iceberg scan duplicates rows when splitting a single-row-group Parquet file into multiple byte-range tasks #4590

@and124578963

Description

@and124578963

Describe the bug

Comet native Iceberg scan can return duplicate rows when an Iceberg FileScanTask byte range splits a Parquet file that contains a single row group.
The issue appears when:

  • the Iceberg table has a Parquet data file with one row group;
  • the file is planned into multiple byte-range scan tasks, e.g. via split-size smaller than row group size;
  • Comet uses CometIcebergNativeScanExec.

Expected behavior: the row group should be read by exactly one split.
Actual behavior: the same row group is read by multiple split tasks, so matching rows are returned multiple times.
Vanilla Spark/Iceberg does not duplicate rows.

Steps to reproduce

Minimal Scala regression test:

test("native Iceberg scan does not duplicate a row group split by byte range") {
  assume(icebergAvailable, "Iceberg not available in classpath")
  withTempIcebergDir { warehouseDir =>
    withSQLConf(
      "spark.sql.catalog.split_cat" -> "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.split_cat.type" -> "hadoop",
      "spark.sql.catalog.split_cat.warehouse" -> warehouseDir.getAbsolutePath,
      CometConf.COMET_ENABLED.key -> "true",
      CometConf.COMET_EXEC_ENABLED.key -> "true",
      CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
      val dataPath = s"${warehouseDir.getAbsolutePath}/single_row_group_parquet"
      spark
        .sql("SELECT CAST(0 AS INT) AS id, repeat('x', 1024) AS payload")
        .coalesce(1)
        .write
        .mode("overwrite")
        .parquet(dataPath)
      spark.sql("""
        CREATE TABLE split_cat.db.single_row_group_split (
          id INT,
          payload STRING
        ) USING iceberg
      """)
      val parquetFiles = new File(dataPath)
        .listFiles()
        .filter(file => file.getName.startsWith("part-") && file.getName.endsWith(".parquet"))
      assert(parquetFiles.length == 1)
      val sourceParquetFile = parquetFiles.head
      val catalog = spark.sessionState.catalogManager.catalog("split_cat")
      val ident =
        org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "single_row_group_split")
      val table = catalog
        .asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
        .loadTable(ident)
        .asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
        .table()
      val dataFile = org.apache.iceberg.DataFiles
        .builder(table.spec())
        .withPath(sourceParquetFile.getAbsolutePath)
        .withFormat(org.apache.iceberg.FileFormat.PARQUET)
        .withFileSizeInBytes(sourceParquetFile.length())
        .withRecordCount(1)
        .build()
      table.newAppend().appendFile(dataFile).commit()
      val df = spark.read
        .format("iceberg")
        .option("split-size", "64")
        .option("file-open-cost", "64")
        .load("split_cat.db.single_row_group_split")
        .where("id = 0")
        .select("id")
      val rows = df.collect()
      assert(rows.length == 1, s"Expected 1 row, got ${rows.length}: ${rows.mkString(", ")}")
    }
  }
}

Actual behavior

The query returns the same row multiple times.

Example failure:

  Expected 1 row, got 4: [0], [0], [0], [0]

Expected behavior

The query should return exactly one row:

  [0]

Additional context

The same Parquet row group appears to be selected by multiple byte-range tasks.
This happens because the native iceberg-rust reader treats row group selection as byte-range overlap:

  row_group_start < split_end && split_start < row_group_end

For a single row group split into N byte ranges, all N ranges overlap the row group, so all N tasks read and emit it.

Parquet Java / vanilla Spark avoids this by assigning each row group to exactly one split, using row-group midpoint ownership semantics. In parquet-java, split filtering keeps a row group only when the split's range contains the row group's midpoint:

long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
  newRowGroups.add(rowGroup);
}

Environment

  • Comet version: current local 0.16.0-SNAPSHOT
  • Spark: 3.5.8
  • Iceberg catalog: Hadoop catalog
  • Native Iceberg scan enabled:
  spark.comet.scan.icebergNative.enabled=true

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions