AWS Athena : How to execute queries in parallel? and How to control the flow of the code till their completion? using Boto3 SDKs

Introduction to Athena

What was the Challenge?

Requirements

  • An EC2 instance(Amazon Linux 2 AMI 2.0.20200406.0 x86_64) within a default public subnet (OR within a private subnet, along with the Internet Gateway, NAT Gateway and Bastion Host configured)
  • An IAM Role created which has Athena and S3 access policies defined and attached to the EC2 instance created above
  • Python 2.7+ setup on the instance along with the Boto3 package installed using pip command

1st Function — Submitting Queries

  • Required Parameters : region_name, s3_output_path
  • Optional Parameters : work_group, database
import boto3 
import time
import copy
'''These parameters are required - Define the Region for the Athena Database and the s3 path for the query outputs'''
region_name='us-east-1'
s3_output_path='<s3-bucket-path>'
'''These parameters are optional - Define the Athena Workgroup to execute the queries and the Athena Database '''
work_group='primary'
database='<database-name>'
'''The function run_query requires 3 parameters and 2 optional parameters(work_group and database)'''
def run_query(query, s3_output_path, region_name, *args, **kwargs):
query_list=[]
response_list=[]
if(type(query) is list):
query_list=query
else:
query_list.append(query)
work_group = kwargs.get('work_group', None)
database = kwargs.get('database', None)
OutputLocation={}
OutputLocation['OutputLocation'] = s3_output_path
client = boto3.client('athena', region_name=region_name)
for single_query in query_list:
if ((work_group == None) and (database == None)):
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation
)
if ((work_group == None) and (database != None)):
DatabaseName={}
DatabaseName['Database'] = database
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
QueryExecutionContext=DatabaseName
)
if ((work_group != None) and (database == None)):
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
WorkGroup=work_group
)
if ((work_group != None) and (database != None)):
DatabaseName={}
DatabaseName['Database'] = database
response = client.start_query_execution(
QueryString=single_query,
ResultConfiguration=OutputLocation,
WorkGroup=work_group,
QueryExecutionContext=DatabaseName
)
print('\nExecution ID: ' + response['QueryExecutionId'])
response_list.append(response)
return response_list
'''Define the Array of the queries which are to be executed Parallel '''
query_list=['<Query1>', '<Query2>', '<Query3>', '<Query4>', '<Query5>']
'''Call the Function defined above and pass the List of Queries to executed along with the required parameters '''
response_list = run_query(query_list, s3_output_path, region_name, database=database)
  • Define the global parameters
  • Define the run_query() function which accepts a list of queries which are to be executed in parallel along with some other optional arguments.
  • This function submits all the queries and returns an array of the responses (response_list) obtained from the Athena API call. These responses include the query execution ids for every independent query submitted to Athena
  • These query execution ids can be extracted from the return value received by the caller function using the following code snippet
query_executing_ids=[]for resp in response_list:
query_executing_ids.append(resp['QueryExecutionId'])
print("\nQuery Execution Ids for the Queries Submitted : "+str(resp['QueryExecutionId']))

2nd Function — Wait till all the Queries are Completed

  • query_executing_ids : The list of query execution ids who’s query statuses are to be checked
  • region_name : Region defined for the Athena Database and Tables
  • time_to_check : This is argument defines the time(in seconds) for which the function waits in the loop till it again checks the status of the query submitted.
'''The function run_query requires 3 parameters'''def check_query_status(query_executing_ids, region_name, time_to_check): 
i=0
response_array=[]
query_executing_ids_copy = copy.copy(query_executing_ids)
while True:
q_id=query_executing_ids_copy[i]
time.sleep(time_to_check)
client = boto3.client('athena', region_name=region_name)
response = client.get_query_execution(QueryExecutionId=q_id)
status_resp=response['QueryExecution']['Status']['State']
if status_resp=='SUCCEEDED':
print('\nQuery ID '+q_id+' SUCCEEDED')
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
output_loaction_path =response['QueryExecution']['ResultConfiguration']['OutputLocation']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes, 'OutputLocation': output_loaction_path}
response_array.append(respone_dictionary)
query_executing_ids_copy.remove(q_id)
if len(query_executing_ids_copy)==0:
break;
if (status_resp=='FAILED' or status_resp=='CANCELLED'):
if status_resp=='FAILED':
print('\nQuery ID '+q_id+' FAILED')
status_failed_resp=response['QueryExecution']['Status']['StateChangeReason']
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'ErrorMessage': status_failed_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes}
elif status_resp=='CANCELLED':
print('\nQuery ID '+q_id+' CANCELLED')
total_excution_time_ms=response['QueryExecution']['Statistics']['TotalExecutionTimeInMillis']
total_data_scanned_bytes=response['QueryExecution']['Statistics']['DataScannedInBytes']
output_loaction_path =response['QueryExecution']['ResultConfiguration']['OutputLocation']
respone_dictionary={'QueryExecutionId': q_id, 'QueryStatus': status_resp, 'TotalExecutionTimeInMillis': total_excution_time_ms, 'DataScannedInBytes': total_data_scanned_bytes, 'OutputLocation': output_loaction_path}
response_array.append(respone_dictionary)
query_executing_ids_copy.remove(q_id)
if len(query_executing_ids_copy)==0:
break;
i=i+1
if i>len(query_executing_ids_copy)-1:
i=0
return response_array
  • SUCCEEDED : When the query completes successfully without any errors
  • FAILED : When the query submitted encounters an error in the execution. This status value also provides the error message in the response. This error message is captured and then returned to the caller function with the execution id
  • CANCELLED : When the query submitted is cancelled by any user then the function returns this status value
'''Check the status of the queries with the arguments. The third argument is the time_to_check(in seconds)'''print('\nWAIT TILL THE QUERIES ARE EXECUTED\n')
response_list = check_query_status(query_executing_ids, region_name, 5)
'''Loop through the Response obtained to get the Statuses of each Query Execution Id'''for resp in response_list:
status=resp['QueryStatus']
if ((status=='SUCCEEDED') or (status=='CANCELLED')):
print('\nQueryExecutionId = '+str(resp['QueryExecutionId'])+'\nStatus = '+str(resp['QueryStatus'])+' \nTotalExecutionTimeInMillis = '+str(resp['TotalExecutionTimeInMillis'])+'\nDataScannedInBytes = '+str(resp['DataScannedInBytes'])+'\nOutputLocation = '+str(resp['OutputLocation']))
elif status=='FAILED':
print('\nQueryExecutionId = '+str(resp['QueryExecutionId'])+'\nStatus = '+str(resp['QueryStatus'])+'\nErrorMessage = '+str(resp['ErrorMessage'])+' \nTotalExecutionTimeInMillis = '+str(resp['TotalExecutionTimeInMillis'])+'\nDataScannedInBytes = '+str(resp['DataScannedInBytes']))
Output of the Statuses obtained for the Query Execution Ids
  • S3 path of the Output CSV result of the query
  • Total Execution Time for the query
  • Total Bytes Scanned by the query
  • Error Message for the Failure (In the above example there was a Syntax error in the query, which could not resolve a column used in the query)
  • Total Execution Time for the query
  • Total Bytes Scanned by the query (In the above example it shows zero bytes scanned as it was a syntax error, if it was an Out Of Resource Error then it would have shown a Non-Zero value for Bytes scanned)

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Rajas Walavalkar

Associate Technical Architect at Quantiphi Analytics. AWS & GCP Certified. Worked on ETL, Data Warehouses, Big Data (AWS Glue, Spark), BI & Dashboarding (D&A)