PySpark is a Spark library written in Python to run Python application using Apache Spark capabilities.
PySpark is used to run applications parallelly on the distributed cluster (multiple nodes).
Q2. What are Spark features?
1. In-memory computation
2. Distributed processing using parallelize
3. Can be used with many cluster managers (Spark, Yarn, Mesos etc)
4. Fault-tolerant
5. Immutable
6. Lazy evaluation
7. Cache & persistence
8. Inbuild-optimization when using DataFrames
9. Supports ANSI SQL
Q3. What is SparkSession?
Since Spark 2.0 SparkSession has become an entry point to PySpark to work with RDD, DataFrame.
Prior to 2.0, SparkContext used to be an entry point.
Q. Create spark session?
#
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.getOrCreate()
spark.version
O/p:
3.2.1
Q. Get SparkContext Configurations?
#
configurations = spark.sparkContext.getConf().getAll()
for item in configurations:
print(item)
Q. Get Specific configuration?
#
print(spark.sparkContext.getConf().get("spark.master"))
O/p:
local
Q. Lazy evaluation in PySpark?
Transformation is not evaluated when they arrive, but instead queued with other transformations until an Action is called.
Q. Benefits of Spark vs MapReduce?
- Spark has in memory processing
- Spark is 10-100 times faster than MapReduce.
- Spark works good for smaller data sets (size less than server's RAM)
- Hadoop is more cost-effective for processing massive data sets.
Q. Cluster Manager Types?
1. Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
2. Apache Mesos – Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark applications.
3. Hadoop YARN – the resource manager in Hadoop 2. (Most used)
4. Kubernetes – an open-source system for automating deployment, scaling, and management of containerized applications.
Q. Spark integrates with languages?
-Python,
-Scala,
-R
-Java
2. RDD
Q. What is RDD?
RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects.
Immutable meaning once you create an RDD you cannot change it.
Q. How many ways RDD can be created?
2 ways.
1. Parallelizing an existing collection in your driver program.
#Create RDD from parallelize
data = [1,2,3,4,5]
rdd=spark.sparkContext.parallelize(data)
rdd
2. Referencing a dataset in an external storage system.
# Create rddd from External Dataset (read as multiple lines)
rdd_external = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/text1.txt")
rdd_external.collect()
Q. coalesce vs repartition?
repartition () method shuffles data from all nodes also called full shuffle.
coalesce () method which shuffle data from minimum nodes
example:
If data is in 4 partitions and doing coalesce (2) moves data from 2 nodes only.
repartition () - increase/ decrease RDD, DataFrame, Dataset partitions
coalesce () - Only decreases the number of partitions
Q. RDD transformation vs RDD action?
RDD transformations – Transformations are lazy operations, instead of updating an RDD, these return new RDD. (Eg: filter(), map() )
RDD actions – After an action, we get values. (Eg: max(), min(), sum() )
Q. What is narrow transformation?
In narrow transformations no data movement occurs (happens in same partition)
Examples: map(), mapPartition(), flatMap(), filter(), union()
Q. What is wider transformation?
Data from many partitions, means data movement happens.
Examples: groupByKey(), aggregateByKey(), aggregate(), join(), repartition()
Q. RDD transformations?
map(func)
flatMap(func)
filter(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Q. RDD Actions?
reduce(func)
collect()
count()
first()
max()
min()
reduce()
take()
takeSample()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
Q. map() transformation?
map() transformation is used the apply any complex operations like adding a column, updating a column etc,
# Output of map transformation always have same no: of records as input
rdd3 = rdd2.map(lambda x: (x,1))
in above code: new column with value 1 is added.
Q. Select vs Collect() in PySpark?
Select() -transformation
Collect() -action
Returns all data from RDD as an array.
Be careful when you use this action when you are working with huge RDD
with millions and billions of data, you may run out of memory on the driver.
Q. count vs countApprox vs countApproxDistinct?
count() – Return the count of elements in the dataset.
countApprox() – Return approximate count of elements in the dataset, this method returns incomplete when execution time meets timeout.
countApproxDistinct() – Return an approximate number of distinct elements in the dataset.
countByValue() – Return Map[T,Long] key representing each unique value in dataset and value represents count each value present.
#
rdd.count()
rdd.countApprox(1)
rdd.countApproxDistinct()
rdd.countByValue()
Q. RDD min, max?
rdd.max() # returns max value
rdd.min() # returns min value
rdd.first() # returns first value
rdd.top(2) # returns top 2 values
Q. Types of RDD?
PairRDDFunctions or PairRDD: – Pair RDD is a key-value pair This is mostly used RDD type.
ShuffledRDD: PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS
DoubleRDD:
SequenceFileRDD: –
HadoopRDD: –
ParallelCollectionRDD: –
# PairRDD, aggregate elements by key Example
rdd = spark.sparkContext.parallelize(
['Bengaluru Hyderabad Chennai','Bengaluru Hyderabad Pune','Hyderabad Mumbai Mysore Coiambattore']
)
wordsRdd = rdd.flatMap(lambda x: x.split(" "))
pairRDD = wordsRdd.map(lambda f: (f,1))
pairRDD1 = pairRDD.reduceByKey(lambda a,b: a+b)
pairRDD1.collect()
O/p:
[('Bengaluru', 2),
('Hyderabad', 3),
('Chennai', 1),
('Pune', 1),
('Mumbai', 1),
('Mysore', 1),
('Coiambattore', 1)]
Q. Shuffle operations in PySpark?
Shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions.
Shuffle involves copying data across executors and machines, making the shuffle a complex and costly operation.
It is an expensive operation since it involves disk I/O, data serialization, and network I/O.
To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it.
Q. What is RDD Persistence?
Persisting is caching a dataset in memory across operations. (Most important capability/feature in Spark)
When a dataset / RDD is persisted, each node storing any partitions of it that it computes in memory is cached and is reused in other actions on that dataset.
Q. Save RDD
to a location?
rdd1.saveAsTextFile("content/sample_data/text1.txt")
Q. Count no:
of words in a file in RDD?
#
from operator import add
rdd = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/text1.txt")
rdd.collect()
#
rdd1 = rdd.flatMap(lambda x: x.split(' '))
rdd2 = rdd1.map(lambda x: (x, 1))
rdd3 = rdd2.reduceByKey (lambda a,b: a+b)
rdd5 = rdd3.map(lambda x: (x[1]))
reres = rdd5.reduce(add)
reres
O/p:
40 (includes spaces, '//' )
3. PySpark DataFrame
Q. Create DataFrame from RDD?
# create RDD
data = [("HYd", "200000"), ("Bng", "300000"), ("Chennai", "200000")]
rdd = spark.sparkContext.parallelize(data)
# create data frame from RDD
columns = ["Cities","it_Population"]
df1 = rdd.toDF(columns)
# Show data frame
df1.show()
O/p:
+-------+-------------+
| Cities|it_Population|
+-------+-------------+
| HYd| 200000|
| Bng| 300000|
|Chennai| 200000|
+-------+-------------+
#
df2 = spark.createDataFrame(rdd).toDF(*columns)
df2.show()
O/p:
+-------+-------------+
| Cities|it_Population|
+-------+-------------+
| HYd| 200000|
| Bng| 300000|
|Chennai| 200000|
+-------+-------------+
Q. Create DataFrame form Schema?
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("John","Cena","33","M",50000),
("Michael","Corneol","44","M",44000),
("Robert","Duvval","42","M",67000),
("Anne","Adams","92","F",34000),
("Peter","Parker","57","M",100000)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("emp_id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
O/p:
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- emp_id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+---------+--------+------+------+------+
|firstname|lastname|emp_id|gender|salary|
+---------+--------+------+------+------+
|John |Cena |33 |M |50000 |
|Michael |Corneol |44 |M |44000 |
|Robert |Duvval |42 |M |67000 |
|Anne |Adams |92 |F |34000 |
|Peter |Parker |57 |M |100000|
+---------+--------+------+------+------+
Q. Create DataFrame form a source?
# read from CSV
california_df = spark.read.csv("sample_data/california_housing_train.csv", \
header=True, inferSchema=True)
df.show(3) # shows 3 rows
O/p:
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
| -114.31| 34.19| 15.0| 5612.0| 1283.0| 1015.0| 472.0| 1.4936| 66900.0|
| -114.47| 34.4| 19.0| 7650.0| 1901.0| 1129.0| 463.0| 1.82| 80100.0|
| -114.56| 33.69| 17.0| 720.0| 174.0| 333.0| 117.0| 1.6509| 85700.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows
Q. Covert
PySpark DF to pandas DF?
#
pandasDF = pysparkDF.toPandas()
print(pandasDF)
Q. DataFrame show?
To show contents of DF.
df.show() # Disply full df
df.show(truncate=False) #Display full column contents
# Display 2 rows and full column contents
df.show(2,truncate=False)
# Display 3 rows with column length 10 characters
df.show(3,truncate=10)
# Display DataFrame rows & columns vertically
df.show(n=3,truncate=10,vertical=True)
Q. PySpark StructType & StructField?
PySpark StructType & StructField classes are used to specify schema to DataFrame
and are used in creating complex columns like nested struct, array and map columns.
pyspark.sql.types import StructType
pyspark.sql.types import StructField
#Using PySpark StructType & StructField with DataFrame
data = [("James","","Watt","36636","M",30000),
("Michael","Corleone","","40288","M",40000),
("Robert","","Duvval","42114","M",40000),
("Maria","smith","Sharapova","39192","F",4000)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
Q. PySpark Row?
PySpark Row class is available by importing pyspark.sql.Row which is represented as a record/row in DataFrame.
from pyspark.sql import Row
# row in RDD
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())
#row in DataFrame
from pyspark.sql import Row
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
df=spark.createDataFrame(data)
df.printSchema()
df.show()
Q. Alias example?
#####################
#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
df.lname.alias("last_name")
).show()
#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
).show()
Q. Access columns?
# access column from DF
data=[("Rao",43),("Ram",44)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()
Q. Alias example?
#####################
#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
df.lname.alias("last_name")
).show()
#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
).show()
Q. Sort a DF asc and dsc?
######## ASC & DSC ########
#############################
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()
Q. Convert data type using cast?
####### cast ########
# Used to convert the data Type.
#############################
df.select(df.fname, df.id.cast("int")).printSchema()
Q. Get values in between a range from DF?
# 4.4 between() –
# Returns a Boolean expression when a column values in between lower and upper bound.
#############################
df.filter(df.id.between(100,300)).show()
Q. Contains usage in DF?
# contains #############################
#############################
df.filter(df.fname.contains("John")).show()
Q. startswith and endswith usage?
# #startswith, endswith()
#############################
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
Q. check nulls in column?
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()
Q. Like usage?
#like , rlike
df.select(df.fname,df.lname,df.id) \
.filter(df.fname.like("%om"))
Q. Select usage on columns?
# select usage on columns
df.select("fname","lname").show()
df.select(df.fname,df.lname).show()
df.select(df["fname"],df["lname"]).show()
#By using col() function
from pyspark.sql.functions import col
df.select(col("fname"),col("lname")).show()
#Select columns by regular expression
df.select(df.colRegex("`^.*name*`")).show()
Q. Select all columns?
# Select All columns from List
df.select(*columns).show()
# Select All columns
df.select([col for col in df.columns]).show()
df.select("*").show()
Q. Select columns by index?
### Select Columns by Index
#Selects first 3 columns and top 3 rows
df.select(df.columns[:2]).show(2)
#Selects columns 2 to 4 and top 3 rows
df.select(df.columns[2:4]).show(3)
Q. PySpark withColumn() uage?
PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.
#Change DataType using PySpark withColumn()
df.withColumn("salary",col("salary").cast("Integer")).show()
#2. Update The Value of an Existing Column
df.withColumn("sal",col("sal")*100).show()
#3. Create a Column from an Existing
df.withColumn("New_sal",col("sal") / 100).show()
# 4. Add a New Column using withColumn()
from pyspark.sql.functions import *
df.withColumn("City", lit("Bengaluru")) \
.withColumn("Date_column",current_date()) \
.show()
# 5. Rename Column Name
df.withColumnRenamed("gender","sex") \
.show(truncate=False)
# 6. Drop Column From PySpark DataFrame
df.drop("Date_column") \
.show()
#2. rename multiple columns in DF
df = df.withColumnRenamed("fname","First_name").withColumnRenamed("lname","last_name")
# rename nested columns
df4 = df.withColumn("fname",col("name.firstname")) \
.withColumn("mname",col("name.middlename")) \
.withColumn("lname",col("name.lastname")) \
.drop("name")
#7. Using toDF() – To change all columns in a PySpark DataFrame
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*newColumns).printSchema()
Q. PySpark filter?
#2. DataFrame filter() with Column Condition
df.filter(df.gender == "F").show(truncate=False)
# 3. DataFrame filter() with SQL Expression
#Using SQL Expression
df.filter("gender == 'M'").show()
#For not equal
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()
#4. PySpark Filter with Multiple Conditions
df.filter( (df.state == "OH") & (df.gender == "M") ) \
.show(truncate=False)
#4. PySpark Filter with Multiple Conditions
df.filter( (df.gender == "F") & (df.last_name == "Adams") ) \
.show(truncate=False)
# like - SQL LIKE pattern
df.filter(df.last_name.like("%da%")).show()
Q. Distinct usage?
# 1. Get Distinct Rows (By Comparing All Columns)
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
# Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
# 2. PySpark Distinct of Selected Multiple Columns
dropDisDF = df.dropDuplicates(["department","salary"])
Q. PySpark orderBy() and sort()?
# sort
df.sort("last_name","age").show(truncate=False)
# order by
df.orderBy("sal","age").show(truncate=False)
# Sort by Ascending (ASC)
df.sort(df.department.asc(),df.state.asc()).show(truncate=False)
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)
Q. Group by usage?
#
df.groupBy("department").count().show(truncate=False)
df.groupBy("department").min("salary").show(truncate=False)
df.groupBy("department").sum("salary").show(truncate=False)
#GroupBy on multiple columns
df.groupBy("department","state") \
.sum("salary","bonus") \
.show()
Q. Aggregate functions?
#
df.select(collect_list("salary")).show(truncate=False)
df.select(collect_set("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(approx_count_distinct("sal")).collect()[0][0]
df.select(avg("sal")).collect()[0][0]
df.select(collect_list("sal")).show(truncate=False)
df.select(countDistinct("gender", "sal")).show(truncate=False)
df.select(count("sal")).collect()[0]
df.select(first("sal")).show(truncate=False)
df.select(last("sal")).show(truncate=False)
df.select(max("sal")).show(truncate=False)
df.select(min("sal")).show(truncate=False)
df.select(mean("sal")).show(truncate=False)
df.select(sum("sal")).show(truncate=False)
Q. Joins?
# join
empDF.join(deptDF,empDF.dept_id == deptDF.dept_id,"inner") \
.show(truncate=False)
empDF.join(deptDF,empDF.dept_id == deptDF.dept_id,"fullouter") \
.show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id,"leftsemi") \
.show(truncate=False)
empDF.join(deptDF,empDF.dept_id == deptDF.dept_id,"leftsemi") \
.show(truncate=False)
Q. Union?
#
unionDF = emp_df1.union(emp_df2).show(truncate=False)
# Merge without Duplicates
unionDF = emp_df1.union(emp_df2).distinct().show(truncate=False)
# merge different column dfs
merged_df = df1.unionByName(df2, allowMissingColumns=True)
Q. Get 1st column, 1st row from DataFrame.
df1.select (df1.columns[:1]).show(1,truncate =False)
Q. Parquet files benefits?
Uses low storage space.
Stores data in columnar format hence reading data is efficient.
Q. Add new columns in dataset using PySpark.
from pyspark.sql.functions import *
df.withColumn("City", lit("Bangalore")) \
.withColumn("Date_column",current_date()) \
.show()
12) Replace a string column value?
from pyspark.sql.functions import regexp_replace
df.withColumn('address', regexp_replace('address', 'res', 'Residence')) \
.show(truncate=False)
Q. Covert timestamp string to Datetype.
from pyspark.sql.types import *
new_df = df.withColumn("timestamp",to_timestamp("input_timestamp"))
df.withColumn("date_type",to_date(current_timestamp()))
Q. Flatten a nested array
#
df.select(df.flatten(df.col_name)).show()
Q. Case usage example in PySPark?
from pyspark.sql.functions import *
df2 = df.withColumn("new_Residence", when(df.Residence == "v","Village")
.when(df.Residence == "twn","Town")
.when(df.Residence == "cty","City")
.when(df.Residence.isNull() ,"")
.otherwise(df.Residence))
Q. Create a DataFrame from a source.
#
df = spark.read.csv("sample_data/california_housing_train.csv", \
header=True, inferSchema=True)
Q. Write DataFrame to a location in pyspark in parquet format.
df.write.parquet("sample_data/california_housing_train.parquet")
Q. Null Usage?
#Replace 0 for null for all integer columns
emp_df2.na.fill(value=0).show()
#Replace 0 for null on only bonus column
df.na.fill(value=0,subset=["bonus"]).show()
# replace NULL/None values with an empty string or
# any constant values String on all DataFrame String columns.
df.na.fill("").show() # not working
df.na.fill("unknown",["state"]).show()
Q. Pivot usage?
#
df.show()
+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000 |USA |
|Carrots|1500 |USA |
|Beans |1600 |USA |
|Orange |2000 |USA |
|Orange |2000 |USA |
|Banana |400 |China |
|Carrots|1200 |China |
|Beans |1500 |China |
|Orange |4000 |China |
|Banana |2000 |Canada |
|Carrots|2000 |Canada |
|Beans |2000 |Mexico |
+-------+------+-------+
#
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show(truncate=False)
O/p:
+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null |4000 |null |4000|
|Beans |null |1500 |2000 |1600|
|Banana |2000 |400 |null |1000|
|Carrots|2000 |1200 |null |1500|
+-------+------+-----+------+----+
Q. Write to disk?
#partitionBy()
df.write.option("header",True) \
.partitionBy("housing_median_age") \
.mode("overwrite") \
.csv("sample_data/california_housing_train_partiton")
Q. Parttion?
#partitionBy() multiple columns
df.write.option("header",True) \
.partitionBy("housing_median_age","latitude") \
.mode("overwrite") \
.csv("sample_data/california_housing_train_partiton_test")
##
#Use repartition() and partitionBy() together
df.repartition(4) \
.write.option("header",True) \
.partitionBy("latitude") \
.mode("overwrite") \
.csv("sample_data/california_housing_train_partiton_test_repartiton_2")
# 7. Data Skew – Control Number of Records per Partition File
#partitionBy() control number of partitions
df.write.option("header",True) \
.option("maxRecordsPerFile", 2) \
.partitionBy("latitude") \
.mode("overwrite") \
.csv("sample_data/california_housing_train_partiton_test_repartiton_2")
#read specific partition
dfSinglePart=spark.read.option("header",True) \
.csv("sample_data/california_housing_train_partiton_test_repartiton_2/latitude=32.56")
dfSinglePart.printSchema()
dfSinglePart.show()
#9. PySpark SQL – Read Partition Data
dfSinglePart=spark.read.option("header",True) \
.csv("sample_data/california_housing_train_partiton_test_repartiton_2")
dfSinglePart.createOrReplaceTempView("ZIPCODE")
spark.sql("select * from ZIPCODE where latitude='32.56'") \
.show()
Q. PySpark Window Functions?
PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.
PySpark SQL supports three kinds of window functions:
1. ranking functions
2. analytic functions
3. aggregate functions
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James |Sales |3000 |
|Michael |Sales |4600 |
|Robert |Sales |4100 |
|Maria |Finance |3000 |
|James |Sales |3000 |
|Scott |Finance |3300 |
|Jen |Finance |3900 |
|Jeff |Marketing |3000 |
|Kumar |Marketing |2000 |
|Saif |Sales |4100 |
+-------------+----------+------+
# row_number() window function is used to give the sequential row number
# starting from 1 to the result of each window partition
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number",row_number().over(windowSpec)) \
.show(truncate=False)
+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria |Finance |3000 |1 |
|Scott |Finance |3300 |2 |
|Jen |Finance |3900 |3 |
|Kumar |Marketing |2000 |1 |
|Jeff |Marketing |3000 |2 |
|James |Sales |3000 |1 |
|James |Sales |3000 |2 |
|Robert |Sales |4100 |3 |
|Saif |Sales |4100 |4 |
|Michael |Sales |4600 |5 |
+-------------+----------+------+----------+
# rank() window function is used to provide a rank to the result within a window partition.
# This function leaves gaps in rank when there are ties.
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
.show()
# dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.
# This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
.show()
# percent_rank Window Function
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
.show()
Q. Read csv files?
df2 = spark.read.option("header",True) \
.csv("sample_data/california_housing_test.csv") \
.show(5, truncate=False)
Q. Read multiple csv files?
# 1.2 Read Multiple CSV Files**
# df = spark.read.csv("path1,path2,path3")
###########################################
df = spark.read.options(header=True) \
.csv(["sample_data/california_housing_test.csv", "sample_data/mnist_test.csv"]) \
.show(5, truncate=False)
###########################################
# 1.3 Read all CSV Files in a Directory
# df = spark.read.csv("Folder path")
###########################################
df = spark.read.options(header=True) \
.csv(["sample_data/"]) \
.show(5, truncate=False)
###########################################
# 2.1 delimiter
#
###########################################
df3 = spark.read.options(delimiter=',') \
.csv(["sample_data/california_housing_test.csv"]) \
.show(5, truncate =False)
###########################################
# 2.2 inferSchema
#
###########################################
df4 = spark.read.options(inferSchema='True',delimiter=',') \
.csv(["sample_data/california_housing_test.csv"]) \
.show(5, truncate =False)
###########################################
# 2.3 header
# This option is used to read the first line of the CSV file as column names.
# By default the value of this option is False.
###########################################
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
.csv("sample_data/california_housing_test.csv") \
.show(5, truncate =False)
Q. Write to DF to csv file?
###########################################
# 5. Write PySpark DataFrame to CSV file
#
###########################################
df3 = spark.read.options(header='True') \
.csv("sample_data/california_housing_test.csv")
df3.write.options(header='True', delimiter=',') \
.csv("My_sample_data/")
###########################################
# 5.2 Saving modes
# PySpark DataFrameWriter also has a method mode() to specify saving mode.
# overwrite – mode is used to overwrite the existing file.
# append – To add the data to the existing file.
# ignore – Ignores write operation when the file already exists.
# error – This is a default option when the file already exists, it returns an error.
###########################################
df3.write.mode('overwrite').csv("My_sample_data/")
Q. Woork on parquest files?
# Write DataFrame to Parquet file format
df.write.parquet("My_sample_data/people.parquet")
# Pyspark Read Parquet file into DataFrame
###########################################
#
parquetDF = spark.read.parquet("My_sample_data/people.parquet")
parquetDF.count()
# Append or Overwrite an existing Parquet file
#
df.write.mode('append').parquet("My_sample_data/people.parquet")
parquetDF = spark.read.parquet("My_sample_data/people.parquet")
parquetDF.count()
# Executing SQL queries DataFrame
#
parquetDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show()
# Creating a table on Parquet file
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"My_sample_data/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()
# Create Parquet partition file
#
df.write.partitionBy("gender","salary").mode("overwrite").parquet("My_sample_data/people2.parquet")
# Retrieving from a partitioned Parquet file
#
parDF2=spark.read.parquet("My_sample_data/people2.parquet/gender=M")
parDF2.show(truncate=False)
# Creating a table on Partitioned Parquet file
#
# spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"people2.parquet/gender=F\")")
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"My_sample_data/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()
Q. work on json files?
# Read JSON file into dataframe
df = spark.read.json("sample_data/anscombe.json")
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
.json("sample_data/anscombe.json")
# Read multiple files
df2 = spark.read.json(
['sample_data/anscombe.json','sample_data/anscombe_1.json'])
# Read all JSON files from a folder
df3 = spark.read.json("sample_data/*.json")
df3.show(5, truncate=False)
# Read JSON file using PySpark SQL
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'sample_data/anscombe.json')")
spark.sql("select * from zipcode").show(5, truncate=False)
# Write PySpark DataFrame to JSON file
df3.write.json("My_sample_data/anscombe.json")
# PySpark Saving modes
# PySpark DataFrameWriter also has a method mode() to specify SaveMode;
# the argument to this method either takes overwrite, append, ignore, errorifexists.
#
df3.write.mode('Overwrite').json("My_sample_data/anscombe.json")
Q. Bring non null values from DataFrames using coalesce?
#
joined_df = df1.join(df2, df1.citY_1 == df2.citY_2, 'fullouter')
joined_df.show(truncate=False)
O/p:
+-------+--------------+------+--------------+
|citY_1 |Pop_in_million|citY_2|Pop_in_million|
+-------+--------------+------+--------------+
|bng |2 |null |null |
|chennai|3 |null |null |
|null |null |coiam |5 |
|hyd |1 |hyd |1 |
|null |null |mysore|4 |
+-------+--------------+------+--------------+
from pyspark.sql import Row, Window, functions as F
joined_df = joined_df.withColumn('new_city_names', F.coalesce(df1.citY_1, df2.citY_2))
joined_df.show()
O/p:
+-------+--------------+------+--------------+--------------+
| citY_1|Pop_in_million|citY_2|Pop_in_million|new_city_names|
+-------+--------------+------+--------------+--------------+
| bng| 2| null| null| bng|
|chennai| 3| null| null| chennai|
| null| null| coiam| 5| coiam|
| hyd| 1| hyd| 1| hyd|
| null| null|mysore| 4| mysore|
+-------+--------------+------+--------------+--------------+