AWS Athena : How to execute queries in parallel? and How to control the flow of the code till their completion? using Boto3 SDKs

Rajas Walavalkar
8 min readMay 18, 2020

--

Introduction to Athena

AWS Athena can be used to analyze data in Amazon S3 using standard SQL queries. This makes it a powerful service as it reduces the overhead to create schema and load data into the schema. You can import all the data into Amazon S3 and just create a catalog over the files using AWS Glue or normal Athena CREATE EXTERNAL TABLE query and then use Athena queries to extract/analyse data from S3

Athena queries can be fired from AWS Athena Console or by using the Boto3 Python SDKs. In this blog we are going to focus on accessing Athena programmatically using Boto3 Python SDKs.

What was the Challenge?

A major challenge which I faced while using the Boto3 client library, is while submitting the list of queries to Athena which had dependency with each other. The reason for this is, when we submit a query to Athena using an API call, the query is initiated and in return we receive a query execution id for the query. Thus it becomes difficult to control the flow of the code till the query is completed successfully.

Let’s see how to control this with the implementation of the code

A key take away from this would be, how to control the flow of the code in a way, where the queries which are independent are executed in parallel and then the dependent queries are executed after successful execution of the above set of queries

Requirements

  • An EC2 instance(Amazon Linux 2 AMI 2.0.20200406.0 x86_64) within a default public subnet (OR within a private subnet, along with the Internet Gateway, NAT Gateway and Bastion Host configured)
  • An IAM Role created which has Athena and S3 access policies defined and attached to the EC2 instance created above
  • Python 2.7+ setup on the instance along with the Boto3 package installed using pip command

Note: The above requirement for EC2 and IAM roles is necessary if you are planning to access Athena from an EC2 instance. Alternatively, you can also access Athena from your local machine by setting up AWS-CLI and by configuring Access Key Id and Secret Access Key which have appropriate permissions to S3 and Athena

The entire code is split into two major functions. First function (run_query) is to execute the queries in parallel and the second function (check_query_status) is to wait till all the queries are completed. Using these two functions the control of the code can be configured

Note: You can get the entire list of functions from the Boto3 Documentation for Athena over here

1st Function — Submitting Queries

This function accepts a list of queries and executes them on Athena using Boto3 SDKs. The function returns the responses back to the caller function

Define the global variables in the code

  • Required Parameters : region_name, s3_output_path
  • Optional Parameters : work_group, database

Note: For every API call to Athena, region name and the S3 output path are required parameters, you can refer to the Boto3 documentation mentioned above to get a complete list of arguments.

import boto3 
import time
import copy
'''These parameters are required - Define the Region for the Athena Database and the s3 path for the query outputs'''
region_name='us-east-1'
s3_output_path='<s3-bucket-path>'
'''These parameters are optional - Define the Athena Workgroup to execute the queries and the Athena Database '''
work_group='primary'
database='<database-name>'
'''The function run_query requires 3 parameters and 2 optional parameters(work_group and database)'''
def run_query(query, s3_output_path, region_name, *args, **kwargs):
query_list=[]
response_list=[]
if(type(query) is list):
query_list=query
else:
query_list.append(query)
work_group = kwargs.get('work_group', None)
database = kwargs.get('database', None)
OutputLocation={}
OutputLocation['OutputLocation'] = s3_output_path
client = boto3.client('athena', region_name=region_name)
for single_query in query_list:
if ((work_group == None) and (database == None)):
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation
)
if ((work_group == None) and (database != None)):
DatabaseName={}
DatabaseName['Database'] = database
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
QueryExecutionContext=DatabaseName
)
if ((work_group != None) and (database == None)):
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
WorkGroup=work_group
)
if ((work_group != None) and (database != None)):
DatabaseName={}
DatabaseName['Database'] = database
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
WorkGroup=work_group,
QueryExecutionContext=DatabaseName
)
print('\nExecution ID: ' + response['QueryExecutionId'])
response_list.append(response)
return response_list
'''Define the Array of the queries which are to be executed Parallel '''
query_list=['<Query1>', '<Query2>', '<Query3>', '<Query4>', '<Query5>']
'''Call the Function defined above and pass the List of Queries to executed along with the required parameters '''
response_list = run_query(query_list, s3_output_path, region_name, database=database)

Pseudo Code for the above snippet

  • Define the global parameters
  • Define the run_query() function which accepts a list of queries which are to be executed in parallel along with some other optional arguments.
  • This function submits all the queries and returns an array of the responses (response_list) obtained from the Athena API call. These responses include the query execution ids for every independent query submitted to Athena
  • These query execution ids can be extracted from the return value received by the caller function using the following code snippet
query_executing_ids=[]for resp in response_list:
query_executing_ids.append(resp['QueryExecutionId'])
print("\nQuery Execution Ids for the Queries Submitted : "+str(resp['QueryExecutionId']))

2nd Function — Wait till all the Queries are Completed

Once we extract all the query execution ids in a list, then we will obtain the statuses of each query using these ids. The major challenge over here is to hold the control of the code till the execution of the above list of queries are completed. This is achieved by defining our second function check_query_status() and passing the query execution ids list to this function

Arguments passed to this function are as follows:

  • query_executing_ids : The list of query execution ids who’s query statuses are to be checked
  • region_name : Region defined for the Athena Database and Tables
  • time_to_check : This is argument defines the time(in seconds) for which the function waits in the loop till it again checks the status of the query submitted.

Note: The value of time_to_check is to be defined in such a way that if the queries submitted are heavy and requires more time to complete, then the value of this argument should be high(120 to 180 seconds) or else if the queries complete in a few seconds then you can define the value to be in between 5 to 10 seconds. If this value is set to a very low value it will continuously make API calls to get the status of the queries, which can throttle the Athena, so wisely decide the value of this argument

'''The function run_query requires 3 parameters'''def check_query_status(query_executing_ids, region_name, time_to_check): 
i=0
response_array=[]
query_executing_ids_copy = copy.copy(query_executing_ids)
while True:
q_id=query_executing_ids_copy[i]
time.sleep(time_to_check)
client = boto3.client('athena', region_name=region_name)
response = client.get_query_execution(QueryExecutionId=q_id)
status_resp=response['QueryExecution']['Status']['State']
if status_resp=='SUCCEEDED':
print('\nQuery ID '+q_id+' SUCCEEDED')
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
output_loaction_path =response['QueryExecution']['ResultConfiguration']['OutputLocation']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes, 'OutputLocation': output_loaction_path}
response_array.append(respone_dictionary)
query_executing_ids_copy.remove(q_id)
if len(query_executing_ids_copy)==0:
break;
if (status_resp=='FAILED' or status_resp=='CANCELLED'):
if status_resp=='FAILED':
print('\nQuery ID '+q_id+' FAILED')
status_failed_resp=response['QueryExecution']['Status']['StateChangeReason']
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'ErrorMessage': status_failed_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes}
elif status_resp=='CANCELLED':
print('\nQuery ID '+q_id+' CANCELLED')
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
output_loaction_path =response['QueryExecution']['ResultConfiguration']['OutputLocation']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes, 'OutputLocation': output_loaction_path}
response_array.append(respone_dictionary)
query_executing_ids_copy.remove(q_id)
if len(query_executing_ids_copy)==0:
break;
i=i+1
if i>len(query_executing_ids_copy)-1:
i=0
return response_array

If you go through the above function definition, you can see that there are three different status values for any Athena query. These status values are defined as follows

  • SUCCEEDED : When the query completes successfully without any errors
  • FAILED : When the query submitted encounters an error in the execution. This status value also provides the error message in the response. This error message is captured and then returned to the caller function with the execution id
  • CANCELLED : When the query submitted is cancelled by any user then the function returns this status value

Note: The FAILED status of the query can be because of any syntax error or because of any error which might be encountered during its execution (like Resource Exceeding Error or Internal Service Error)

The returned response list from the function can be then used to extract the statuses of each ids (by iterating in a for loop). This can be done using the following code snippet.

'''Check the status of the queries with the arguments. The third argument is the time_to_check(in seconds)'''print('\nWAIT TILL THE QUERIES ARE EXECUTED\n')
response_list = check_query_status(query_executing_ids, region_name, 5)
'''Loop through the Response obtained to get the Statuses of each Query Execution Id'''for resp in response_list:
status=resp['QueryStatus']
if ((status=='SUCCEEDED') or (status=='CANCELLED')):
print('\nQueryExecutionId = '+str(resp['QueryExecutionId'])+'\nStatus = '+str(resp['QueryStatus'])+' \nTotalExecutionTimeInMillis = '+str(resp['TotalExecutionTimeInMillis'])+'\nDataScannedInBytes = '+str(resp['DataScannedInBytes'])+'\nOutputLocation = '+str(resp['OutputLocation']))
elif status=='FAILED':
print('\nQueryExecutionId = '+str(resp['QueryExecutionId'])+'\nStatus = '+str(resp['QueryStatus'])+'\nErrorMessage = '+str(resp['ErrorMessage'])+' \nTotalExecutionTimeInMillis = '+str(resp['TotalExecutionTimeInMillis'])+'\nDataScannedInBytes = '+str(resp['DataScannedInBytes']))

To give an idea of the Output, refer to the image below. Here you can see the status of each query passed to the check_query_status() function.

Output of the Statuses obtained for the Query Execution Ids

For a SUCCEEDED and CANCELLED status query, the function response provides

  • S3 path of the Output CSV result of the query
  • Total Execution Time for the query
  • Total Bytes Scanned by the query

Note: For a CANCELLED query, the Execution time and Bytes scanned returned would be calculated before the point of time the execution was cancelled

For a FAILED status query, the function response provides

  • Error Message for the Failure (In the above example there was a Syntax error in the query, which could not resolve a column used in the query)
  • Total Execution Time for the query
  • Total Bytes Scanned by the query (In the above example it shows zero bytes scanned as it was a syntax error, if it was an Out Of Resource Error then it would have shown a Non-Zero value for Bytes scanned)

Depending on the statuses of the above queries you can again call the run_query() function and pass a new list of queries which are to executed in parallel and then wait and extract the statuses of these queries by again calling check_query_status() function. In this way you can design the entire logic of running queries in parallel.

Note: The number of queries that can be passed in a list to the function run_query() would depend on the service concurrency limit of your AWS account. You can also increase this limit by raising a request to the AWS Support team. But if you exceed this limit you will get an error of ThrottlingException.

Conclusion

Thus we did see that by using Boto3 Client libraries for Athena and by using intermediate python coding logic we can easily automate and control the execution of the queries in Athena. This can be a powerful back end logic while developing reporting scripts for business requirements over Athena Warehouse/S3 Data Lake.

The most beautiful thing about Athena is being a Fire and Forget type of mechanism and using this there can be tremendous level of parallelism which can be achieved. Also another important feature about Athena is that it automatically saves the output CSVs to a specific S3 location. This feature enables the integration of these outputs saved in S3 with other AWS services such as AWS Quicksight, AWS Redshift, AWS EMR and also with other custom user applications or with other BI tools.

I hope this was helpful for the Developers and the Data Engineers. Please let me know your thoughts about this approach or any suggestions in the comments section. Also in case of any questions/doubts about AWS Athena, please feel free to reach out to me.

By Rajas Walavalkar

--

--

Rajas Walavalkar
Rajas Walavalkar

Written by Rajas Walavalkar

Technical Architect - Data & AWS Ambassador at Quantiphi Analytics. Worked on ETL, Data Warehouses, Big Data (AWS Glue, Spark), BI & Dashboarding (D&A).

Responses (3)