PySpark RDD:
RDD
(Resilient Distributed Dataset) is a fundamental building block of PySpark
which is fault-tolerant, immutable distributed collections of objects.
1. Immutable meaning once you create an RDD you
cannot change it.
There are two ways to create RDDs:
- Parallelizing an existing collection in your driver program.
- Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
data = ['''over under over pale \
Thorough over thorough always \
Over park over always \
Thorough flood thorough fire \
over under over pale \
Thorough over thorough always \
Over park over always
''']
# create RDD by parallelizing
rdd=spark.sparkContext.parallelize(data)
rdd
Read files from external location:
# Create rddd from External Dataset (read as multiple lines)
rdd_external = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/test.txt")
rdd_external.collect()
#Reads entire file into a RDD as single record.
rdd3 = spark.sparkContext.wholeTextFiles("/content/drive/MyDrive/0_AWS/test.txt")
rdd3.collect()
Partition:
# Get partitons
rdd2.getNumPartitions()
o/p:
10
# repartition
reparRdd = empty_rdd2.repartition(10)
RDD Operations:
*RDDs support two types of operations: *
- Transformations, which create a new dataset from an existing one. (e.g: map)
- Actions, which return a value to the driver program after running a computation on the dataset. (e.g: reduce)
Narrow transformations:
No data moment
Examples- map(), mapPartition(), flatMap(), filter(), union()
Wider transformations:
Data from many partitions, means data movement happens.
Wider transformaitons are:- groupByKey(), aggregateByKey(), aggregate(), join(), repartition()
#read text file from drive
rdd = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/text1.txt")
# Apply flatmap
# flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD.
# Resulting RDD consists of a single word on each record.
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # each word is collected
# map
# map – map() transformation is used the apply any complex operations like adding a column,
# updating a column e.t.c,
# the output of map transformations would always have the same number of records as input.
rdd3 = rdd2.map(lambda x: (x,1))
# In our word count example, we are adding a new column with value 1 for each word,
# the result of the RDD is PairRDDFunctions which contains key-value pairs,
# word of type String as Key and 1 of type Int as value.
#filter – filter() transformation is used to filter the records in an RDD.
# In our example we are filtering all words starts with “a”.
rdd4 = rdd3.filter(lambda x: 'a' in x[0])
print(rdd4.collect())
O/p:
[('data', 1), ('pale', 1), ('always', 1), ('park', 1), ('always', 1), ('pale', 1),
('always', 1), ('park', 1), ('always', 1)] ....
rdd5 = rdd4.reduceByKey(lambda a,b: a+b)
rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()
#Print rdd6 result to console
print(rdd6.collect())
O/p:
[(1, 'data'), (2, 'pale'), (2, 'park'), (4, 'always')]
Row class on PySpark RDD:
from pyspark.sql import Row
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())
O/p:
[Row(name='Saravanavel', lang=['Tamil', 'Telugu', 'Eng'], state='Chennai'), \
Row(name='Siddappa', lang=['Kannada', 'Telugu', 'Tulu'], state='Bangalore'), \
Row(name='Rao', lang=['Telugu', 'Urdu', 'Hindi'], state='Hyderabad')]
No comments:
Post a Comment