Search This Blog

AWS Glue Job Scripts

AWS Glue Job Scripts

Glue job1

#
# Glue job to fecth 2 files from S3 source location and join them into one data frame
# Remove the columns
# Write the DataFrame to S3 target location

# import
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())


# cerate DataFrames for Salels, customer
sales_DF = glueContext.create_dynamic_frame.from_catalog(
             database="retaildatabase",
             table_name="sales")
cust_DF = glueContext.create_dynamic_frame.from_catalog(
             database="retaildatabase",
             table_name="customers")

#
# join sales, customers dataframe
cust_sales_DF=Join.apply(sales_DF, cust_DF, 'customerid', 'customerid')


#
# drop customer id column
cust_sales_DF = cust_sales_DF.drop_fields(['`.customerid`'])

#
# write to S3 target location
glueContext.write_dynamic_frame.from_options(cust_sales_DF, connection_type = "s3", \
connection_options = {"path": "s3://sales-cust-data-lake/data/customersales"}, \
format = "json")


Glue job2:
# Script to load from S3 to Redshift
#
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="glue_db1", table_name="hk_city_data2", transformation_ctx="S3bucket_node1"
)

# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("id", "long", "id", "long"),
        ("country", "string", "country", "string"),
        ("state", "string", "state", "string"),
        ("city", "string", "city", "string"),
        ("amount", "double", "amount", "double"),
    ],
    transformation_ctx="ApplyMapping_node2",
)

# Script generated for node Redshift Cluster
RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
    frame=ApplyMapping_node2,
    database="glue_db1",
    table_name="dev_public_city_data",
    redshift_tmp_dir="s3://redshift_temp_bkt1/glue_job_temp_dir/",
    transformation_ctx="RedshiftCluster_node3",
)

job.commit()

No comments:

Post a Comment