Data Preparation
Overview
This section explores, cleans and scales the data to prepare them for a cluster analysis that identifies different weather patterns for a weather station in San Diego, CA using k-means.
The dataset is described and imported in the previous section.
The analysis is discussed in the next section.
This project is based on assignments from Machine Learning With Big Data by University of California San Diego on Coursera.
The analysis for this project was performed in Spark.
Data Exploration, Cleaning and Scaling
The imported dataset includes over 1.5 million rows, as indicated by df.count()
. For the purpose of this analysis a smaller dataset was used that contains only one-tenth of the data. The following code creates such a subset of data by keeping every 10th row in the subset and dropping all the other rows:
filteredDF = df.filter((df.rowID % 10) == 0)
filteredDF.count()
The number of rows in the subset is 158,726.
Below are the summary statistics:
filteredDF.describe().toPandas().transpose()
summary count mean stddev min max
rowID 158726 793625.0 458203.93 0 1587250
air_pressure 158726 916.83 3.05 905.0 929.5
air_temp 158726 61.85 11.83 31.64 99.5
avg_wind_direction 158680 162.15 95.27 0.0 359.0
avg_wind_speed 158680 2.77 2.05 0.0 31.9
max_wind_direction 158680 163.46 92.45 0.0 359.0
max_wind_speed 158680 3.40 2.41 0.1 36.0
rain_accumulation 158725 3.18E-4 0.01 0.0 3.12
rain_duration 158725 0.40 8.66 0.0 2960.0
relative_humidity 158726 47.60 26.21 0.9 93.0
The low average values for rain accumulation and duration in this dataset suggest that the data were collected during a dry period. The code below outputs the counts of days when the values of rain accumulation and duration are 0:
filteredDF.filter(filteredDF.rain_accumulation == 0.0).count()
filteredDF.filter(filteredDF.rain_duration == 0.0).count()
For rain accumulation the count is 157,812 days, while for rain duration the count is 157,237 days, which are almost all the days in the sample. Since the values for these features are almost all 0 (i.e., very limited variation in the data) and for the purpose of speeding up the analyses, these features are dropped from the DataFrame. We also drop the hpwren_timestamp feature since it is not used in the analysis, as well as rowID since it is the row number:
workingDF = filteredDF.drop('rain_accumulation').drop('rain_duration').drop('hpwren_timestamp').drop('rowID')
Next, we drop rows with missing values and count the number of rows that were dropped:
before = workingDF.count()
workingDF = workingDF.na.drop()
after = workingDF.count()
before - after
The code above indicates that 46 rows in the workingDF
data frame had missing values in at least one feature, before these rows were dropped.
Next, we combine the remaining features into a single vector column. Let’s create an array of the columns we want to combine, and use VectorAssembler to create the vector column:
featuresUsed = ['air_pressure',
'air_temp',
'avg_wind_direction',
'avg_wind_speed',
'max_wind_direction',
'max_wind_speed',
'relative_humidity']
assembler = VectorAssembler(inputCols = featuresUsed, outputCol = 'features_unscaled')
assembled = assembler.transform(workingDF)
Finally, since the features have different scales (e.g., air temperature ranges from 31.6 to 99.5, while air pressure ranges from 905.0 to 929.5), they need to be scaled. We scale them using StandardScaler()
so that each feature has the mean of 0 and the standard deviation of 1:
scaler = StandardScaler(inputCol = 'features_unscaled', outputCol = 'features', withStd = True, withMean = True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)
Next step: Analysis