AWS Glue Job Scripts
Glue job1
#
# Glue job to fecth 2 files from S3 source location and join them into one data frame
# Remove the columns
# Write the DataFrame to S3 target location
# import
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
# cerate DataFrames for Salels, customer
sales_DF = glueContext.create_dynamic_frame.from_catalog(
database="retaildatabase",
table_name="sales")
cust_DF = glueContext.create_dynamic_frame.from_catalog(
database="retaildatabase",
table_name="customers")
#
# join sales, customers dataframe
cust_sales_DF=Join.apply(sales_DF, cust_DF, 'customerid', 'customerid')
#
# drop customer id column
cust_sales_DF = cust_sales_DF.drop_fields(['`.customerid`'])
#
# write to S3 target location
glueContext.write_dynamic_frame.from_options(cust_sales_DF, connection_type = "s3", \
connection_options = {"path": "s3://sales-cust-data-lake/data/customersales"}, \
format = "json")
Glue job2:
#
# Script to load from S3 to Redshift
#
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
database="glue_db1", table_name="hk_city_data2", transformation_ctx="S3bucket_node1"
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=S3bucket_node1,
mappings=[
("id", "long", "id", "long"),
("country", "string", "country", "string"),
("state", "string", "state", "string"),
("city", "string", "city", "string"),
("amount", "double", "amount", "double"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Redshift Cluster
RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
frame=ApplyMapping_node2,
database="glue_db1",
table_name="dev_public_city_data",
redshift_tmp_dir="s3://redshift_temp_bkt1/glue_job_temp_dir/",
transformation_ctx="RedshiftCluster_node3",
)
job.commit()
No comments:
Post a Comment