Deploy Apache Iceberg Data Lake on Amazon S3 using AWS Glue Spark job
INTRODUCTION
A data lake is a centralized repository that you can use to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data and then run different types of analytics for better business insights. On AWS cloud the go-to service for creating a Data Lake is to use Amazon S3 service and then we leverage other services to perform analytics on the data lake
Currently there has been spike in a new type of usage pattern for data lake which is more like a transactional system rather then historic analytics. In transactional system the most important types of operations are Updates, Inserts & Deletes which was difficult and not so feasible in traditional S3 data lake structure. Thus there has been rise of a new concept of Open Table Formats — which actually help in performing CRUD operation on a data lake. One of the open table format is Apache Iceberg format.
What is Apache Iceberg Format?
Apache Iceberg is an open table format designed for huge, petabyte-scale tables. The function of a table format is to determine how you manage, organize and track all of the files that make up a table. You can think of it as an abstraction layer between your physical data files (written in Parquet or ORC etc.) and how they are structured to form a table.
Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time.
Some of the important features of Apache Iceberg are as follows
- Transactional consistency between multiple applications where files can be added, removed or modified atomically, with full read isolation and multiple concurrent writes
- Full schema evolution to track changes to a table over time
- Time travel to query historical data and verify changes between updates
- Partition layout and evolution enabling updates to partition schemes as queries and data volumes change without relying on hidden partitions or physical directories
- Rollback to prior versions to quickly correct issues and return tables to a known good state
- Advanced planning and filtering capabilities for high performance on large data volumes
Seems INTERESTING ! Right? So lets see how can leverage Apache Iceberg format in AWS Glue Jobs
Create Apache Iceberg Data Lake using AWS Glue
To get started with this, there some pre-requisite activities that are needed here.
- Access to AWS services — AWS Glue, S3 & Athena
- Glue IAM service role which has access to Amazon S3
- An existing Glue Table which you can create by running just a simple crawler which will be used as a source for the Glue Job. In our example we will be using Covid table data
- An existing Glue database which we will be leveraging as a destination database to create a Iceberg table
Once you have the above pre-requisites completed, then we can start to write the Glue Spark job
1. Define some of the important spark and python libraries which are required in the job
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
catalog_nm = "glue_catalog"
#The Glue Database Name which has the source table
in_database="<glue-database-input>"
#The input Glue Table which we will be using as a source for the Iceberg data
in_table_name="covid_19_data"
#The Glue Database Name which will be used to create an output Icerberg table
database_op='database_ib'
#The Glue Table Name which will be used as a destination for Icerberg table
table_op='covid_dataset_iceberg'
#The S3 path which will be use to store the Iceberg files as output
s3_output_path='s3://<your-destination-bucket-name>/iceberg-output/'
table = str(catalog_nm)+'.`'+str(database_op)+'`.'+str(table_op)
print("\nINPUT Database : " + str(in_database))
print("\nINPUT Table : " + str(in_table_name))
print("\nOUPUT IcerBerg Database : " + str(database_op))
print("\nOUPUT IcerBerg Table : " + str(table))
print("\nOUPUT IcerBerg S3 Path : " + str(s3_output_path))
The above code snippet, will import all the required python and spark packages for our Glue script. Along with that we also need to define some important parameters and variables which will be used through out the script
In line with the script we need to define a important job parameter in the glue which will indicate the Glue job executer to leverage the Iceberg table format as output for the data. For this you need to define a parameter named as
--datalake-formats : iceberg
2. Define Spark and Glue context for the Iceberg Table format
In this step, we will be defining the spark iceberg object which will defined with the with Glue Catalog as a spark Catalog, S3 path as a Spark Data warehouse.
def create_spark_iceberg(catalog_nm:str = "glue_catalog"):
"""
Function to initialize a session with iceberg by default
:param catalog_nm:
:return spark:
"""
from pyspark.sql import SparkSession
# You can set this as a variable if required
warehouse_path = s3_output_path
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{catalog_nm}",
"org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_nm}.warehouse",
warehouse_path) \
.config(f"spark.sql.catalog.{catalog_nm}.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_nm}.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO") \
.config(f"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
return spark
ibspark = create_spark_iceberg(catalog_nm)
ibsc = ibspark.sparkContext
ibglueContext = GlueContext(ibsc)
ibjob = Job(ibglueContext)
ibjob.init(args["JOB_NAME"], args)
Once you define the Spark object, then we need to set the Spark Context and set the Glue Context. Once you have all the context set, then we can start to play with the source data and Iceberg table
3. Read the source Glue table and write into a destination Glue Table in Iceberg format
In this step, we will read the data from the existing source Glue Table which we have defined in our previous steps. Then we convert the glue dynamic frame into a spark DataFrame. The Spark DataFrame is then registered as a temp table/view so that we can leverage Spark SQL on it
#Read the Glue inout table from thr Catalog using a Glue DynamicFrame
InputDynamicFrameTable = ibglueContext.create_dynamic_frame.from_catalog(database=in_database, table_name=in_table_name)
#Convert the Glue DynamicFrame into a Spark DataFrame
InputDynamicFrameTable_DF = InputDynamicFrameTable.toDF()
#Register the Spark DataFrame as TempView
InputDynamicFrameTable_DF.createOrReplaceTempView("InputDataFrameTable")
ibspark.sql("select * from InputDataFrameTable LIMIT 10").show()
#Filter the source table with country as 'Australia'
colname_df = ibspark.sql("SELECT * FROM InputDataFrameTable WHERE country='Australia'")
colname_df.createOrReplaceTempView("OutputDataFrameTable")
#Write the filtered Data into an ICEBERG table format in Glue destination table
ib_Write_SQL = f"""
CREATE OR REPLACE TABLE {catalog_nm}.{database_op}.{table_op}
USING iceberg
TBLPROPERTIES ("format-version"="2", "write_compression"="gzip")
AS SELECT * FROM OutputDataFrameTable;
"""
#Run the Spark SQL query
ibspark.sql(ib_Write_SQL)
In our example, we will be filtering the table on a certain column and then write the data into a specific Glue database and table which we had defined earlier.
4. Let’s now Run the Glue Job and see the output in S3 and Query it through Athena
At this stage, you can save the Glue and then run the Glue job. Once the Glue job completes the execution then we can go ahead and check the Glue Database and Table.
The screenshot above provides detail on the table which we created though the Glue job. The Table format defined is Iceberg format and also has a defined S3 path where the output table files are stored in the Data Lake
Let’s take a look at the S3 destination and how the folder structure looks like
There are by default data and metadata folders created and which stores important data for the table. The data folder, as name suggests it stored the data files in the Iceberg format and the metadata folder only stores the metadata information about the table
Now lets start querying the Iceberg table using Athena. Once you query the table you can actually see the table has only filtered data for Australia which we had added in the Glue script.
Lets run a Update query on the table and see whether we are able to update the actual records in the Iceberg table. For that lets update Australia value in the country column to AUS.
UPDATE "database_ib"."covid_dataset_iceberg" SET country = 'AUS';
You can see that the query ran successfully and without any errors. Lets now actually see whether we can see the country column updated by again firing SELECT query.
In the above screenshot, you can actually see that all the records are updated with the new value of country which is AUS
CONCLUSION
Thus through this blog we can actually see how we can use Apache Iceberg table format through AWS Glue and then leverage other AWS services for manipulating the data. Iceberg format comes with a number of advantages from transactional query standpoint as well.
In this post, we introduced the Apache Iceberg framework and how it helps resolve some of the challenges we have in a modern data lake. I hope this post provides some useful information for you to get started with Apache Iceberg in your data lake solution.
REFERENCE:
- Iceberg Format and Versions — https://iceberg.apache.org/spec/#format-versioning
- Iceberg and AWS — https://aws.amazon.com/what-is/apache-iceberg/
- Apache Iceberg with Athena — https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html
- AWS Glue with Iceberg — https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html
- Creating Iceberg using Athena — https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html