Data Acquisition

Overview

The steps outlined below describe a fully automated process of connecting to a weather station that transmits streaming data generated by sensors, and continuously extracting these data using a sliding window for further processing.

Processing of the data and saving them to a file is discussed in the next section.

This project is based on assignments from Big Data Specialization by University of California San Diego on Coursera.

The analysis for this project was performed in Spark.

Data

Below is an example of the streaming measurements coming from the weather station:

1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

Each line consists of a timestamp and a set of measurements. Each measurement has an abbreviation, such as Dn, Dm, Sn, etc. The dictionary of all the abbreviations is provided below:

Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
Dn      Wind direction minimum deg #, D
Dm      Wind direction average deg #, D
Dx      Wind direction maximum deg #, D
Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
Ta      Air temperature °C, °F #, C, F
Tp      Internal temperature °C, °F #, C, F
Ua      Relative humidity %RH #, P
Rc      Rain accumulation mm, in #, M, I
Rd      Rain duration s #, S
Ri      Rain intensity mm/h, in/h #, M, I
Rp      Rain peak intensity mm/h, in/h #, M, I
Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
Hd      Hail duration s #, S
Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
Th      Heating temperature °C, °F #, C, F
Vh      Heating voltage V #, N, V, W, F2
Vs      Supply voltage V V
Vr      3.5 V ref. voltage V V

We extract only the data for the average wind direction (which has an abbreviation Dm) which is further processed in the next section.

First, we define a function that parses each line of the streaming data (such as 1419408015 0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M) and returns the average wind direction (Dm):

import re

def parse(line):

    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
        
    return []

Next, we import and create a new instance of Spark’s StreamingContext:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)                                

The argument sc is the SparkContext, and 1 specifies a batch interval of one second (i.e., each sliding 10-second window as defined below will have approximately 10 lines of data).

A connection to the streaming weather data is opened as follows:

lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12020)

This creates a new variable lines to be a Spark DStream that streams the lines of output from the weather station.

Then we read the average wind speed from each line and store it in a new DStream vals:

vals = lines.flatMap(parse)

The flatMap() function iterates over the lines transmitted by the sensors and calls the parse() function defined above in order to extract the average wind speed from each line.

Next, we create a sliding window over the measurements by calling the window() method:

window = vals.window(10, 5)

This creates a sliding window that combines the ten seconds worth of data and moves by five seconds.

Next step: Data Preparation