PySpark DataFrame
Create DataFrame form RDD:
#
# create RDD
data = [("HYd", "200000"), ("Bng", "300000"), ("Chennai", "200000")]
rdd = spark.sparkContext.parallelize(data)
# create data frame from RDD
columns = ["Cities","it_Population"]
df1 = rdd.toDF(columns)
# Show data frame
df1.show()
O/p:
+-------+-------------+
| Cities|it_Population|
+-------+-------------+
| HYd| 200000|
| Bng| 300000|
|Chennai| 200000|
+-------+-------------+
#
df2 = spark.createDataFrame(rdd).toDF(*columns)
df2.show()
O/p:
+-------+-------------+
| Cities|it_Population|
+-------+-------------+
| HYd| 200000|
| Bng| 300000|
|Chennai| 200000|
+-------+-------------+
Create DataFrame form Schema:
#
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("John","Cena","33","M",50000),
("Michael","Corneol","44","M",44000),
("Robert","Duvval","42","M",67000),
("Anne","Adams","92","F",34000),
("Peter","Parker","57","M",100000)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("emp_id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
O/p:
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- emp_id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
+---------+--------+------+------+------+
|firstname|lastname|emp_id|gender|salary|
+---------+--------+------+------+------+
|John |Cena |33 |M |50000 |
|Michael |Corneol |44 |M |44000 |
|Robert |Duvval |42 |M |67000 |
|Anne |Adams |92 |F |34000 |
|Peter |Parker |57 |M |100000|
+---------+--------+------+------+------+
Create DataFrame from Data sources:
df_csv = spark.read.csv("sample_data/california_housing_train.csv")
df_txt = spark.read.text("sample_data/california_housing_train.txt")
df_json = spark.read.json("sample_data/california_housing_train.json")
df_parquest = spark.read.parquet("sample_data/california_housing_train.parquet")
# read from CSV
df = spark.read.csv("sample_data/california_housing_train.csv", \
header=True, inferSchema=True)
df.printSchema()
O/p:
root
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- housing_median_age: double (nullable = true)
|-- total_rooms: double (nullable = true)
|-- total_bedrooms: double (nullable = true)
|-- population: double (nullable = true)
|-- households: double (nullable = true)
|-- median_income: double (nullable = true)
|-- median_house_value: double (nullable = true)
# show 3 rows
df.show(3)
O/p:
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
| -114.31| 34.19| 15.0| 5612.0| 1283.0| 1015.0| 472.0| 1.4936| 66900.0|
| -114.47| 34.4| 19.0| 7650.0| 1901.0| 1129.0| 463.0| 1.82| 80100.0|
| -114.56| 33.69| 17.0| 720.0| 174.0| 333.0| 117.0| 1.6509| 85700.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
Write to location:
#
df2.write.parquet("sample_data/california_housing_train.parquet")
Convert Spark DF to Pandas DF:
#
pandasDF = df.toPandas()
print(pandasDF)
O/p:
firstname lastname emp_id gender salary
0 John Cena 33 M 50000
1 Michael Corneol 44 M 44000
2 Robert Duvval 42 M 67000
3 Anne Adams 92 F 34000
4 Peter Parker 57 M 100000
DF Show with and without Truncate:
columns = ["line_no","Sentance"]
data = [("1", "This is Bangalore, the Garden city"),
("2", "This is Hyderabad, city if Biryanies."),
("3", "This is Chennai, city of people"),
("4", "This is Coiambattore."),
("5", "This is Mysore the Royal city.")]
df = spark.createDataFrame(data,columns)
df.show()
O/p:
+-------+--------------------+
|line_no| Sentance|
+-------+--------------------+
| 1|This is Bangalore...|
| 2|This is Hyderabad...|
| 3|This is Chennai, ...|
| 4|This is Coiambatt...|
| 5|This is Mysore th...|
+-------+--------------------+
# Use truncate to dispalay full column contents
df.show(truncate=False)
O/p:
+-------+-------------------------------------+
|line_no|Sentance |
+-------+-------------------------------------+
|1 |This is Bangalore, the Garden city |
|2 |This is Hyderabad, city if Biryanies.|
|3 |This is Chennai, city of people |
|4 |This is Coiambattore. |
|5 |This is Mysore the Royal city. |
+-------+-------------------------------------+
# Display 3 rows with column length 10 characters
df.show(3,truncate=10)
O/p:
+-------+----------+
|line_no| Sentance|
+-------+----------+
| 1|This is...|
| 2|This is...|
| 3|This is...|
+-------+----------+
only showing top 3 rows
# Didaply rows, columns in DataFrame vertically
df.show(n=3,truncate=10,vertical=True)
O/p:
-RECORD 0--------------
line_no | 1
Sentance | This is...
-RECORD 1--------------
line_no | 2
Sentance | This is...
-RECORD 2--------------
line_no | 3
Sentance | This is...
only showing top 3 rows
Using Row class on PySpark DataFrame:
from pyspark.sql import Row
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
df=spark.createDataFrame(data)
df.printSchema()
df.show()
O/p:
root
|-- name: string (nullable = true)
|-- lang: array (nullable = true)
| |-- element: string (containsNull = true)
|-- state: string (nullable = true)
+-----------+--------------------+---------+
| name| lang| state|
+-----------+--------------------+---------+
|Saravanavel|[Tamil, Telugu, Eng]| Chennai|
| Siddappa|[Kannada, Telugu,...|Bangalore|
| Rao|[Telugu, Urdu, Hi...|Hyderabad|
+-----------+--------------------+---------+
PySpark Column Class:
# access column from DF
data=[("Rao",43),("Ram",44)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
# Selecting a column using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()
O/p:
+------+
|gender|
+------+
| 43|
| 44|
+------+
+------+
|gender|
+------+
| 43|
| 44|
+------+
+----------+
|name.fname|
+----------+
| Rao|
| Ram|
+----------+
PySpark Column Operators:
add, subtract, div, multi,
data=[(10,20),(20,33),(30,50)]
df=spark.createDataFrame(data).toDF("col1","col2")
#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show()
O/p:
+-------------+
|(col1 + col2)|
+-------------+
| 30|
| 53|
| 80|
+-------------+
+-------------+
|(col1 - col2)|
+-------------+
| -10|
| -13|
| -20|
+-------------+
Alias Column:
#
data = [("John","Cena","33","M",50000),
("Michael","Corneol","44","M",44000),
("Robert","Duvval","42","M",67000),
("Anne","Adams","92","F",34000),
("Peter","Parker","57","M",100000)
]
columns=["fname","lname","age","gender", "sal"]
df=spark.createDataFrame(data,columns)
df.select(df.fname.alias("first_name"), \
df.lname.alias("last_name")
).show()
O/p:
+----------+---------+
|first_name|last_name|
+----------+---------+
| John| Cena|
| Michael| Corneol|
| Robert| Duvval|
| Anne| Adams|
| Peter| Parker|
+----------+---------+
# Concat first and last column
df.select(expr(" fname ||','|| lname").alias("fullName") \
).show()
O/p:
+---------------+
| fullName|
+---------------+
| John,Cena|
|Michael,Corneol|
| Robert,Duvval|
| Anne,Adams|
| Peter,Parker|
+---------------+
DataFrame Filter:
# contains
df.filter(df.fname.contains("John")).show()
O/p:
+-----+-----+---+------+-----+
|fname|lname|age|gender| sal|
+-----+-----+---+------+-----+
| John| Cena| 33| M|50000|
+-----+-----+---+------+-----+
Select Columns:
df.select("fname","lname").show()
df.select(df.fname,df.lname).show()
df.select(df["fname"],df["lname"]).show()
O/p:
+-------+-------+
| fname| lname|
+-------+-------+
| John| Cena|
|Michael|Corneol|
| Robert| Duvval|
| Anne| Adams|
| Peter| Parker|
+-------+-------+
# Select All columns from List
df.select(*columns).show()
O/p:
+-------+-------+---+------+------+
| fname| lname|age|gender| sal|
+-------+-------+---+------+------+
| John| Cena| 33| M| 50000|
|Michael|Corneol| 44| M| 44000|
| Robert| Duvval| 42| M| 67000|
| Anne| Adams| 92| F| 34000|
| Peter| Parker| 57| M|100000|
+-------+-------+---+------+------+
#Selects first 2 columns and top 2 rows
df.select(df.columns[:2]).show(2)
O/p:
+-------+-------+
| fname| lname|
+-------+-------+
| John| Cena|
|Michael|Corneol|
+-------+-------+
only showing top 2 rows
#2. Update The Value of an Existing Column
df.withColumn("sal",col("sal")*100).show()
O/p:
+-------+-------+---+------+--------+
| fname| lname|age|gender| sal|
+-------+-------+---+------+--------+
| John| Cena| 33| M| 5000000|
|Michael|Corneol| 44| M| 4400000|
| Robert| Duvval| 42| M| 6700000|
| Anne| Adams| 92| F| 3400000|
| Peter| Parker| 57| M|10000000|
+-------+-------+---+------+--------+
#3. Create a Column from an Existing
df.withColumn("New_sal",col("sal") / 100).show()
O/p:
+-------+-------+---+------+------+-------+
| fname| lname|age|gender| sal|New_sal|
+-------+-------+---+------+------+-------+
| John| Cena| 33| M| 50000| 500.0|
|Michael|Corneol| 44| M| 44000| 440.0|
| Robert| Duvval| 42| M| 67000| 670.0|
| Anne| Adams| 92| F| 34000| 340.0|
| Peter| Parker| 57| M|100000| 1000.0|
+-------+-------+---+------+------+-------+
# 4. Add a New Column using withColumn()
from pyspark.sql.functions import *
df.withColumn("City", lit("Bangalore")) \
.withColumn("Date_column",current_date()) \
.show()
O/p:
+-------+-------+---+------+------+---------+-----------+
| fname| lname|age|gender| sal| City|Date_column|
+-------+-------+---+------+------+---------+-----------+
| John| Cena| 33| M| 50000|Bengaluru| 2022-04-15|
|Michael|Corneol| 44| M| 44000|Bengaluru| 2022-04-15|
| Robert| Duvval| 42| M| 67000|Bengaluru| 2022-04-15|
| Anne| Adams| 92| F| 34000|Bengaluru| 2022-04-15|
| Peter| Parker| 57| M|100000|Bengaluru| 2022-04-15|
+-------+-------+---+------+------+---------+-----------+
# 6. Drop Column From PySpark DataFrame
df.drop("Date_column") \
.show()
O/p:
+-------+-------+---+------+------+
| fname| lname|age|gender| sal|
+-------+-------+---+------+------+
| John| Cena| 33| M| 50000|
|Michael|Corneol| 44| M| 44000|
| Robert| Duvval| 42| M| 67000|
| Anne| Adams| 92| F| 34000|
| Peter| Parker| 57| M|100000|
+-------+-------+---+------+------+
#2. rename multiple columns in DF
df = df.withColumnRenamed("fname","First_name").withColumnRenamed("lname","last_name")
df.printSchema()
O/p:
root
|-- First_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- age: string (nullable = true)
|-- gender: string (nullable = true)
|-- sal: long (nullable = true)
#2. DataFrame filter() with Column Condition
df.filter(df.gender == "F").show(truncate=False)
O/p:
+----------+---------+---+------+-----+
|First_name|last_name|age|gender|sal |
+----------+---------+---+------+-----+
|Anne |Adams |92 |F |34000|
+----------+---------+---+------+-----+
#4. PySpark Filter with Multiple Conditions
df.filter( (df.gender == "F") & (df.last_name == "Adams") ) \
.show(truncate=False)
O/p:
+----------+---------+---+------+-----+
|First_name|last_name|age|gender|sal |
+----------+---------+---+------+-----+
|Anne |Adams |92 |F |34000|
+----------+---------+---+------+-----+
# like - SQL LIKE pattern
df.filter(df.last_name.like("%da%")).show()
O/p:
+----------+---------+---+------+-----+
|First_name|last_name|age|gender| sal|
+----------+---------+---+------+-----+
| Anne| Adams| 92| F|34000|
+----------+---------+---+------+-----+
# 1. Get Distinct Rows (By Comparing All Columns)
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)
O/p:
Distinct count: 5
+----------+---------+---+------+------+
|First_name|last_name|age|gender|sal |
+----------+---------+---+------+------+
|Anne |Adams |92 |F |34000 |
|Robert |Duvval |42 |M |67000 |
|Peter |Parker |57 |M |100000|
|John |Cena |33 |M |50000 |
|Michael |Corneol |44 |M |44000 |
+----------+---------+---+------+------+
# Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)
O/p:
# sort
df.sort("last_name","age").show(truncate=False)
O/p:
+----------+---------+---+------+------+
|First_name|last_name|age|gender|sal |
+----------+---------+---+------+------+
|Anne |Adams |92 |F |34000 |
|John |Cena |33 |M |50000 |
|Michael |Corneol |44 |M |44000 |
|Robert |Duvval |42 |M |67000 |
|Peter |Parker |57 |M |100000|
+----------+---------+---+------+------+
# order by
df.orderBy("sal","age").show(truncate=False)
O/p:
+----------+---------+---+------+------+
|First_name|last_name|age|gender|sal |
+----------+---------+---+------+------+
|Anne |Adams |92 |F |34000 |
|Michael |Corneol |44 |M |44000 |
|John |Cena |33 |M |50000 |
|Robert |Duvval |42 |M |67000 |
|Peter |Parker |57 |M |100000|
+----------+---------+---+------+------+
Group by:
simpleData = [("John","Merging","NY",89000,34,1000),
("Michael","Merging","NY",76000,29,12000),
("Robert","Civil","WA",91000,35,9000),
("Sean","Finance","WA",60000,24,3000),
("Peter","Finance","WA",99000,40,20000),
("Parket","Finance","WA",83000,36,23000),
("Scotty","Merging","NY",79000,53,15000),
("Harvey","Merging","NY",990000,40,50000),
("Litt","Merging","NY",94000,50,40000)
]
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
#df.printSchema()
df.show(truncate=False)
O/p:
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|John |Merging |NY |89000 |34 |1000 |
|Michael |Merging |NY |76000 |29 |12000|
|Robert |Civil |WA |91000 |35 |9000 |
|Sean |Finance |WA |60000 |24 |3000 |
|Peter |Finance |WA |99000 |40 |20000|
|Parket |Finance |WA |83000 |36 |23000|
|Scotty |Merging |NY |79000 |53 |15000|
|Harvey |Merging |NY |990000|40 |50000|
|Litt |Merging |NY |94000 |50 |40000|
+-------------+----------+-----+------+---+-----+
#
df.groupBy("department").count().show(truncate=False)
O/p:
+----------+-----+
|department|count|
+----------+-----+
|Merging |5 |
|Finance |3 |
|Civil |1 |
+----------+-----+
#
df.groupBy("department").sum("salary").show(truncate=False)
O/p:
+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Merging |1328000 |
|Finance |242000 |
|Civil |91000 |
+----------+-----------+
PySpark Joins:
emp_data = [("John","Merging","NY",89000,34,1000, 50),
("Michael","Merging","NY",76000,29,12000, 50),
("Robert","Civil","WA",91000,35,9000, 40),
("Sean","Finance","WA",60000,24,3000, 30),
("Peter","Finance","WA",99000,40,20000, 30),
("Parket","Finance","WA",83000,36,23000, 30),
("Scotty","Merging","NY",79000,53,15000, 50),
("Harvey","Merging","NY",990000,40,50000, 50),
("Litt","Merging","NY",94000,50,40000, 50)
]
emp_Columns = ["employee_name","department","state","salary","age","bonus", "dept_id"]
emp_df = spark.createDataFrame(data=emp_data, schema = schema)
empDF = spark.createDataFrame(data=emp_data, schema = emp_Columns)
# empDF.printSchema()
print ('****emp_table')
print('************************************')
empDF.show(truncate=False)
dept = [("Merging",50), \
("Civil",40), \
("Finance",30), \
("Law", 20)
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
# deptDF.printSchema()
deptDF.show(truncate=False)
O/p:
****emp_table
************************************
+-------------+----------+-----+------+---+-----+-------+
|employee_name|department|state|salary|age|bonus|dept_id|
+-------------+----------+-----+------+---+-----+-------+
|John |Merging |NY |89000 |34 |1000 |50 |
|Michael |Merging |NY |76000 |29 |12000|50 |
|Robert |Civil |WA |91000 |35 |9000 |40 |
|Sean |Finance |WA |60000 |24 |3000 |30 |
|Peter |Finance |WA |99000 |40 |20000|30 |
|Parket |Finance |WA |83000 |36 |23000|30 |
|Scotty |Merging |NY |79000 |53 |15000|50 |
|Harvey |Merging |NY |990000|40 |50000|50 |
|Litt |Merging |NY |94000 |50 |40000|50 |
+-------------+----------+-----+------+---+-----+-------+
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Merging |50 |
|Civil |40 |
|Finance |30 |
|Law |20 |
+---------+-------+
#
empDF.join(deptDF,empDF.dept_id == deptDF.dept_id,"fullouter") \
.show(truncate=False)
+-------------+----------+-----+------+----+-----+-------+---------+-------+
|employee_name|department|state|salary|age |bonus|dept_id|dept_name|dept_id|
+-------------+----------+-----+------+----+-----+-------+---------+-------+
|null |null |null |null |null|null |null |Law |20 |
|Sean |Finance |WA |60000 |24 |3000 |30 |Finance |30 |
|Peter |Finance |WA |99000 |40 |20000|30 |Finance |30 |
Union:
data1 = [("John","Merging","NY",89000,34,1000, 50),
("Michael","Merging","NY",76000,29,12000, 50),
("Robert","Civil","WA",91000,35,9000, 40),
("Sean","Finance","WA",60000,24,3000, 30)
]
emp_Columns = ["employee_name","department","state","salary","age","bonus", "dept_id"]
emp_df1 = spark.createDataFrame(data = data1, schema = emp_Columns)
data2= [("Peter","Finance","WA",99000,40,20000, 30),
("Parket","Finance","WA",83000,36,23000, 30),
("Scotty","Merging","NY",79000,53,15000, 50),
("Harvey","Merging","NY",990000,40,50000, 50),
("Litt","Merging","NY",94000,50,40000, 50)]
emp_Columns = ["employee_name","department","state","salary","age","bonus", "dept_id"]
emp_df2 = spark.createDataFrame(data = data2, schema = emp_Columns)
#
unionDF = emp_df1.union(emp_df2).show(truncate=False)
O/p:
+-------------+----------+-----+------+---+-----+-------+
|employee_name|department|state|salary|age|bonus|dept_id|
+-------------+----------+-----+------+---+-----+-------+
|John |Merging |NY |89000 |34 |1000 |50 |
|Michael |Merging |NY |76000 |29 |12000|50 |
|Robert |Civil |WA |91000 |35 |9000 |40 |
|Sean |Finance |WA |60000 |24 |3000 |30 |
|Peter |Finance |WA |99000 |40 |20000|30 |
|Parket |Finance |WA |83000 |36 |23000|30 |
|Scotty |Merging |NY |79000 |53 |15000|50 |
|Harvey |Merging |NY |990000|40 |50000|50 |
|Litt |Merging |NY |94000 |50 |40000|50 |
+-------------+----------+-----+------+---+-----+-------+
# Merge without Duplicates
unionDF = emp_df1.union(emp_df2).distinct().show(truncate=False)
DataFrame Aggregate Functions:
data = [("John","Cena","33","M",50000),
("Michael","Corneol","44","M",44000),
("Robert","Duvval","42","M",67000),
("Anne","Adams","92","F",34000),
("Peter","Parker","57","M",100000),
("Peter", "Hains", "47", "M", 500000),
("Penelope", 'Cruze', "44", "F", 1000000)
]
columns=["fname","lname","age","gender", "sal"]
df=spark.createDataFrame(data,columns)
# df = spark.createDataFrame(data=simpleData, schema = schema)
# USing SQL
df.createOrReplaceTempView("EMP")
spark.sql("select * from EMP ORDER BY sal asc") \
.show(truncate=False)
O/p:
+--------+-------+---+------+-------+
|fname |lname |age|gender|sal |
+--------+-------+---+------+-------+
|Anne |Adams |92 |F |34000 |
|Michael |Corneol|44 |M |44000 |
|John |Cena |33 |M |50000 |
|Robert |Duvval |42 |M |67000 |
|Peter |Parker |57 |M |100000 |
|Peter |Hains |47 |M |500000 |
|Penelope|Cruze |44 |F |1000000|
+--------+-------+---+------+-------+
from pyspark.sql.types import *
from pyspark.sql.functions import *
df.select(approx_count_distinct("sal")).collect()[0][0]
df.select(avg("sal")).collect()[0][0]
df.select(collect_list("sal")).show(truncate=False)
df.select(countDistinct("gender", "sal")).show(truncate=False)
df.select(count("sal")).collect()[0]
df.select(first("sal")).show(truncate=False)
df.select(last("sal")).show(truncate=False)
df.select(max("sal")).show(truncate=False)
df.select(min("sal")).show(truncate=False)
df.select(mean("sal")).show(truncate=False)
df.select(sum("sal")).show(truncate=False)
O/p:
+-----------------------------------------------------+
|collect_list(sal) |
+-----------------------------------------------------+
|[50000, 44000, 67000, 34000, 100000, 500000, 1000000]|
+-----------------------------------------------------+
+---------------------------+
|count(DISTINCT gender, sal)|
+---------------------------+
|7 |
+---------------------------+
+----------+
|first(sal)|
+----------+
|50000 |
+----------+
+---------+
|last(sal)|
+---------+
|1000000 |
+---------+
+--------+
|max(sal)|
+--------+
|1000000 |
+--------+
+--------+
|min(sal)|
+--------+
|34000 |
+--------+
+------------------+
|avg(sal) |
+------------------+
|256428.57142857142|
+------------------+
+--------+
|sum(sal)|
+--------+
|1795000 |
+--------+
No comments:
Post a Comment