Spark Structured Streaming with ExpressionEncoder[gBoxADTBase[_]]

Summary

The previous post glanced through an introductory exercise to use gBoxADTBase types with Apache Spark framework. The example considered a streaming API based implementation, where in a custom receiver is implemented to handle deserialization of HOCON strings. These are serialized versions of some gBoxADTBase type objects. The receiver is registered on the RDD stream as a converter. The converter reads in-bound streams using a new-line delimiter, and for each captured string applies the `hoconToType[T]` transformation. Just to refresh the memories, the canonical definition is `gBoxCPETLSocketStreamIterator[T]`, where the iterator’s next() call will return an `Either[Throwable, O]`. Here the type parameter O is any O <: gBoxADTBase[O]. The post concluded with a future to do similar integration with the Structured Streaming API, but with a bit more than a hello-world. Hence this post!

TL;DR

For a given T<:gBoxADTBase[_], produce ExpressionEncoder[gBoxADTBase[_]] instance, and use it inside Spark Structured Streaming API.

Context

To paraphrase the formal definition, the Structured Streaming API provides flow semantics using which one can enforce structure over the data in the context and type-safety of operations. Example, application of relational operators where the type-wise references to the data are made. The view projected from the stream can be considered as a typical relational database table, except that the records would form an unbounded set or stream.

Few vocabulary items to consider before dwelling further include:

  • DataSet: typed representation of domain specific data sets
  • DataFrame: structured representation of data sets, with the default being DataSet[Row]
  • Row: Each in-bound record is mapped to a Row, which is an abstraction over a set of fields.
  • StructType: meta-information on domain specific types that provides semantic detail as required for the inner-workings.
  • InternalRow: internal representation of user data as it get exchanged between data processing nodes i.e., tasks inside Spark.
  • Executor – this is a compute entity that receives tasks as defined in the physical plan, translated from the logical plan.
  • Task – a task is a step in the data processing logic as it must be executed over the in-bound data.

Spark is a distributed data processing engine, and it ships code i.e., lambdas to executors, to be applied as tasks over data. Nature of the data source in combination with routes within the transformation logic will drive the level of parallelism. Besides, the plan analyzer employs various cost optimization techniques to further the processing efficiencies. The later however is greatly dependent on something called Schema (a StructType/DataType). The structure represents the semantic composition of the underlying data sets, allowing Spark to apply selective actions (operators, serialization, deserialization, etc). It leverages columnar methods. Think about relational operators such as select, group-by, and reducing functions such as count, avg, etc., that act on commonly identified sub-sets within the given data set. When Spark need to run such compute, if we do not overlay a structure that allows it to dig deep, the whole objects are serialized and deserialized. Effectively slowing down the throughput i.e., increase in latency.

Approach

That brings me to the core aspect of this post – application of gBoxADTBAse types as part of the structured streaming processing compute. Following the expectations of the API constructs. Review the code snippet below:

val materializedADTDF: DataFrame = sparkSession
      .readStream
      .format("socket")
      .option("host", socketCfg.hostName)
      .option("port", socketCfg.portNumber)
      .load()
      .as[String]
      .mapPartitions {
          //gBoxADTBase deserialization code goes here!
      }(gBoxADTTypeEncoder.encocderForADTFromFQCN(inputADTTypeFQCNBCastRef.value).get).toDF()

Two interesting parts in the above code are – format("socket") and as[String]. Desire that I started with are to achieve a custom source like socket2gBoxADT, and as[gBoxADTBase]. It however does require a bit more involved extension work. Simply put, a custom data source implementation similar to the custom Receiver that I have presented in my earlier post. Once a receiver materializes the objects as gBoxADTBase[_] type and enables them for downstream access via the DataFrame API, the constructs like as[gBoxADTBase[_]] are a achievable. The `as[_]` does not have much intelligence than to expose the underlying stream as a view of the corresponding type that you pass to it as a type parameter. See the code snippet from Spark code base on Git:

@Experimental
@InterfaceStability.Evolving
def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)

Here’s the link to the source. Hope you’d like to ask that question too! Why not simplify the approach. Yes, that’s the next User Story that I intend to work and I’m very excited about it.

For now, the approach that I have taken is to leverage existing capabilities around Sockets, gain access to serialized HOCON string as a DataSet[String] and move on to generating the corresponding ADT for each record that comes through the stream. I however taken care not to repeat my self for each data pipe that I wanted to implement. Instead introduced a trait that sets the base foundations of the flow semantics and leaves just the business function implementation for repeat use cases, where Spark Structured Streams is the solution choice.

gBoxADTTypeEncoder

Before checking out the boiler-plate that abstracts repeat configuration items like stream initialization, etc., let’s take a look at the kicker in the game – the gBoxADTTypeEncoder. Here’s the signature and definition of the corresponding method (a bit of verbose code; excuse me there):

def encocderForADTFromFQCN(classFQCN: String): Try[ExpressionEncoder[gBoxADTBase[_]]] = Try {
    val adtTTag: ru.TypeTag[gBoxADTBase[_]] = if (classFQCN.charAt(classFQCN.length - 1) == '

Boiler plate – Stream Initialization & Deserialization

Here’s the boiler plate from the trait – gBoxCPETLStructuredStreamingApp, defined as:

trait gBoxCPETLStructuredStreamingApp[T <: gBoxADTBase[T]]!

One key aspect that I’d like to highlight is the inability to serialize types. In an ideal world, I’d like to achieve the support to indicate in-bound gBoxADTBase type using a type parameter set on the base trait. The notion in the above trait only lasts to support few basic signatures. Anything to do with lambdas must happen inside the lambda or on its reference i.e., implicit evidences.  Since I cannot pass types which requires the whole of the driver code to be serialized and pushed to each task, I’ve considered the approach to set some broad cast variables. Here are those variables:

  • Fully Qualified Class Name (FQCN) such that gBoxTypeHelper can be used to establish corresponding type tag reference of the ADT in the context.  Type tag is necessary in two places – gBoxSerDeImplicits#hoconToADT that deserializes String to corresponding ADT, and for the gBoxADTTypeEncoder#encocderForADTFromFQCN that generates the corresponding ExpressionEncoder[gBoxADTBase[_]] evidence. See the code snippet below (boiler-plate further down here).
  • Serialized version of gBoxCPCommonsCacheImplConfig (remember it’s also a gBoxADTBase type), such that the cache configuration is localized to each executor’s environment that handles a partition of data. Use of cached type-meta will speed up processing the partitioned in-bound data sets.

And, now to the actual boiler plate in two segments – stream initialization and ADT materialization (i.e., deserialization):

  • Part that initializes the socket stream:
sparkSession
      .readStream
      .format("socket")
      .option("host", socketCfg.hostName)
      .option("port", socketCfg.portNumber)
      .load()
      .as[String]
      .mapPartitions{
         //..logic to product gBoxADTBase types go here!
      }
  • And, the part that handles deserialization of in-bound records from String to gBoxADTBase:
.mapPartitions {
  partitionInStream: Iterator[String] => {
      //Initialize cache reference
      if (gboxADTTypeHelperCacheCfg.value.nonEmpty) {
        //This will not utilize the cache, as it is not yet set
        gboxADTTypeHelperCacheCfg.value.hoconToType[gBoxCPCommonsCacheImplConfig]() match {
          case Left(error: Throwable) => throw error
          //Down side of calling below will result in cache already set, which can be ignored at per-JVM
          case Right(cacheImplCfg: gBoxCPCommonsCacheImplConfig) => {
            gBoxTypeHelper.updateCacheRef(gBoxCPCommonsCacheImpl.initialize(cacheImplCfg).get)
          }
        }
      }

      //Derive gBoxADTBase[_] instance without specificity, but generality of base type
      val adtTTag = gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(inputADTTypeFQCNBCastRef.value).right.get
      val listBuffer: ListBuffer[gBoxADTBase[_]] = ListBuffer.empty[gBoxADTBase[_]]

      //Iterate through in-bound serialized records and de-serialize them to corresponding gBoxADTBase[_] type
      while (partitionInStream.hasNext) {
        partitionInStream.next().hoconToADT(adtTTag) match {
          case Left(error: Throwable) => fdc.add(1)
          case Right(adtDeserialized: gBoxADTBase[_]) =>
            listBuffer.+=(adtDeserialized); vdc.add(1)
          case Right(_) => ivdc.add(1)
        }
      }
      //Log cache utilization to the executor logs
      println(gBoxTypeHelper.getCacheStats())
      //Return the Iterator[gBoxADTBase[_]] back to Spark
      listBuffer.iterator
    }
}(gBoxADTTypeEncoder.encocderForADTFromFQCN(inputADTTypeFQCNBCastRef.value).get).toDF()

The DataFrame reference from above partition logic is then passed onto the Object that extends this trait, as a reference to the data processing function. The data processing function is represented as –  functionToActOnInputStream(materializedADTDF, sparkSession):Unit.

The implementation for example can be something like below:

materializedDF
  .withColumn("timestamp", lit(current_timestamp()))
  .select($"firstName", $"secondName", $"address.country" as "country", $"timestamp")
  .groupBy("country").count().withColumnRenamed("count", "CountByCountry")
  .orderBy("country")
  .writeStream
  .trigger(Trigger.ProcessingTime(Duration(10, TimeUnit.SECONDS)))
  .outputMode(OutputMode.Complete)
  .format("console")
  .start()
  .awaitTermination()

Parts in italicized style above can probably be refactored back to the base trait. That is the return type of the functionToActOnInputStream(materializedADTDF, sparkSession):DataSet[Row]. That way the flow semantics from end to end are handled within the base, leaving aside the piece to deal with use case requirements i.e., business logic. Note that it’s all only to specify how we’d like to execute our logic. Everything inside Spark is lazily evaluated, to the point unless a call to start() is not made. At that point the call-to-action is applied, finally processing the streams as they are ingested at the source.

Data Flow – Empirical View

Okay, unlike prior posts I have reserved the choice to describe visually towards mid to later part of the post. So here is the visual that describes the flow of events and highlights some key functions within the data pipe:

Empirical flow using ExpressionEncoder[gBoxADTBase[_]]

Sample Runtime References

Now to some samples. Will break these down to following:

  • gBoxADTBase types that represent domain objects
  • Serialized HOCON Sample
  • Schema (original and transformed)
  • Logical Plan
  • Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
  • Accumulator Counts
  • Cache Utilization

Domain Objects – ADT Definition

@ItsAGBoxADT(doesValidations = true)
case class gBoxAddress(streetNumber: Int, streetName: String, city: String, zipCode: String, state: String, country: String)

@ItsAGBoxADT(doesValidations = true)
case class gBoxPersonProfile(firstName: String, secondName: String, address: gBoxAddress)

Serialized HOCON Sample

I’ve tried with 20 record samples, with 2 of them incorrect i.e., will result in parser errors and captured during the deserialization attempt. These numbers are logged to the accumulators.  Here’s one sample HOCON:

me{ganaakruti{gboxcp{etl{structuredstreaming{spark{mock{gBoxPersonProfile{secondName ="Doe",firstName ="Name001",me{ganaakruti{gboxcp{etl{structuredstreaming{spark{mock{gBoxAddress={country ="India",state ="FirstState",zipCode ="Z12341",city ="FirstCity",streetName ="FirstStreet",streetNumber =1234}}}}}}}}}}}}}}}}

Schema (Original & Transformed)

Original

root
 |-- firstName: string (nullable = true)
 |-- secondName: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- streetNumber: integer (nullable = false)
 |    |-- streetName: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipCode: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- country: string (nullable = true)

Transformed

root
 |-- country: string (nullable = true)
 |-- CountByCountry: long (nullable = false)

Logical Plan

Sort [country#21 ASC NULLS FIRST], true
+- Project [country#21, count#32L AS CountByCountry#35L]
   +- Aggregate [country#21], [country#21, count(1) AS count#32L]
      +- Project [firstName#10, secondName#11, address#12.country AS country#21, timestamp#16]
         +- Project [firstName#10, secondName#11, address#12, current_timestamp() AS timestamp#16]
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).firstName, true, false) AS firstName#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).secondName, true, false) AS secondName#11, if (isnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address)) null else named_struct(streetNumber, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).streetNumber, streetName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).streetName, true, false), city, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).city, true, false), zipCode, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).zipCode, true, false), state, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).state, true, false), country, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).country, true, false)) AS address#12]
               +- MapPartitions , obj#9: me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile
                  +- DeserializeToObject cast(value#1 as string).toString, obj#8: java.lang.String
                     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@ca10069,socket,List(),None,List(),None,Map(host -> localhost, port -> 9999),None), textSocket, [value#1]

Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------------+
|  country|CountByCountry|
+---------+--------------+
|Argentina|             1|
|Australia|             1|
|   Brazil|             1|
|   Canada|             1|
|    India|             7|
|Indonesia|             1|
|    Japan|             1|
|    Kenya|             1|
|Singapore|             1|
|       UK|             1|
|       US|             2|
+---------+--------------+

Accumulator Counts

Accumulators i.e., Counters (Good vs Bad records)

Cache Utilization

Note – the lines below are progressive output, where you could see the miss-count value remains same. That is to say, there were about 8 objects for which type information needed to be generated, while the utilization took upward streak.

Next Stop

Much before dwelling into gBoxADTBase[_] types, I’ve ran few experiments in building a CEP implementation using Akka. Later interest moved a bit more into type-safe handling of data. I’ll try and touch with a similar representation of integrating gBoxADTBase[_] types with that CEP framework. Although, it’s quite a deal in refactoring and also addressing the fault-tolerance support in clustered mode, etc. that often is found in commercial offerings from Lightbend. Have some plans to roll these into Docker containers. So will see which ever I can grab as a quick next experiment. Finally allowing me to try out the continuous experimentation box as described in my first (non-technical blog) on gBox – Experimenting with data in a box!

References

Nothing is quite done yet, without acknowledging many of those in the open source community that helped me distill what I’m sharing here. Here are few references that helped me (reminding of those topics that I may have forgotten as well):

) { gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(classFQCN.substring(0, classFQCN.length - 1)).right.get } else { gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(classFQCN).right.get } val adtCTag: scala.reflect.ClassTag[gBoxADTBase[_]] = gBoxTypeHelper.adtClassTag(adtTTag).get.asInstanceOf[scala.reflect.ClassTag[gBoxADTBase[_]]] //Uses a combination of Java and Scala reflection inside the gBoxTypeHelper implementation. val typeArgumentMetaInfo: Array[(String, ru.Symbol)] = gBoxTypeHelper.typesConstructorMetaInfo(adtTTag).get._3 //Approach taken from how Spark internally applies Encoders val dataType: DataType = ScalaReflection.dataTypeFor(adtTTag) val boundReference: BoundReference = BoundReference(0, dataType, nullable = true) val serializer: CreateNamedStruct = ScalaReflection.serializerFor(boundReference)(adtTTag) val deserializer: Expression = ScalaReflection.deserializerFor(adtTTag) new ExpressionEncoder[gBoxADTBase[_]]( schema = serializer.dataType, flat = false, serializer.flatten, deserializer, adtCTag) }

Boiler plate – Stream Initialization & Deserialization

Here’s the boiler plate from the trait – gBoxCPETLStructuredStreamingApp, defined as:

trait gBoxCPETLStructuredStreamingApp[T <: gBoxADTBase[T]]!

One key aspect that I’d like to highlight is the inability to serialize types. In an ideal world, I’d like to achieve the support to indicate in-bound gBoxADTBase type using a type parameter set on the base trait. The notion in the above trait only lasts to support few basic signatures. Anything to do with lambdas must happen inside the lambda or on its reference i.e., implicit evidences.  Since I cannot pass types which requires the whole of the driver code to be serialized and pushed to each task, I’ve considered the approach to set some broad cast variables. Here are those variables:

  • Fully Qualified Class Name (FQCN) such that gBoxTypeHelper can be used to establish corresponding type tag reference of the ADT in the context.  Type tag is necessary in two places – gBoxSerDeImplicits#hoconToADT that deserializes String to corresponding ADT, and for the gBoxADTTypeEncoder#encocderForADTFromFQCN that generates the corresponding ExpressionEncoder[gBoxADTBase[_]] evidence. See the code snippet below (boiler-plate further down here).
  • Serialized version of gBoxCPCommonsCacheImplConfig (remember it’s also a gBoxADTBase type), such that the cache configuration is localized to each executor’s environment that handles a partition of data. Use of cached type-meta will speed up processing the partitioned in-bound data sets.

And, now to the actual boiler plate in two segments – stream initialization and ADT materialization (i.e., deserialization):

  • Part that initializes the socket stream:
 
  • And, the part that handles deserialization of in-bound records from String to gBoxADTBase:
 

The DataFrame reference from above partition logic is then passed onto the Object that extends this trait, as a reference to the data processing function. The data processing function is represented as –  functionToActOnInputStream(materializedADTDF, sparkSession):Unit.

The implementation for example can be something like below:

 

Parts in italicized style above can probably be refactored back to the base trait. That is the return type of the functionToActOnInputStream(materializedADTDF, sparkSession):DataSet[Row]. That way the flow semantics from end to end are handled within the base, leaving aside the piece to deal with use case requirements i.e., business logic. Note that it’s all only to specify how we’d like to execute our logic. Everything inside Spark is lazily evaluated, to the point unless a call to start() is not made. At that point the call-to-action is applied, finally processing the streams as they are ingested at the source.

Data Flow – Empirical View

Okay, unlike prior posts I have reserved the choice to describe visually towards mid to later part of the post. So here is the visual that describes the flow of events and highlights some key functions within the data pipe:

Empirical flow using ExpressionEncoder[gBoxADTBase[_]]

Sample Runtime References

Now to some samples. Will break these down to following:

  • gBoxADTBase types that represent domain objects
  • Serialized HOCON Sample
  • Schema (original and transformed)
  • Logical Plan
  • Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
  • Accumulator Counts
  • Cache Utilization

Domain Objects – ADT Definition

 

Serialized HOCON Sample

I’ve tried with 20 record samples, with 2 of them incorrect i.e., will result in parser errors and captured during the deserialization attempt. These numbers are logged to the accumulators.  Here’s one sample HOCON:

 

Schema (Original & Transformed)

Original

 

Transformed

 

Logical Plan

 

Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))

 

Accumulator Counts

Accumulators i.e., Counters (Good vs Bad records)

Cache Utilization

Note – the lines below are progressive output, where you could see the miss-count value remains same. That is to say, there were about 8 objects for which type information needed to be generated, while the utilization took upward streak.

Next Stop

Much before dwelling into gBoxADTBase[_] types, I’ve ran few experiments in building a CEP implementation using Akka. Later interest moved a bit more into type-safe handling of data. I’ll try and touch with a similar representation of integrating gBoxADTBase[_] types with that CEP framework. Although, it’s quite a deal in refactoring and also addressing the fault-tolerance support in clustered mode, etc. that often is found in commercial offerings from Lightbend. Have some plans to roll these into Docker containers. So will see which ever I can grab as a quick next experiment. Finally allowing me to try out the continuous experimentation box as described in my first (non-technical blog) on gBox – Experimenting with data in a box!

References

Nothing is quite done yet, without acknowledging many of those in the open source community that helped me distill what I’m sharing here. Here are few references that helped me (reminding of those topics that I may have forgotten as well):

Leave a Reply