Supported Spark Data Sources

File Formats

Parquet

When spark.comet.scan.enabled is enabled, Parquet scans will be performed natively by Comet if all data types in the schema are supported. When this option is not enabled, the scan will fall back to Spark. In this case, enabling spark.comet.convert.parquet.enabled will immediately convert the data into Arrow format, allowing native execution to happen after that, but the process may not be efficient.

CSV

Comet does not provide native CSV scan, but when spark.comet.convert.csv.enabled is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that.

JSON

Comet does not provide native JSON scan, but when spark.comet.convert.json.enabled is enabled, data is immediately converted into Arrow format, allowing native execution to happen after that.

Data Catalogs

Apache Iceberg

See the dedicated Comet and Iceberg Guide.

Supported Storages

Comet supports most standard storage systems, such as local file system and object storage.

HDFS

Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for supported formats

Using experimental native DataFusion reader

Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only

To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed

Example: Build a Comet for spark-3.5 provide a JDK path in JAVA_HOME Provide the JRE linker path in RUSTFLAGS, the path can vary depending on the system. Typically JRE linker is a part of installed JDK

export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"

Start Comet with experimental reader and HDFS support as described and add additional parameters

--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
--conf dfs.client.use.datanode.hostname = true

Query a struct type from Remote HDFS

spark.read.parquet("hdfs://namenode:9000/user/data").show(false)

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- personal_info: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- lastName: string (nullable = true)
 |    |-- ageInYears: integer (nullable = true)

25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.7.0 initialized
== Physical Plan ==
* CometColumnarToRow (2)
+- CometNativeScan:  (1)


(1) CometNativeScan: 
Output [3]: [id#0, first_name#1, personal_info#4]
Arguments: [id#0, first_name#1, personal_info#4]

(2) CometColumnarToRow [codegen id : 1]
Input [3]: [id#0, first_name#1, personal_info#4]


25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
+---+----------+-----------------+
|id |first_name|personal_info    |
+---+----------+-----------------+
|2  |Jane      |{Jane, Smith, 34}|
|1  |John      |{John, Doe, 28}  |
+---+----------+-----------------+

Verify the native scan type should be CometNativeScan.

More on HDFS Reader

Local HDFS development

  • Configure local machine network. Add hostname to /etc/hosts

127.0.0.1	localhost   namenode datanode1 datanode2 datanode3
::1             localhost namenode datanode1 datanode2 datanode3
  • Start local HDFS cluster, 3 datanodes, namenode url is namenode:9000

docker compose -f kube/local/hdfs-docker-compose.yml up
  • Check the local namenode is up and running on http://localhost:9870/dfshealth.html#tab-overview

  • Build a project with HDFS support

JAVA_HOME="/opt/homebrew/opt/openjdk@11" make release PROFILES="-Pspark-3.5" COMET_FEATURES=hdfs RUSTFLAGS="-L /opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home/lib/server"
  • Run local test

    withSQLConf(
      CometConf.COMET_ENABLED.key -> "true",
      CometConf.COMET_EXEC_ENABLED.key -> "true",
      CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
      SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
      "fs.defaultFS" -> "hdfs://namenode:9000",
      "dfs.client.use.datanode.hostname" -> "true") {
      val df = spark.read.parquet("/tmp/2")
      df.show(false)
      df.explain("extended")
    }
  }

Or use spark-shell with HDFS support as described above

S3

DataFusion Comet has multiple Parquet scan implementations that use different approaches to read data from S3.

native_comet

The default native_comet Parquet scan implementation reads data from S3 using the Hadoop-AWS module, which is identical to the approach commonly used with vanilla Spark. AWS credential configuration and other Hadoop S3A configurations works the same way as in vanilla Spark.

native_datafusion

The native_datafusion Parquet scan implementation completely offloads data loading to native code. It uses the object_store crate to read data from S3 and supports configuring S3 access using standard Hadoop S3A configurations by translating them to the object_store crate’s format.

This implementation maintains compatibility with existing Hadoop S3A configurations, so existing code will continue to work as long as the configurations are supported and can be translated without loss of functionality.

Supported Credential Providers

AWS credential providers can be configured using the fs.s3a.aws.credentials.provider configuration. The following table shows the supported credential providers and their configuration options:

Credential provider

Description

Supported Options

org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

Access S3 using access key and secret key

fs.s3a.access.key, fs.s3a.secret.key

org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider

Access S3 using temporary credentials

fs.s3a.access.key, fs.s3a.secret.key, fs.s3a.session.token

org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider

Access S3 using AWS STS assume role

fs.s3a.assumed.role.arn, fs.s3a.assumed.role.session.name (optional), fs.s3a.assumed.role.credentials.provider (optional)

org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider

Access S3 using EC2 instance profile or ECS task credentials (tries ECS first, then IMDS)

None (auto-detected)

org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider
com.amazonaws.auth.AnonymousAWSCredentials
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider

Access S3 without authentication (public buckets only)

None

com.amazonaws.auth.EnvironmentVariableCredentialsProvider
software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider

Load credentials from environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN)

None

com.amazonaws.auth.InstanceProfileCredentialsProvider
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider

Access S3 using EC2 instance metadata service (IMDS)

None

com.amazonaws.auth.ContainerCredentialsProvider
software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider
com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper

Access S3 using ECS task credentials

None

com.amazonaws.auth.WebIdentityTokenCredentialsProvider
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider

Authenticate using web identity token file

None

Multiple credential providers can be specified in a comma-separated list using the fs.s3a.aws.credentials.provider configuration, just as Hadoop AWS supports. If fs.s3a.aws.credentials.provider is not configured, Hadoop S3A’s default credential provider chain will be used. All configuration options also support bucket-specific overrides using the pattern fs.s3a.bucket.{bucket-name}.{option}.

Additional S3 Configuration Options

Beyond credential providers, the native_datafusion implementation supports additional S3 configuration options:

Option

Description

fs.s3a.endpoint

The endpoint of the S3 service

fs.s3a.endpoint.region

The AWS region for the S3 service. If not specified, the region will be auto-detected.

fs.s3a.path.style.access

Whether to use path style access for the S3 service (true/false, defaults to virtual hosted style)

fs.s3a.requester.pays.enabled

Whether to enable requester pays for S3 requests (true/false)

All configuration options support bucket-specific overrides using the pattern fs.s3a.bucket.{bucket-name}.{option}.

Examples

The following examples demonstrate how to configure S3 access with the native_datafusion Parquet scan implementation using different authentication methods.

Example 1: Simple Credentials

This example shows how to access a private S3 bucket using an access key and secret key. The fs.s3a.aws.credentials.provider configuration can be omitted since org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider is included in Hadoop S3A’s default credential provider chain.

$SPARK_HOME/bin/spark-shell \
...
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.s3a.access.key=my-access-key \
--conf spark.hadoop.fs.s3a.secret.key=my-secret-key
...

Example 2: Assume Role with Web Identity Token

This example demonstrates using an assumed role credential to access a private S3 bucket, where the base credential for assuming the role is provided by a web identity token credentials provider.

$SPARK_HOME/bin/spark-shell \
...
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider \
--conf spark.hadoop.fs.s3a.assumed.role.arn=arn:aws:iam::123456789012:role/my-role \
--conf spark.hadoop.fs.s3a.assumed.role.session.name=my-session \
--conf spark.hadoop.fs.s3a.assumed.role.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
...

Limitations

The S3 support of native_datafusion has the following limitations:

  1. Partial Hadoop S3A configuration support: Not all Hadoop S3A configurations are currently supported. Only the configurations listed in the tables above are translated and applied to the underlying object_store crate.

  2. Custom credential providers: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See issue #1829 for more details.

native_iceberg_compat

The native_iceberg_compat Parquet scan implementation does not support reading data from S3 yet, but we are working on it.