In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('test').getOrCreate()
spark

In [54]:
df = spark.read.csv('heart.csv', header=True, inferSchema=True)
df

DataFrame[Age: int, Sex: string, ChestPainType: string, RestingBP: int, Cholesterol: int, FastingBS: int, RestingECG: string, MaxHR: int, ExerciseAngina: string, Oldpeak: double, ST_Slope: string, HeartDisease: int]

In [55]:
type(df)

pyspark.sql.dataframe.DataFrame

In [56]:
df.head(5)

[Row(Age=40, Sex='M', ChestPainType='ATA', RestingBP=140, Cholesterol=289, FastingBS=0, RestingECG='Normal', MaxHR=172, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0),
 Row(Age=49, Sex='F', ChestPainType='NAP', RestingBP=160, Cholesterol=180, FastingBS=0, RestingECG='Normal', MaxHR=156, ExerciseAngina='N', Oldpeak=1.0, ST_Slope='Flat', HeartDisease=1),
 Row(Age=37, Sex='M', ChestPainType='ATA', RestingBP=130, Cholesterol=283, FastingBS=0, RestingECG='ST', MaxHR=98, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0),
 Row(Age=48, Sex='F', ChestPainType='ASY', RestingBP=138, Cholesterol=214, FastingBS=0, RestingECG='Normal', MaxHR=108, ExerciseAngina='Y', Oldpeak=1.5, ST_Slope='Flat', HeartDisease=1),
 Row(Age=54, Sex='M', ChestPainType='NAP', RestingBP=150, Cholesterol=195, FastingBS=0, RestingECG='Normal', MaxHR=122, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0)]

In [57]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [58]:
df.columns

['Age',
 'Sex',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease']

In [59]:
df.show()

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

In [60]:
df.select('Age', 'Sex').show()

+---+---+
|Age|Sex|
+---+---+
| 40|  M|
| 49|  F|
| 37|  M|
| 48|  F|
| 54|  M|
| 39|  M|
| 45|  F|
| 54|  M|
| 37|  M|
| 48|  F|
| 37|  F|
| 58|  M|
| 39|  M|
| 49|  M|
| 42|  F|
| 54|  F|
| 38|  M|
| 43|  F|
| 60|  M|
| 36|  M|
+---+---+
only showing top 20 rows



In [61]:
# Is just a mere column
df['Cholesterol']

Column<'Cholesterol'>

In [62]:
df.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int')]

In [63]:
df.describe()

DataFrame[summary: string, Age: string, Sex: string, ChestPainType: string, RestingBP: string, Cholesterol: string, FastingBS: string, RestingECG: string, MaxHR: string, ExerciseAngina: string, Oldpeak: string, ST_Slope: string, HeartDisease: string]

In [64]:
df.describe().show()

+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age| Sex|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918| 918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|null|         null|132.39651416122004| 198.7995642701525|0.23311546840958605|      null|136.80936819172112|          null|0.8873638344226581|    null| 0.5533769063180828|
| std

In [65]:
# Adding columns
df = df.withColumn('dummy1', df['age']<50)

In [66]:
df.show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|dummy1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|  true|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|  true|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|  true|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|  true|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0| false|


In [67]:
# Drop column
df = df.drop('dummy1')
df.show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

In [68]:
# dropping null
df = df.na.drop()

In [69]:
# fill na
df = df.na.fill('NA')
df.show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

In [70]:
# mean imputation
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age'],
    outputCols = ['{}_i'.format(c) for c in ['Age']]
).setStrategy('mean')

In [73]:
df = imputer.fit(df).transform(df)
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age_i|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|   40|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|   49|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|   37|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|   48|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|   54|
+---+---

In [75]:
# filter operation
df.filter('age<=50').show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age_i|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|   40|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|   49|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|   37|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|   48|
| 39|  M|          NAP|      120|        339|        0|    Normal|  170|             N|    0.0|      Up|           0|   39|
| 45|  F

In [76]:
df.filter('age<=50').select(['age', 'sex', 'Cholesterol']).show(10)

+---+---+-----------+
|age|sex|Cholesterol|
+---+---+-----------+
| 40|  M|        289|
| 49|  F|        180|
| 37|  M|        283|
| 48|  F|        214|
| 39|  M|        339|
| 45|  F|        237|
| 37|  M|        207|
| 48|  F|        284|
| 37|  F|        211|
| 39|  M|        204|
+---+---+-----------+
only showing top 10 rows



In [92]:
# same output
df.filter((df['age'] >= 55) & (df['Cholesterol'] >= 300)).show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age_i|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 65|  M|          ASY|      140|        306|        1|    Normal|   87|             Y|    1.5|    Flat|           1|   65|
| 59|  M|          NAP|      130|        318|        0|    Normal|  120|             Y|    1.0|    Flat|           0|   59|
| 59|  F|          ASY|      130|        338|        1|        ST|  130|             Y|    1.5|    Flat|           1|   59|
| 58|  F|          ATA|      180|        393|        0|    Normal|  110|             Y|    1.0|    Flat|           1|   58|
| 56|  M|          ASY|      170|        388|        0|        ST|  122|             Y|    2.0|    Flat|           1|   56|
| 56|  M

In [86]:
# not condition
df.filter(~((df['age'] >= 50) | (df['Cholesterol'] >= 250))).show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age_i|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|   49|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|   48|
| 45|  F|          ATA|      130|        237|        0|    Normal|  170|             N|    0.0|      Up|           0|   45|
| 37|  M|          ASY|      140|        207|        0|    Normal|  130|             Y|    1.5|    Flat|           1|   37|
| 37|  F|          NAP|      130|        211|        0|    Normal|  142|             N|    0.0|      Up|           0|   37|
| 39|  M

In [93]:
# groupby
df.groupBy('Sex')

<pyspark.sql.group.GroupedData at 0x2a6d5d3a108>

In [95]:
# always: groupby + aggregate function
df.groupBy('Sex').sum().show(10)

+---+--------+--------------+----------------+--------------+----------+------------------+-----------------+----------+
|Sex|sum(Age)|sum(RestingBP)|sum(Cholesterol)|sum(FastingBS)|sum(MaxHR)|      sum(Oldpeak)|sum(HeartDisease)|sum(Age_i)|
+---+--------+--------------+----------------+--------------+----------+------------------+-----------------+----------+
|  F|   10131|         25517|           46551|            26|     28205|129.10000000000002|               50|     10131|
|  M|   38992|         96023|          135947|           188|     97386| 685.5000000000001|              458|     38992|
+---+--------+--------------+----------------+--------------+----------+------------------+-----------------+----------+



In [100]:
df.select('Sex', 'HeartDisease').groupBy('Sex').mean().show()

+---+-------------------+
|Sex|  avg(HeartDisease)|
+---+-------------------+
|  F|0.25906735751295334|
|  M| 0.6317241379310344|
+---+-------------------+



In [101]:
df.select('Sex', 'ChestPainType', 'HeartDisease').groupby('Sex', 'ChestPainType').mean().show()

+---+-------------+-------------------+
|Sex|ChestPainType|  avg(HeartDisease)|
+---+-------------+-------------------+
|  M|          ASY| 0.8286384976525821|
|  M|           TA| 0.5277777777777778|
|  F|           TA|                0.1|
|  F|          ATA|0.06666666666666667|
|  M|          ATA|0.17699115044247787|
|  F|          NAP|0.11320754716981132|
|  M|          NAP|               0.44|
|  F|          ASY| 0.5571428571428572|
+---+-------------+-------------------+



In [102]:
df.groupBy('Sex').count().show()

+---+-----+
|Sex|count|
+---+-----+
|  F|  193|
|  M|  725|
+---+-----+



In [103]:
# pyspark ML
df.show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age_i|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|   40|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|   49|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|   37|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|   48|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|   54|
| 39|  M

In [104]:
df.columns

['Age',
 'Sex',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease',
 'Age_i']

In [117]:
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=['Age', 'Cholesterol', 'MaxHR'], outputCol='linregvector')
partial_df = va.transform(df)

In [118]:
partial_df = partial_df.select('linregvector', 'HeartDisease')
partial_df.show(5)

+------------------+------------+
|      linregvector|HeartDisease|
+------------------+------------+
|[40.0,289.0,172.0]|           0|
|[49.0,180.0,156.0]|           1|
| [37.0,283.0,98.0]|           0|
|[48.0,214.0,108.0]|           1|
|[54.0,195.0,122.0]|           0|
+------------------+------------+
only showing top 5 rows



In [121]:
from pyspark.ml.regression import LinearRegression
train, test = partial_df.randomSplit([0.8, 0.2])
reg = LinearRegression(featuresCol='linregvector', labelCol='HeartDisease')
reg = reg.fit(train)

In [122]:
reg.coefficients

DenseVector([0.0075, -0.0007, -0.0065])

In [127]:
preds = reg.evaluate(test)
preds.predictions.show()

+------------------+------------+-------------------+
|      linregvector|HeartDisease|         prediction|
+------------------+------------+-------------------+
|[31.0,270.0,153.0]|           1|0.23378003434147465|
|[32.0,225.0,184.0]|           0|0.06999129351883981|
|[34.0,182.0,174.0]|           0|0.17764327934494628|
|[34.0,214.0,168.0]|           0|0.19560299548239402|
|  [35.0,0.0,130.0]|           1| 0.5884572609894168|
|[35.0,183.0,182.0]|           0|0.13268425253787974|
|[35.0,264.0,168.0]|           0|0.17046113262256912|
|[35.0,308.0,180.0]|           0|0.06412926600709068|
|[36.0,267.0,160.0]|           1|0.22772526423494832|
|  [38.0,0.0,128.0]|           1| 0.6237676066537855|
|[38.0,190.0,150.0]|           1|0.35754412753864584|
|[38.0,282.0,170.0]|           1|0.16815429946632832|
|[38.0,289.0,105.0]|           1| 0.5841693434030422|
|[39.0,147.0,160.0]|           0|0.32833076150693075|
|[39.0,182.0,180.0]|           0|0.17610305155880968|
|[39.0,241.0,146.0]|        

In [128]:
preds.meanSquaredError

0.20845707856697832