TPC-DS Benchmarking with spark-sql-perf#
This guide explains how to generate TPC-DS data and run TPC-DS benchmarks using the KubedAI/spark-sql-perf framework (a fork of databricks/spark-sql-perf).
We use the KubedAI fork because it adds two features not present in the upstream Databricks repository:
Apache Iceberg support — allows generating and registering TPC-DS tables in Iceberg format, which is needed for Comet Iceberg benchmarking.
TPC-DS v4.0 queries — adds the full set of TPC-DS v4.0 queries alongside the existing v1.4 and v2.4 sets.
The spark-sql-perf approach uses the TPC-DS dsdgen tool to generate data directly through Spark, which handles
partitioning and writing to Parquet format automatically.
Prerequisites#
Java 17 (for Spark 3.5+)
Apache Spark 3.5.x
SBT (Scala Build Tool)
C compiler toolchain (
gcc,make,flex,bison,byacc)
Step 1: Build tpcds-kit#
The dsdgen tool from databricks/tpcds-kit is required for data
generation. This is a modified fork of the official TPC-DS toolkit that outputs to stdout, allowing Spark to
ingest the data directly.
Linux (Ubuntu/Debian):
sudo apt-get install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX
Linux (CentOS/RHEL/Amazon Linux):
sudo yum install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX
macOS:
xcode-select --install
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=MACOS
Verify the build succeeded:
ls -l dsdgen
Step 2: Build spark-sql-perf#
git clone https://github.com/KubedAI/spark-sql-perf.git
cd spark-sql-perf
git checkout support-iceberg-tpcds-v4.0
sbt package
This produces a JAR file at target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar (the exact version may
vary). Note the path to this JAR for later use.
Step 3: Install and Start Spark#
If you do not already have Spark installed:
export SPARK_VERSION=3.5.6
wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz
tar xzf spark-$SPARK_VERSION-bin-hadoop3.tgz
sudo mv spark-$SPARK_VERSION-bin-hadoop3 /opt
export SPARK_HOME=/opt/spark-$SPARK_VERSION-bin-hadoop3/
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
mkdir -p /tmp/spark-events
Start Spark in standalone mode:
$SPARK_HOME/sbin/start-master.sh
export SPARK_MASTER=spark://$(hostname):7077
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
Step 4: Generate TPC-DS Data#
Launch spark-shell with the spark-sql-perf JAR loaded:
$SPARK_HOME/bin/spark-shell \
--master $SPARK_MASTER \
--jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=16g
In the Spark shell, run the following to generate data. Adjust scaleFactor and paths as needed:
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tpcdsKit = "/path/to/tpcds-kit/tools"
val scaleFactor = "100" // 100 GB
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val numPartitions = 32 // adjust based on cluster size
val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = tpcdsKit, scaleFactor = scaleFactor)
tables.genData(
location = dataDir,
format = format,
overwrite = true,
partitionTables = true,
clusterByPartitionColumns = true,
filterOutNullPartitionValues = false,
numPartitions = numPartitions
)
Data generation for SF100 typically takes 20-60 minutes depending on hardware. When complete, exit the shell:
:quit
Verify the data was generated:
ls /path/to/tpcds-data/
You should see directories for each TPC-DS table (store_sales, catalog_sales, web_sales, customer,
date_dim, etc.).
Set the TPCDS_DATA environment variable:
export TPCDS_DATA=/path/to/tpcds-data
Step 5: Run TPC-DS Benchmarks#
Register Tables#
Launch spark-shell with the spark-sql-perf JAR (same as Step 4) and register the generated data as tables:
import com.databricks.spark.sql.perf.tpcds.{TPCDS, TPCDSTables}
val scaleFactor = "100"
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val databaseName = "tpcds"
// Create database and register tables
sql(s"CREATE DATABASE IF NOT EXISTS $databaseName")
val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = "", scaleFactor = scaleFactor)
tables.createExternalTables(
location = dataDir,
format = format,
databaseName = databaseName,
overwrite = true,
discoverPartitions = true
)
sql(s"USE $databaseName")
Run Spark Baseline#
val tpcds = new TPCDS(spark.sqlContext)
// Choose a query set: tpcds1_4Queries, tpcds2_4Queries, or tpcds4_0Queries
val queries = tpcds.tpcds2_4Queries
val experiment = tpcds.runExperiment(
executionsToRun = queries,
iterations = 3,
resultLocation = "/path/to/results/spark",
tags = Map("engine" -> "spark", "scale_factor" -> "100"),
forkThread = true
)
experiment.waitForFinish(86400)
Results are saved as JSON to the resultLocation path.
Run with Comet#
Build Comet from source and launch spark-shell with both the Comet and spark-sql-perf JARs:
make release
export COMET_JAR=$(pwd)/spark/target/comet-spark-spark3.5_2.12-*.jar
$SPARK_HOME/bin/spark-shell \
--master $SPARK_MASTER \
--jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar,$COMET_JAR \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=8g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=8g \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.columnar.shuffle.enabled=true
Then register tables and run the benchmark the same way as the Spark baseline, changing the tags and result location:
val experiment = tpcds.runExperiment(
executionsToRun = queries,
iterations = 3,
resultLocation = "/path/to/results/comet",
tags = Map("engine" -> "comet", "scale_factor" -> "100"),
forkThread = true
)
experiment.waitForFinish(86400)
View Results#
Results are saved as JSON under the resultLocation. You can query them directly in Spark:
val results = spark.read.json("/path/to/results/spark")
results.select("name", "parsingTime", "analysisTime", "optimizationTime", "planningTime", "executionTime")
.withColumn("totalTime", (col("parsingTime") + col("analysisTime") +
col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0)
.orderBy("name")
.show(200, false)
Alternative: Command-Line Data Generation#
You can also generate TPC-DS data without the Spark shell using spark-submit:
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class com.databricks.spark.sql.perf.tpcds.GenTPCDSData \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=16g \
/path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
-d /path/to/tpcds-kit/tools \
-s 100 \
-l /path/to/tpcds-data \
-f parquet
Troubleshooting#
dsdgen not found#
Ensure tpcds-kit/tools/dsdgen exists and is executable. The dsdgenDir parameter in the Spark shell (or -d
flag in the CLI) must point to the directory containing the dsdgen binary, not the binary itself.
Out of memory during data generation#
For large scale factors (SF1000+), increase executor memory and the number of partitions:
--conf spark.executor.memory=32g
And in the Spark shell, use a higher numPartitions value (e.g., 200+).