Search This Blog

599_AWS Interview Questions

0. AWS
Q. What is AWS?
-Amazon Web Services is a Cloud Provider 
-AWS provide servers and services that can be used on demand and scale easily.

Q. SSO?
Single Sign ON – it is like using 1 account for many logins.

Q. AWS Cloud Use Cases?
AWS used in
- Enterprise IT,  
- Data analytics
- Website hosting  
- Gaming
- Storage
- Mobile
- Social Apps

Q. AWS Well-Architected tool?
An AWS Well-Architected tool provided consistent approach for reviewing your architecture against AWS best practices.
AWS Well-Architected Tool àWorkloads à Define workload

Q. Pricing in AWS?
It is based on 3 factors.
1) Compute (CPU, resources, memory)
2) Storage
3) Outbound Data transfer (like downloading froms3)

Q. How to Choose region?
1) Compliance with data governance and legal rules.
2) Proximity to Customers
3) Pricing
4) Available Services
5) Based on Latency: how much band width of data is coming

Q. Edge location use?
Used for caching the data.

For example, a user in India accessing a file from US east region for the first time the file will be accessed from US East region. If any other user from India tries to access the same file, then they can directly access it from caching server in India.

Q. Maximum Limit set by aws for roles, Users in an account?
1000 roles, 
5000 Users, 
300 Groups,
Iam roles and users per policies can be 10,
IAM users can be member of 10 groups

You can raise a ticket to support team and ask for increasing the limit.




1. Iam
Q. IAM 
- Identity and Access Management
- enables you to manage the access to AWS services and resources securely.
- IAM Service usage is FREE!
- IAM is a Global service.

Q. Features of IAM:
- Securely manage services and resources.
- Creates IAM principals such as Users, Groups and Roles.
- Allow/Deny access via IAM Policy.
- Supports Identity Federation.
- Provides Multi-Factor Authentication.
- Provides Policy Simulator.

Q. How can users access AWS?
AWS can be accessed in 3 ways:
- AWS Management Console (Password is used to login)
- AWS Command Line Interface (access keys )
- AWS Software Developer Kit (access keys)

Q. What is a Role in Iam?
Role is not linked to any person/user or service. Rather you assume the Role as which grants you permission to access that(service).

Q. What are policies in Iam?
There are 2 types of policies.
1) Managed policies (created by AWS)
2) Custom policy (Created by us and we have to manage them)

Policies are:
1. A policy is an entity in AWS that, when attached to an identity or resource defines their permissions.
2. A policy contains different Permissions, determine whether the request is allowed or denied.
3. Policies are stored in AWS as JSON documents.
4. Policy is attached to either principals (Users, Roles or Groups) or to resources (e.g., S3 bucket).

Q. Iam policy consists of?
Policy Consists of:
1) Version: It’s a policy language version, always include “2012-10-17”
2) ID: Identifier for policy (optional), (it helps in quick recognition for which policy is)
3) Statement: one or more required
Statement consists of:
1) SID: Statement Identifier (optional)
2) Effect: Whether statement allows or denies access ()
3) Principal: accounts/user/role to which this policy is applied to (ARN is used here) 
4) Actions: Set of actions policy allows or denies
5) Resources: list of resources to which action applied to
6) Conditions: conditions applied when the policy is executed


Q. IAM Terminologies:
- Resources: Resources are the things on which actions can be taken. Example: EC2 instances are resources so someone who has the power to start or stop an instance, they are acting on a resource.
- Principals: Things that can take action. So, principals act and they act upon resources. It includes users, groups and roles. These are also called as Identities.
- Users: IAM users are entities created in AWS and have permissions to perform some actions.
- Group – Collection of IAM users.
- Role – An identity that grants permission.

Q. What is ARN?
ARN --Amazon Resource Names
It is an unique name for all the resources which are being assigned in AWS.
It is helpful when we write policies, rules.
It is automatically assigned for services we create.

Q. Defaualt deny in Iam?
IAM User and IAM Group by default has no permissions to access AWS resources. This is called as “Default Deny” or “Non-Explicit Deny”.

Q. How to create/import 1000 users at a time in AWS?
By Bulk importing csv file, using AWS SSO (single Sign on) we can create 1000 users at a time.

Q. What is MFA in IAM?
AWS Multi-Factor Authentication (MFA) is a simple best practice that adds an extra layer of protection on top of your user’s name and password. 

Q. What are identity-based policies?
Policies that you can attach to a principal (or identity) such as an IAM user, role or group.

These policies control what actions that identity can perform, on which resources, and under what conditions.

Q. Resource based policies?
Policies that you attach to a resource, such as S3, IAM, Lambda, EC2.

Q. How policies are different form Permission boundaries?
Permission Boundaries are like add on policies on Policies.
Example:
If you have AdminrolePolicy then if access is given for DynamoDB then your area admin for DynamoDB.

Q. What is Access Analyzer in Iam?
With Access Analyzer you can analyze:
1) what kind of security your account has?
2) What services provided for your account




2. EC2
Q. EC2
- Elastic Compute Cloud.
Infrastructure as a service.
-Virtual server to run applications.
-Scalable Computing Capacity in AWS Cloud. (Increase /decrease as required)
-Scale up or down to handle changing requirements, reduce the need to forecast traffic.




Q. Types of EC2 instances?
- General purpose. (t2, t3, t4, m4, m5) -balance b/n memory, Compute, Networking
- Compute optimized. (c4, c5, c6)
- Memory optimized. (r4, r5, r6, u, x1, x2, z1)
- Storage optimized. (d2, d3, h1, i3, im, is)
- Accelerated computing (f1, g2, g3, g4…)

Q. Instance purchasing options?
Amazon EC2 provides the following purchasing options.
On-Demand Instances – Pay, by the second.
Savings Plans – commitment to a consistent usage, for a term of 1 or 3 years.
Reserved Instances – Reserve for a term of 1 or 3 years.
Spot Instances – Request unused EC2 instances, 
Dedicated Hosts – Pay for a physical host 
Dedicated Instances – Pay, by the hour, for instances that run on single-tenant hardware.
Capacity Reservations – Reserve capacity for your EC2 instances in a specific Availability Zone for any duration.


Q. Type of Scaling recommend to Amazon databases?
Autoscaling 

Q. Diff type of Load balancers?
    - Application Load Balancer, 
    - Network Load Balancer, 
    - Gateway Load balancer, 
    - Classic load balancer.

Q. How can you auto delete the snapshots?
AWS ops autometer service is used which handles snapshots automatically.

Q. EC2 storage volumes?
- Storage volumes for temporary data, which are deleted when you stop, hibernate, or terminate your instance, known as instance store RAM volumes.
Ex: used for testing, POC's

- Persistent storage volumes (permanently there) for your data using Amazon Elastic Block Store (Amazon EBS), known as Amazon EBS Volumes.





3. S3
Q. S3
Simple Storage Service

Q. Default storage service in S3?
Standard.


Q. Can a bucket have different objects in different storage class?
Yes

Q. Amazon S3 is which type of service?
Object

Q. For which type of data S3 offers encryption services?
    - Data in flight, 
    - data in rest

Q. Does S3 objects are accessible from any region.
True

Q. S3 storage classes:
1. Standard
2. Standard-IA
3. Intelligent-Tiering
4. One Zone-IA
5. Glacier Instant retrieval
6. Glacier Flexible retrieval
7. Glacier Deep archive.

Q. Copy a file from Ec2 instance to s3?
- SSH EC2 instance.
>aws s3 cp abc.txt s3://my-bucket-1b

Q. Copy a file from s3 to Ec2 instance?
>aws s3 cp s3://my-bucket-1b/xyz.txt .



4. VPC
Q. VPC main components?
VPC: Networking Layer for EC2.
Subnet: A logical partition on an IP network/address into multiple segments/IP address (or range of IP addresses)
CIDR: a method for allocating IP addresses
Route Tables: Set of rules, called Routes that are used to determine where network traffic is directed.
Internet Gateway: A gateway that you attach to your VPC to enable communication between resources in your VPC and the internet
VPC Endpoints: Allows you to connect to AWS services using a private network instead of using the public Internet. (without Internet Gateway, NAT device, VPN connection, instances do not require public IP addresses to communicate with resources in the service)

Q. Host id in Ipv4
Ipv4:
172.16.47.5
172.16  -- network prefix
47.5   -- Host ID 

Q. IP addresses calculation in CIDR?
10.0.0.0/24 à256 Ip addresses (10.0.0.0 – 10.0.0.255)

Every network has 32bits
32- CIDR=32-24=8
No: of Ips è 2^8 =256

Q. Ipv4 vs Ipv6

Ipv4

Ipv6

1) Format is 32bit, 4groups of up to 3 decimal digits 
1) Format is 128-bit, 8 groups of 4 hexadecimal digits
2) Default and required for all VPC's
 
2) Option only

3) VPC CIDR block size can be from /16 to /28

3) VPC CIDR block size fixed at /56

 

4) Subnet CIDR block size can be from /16 to /28

4) Subnet CIDR block size fixed at /64

5) You can choose the Private ipv4 CIDR block for your VPC

5)


Q, NACL vs Security Group?
Security Group vs NACL
SG works at instance level, NACL works at subnet level.







5. Lambda




6. Glue
Q. Access S3 for glue job?
Iam role to access S3, GlueServiceRole, GlueServiceNotebookRole, GlueConsoleAccess

Q. Type (where job runs?)
Spark, Spark Streaming, Python shell

Q. Glue version?
Spark3.1, Python 3 (Glue version 3.0)

Q. Script stored at?
In S3 bucket in specific folder

Q. Temporary directory in Glue job?
Location where temporary results are saved.

Q. Incremental load implementation?
Advanced properties -- job bookmark -- enable

Q. Monitoring options?
Job metrics (Enable creation of cloud watch metrics CloudWatch metrics) 
Continuous logging
Spark UI (enable Spark UI for monitoring) 

Security, Script libraries, job parameters
Q. Library path?
Specify S3 bucket folder/key path.

Q. Dependent jars path?
Specify S3 bucket folder/key path.

Q. Referenced files path?
Specify S3 bucket folder/key path.

Q. Worker type?
G.1X (memory intensive jobs Max 299 workers
G.2X (for ML transformations) Max 149 workers
Data Processing Units (or DPUs)

Q. Max number of concurrent runs allowed for this job?
Max concurrency (default 1), error is returned when threshold reached.

Q. Number of retries?
Mention

Q. Job time out in Glue job?
Job execution time limit. Default is 2880min (48hrs)

Q. Delay notification threshold (in min)?
If job runs longer than specified time, glue sends notification via CloudWatch. 

Q. Catalog options?
Use Glue Catalog as Hive meta store.

Q. Choose Data source?
Data source has to be crawled and schema table should be in Glue Catalog DB.

Q. Choose data target?
Create tables in data target.
Use tables in the data Catalog and update your data target.
Data store: S3, JDBC (select connection)









Testing Interview Questions

Testing Interview Questions:
Q.    What is Software testing?
Process of executing applications to find defects.


Q.    Why Software Testing?
-to find errors that are made during development
-to ensure quality of product
-saves money as defect identified in early stages.
-to build customer confidence.


Q.    Testing benefits?
-Finding defects before delivery
-level of quality 
-Ensure requirements are delivered to client


Q.    Quality?
Delivery application without bugs in time.


Q.    SDLC models?
Waterfall
Agile


Q.    Software Development Life Cycle (SDLC)?
-Requirement gathering
-Analysis
-designing
-Coding/Development
-Tesing
-Deployment /Production
-Maintains /Support


Q.    Waterfall model?
The Waterfall" approach, the whole process of software development is divided into separate phases. In this Waterfall model, typically, the outcome of one phase acts as the input for the next phase sequentially.
Advantages:
-simple and easy to understand
-Phases are processed and completed 1 at a time. (predefined outputs at every phase)
- Tracking changes is easy.
Dis-advantages:
-not suitable for changing requests
-difficult to measure progress within stages.
-Poor model for long and ongoing projects.
-

Q.    Agile process?
In Agile software development approach is based on iterative development.
-no log term planning.
Phases in Agile:
Requirements gathering
Design the requirements
Construction/ iteration
Testing/ Quality assurance
Deployment
Feedback

Advantages:
-Each build is incremental in terms of features.
-Rapid development
-little planning required.
-
Disadvantages:
-no documentation
-transfer of technology  to new team is difficult
-customer interaction  is very important


Q.    Agile terms?
Story
Product owner
Product backlog

Standup meeting
Sprint
Scrum meeting

Scrum master



Q.    Software Testing life cycle Phases?
-requirement understanding
-Test plan
-test scenario
-test cases
-test execution
-bug reporting, retesting


Q.    Test case?
Test data with 
-pre conditions,
-expected results
-post conditions


Q.     Bug vs Defect?
Defect: Idendified during development
Bug:  Idendified during Testing

Error vs Failure?
Error: related to coding.
Failure: Problem identified during production phase.


Q.    Query Surge?
Used for continuous  testing.


Q.    Components in Query surge?
-Users,
-Agents
-Application server
-Database server


Q.    Mutation testing?
Introduce bugs


Q.    Latent defect?
Existing defect which cause no failure.


Q.    Steps to resolve testing?
Record
Report
Control


Q.    Quality of test execution is based?
-Defect rejection ratio
-Defect leakage ratio.


Q.    Defect rejection ratio?
Number of rejected bug reports / total submitted bug reports.


Q.    Fault masking?
Presence of one defect hides the presence of another defect. 


Q.    Testing methodologies?
-Black box
-white box

Q.    Black box Testing?
Testing method in which the functionalities of software applications are tested without having knowledge of internal code structure, implementation details.


Q.     what is White box testing?
Testing technique in which internal structure, design and coding of software are tested to verify flow of input-output and to improve design, usability and security. 


Q.    Incremental testing?
Two approaches 
1-Bottom-up (Drivers) -low components to high. 
2-top-down (Stubs)


Q.    Adhoc testing?
Testing application w/o proper planning.

Q.    Static testing?
Testing application w/o performing any action.


Q.    Retesting?
Product is already tested and due to some problems it needs to be tested again.






PySpark Int Qstns
1. PySpark Basic
Q1.  What is PySpark?
PySpark is a Spark library written in Python to run Python application using Apache Spark capabilities.
PySpark is used to run applications parallelly on the distributed cluster (multiple nodes).


Q2.  What are Spark features?
1. In-memory computation
2. Distributed processing using parallelize
3. Can be used with many cluster managers (Spark, Yarn, Mesos etc)
4. Fault-tolerant
5. Immutable
6. Lazy evaluation
7. Cache & persistence
8. Inbuild-optimization when using DataFrames
9. Supports ANSI SQL


Q3.  What is SparkSession?
Since Spark 2.0 SparkSession has become an entry point to PySpark to work with RDD, DataFrame.
Prior to 2.0, SparkContext used to be an entry point.


Q.  Create spark session?
#
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark.version
O/p:
3.2.1




Q.  Get SparkContext Configurations?
#
configurations = spark.sparkContext.getConf().getAll()
for item in configurations:
  print(item)


Q.  Get Specific configuration?
#
print(spark.sparkContext.getConf().get("spark.master"))
O/p:
local


Q.  Lazy evaluation in PySpark?
Transformation is not evaluated when they arrive, but instead queued with other transformations until an Action is called.


Q.   Benefits of Spark vs MapReduce?
- Spark has in memory processing
- Spark is 10-100 times faster than MapReduce.
- Spark works good for smaller data sets (size less than server's RAM)
- Hadoop is more cost-effective for processing massive data sets.


Q.  Cluster Manager Types?
1. Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
2. Apache Mesos – Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark applications.
3. Hadoop YARN – the resource manager in Hadoop 2. (Most used)
4. Kubernetes – an open-source system for automating deployment, scaling, and management of containerized applications.


Q.  Spark integrates with languages?
-Python,
-Scala,
-R
-Java



2. RDD
Q.  What is RDD?
RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects.

Immutable meaning once you create an RDD you cannot change it.


Q.  How many ways RDD can be created?
2 ways.
1. Parallelizing an existing collection in your driver program.
#Create RDD from parallelize    
data = [1,2,3,4,5]
rdd=spark.sparkContext.parallelize(data)
rdd
2. Referencing a dataset in an external storage system.
# Create rddd from External Dataset (read as multiple lines)
rdd_external = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/text1.txt")
rdd_external.collect()


Q.  coalesce vs repartition?
repartition () method shuffles data from all nodes also called full shuffle.
coalesce () method which shuffle data from minimum nodes
example:
If data is in 4 partitions and doing coalesce (2) moves data from 2 nodes only.

repartition () - increase/ decrease RDD, DataFrame, Dataset partitions  
coalesce () - Only decreases the number of partitions



Q.  RDD transformation vs RDD action?
RDD transformations – Transformations are lazy operations, instead of updating an RDD, these return new RDD. (Eg: filter(), map() )

RDD actions – After an action, we get values. (Eg: max(), min(), sum() )


Q. What is narrow transformation?
In narrow transformations no data movement occurs (happens in same partition)
Examples: map(), mapPartition(), flatMap(), filter(), union()


Q.  What is wider transformation?
Data from many partitions, means data movement happens.
Examples: groupByKey(), aggregateByKey(), aggregate(), join(), repartition()


Q.  RDD transformations?
map(func)
flatMap(func)
filter(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
reduceByKey(func, [numPartitions])
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
sortByKey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)


Q. RDD Actions?
reduce(func)
collect()
count()
first()
max()
min()
reduce()
take()
takeSample()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()


Q.  map() transformation?
map() transformation is used the apply any complex operations like adding a column, updating a column etc,
# Output of map transformation always have same no: of records as input 
rdd3 = rdd2.map(lambda x: (x,1))
in above code: new column with value 1 is added.


Q.  Select vs Collect() in PySpark?
Select() -transformation

Collect() -action
Returns all data from RDD as an array. 
Be careful when you use this action when you are working with huge RDD 
with millions and billions of data, you may run out of memory on the driver.

 
Q.  count vs countApprox vs countApproxDistinct?
count() – Return the count of elements in the dataset.

countApprox() – Return approximate count of elements in the dataset, this method returns incomplete when execution time meets timeout.

countApproxDistinct() – Return an approximate number of distinct elements in the dataset.

countByValue() – Return Map[T,Long] key representing each unique value in dataset and value represents count each value present.
#
rdd.count()
rdd.countApprox(1)
rdd.countApproxDistinct()
rdd.countByValue()



Q.  RDD min, max?
rdd.max() # returns max value
rdd.min()   # returns min value
rdd.first() # returns first value
rdd.top(2)  # returns top 2 values


Q.  Types of RDD?
PairRDDFunctions or PairRDD: – Pair RDD is a key-value pair This is mostly used RDD type.
ShuffledRDD: PySpark shuffling triggers when we perform certain transformation operations like gropByKey(), reduceByKey(), join() on RDDS
DoubleRDD:
SequenceFileRDD: –
HadoopRDD: –
ParallelCollectionRDD: –
# PairRDD, aggregate elements by key Example
rdd = spark.sparkContext.parallelize(
      ['Bengaluru Hyderabad Chennai','Bengaluru Hyderabad Pune','Hyderabad Mumbai Mysore Coiambattore']
    )
wordsRdd = rdd.flatMap(lambda x: x.split(" "))
pairRDD = wordsRdd.map(lambda f: (f,1))
pairRDD1 = pairRDD.reduceByKey(lambda a,b: a+b)

pairRDD1.collect()

O/p:
[('Bengaluru', 2), ('Hyderabad', 3), ('Chennai', 1), ('Pune', 1), ('Mumbai', 1), ('Mysore', 1), ('Coiambattore', 1)]


Q. Shuffle operations in PySpark? Shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. Shuffle involves copying data across executors and machines, making the shuffle a complex and costly operation. It is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. Q. What is RDD Persistence? Persisting is caching a dataset in memory across operations. (Most important capability/feature in Spark) When a dataset / RDD is persisted, each node storing any partitions of it that it computes in memory is cached and is reused in other actions on that dataset.

Q.  Save RDD to a location?
rdd1.saveAsTextFile("content/sample_data/text1.txt")


Q.  Count no: of words in a file in RDD?
#
from operator import add
rdd = spark.sparkContext.textFile("/content/drive/MyDrive/0_AWS/text1.txt")
rdd.collect()

#
rdd1 = rdd.flatMap(lambda x: x.split(' '))
rdd2 = rdd1.map(lambda x: (x, 1))
rdd3 = rdd2.reduceByKey (lambda a,b:  a+b)
rdd5 = rdd3.map(lambda x: (x[1]))
reres = rdd5.reduce(add)
reres

O/p:
40 (includes spaces, '//' )



3. PySpark DataFrame
Q.  Create DataFrame from RDD?
# create RDD
data = [("HYd", "200000"), ("Bng", "300000"), ("Chennai", "200000")]
rdd = spark.sparkContext.parallelize(data)

# create data frame from RDD
columns = ["Cities","it_Population"]
df1 = rdd.toDF(columns)

# Show data frame
df1.show()
O/p:
+-------+-------------+ | Cities|it_Population| +-------+-------------+ | HYd| 200000| | Bng| 300000| |Chennai| 200000| +-------+-------------+

#
df2  = spark.createDataFrame(rdd).toDF(*columns)
df2.show()
O/p:
+-------+-------------+ | Cities|it_Population| +-------+-------------+ | HYd| 200000| | Bng| 300000| |Chennai| 200000| +-------+-------------+



Q.  Create DataFrame form Schema?
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("John","Cena","33","M",50000),
    ("Michael","Corneol","44","M",44000),
    ("Robert","Duvval","42","M",67000),
    ("Anne","Adams","92","F",34000),
    ("Peter","Parker","57","M",100000)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("emp_id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
O/p:
root |-- firstname: string (nullable = true) |-- lastname: string (nullable = true) |-- emp_id: string (nullable = true) |-- gender: string (nullable = true) |-- salary: integer (nullable = true) +---------+--------+------+------+------+ |firstname|lastname|emp_id|gender|salary| +---------+--------+------+------+------+ |John |Cena |33 |M |50000 | |Michael |Corneol |44 |M |44000 | |Robert |Duvval |42 |M |67000 | |Anne |Adams |92 |F |34000 | |Peter |Parker |57 |M |100000| +---------+--------+------+------+------+


Q.  Create DataFrame form a source?
# read from CSV
california_df = spark.read.csv("sample_data/california_housing_train.csv", \
                    header=True, inferSchema=True)

df.show(3) # shows 3 rows
O/p:
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+ |longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+ | -114.31| 34.19| 15.0| 5612.0| 1283.0| 1015.0| 472.0| 1.4936| 66900.0| | -114.47| 34.4| 19.0| 7650.0| 1901.0| 1129.0| 463.0| 1.82| 80100.0| | -114.56| 33.69| 17.0| 720.0| 174.0| 333.0| 117.0| 1.6509| 85700.0| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+ only showing top 3 rows


Q.  Covert PySpark DF to pandas DF?
#
pandasDF = pysparkDF.toPandas()
print(pandasDF)



Q.  DataFrame show?
To show contents of DF.
df.show() # Disply full df
df.show(truncate=False) #Display full column contents
# Display 2 rows and full column contents
df.show(2,truncate=False)  
 
# Display 3 rows with column length 10 characters
df.show(3,truncate=10)
# Display DataFrame rows & columns vertically
df.show(n=3,truncate=10,vertical=True)




Q.  PySpark StructType & StructField?
PySpark StructType & StructField classes are used to specify schema to DataFrame
and are used in creating complex columns like nested struct, array and map columns.
pyspark.sql.types import StructType
pyspark.sql.types import StructField

#Using PySpark StructType & StructField with DataFrame
data = [("James","","Watt","36636","M",30000),
    ("Michael","Corleone","","40288","M",40000),
    ("Robert","","Duvval","42114","M",40000),
    ("Maria","smith","Sharapova","39192","F",4000)
    ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)




Q. PySpark Row?
PySpark Row class is available by importing pyspark.sql.Row which is represented as a record/row in DataFrame.
from pyspark.sql import Row

# row in RDD
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
    Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
    Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())

#row in DataFrame
from pyspark.sql import Row
data = [Row(name="Saravanavel",lang=["Tamil","Telugu","Eng"],state="Chennai"),
    Row(name="Siddappa",lang=["Kannada","Telugu","Tulu"],state="Bangalore"),
    Row(name="Rao",lang=["Telugu","Urdu", "Hindi"],state="Hyderabad")]
df=spark.createDataFrame(data)
df.printSchema()
df.show()




Q.  Alias example?
#####################
#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name")
   ).show()

#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
   ).show()



Q.  Access columns?
# access column from DF
data=[("Rao",43),("Ram",44)]
df=spark.createDataFrame(data).toDF("name.fname","gender")

# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()


Q.  Alias example?
#####################
#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name")
   ).show()

#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
   ).show()



Q.  Sort a DF asc and dsc?
########  ASC & DSC  ########
#############################
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()



Q.  Convert data type using cast?
#######  cast  ########
# Used to convert the data Type.
#############################
df.select(df.fname, df.id.cast("int")).printSchema()



Q.  Get values in between a range from DF?
#   4.4 between() –
#   Returns a Boolean expression when a column values in between lower and upper bound.
#############################
df.filter(df.id.between(100,300)).show()


Q.  Contains usage in DF?
# contains #############################
#############################
df.filter(df.fname.contains("John")).show()



Q.  startswith and endswith usage?
# #startswith, endswith()
#############################
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()



Q.  check nulls in column?
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()


Q.  Like usage?
#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om"))


Q.  Select usage on columns?
# select usage on columns
df.select("fname","lname").show()
df.select(df.fname,df.lname).show()
df.select(df["fname"],df["lname"]).show()

#By using col() function
from pyspark.sql.functions import col
df.select(col("fname"),col("lname")).show()

#Select columns by regular expression
df.select(df.colRegex("`^.*name*`")).show()


Q.  Select all columns?
# Select All columns from List
df.select(*columns).show()

# Select All columns
df.select([col for col in df.columns]).show()
df.select("*").show()


Q.  Select columns by index?
### Select Columns by Index
#Selects first 3 columns and top 3 rows
df.select(df.columns[:2]).show(2)

#Selects columns 2 to 4  and top 3 rows
df.select(df.columns[2:4]).show(3)


Q.  PySpark withColumn() uage?
PySpark withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.
#Change DataType using PySpark withColumn()
df.withColumn("salary",col("salary").cast("Integer")).show()

#2. Update The Value of an Existing Column
df.withColumn("sal",col("sal")*100).show()

#3. Create a Column from an Existing
df.withColumn("New_sal",col("sal") / 100).show()

# 4. Add a New Column using withColumn()
from pyspark.sql.functions import *
df.withColumn("City", lit("Bengaluru")) \
  .withColumn("Date_column",current_date()) \
  .show()

# 5. Rename Column Name
df.withColumnRenamed("gender","sex") \
  .show(truncate=False)

# 6. Drop Column From PySpark DataFrame
df.drop("Date_column") \
  .show()

#2. rename multiple columns in DF
df = df.withColumnRenamed("fname","First_name").withColumnRenamed("lname","last_name")

# rename nested columns
df4 = df.withColumn("fname",col("name.firstname")) \
      .withColumn("mname",col("name.middlename")) \
      .withColumn("lname",col("name.lastname")) \
      .drop("name")

#7. Using toDF() – To change all columns in a PySpark DataFrame
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*newColumns).printSchema()


Q.  PySpark filter?
#2. DataFrame filter() with Column Condition
df.filter(df.gender == "F").show(truncate=False)

# 3. DataFrame filter() with SQL Expression
#Using SQL Expression
df.filter("gender == 'M'").show()
#For not equal
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

#4. PySpark Filter with Multiple Conditions
df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
    .show(truncate=False)  

#4. PySpark Filter with Multiple Conditions
df.filter( (df.gender  == "F") & (df.last_name  == "Adams") ) \
    .show(truncate=False)

# like - SQL LIKE pattern
df.filter(df.last_name.like("%da%")).show()


Q.  Distinct usage?
# 1. Get Distinct Rows (By Comparing All Columns)
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))

# Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))

# 2. PySpark Distinct of Selected Multiple Columns
dropDisDF = df.dropDuplicates(["department","salary"])



Q.  PySpark orderBy() and sort()?
# sort
df.sort("last_name","age").show(truncate=False)

# order by
df.orderBy("sal","age").show(truncate=False)

# Sort by Ascending (ASC)
df.sort(df.department.asc(),df.state.asc()).show(truncate=False)
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)


Q. Group by usage?
#
df.groupBy("department").count().show(truncate=False)

df.groupBy("department").min("salary").show(truncate=False)

df.groupBy("department").sum("salary").show(truncate=False)

#GroupBy on multiple columns
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()


Q. Aggregate functions?
#
df.select(collect_list("salary")).show(truncate=False)
df.select(collect_set("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)

df.select(approx_count_distinct("sal")).collect()[0][0]
df.select(avg("sal")).collect()[0][0]
df.select(collect_list("sal")).show(truncate=False)
df.select(countDistinct("gender", "sal")).show(truncate=False)
df.select(count("sal")).collect()[0]
df.select(first("sal")).show(truncate=False)
df.select(last("sal")).show(truncate=False)
df.select(max("sal")).show(truncate=False)
df.select(min("sal")).show(truncate=False)
df.select(mean("sal")).show(truncate=False)
df.select(sum("sal")).show(truncate=False)


Q.  Joins?
# join
empDF.join(deptDF,empDF.dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

empDF.join(deptDF,empDF.dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

empDF.join(deptDF,empDF.dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)


Q.  Union?
#
unionDF = emp_df1.union(emp_df2).show(truncate=False)

# Merge without Duplicates
unionDF = emp_df1.union(emp_df2).distinct().show(truncate=False)

# merge different column dfs
merged_df = df1.unionByName(df2, allowMissingColumns=True)



Q.  Get 1st column, 1st row from DataFrame.
#
df1.select (df1.columns[:1]).show(1,truncate =False)

 
Q.  Parquet files benefits?
Uses low storage space.
Stores data in columnar format hence reading data is efficient.


Q.   Add new columns in dataset using PySpark.
from pyspark.sql.functions import *
df.withColumn("City", lit("Bangalore")) \
  .withColumn("Date_column",current_date()) \
  .show()


12) Replace a string column value?
from pyspark.sql.functions import regexp_replace
df.withColumn('address', regexp_replace('address', 'res', 'Residence')) \
  .show(truncate=False)


Q.  Covert timestamp string to Datetype.
from pyspark.sql.types import *
new_df = df.withColumn("timestamp",to_timestamp("input_timestamp"))
df.withColumn("date_type",to_date(current_timestamp()))


Q.  Flatten a nested array
#
df.select(df.flatten(df.col_name)).show()


Q.  Case usage example in PySPark?
from pyspark.sql.functions import *
df2 = df.withColumn("new_Residence", when(df.Residence == "v","Village")
                                 .when(df.Residence == "twn","Town")
                                 .when(df.Residence == "cty","City")
                                 .when(df.Residence.isNull() ,"")
                                 .otherwise(df.Residence))



Q.  Create a DataFrame from a source.
#
df = spark.read.csv("sample_data/california_housing_train.csv", \
                header=True, inferSchema=True)


Q.  Write DataFrame to a location in pyspark in parquet format.
#
df.write.parquet("sample_data/california_housing_train.parquet")


Q. Null Usage?
#Replace 0 for null for all integer columns
emp_df2.na.fill(value=0).show()

#Replace 0 for null on only bonus column
df.na.fill(value=0,subset=["bonus"]).show()

# replace NULL/None values with an empty string or
# any constant values String on all DataFrame String columns.
df.na.fill("").show() # not working

df.na.fill("unknown",["state"]).show()


Q.  Pivot usage?
#
df.show()
+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+
#
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show(truncate=False)
O/p:
+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+


Q. Write to disk?
#partitionBy()
df.write.option("header",True) \
        .partitionBy("housing_median_age") \
        .mode("overwrite") \
        .csv("sample_data/california_housing_train_partiton")


Q.  Parttion?
#partitionBy() multiple columns
df.write.option("header",True) \
        .partitionBy("housing_median_age","latitude") \
        .mode("overwrite") \
        .csv("sample_data/california_housing_train_partiton_test")

##
#Use repartition() and partitionBy() together
df.repartition(4) \
        .write.option("header",True) \
        .partitionBy("latitude") \
        .mode("overwrite") \
        .csv("sample_data/california_housing_train_partiton_test_repartiton_2")

# 7. Data Skew – Control Number of Records per Partition File
#partitionBy() control number of partitions
df.write.option("header",True) \
        .option("maxRecordsPerFile", 2) \
        .partitionBy("latitude") \
        .mode("overwrite") \
        .csv("sample_data/california_housing_train_partiton_test_repartiton_2")

#read specific partition
dfSinglePart=spark.read.option("header",True) \
            .csv("sample_data/california_housing_train_partiton_test_repartiton_2/latitude=32.56")
dfSinglePart.printSchema()
dfSinglePart.show()

#9. PySpark SQL – Read Partition Data
dfSinglePart=spark.read.option("header",True) \
            .csv("sample_data/california_housing_train_partiton_test_repartiton_2")
dfSinglePart.createOrReplaceTempView("ZIPCODE")
spark.sql("select * from ZIPCODE where latitude='32.56'") \
    .show()


Q.  PySpark Window Functions?
PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.

PySpark SQL supports three kinds of window functions:
1.  ranking functions
2.  analytic functions
3.  aggregate functions
df.show(truncate=False)
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+
# row_number() window function is used to give the sequential row number
# starting from 1 to the result of each window partition
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+


# rank() window function is used to provide a rank to the result within a window partition.
# This function leaves gaps in rank when there are ties.
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

# dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps.
# This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

# percent_rank Window Function
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()


Q.    Read csv files?
df2 = spark.read.option("header",True) \
     .csv("sample_data/california_housing_test.csv") \
     .show(5, truncate=False)


Q.  Read multiple csv files?
# 1.2 Read Multiple CSV Files**
# df = spark.read.csv("path1,path2,path3")
###########################################

df = spark.read.options(header=True) \
    .csv(["sample_data/california_housing_test.csv", "sample_data/mnist_test.csv"]) \
    .show(5, truncate=False)

###########################################
# 1.3 Read all CSV Files in a Directory
# df = spark.read.csv("Folder path")
###########################################
df = spark.read.options(header=True) \
    .csv(["sample_data/"]) \
    .show(5, truncate=False)

###########################################
# 2.1 delimiter
#
###########################################
df3 = spark.read.options(delimiter=',') \
  .csv(["sample_data/california_housing_test.csv"]) \
  .show(5, truncate =False)

###########################################
# 2.2 inferSchema
#
###########################################
df4 = spark.read.options(inferSchema='True',delimiter=',') \
  .csv(["sample_data/california_housing_test.csv"]) \
  .show(5, truncate =False)

###########################################
# 2.3 header
# This option is used to read the first line of the CSV file as column names.
# By default the value of this option is False.
###########################################
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("sample_data/california_housing_test.csv") \
  .show(5, truncate =False)


Q. Write to DF to csv file?
###########################################
# 5. Write PySpark DataFrame to CSV file
#
###########################################
df3 = spark.read.options(header='True') \
  .csv("sample_data/california_housing_test.csv")
df3.write.options(header='True', delimiter=',') \
 .csv("My_sample_data/")

 ###########################################
# 5.2 Saving modes
# PySpark DataFrameWriter also has a method mode() to specify saving mode.

# overwrite – mode is used to overwrite the existing file.
# append – To add the data to the existing file.
# ignore – Ignores write operation when the file already exists.
# error – This is a default option when the file already exists, it returns an error.
###########################################

df3.write.mode('overwrite').csv("My_sample_data/")



Q.  Woork on parquest files?
# Write DataFrame to Parquet file format
df.write.parquet("My_sample_data/people.parquet")

# Pyspark Read Parquet file into DataFrame
###########################################
#
parquetDF = spark.read.parquet("My_sample_data/people.parquet")
parquetDF.count()

# Append or Overwrite an existing Parquet file
#
df.write.mode('append').parquet("My_sample_data/people.parquet")

parquetDF = spark.read.parquet("My_sample_data/people.parquet")
parquetDF.count()

# Executing SQL queries DataFrame
#
parquetDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show()

# Creating a table on Parquet file
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"My_sample_data/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

# Create Parquet partition file
#
df.write.partitionBy("gender","salary").mode("overwrite").parquet("My_sample_data/people2.parquet")

# Retrieving from a partitioned Parquet file
#
parDF2=spark.read.parquet("My_sample_data/people2.parquet/gender=M")
parDF2.show(truncate=False)

# Creating a table on Partitioned Parquet file
#
# spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"people2.parquet/gender=F\")")
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"My_sample_data/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()


Q.  work on json files?
# Read JSON file into dataframe
df = spark.read.json("sample_data/anscombe.json")

# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("sample_data/anscombe.json")

# Read multiple files
df2 = spark.read.json(
    ['sample_data/anscombe.json','sample_data/anscombe_1.json'])

# Read all JSON files from a folder
df3 = spark.read.json("sample_data/*.json")
df3.show(5, truncate=False)

# Read JSON file using PySpark SQL
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
      " (path 'sample_data/anscombe.json')")
spark.sql("select * from zipcode").show(5, truncate=False)

# Write PySpark DataFrame to JSON file
df3.write.json("My_sample_data/anscombe.json")

# PySpark Saving modes
# PySpark DataFrameWriter also has a method mode() to specify SaveMode;
# the argument to this method either takes overwrite, append, ignore, errorifexists.
#
df3.write.mode('Overwrite').json("My_sample_data/anscombe.json")


Q.  Bring non null values from DataFrames using coalesce?
#
joined_df = df1.join(df2, df1.citY_1 == df2.citY_2, 'fullouter')
joined_df.show(truncate=False)
O/p:
+-------+--------------+------+--------------+
|citY_1 |Pop_in_million|citY_2|Pop_in_million|
+-------+--------------+------+--------------+
|bng    |2             |null  |null          |
|chennai|3             |null  |null          |
|null   |null          |coiam |5             |
|hyd    |1             |hyd   |1             |
|null   |null          |mysore|4             |
+-------+--------------+------+--------------+
from pyspark.sql import Row, Window, functions as F
joined_df = joined_df.withColumn('new_city_names', F.coalesce(df1.citY_1, df2.citY_2))
joined_df.show()
O/p:
+-------+--------------+------+--------------+--------------+
| citY_1|Pop_in_million|citY_2|Pop_in_million|new_city_names|
+-------+--------------+------+--------------+--------------+
|    bng|             2|  null|          null|           bng|
|chennai|             3|  null|          null|       chennai|
|   null|          null| coiam|             5|         coiam|
|    hyd|             1|   hyd|             1|           hyd|
|   null|          null|mysore|             4|        mysore|
+-------+--------------+------+--------------+--------------+