Simple Spark examples using Ammonite

Dependencies:

Ammonite solves the issues of dependency-management, compilation and launching with the correct classpath.

Simple example

Copy-paste the following snippet into a file called simple.sc, and run it with Ammonite as shown below.

// simple.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {
  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .getOrCreate()

  spark.sql("select 1").show()
}

Launch command:

JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED' amm simple.sc

Example using Hive meta-store

// test.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`org.apache.spark::spark-hive:3.5.0`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {
  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .enableHiveSupport()
              .getOrCreate()

  spark.sql("select 1").show()
}

Launch command:

JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED' amm test.sc

Example setting the Warehouse path

// test-path.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {
  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .config("spark.sql.warehouse.dir", "/opt/warehouse")
              .getOrCreate()

  spark.sql("""
    CREATE TABLE hello
    USING csv
    OPTIONS (header=true)
    LOCATION 'store/'
    AS (select 1 as col)
  """)
}

Launch command:

JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED' amm test-path.sc

Example using Delta tables

// delta.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`io.delta::delta-spark:3.0.0`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {

  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
              .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
              .getOrCreate()

  spark.sql("""
    CREATE TABLE hello
    USING delta
    LOCATION 'store3/'
    AS (select 1 as col)
  """).show(false)
}

Launch command:

JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED' amm delta.sc

Example using Iceberg Tables

// iceberg.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`org.apache.iceberg::iceberg-spark-runtime-3.5:1.4.3`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {
  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
              .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
              .config("spark.sql.catalog.local.type", "hadoop")
              .config("spark.sql.catalog.local.warehouse", "warehouse")
              .getOrCreate()

  spark.sql("CREATE TABLE local.db.table (id bigint, data string) USING iceberg").show()
}
JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED' amm iceberg.sc

Example using bloom-filtering on the data-column and partitioning on the id-column (hash(id) mod 16).

// iceberg2.sc
import $ivy.`org.apache.spark::spark-core:3.5.0`
import $ivy.`org.apache.spark::spark-sql:3.5.0`
import $ivy.`org.apache.iceberg::iceberg-spark-runtime-3.5:1.4.3`

import org.apache.spark.sql.SparkSession

@main
def main() : Unit = {
  val spark = SparkSession
              .builder()
              .appName("Spark SQL basic example")
              .master("local[*]")
              .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
              .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
              .config("spark.sql.catalog.local.type", "hadoop")
              .config("spark.sql.catalog.local.warehouse", "warehouse")
              .getOrCreate()

  spark.sql("DROP TABLE IF EXISTS local.db.table")
  spark.sql("""
    CREATE TABLE local.db.table (id bigint, data string) 
    USING iceberg
    PARTITIONED BY (bucket(16, id))
    TBLPROPERTIES (
      'write.format.default'='parquet',
      'write.parquet.bloom-filter-enabled.column.data'='true',
      'write.target-file-size-bytes'='536870912'
    )
    """).show()
}
JAVA_OPTS='--add-exports java.base/sun.nio.ch=ALL-UNNAMED' amm iceberg2.sc