TPC-DS Benchmarking with spark-sql-perf#

This guide explains how to generate TPC-DS data and run TPC-DS benchmarks using the KubedAI/spark-sql-perf framework (a fork of databricks/spark-sql-perf).

We use the KubedAI fork because it adds two features not present in the upstream Databricks repository:

  • Apache Iceberg support — allows generating and registering TPC-DS tables in Iceberg format, which is needed for Comet Iceberg benchmarking.

  • TPC-DS v4.0 queries — adds the full set of TPC-DS v4.0 queries alongside the existing v1.4 and v2.4 sets.

The spark-sql-perf approach uses the TPC-DS dsdgen tool to generate data directly through Spark, which handles partitioning and writing to Parquet format automatically.

Prerequisites#

  • Java 17 (for Spark 3.5+)

  • Apache Spark 3.5.x

  • SBT (Scala Build Tool)

  • C compiler toolchain (gcc, make, flex, bison, byacc)

Step 1: Build tpcds-kit#

The dsdgen tool from databricks/tpcds-kit is required for data generation. This is a modified fork of the official TPC-DS toolkit that outputs to stdout, allowing Spark to ingest the data directly.

Linux (Ubuntu/Debian):

sudo apt-get install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX

Linux (CentOS/RHEL/Amazon Linux):

sudo yum install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX

macOS:

xcode-select --install
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=MACOS

Verify the build succeeded:

ls -l dsdgen

Step 2: Build spark-sql-perf#

git clone https://github.com/KubedAI/spark-sql-perf.git
cd spark-sql-perf
git checkout support-iceberg-tpcds-v4.0
sbt package

This produces a JAR file at target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar (the exact version may vary). Note the path to this JAR for later use.

Step 3: Install and Start Spark#

If you do not already have Spark installed:

export SPARK_VERSION=3.5.6
wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz
tar xzf spark-$SPARK_VERSION-bin-hadoop3.tgz
sudo mv spark-$SPARK_VERSION-bin-hadoop3 /opt
export SPARK_HOME=/opt/spark-$SPARK_VERSION-bin-hadoop3/
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
mkdir -p /tmp/spark-events

Start Spark in standalone mode:

$SPARK_HOME/sbin/start-master.sh
export SPARK_MASTER=spark://$(hostname):7077
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER

Step 4: Generate TPC-DS Data#

Launch spark-shell with the spark-sql-perf JAR loaded:

$SPARK_HOME/bin/spark-shell \
    --master $SPARK_MASTER \
    --jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.cores=8 \
    --conf spark.executor.memory=16g

In the Spark shell, run the following to generate data. Adjust scaleFactor and paths as needed:

import com.databricks.spark.sql.perf.tpcds.TPCDSTables

val tpcdsKit = "/path/to/tpcds-kit/tools"
val scaleFactor = "100"    // 100 GB
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val numPartitions = 32     // adjust based on cluster size

val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = tpcdsKit, scaleFactor = scaleFactor)
tables.genData(
  location = dataDir,
  format = format,
  overwrite = true,
  partitionTables = true,
  clusterByPartitionColumns = true,
  filterOutNullPartitionValues = false,
  numPartitions = numPartitions
)

Data generation for SF100 typically takes 20-60 minutes depending on hardware. When complete, exit the shell:

:quit

Verify the data was generated:

ls /path/to/tpcds-data/

You should see directories for each TPC-DS table (store_sales, catalog_sales, web_sales, customer, date_dim, etc.).

Set the TPCDS_DATA environment variable:

export TPCDS_DATA=/path/to/tpcds-data

Step 5: Run TPC-DS Benchmarks#

Register Tables#

Launch spark-shell with the spark-sql-perf JAR (same as Step 4) and register the generated data as tables:

import com.databricks.spark.sql.perf.tpcds.{TPCDS, TPCDSTables}

val scaleFactor = "100"
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val databaseName = "tpcds"

// Create database and register tables
sql(s"CREATE DATABASE IF NOT EXISTS $databaseName")
val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = "", scaleFactor = scaleFactor)
tables.createExternalTables(
  location = dataDir,
  format = format,
  databaseName = databaseName,
  overwrite = true,
  discoverPartitions = true
)

sql(s"USE $databaseName")

Run Spark Baseline#

val tpcds = new TPCDS(spark.sqlContext)

// Choose a query set: tpcds1_4Queries, tpcds2_4Queries, or tpcds4_0Queries
val queries = tpcds.tpcds2_4Queries

val experiment = tpcds.runExperiment(
  executionsToRun = queries,
  iterations = 3,
  resultLocation = "/path/to/results/spark",
  tags = Map("engine" -> "spark", "scale_factor" -> "100"),
  forkThread = true
)

experiment.waitForFinish(86400)

Results are saved as JSON to the resultLocation path.

Run with Comet#

Build Comet from source and launch spark-shell with both the Comet and spark-sql-perf JARs:

make release
export COMET_JAR=$(pwd)/spark/target/comet-spark-spark3.5_2.12-*.jar

$SPARK_HOME/bin/spark-shell \
    --master $SPARK_MASTER \
    --jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar,$COMET_JAR \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.cores=8 \
    --conf spark.executor.memory=8g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=8g \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.columnar.shuffle.enabled=true

Then register tables and run the benchmark the same way as the Spark baseline, changing the tags and result location:

val experiment = tpcds.runExperiment(
  executionsToRun = queries,
  iterations = 3,
  resultLocation = "/path/to/results/comet",
  tags = Map("engine" -> "comet", "scale_factor" -> "100"),
  forkThread = true
)

experiment.waitForFinish(86400)

View Results#

Results are saved as JSON under the resultLocation. You can query them directly in Spark:

val results = spark.read.json("/path/to/results/spark")
results.select("name", "parsingTime", "analysisTime", "optimizationTime", "planningTime", "executionTime")
  .withColumn("totalTime", (col("parsingTime") + col("analysisTime") +
    col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0)
  .orderBy("name")
  .show(200, false)

Alternative: Command-Line Data Generation#

You can also generate TPC-DS data without the Spark shell using spark-submit:

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --class com.databricks.spark.sql.perf.tpcds.GenTPCDSData \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.cores=8 \
    --conf spark.executor.memory=16g \
    /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
    -d /path/to/tpcds-kit/tools \
    -s 100 \
    -l /path/to/tpcds-data \
    -f parquet

Troubleshooting#

dsdgen not found#

Ensure tpcds-kit/tools/dsdgen exists and is executable. The dsdgenDir parameter in the Spark shell (or -d flag in the CLI) must point to the directory containing the dsdgen binary, not the binary itself.

Out of memory during data generation#

For large scale factors (SF1000+), increase executor memory and the number of partitions:

--conf spark.executor.memory=32g

And in the Spark shell, use a higher numPartitions value (e.g., 200+).