Tightly-Coupled Analytics with Spark

Tightly-Coupled Analytics with Spark 2018-11-08T22:53:25+00:00

Data engineers and data scientists are becoming increasingly comfortable with new scale-out architectures such as Spark using flexible programming languages such as Scala and Python to operate the data. Often these developers use new notebook technologies such as Apache Zeppelin or Jupyter to create and share documents that contain live code, equations, visualizations and explanatory text. They use notebooks for data cleaning and transformation, numerical simulation, statistical modeling, machine learning and much more.

We deliver a Splice Machine interpreter and Spark integration tightly embedded within Zeppelin notebooks. This enables data scientists to simultaneous benefit from the libraries delivered with Spark such as MLlib and Spark Streaming but get the ACID benefits and superior optimization of Splice Machine.

The Splice Machine Native Spark DataSource

The Splice Machine Native Spark DataSource provides dramatic performance improvements for large scale data operations. Why? Because it works directly on native DataFrames and RDDs, eliminating the need to serialize data over the wire using the JDBC protocol.

Spark is optimized to work on DataFrames, which are the main structure used by Spark. A DataFrame is a distributed collection of data (an RDD) organized into named columns, with a schema that specifies data types, that is designed to support efficiently operating on scalable, massive datasets.

The Splice Machine DataSource is native to Spark, which means that it operates directly on these DataFrames and in the same Spark executors that your programs are using to analyze or transform the data. Instead of accessing, inserting, or manipulating data one record at a time over a serialized connection, you can now use the Splice Machine Native Spark DataSource to pull the contents of an entire DataFrame into your database, and to push database query results into a DataFrame.

We’ve seen 100x performance increases compared to using JDBC for operations such as inserting millions of records in a database!

JDBC connections requires a serialization and deserialization one record at a time. When you use the Splice Machine Native Spark DataSource, the contents of the database table are typically sitting in a DataFrame in memory that resides on the same Spark executor that’s performing the query. The query takes place in memory, and there’s no need to serialize the results over a wire. Similarly, when the app sends updates to the database, the data is inserted into the database directly from the DataFrame. As a result, a great deal of overhead is eliminated, and performance gains can be remarkable.

This makes Machine Learning native to the database. For instance, here is a notebook example of using a cross-validated, logistic regression model on supply chain order data to predict how late an order will be. First we label the examples with SQL:

CREATE table ASN.FEATURES AS
    SELECT 
    SHIPMENTID,
    STATUS,
    SHIPMODE,
    PRODUCT_DESCRIPTION,
    CONSIGNEE,
    SHIPPER,
    ARRIVAL_DATE,
    GROSS_WEIGHT_LB,
    GROSS_WEIGHT_KG,
    FOREIGN_PORT,
    US_PORT,
    VESSEL_NAME,
    COUNTRY_OF_ORIGIN,
    CONSIGNEE_ADDRESS,
    SHIPPER_ADDRESS,
    ZIPCODE,
    NO_OF_CONTAINERS,
    CONTAINER_NUMBER,
    CONTAINER_TYPE,
    QUANTITY,
    QUANTITY_UNIT,
    MEASUREMENT,
    MEASUREMENT_UNIT,
    BILL_OF_LADING,
    HOUSE_VS_MASTER,
    DISTRIBUTION_PORT,
    MASTER_BL,
    VOYAGE_NUMBER,
    SEAL,
    SHIP_REGISTERED_IN,
    INBOND_ENTRY_TYPE,
    CARRIER_CODE,
    CARRIER_NAME,
    CARRIER_CITY,
    CARRIER_STATE,
    CARRIER_ZIP,
    CARRIER_ADDRESS,
    NOTIFY_PARTY,
    NOTIFY_ADDRESS,
    PLACE_OF_RECEIPT,
    DATE_OF_RECEIPT,
    CASE
    WHEN ASN.SHIPMENT_HISTORY.QUANTITY > 10
    THEN
        CASE
            WHEN ASN.SHIPMENT_HISTORY.QUANTITY > 100
            THEN
                CASE
                    WHEN ASN.SHIPMENT_HISTORY.QUANTITY > 1000
                    THEN 3
                    ELSE 2
                END
            ELSE 1
    END
    ELSE 0
    END AS QUANTITY_BIN,
    ASN.SHIPMENT_HISTORY.DATE_OF_RECEIPT - ASN.SHIPMENT_HISTORY.ARRIVAL_DATE as LATENESS,
    CASE
    WHEN  ASN.SHIPMENT_HISTORY.DATE_OF_RECEIPT - ASN.SHIPMENT_HISTORY.ARRIVAL_DATE > 0
    THEN
        CASE
            WHEN  ASN.SHIPMENT_HISTORY.DATE_OF_RECEIPT - ASN.SHIPMENT_HISTORY.ARRIVAL_DATE > 5
            THEN
                CASE
                    WHEN  ASN.SHIPMENT_HISTORY.DATE_OF_RECEIPT - ASN.SHIPMENT_HISTORY.ARRIVAL_DATE > 10
                    THEN 3
                    ELSE 2
                END
            ELSE 1
    END
    ELSE 0
    END AS LABEL
FROM ASN.SHIPMENT_HISTORY

Then we use the following snippet of Python code to build, train, and apply the model.

# Setup our SpliceContext
splice = PySpliceContext(defaultJDBCURL, sqlContext)

# Query Features
df = splice.df("select * from ASN.Features")
            "carrier_address",
            "notify_party",
            "notify_address",
            "place_of_receipt",
            "date_of_receipt",
            "quantity_bin",
            "lateness",
            "label"]

# Assemble Vectors
assembler = VectorAssembler(inputCols=[
    "consigneeIndex",
    "shipperIndex",
    "gross_weight_lbIndex",
    "foreign_portIndex",
    "us_portIndex",
    "vessel_nameIndex",

    "container_numberIndex",
    "container_typeIndex",
    "ship_registered_inIndex",
    "carrier_codeIndex",
    "carrier_cityIndex",
    "notify_partyIndex",
    "place_of_receiptIndex",
    "zipcodeIndex",
    "quantity_bin"
], outputCol='features')

# Transform strings into numbers
zipcodeIndexer = StringIndexer(inputCol="zipcode", outputCol="zipcodeIndex", handleInvalid="skip")
consigneeIndexer = StringIndexer(inputCol="consignee", outputCol="consigneeIndex", handleInvalid="skip")
shipperIndexer = StringIndexer(inputCol="shipper", outputCol="shipperIndex", handleInvalid="skip")
statusIndexer = StringIndexer(inputCol="status", outputCol="statusIndex", handleInvalid="skip")
shipmodeIndexer = StringIndexer(inputCol="shipmode", outputCol="shipmodeIndex", handleInvalid="skip")
gross_weight_lbIndexer = StringIndexer(inputCol="gross_weight_lb", outputCol="gross_weight_lbIndex",
                                       handleInvalid="skip")
foreign_portIndexer = StringIndexer(inputCol="foreign_port", outputCol="foreign_portIndex", handleInvalid="skip")
us_portIndexer = StringIndexer(inputCol="us_port", outputCol="us_portIndex", handleInvalid="skip")
vessel_nameIndexer = StringIndexer(inputCol="vessel_name", outputCol="vessel_nameIndex", handleInvalid="skip")
country_of_originIndexer = StringIndexer(inputCol="country_of_origin", outputCol="country_of_originIndex",
                                         handleInvalid="skip")
container_numberIndexer = StringIndexer(inputCol="container_number", outputCol="container_numberIndex",
                                        handleInvalid="skip")
container_typeIndexer = StringIndexer(inputCol="container_type", outputCol="container_typeIndex",
                                      handleInvalid="skip")
distribution_portIndexer = StringIndexer(inputCol="distribution_port", outputCol="distribution_portIndex",
                                         handleInvalid="skip")
ship_registered_inIndexer = StringIndexer(inputCol="ship_registered_in", outputCol="ship_registered_inIndex",
                                          handleInvalid="skip")
inbond_entry_typeIndexer = StringIndexer(inputCol="inbond_entry_type", outputCol="inbond_entry_typeIndex",
                                         handleInvalid="skip")
carrier_codeIndexer = StringIndexer(inputCol="carrier_code", outputCol="carrier_codeIndex", handleInvalid="skip")
carrier_cityIndexer = StringIndexer(inputCol="carrier_city", outputCol="carrier_cityIndex", handleInvalid="skip")
carrier_stateIndexer = StringIndexer(inputCol="carrier_state", outputCol="carrier_stateIndex", handleInvalid="skip")
carrier_zipIndexer = StringIndexer(inputCol="carrier_zip", outputCol="carrier_zipIndex", handleInvalid="skip")
notify_partyIndexer = StringIndexer(inputCol="notify_party", outputCol="notify_partyIndex", handleInvalid="skip")
place_of_receiptIndexer = StringIndexer(inputCol="place_of_receipt", outputCol="place_of_receiptIndex",
                                        handleInvalid="skip")

lr = LogisticRegression(maxIter=30, labelCol="label", featuresCol="features", regParam=0.3)

lrPipeline = Pipeline(stages=
                      [consigneeIndexer,
                       shipperIndexer,
                       shipmodeIndexer,
                       gross_weight_lbIndexer,
                       foreign_portIndexer,
                       us_portIndexer,
                       vessel_nameIndexer,
                       country_of_originIndexer,
                       container_numberIndexer,
                       container_typeIndexer,
                       ship_registered_inIndexer,
                       carrier_codeIndexer,
                       carrier_cityIndexer,
                       notify_partyIndexer,
                       place_of_receiptIndexer,
                       zipcodeIndexer,
                       assembler,
                       lr]
                      )
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=lrPipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best parameters to demonstate GridSearch in Spark MLlib
lrModel = crossval.fit(df)

transformed_df = lrModel.transform(df)

Even streaming is simplified and performant with the Splice Machine Native Spark DataSource. Each micro-batch, represented as a dataframe, is simply inserted into the database with one operation. Check out this example of streaming weather information.

%spark
// Our Kafka topic
val topicName = "WeatherForecast"

// Our Kafka broker
val brokerList = "ec2-xxxxxxxxxxx.compute-1.amazonaws.com:9092"

// Splice Machine table we are populating
val SPLICE_TABLE_NAME = "WEATHER.FORECAST"

// Forecast schema
val schema = new StructType()
      .add("CITY_ID", LongType)
      .add("AS_OF_TS", TimestampType)
      .add("FORECAST_TS", TimestampType)
      .add("WEATHER_CODE_ID", LongType)

// Set Kafka Queue parameters
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokerList,
    "group.id"-> "cg",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean),
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer]
    )

val splicemachineContext = new SplicemachineContext(JDBC_URL)

// Create Streaming Context
val ssc = new StreamingContext(sc, Seconds(5))

// Stop gracefully when driver is stopped
 sys.ShutdownHookThread {
      ssc.stop(true, true)
  }

// Create Direct Stream
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](Array(topicName), kafkaParams)
)

//Parse the queue messages
val toPair = stream.map(record => (record.key, record.value))
val msgs = toPair.map(_._2)

msgs.foreachRDD { rdd =>
   val df = spark.read.schema(schema).json(rdd)
   if(df.count > 0)
        splicemachineContext.insert(df, SPLICE_TABLE_NAME)
    }

ssc.start()
ssc.awaitTerminationOrTimeout(collectionTime)
ssc.stop(stopSparkContext = false, stopGracefully = true)
Next Chapter