Summary
Table of Contents
The previous post glanced through an introductory exercise to use gBoxADTBase types with Apache Spark framework. The example considered a streaming API based implementation, where in a custom receiver is implemented to handle deserialization of HOCON strings. These are serialized versions of some gBoxADTBase type objects. The receiver is registered on the RDD stream as a converter. The converter reads in-bound streams using a new-line delimiter, and for each captured string applies the `hoconToType[T]` transformation. Just to refresh the memories, the canonical definition is `gBoxCPETLSocketStreamIterator[T]`, where the iterator’s next() call will return an `Either[Throwable, O]`. Here the type parameter O is any O <: gBoxADTBase[O]. The post concluded with a future to do similar integration with the Structured Streaming API, but with a bit more than a hello-world. Hence this post!
TL;DR
For a given T<:gBoxADTBase[_]
, produce ExpressionEncoder[gBoxADTBase[_]]
instance, and use it inside Spark Structured Streaming API.
Context
To paraphrase the formal definition, the Structured Streaming API provides flow semantics using which one can enforce structure over the data in the context and type-safety of operations. Example, application of relational operators where the type-wise references to the data are made. The view projected from the stream can be considered as a typical relational database table, except that the records would form an unbounded set or stream.
Few vocabulary items to consider before dwelling further include:
- DataSet: typed representation of domain specific data sets
- DataFrame: structured representation of data sets, with the default being DataSet[Row]
- Row: Each in-bound record is mapped to a Row, which is an abstraction over a set of fields.
- StructType: meta-information on domain specific types that provides semantic detail as required for the inner-workings.
- InternalRow: internal representation of user data as it get exchanged between data processing nodes i.e., tasks inside Spark.
- Executor – this is a compute entity that receives tasks as defined in the physical plan, translated from the logical plan.
- Task – a task is a step in the data processing logic as it must be executed over the in-bound data.
Spark is a distributed data processing engine, and it ships code i.e., lambdas to executors, to be applied as tasks over data. Nature of the data source in combination with routes within the transformation logic will drive the level of parallelism. Besides, the plan analyzer employs various cost optimization techniques to further the processing efficiencies. The later however is greatly dependent on something called Schema (a StructType/DataType). The structure represents the semantic composition of the underlying data sets, allowing Spark to apply selective actions (operators, serialization, deserialization, etc). It leverages columnar methods. Think about relational operators such as select, group-by, and reducing functions such as count, avg, etc., that act on commonly identified sub-sets within the given data set. When Spark need to run such compute, if we do not overlay a structure that allows it to dig deep, the whole objects are serialized and deserialized. Effectively slowing down the throughput i.e., increase in latency.
Approach
That brings me to the core aspect of this post – application of gBoxADTBAse types as part of the structured streaming processing compute. Following the expectations of the API constructs. Review the code snippet below:
val materializedADTDF: DataFrame = sparkSession .readStream .format("socket") .option("host", socketCfg.hostName) .option("port", socketCfg.portNumber) .load() .as[String] .mapPartitions { //gBoxADTBase deserialization code goes here! }(gBoxADTTypeEncoder.encocderForADTFromFQCN(inputADTTypeFQCNBCastRef.value).get).toDF()
Two interesting parts in the above code are – format("socket")
and as[String]
. Desire that I started with are to achieve a custom source like socket2gBoxADT
, and as[gBoxADTBase]
. It however does require a bit more involved extension work. Simply put, a custom data source implementation similar to the custom Receiver that I have presented in my earlier post. Once a receiver materializes the objects as gBoxADTBase[_] type and enables them for downstream access via the DataFrame API, the constructs like as[gBoxADTBase[_]] are a achievable. The `as[_]` does not have much intelligence than to expose the underlying stream as a view of the corresponding type that you pass to it as a type parameter. See the code snippet from Spark code base on Git:
@Experimental @InterfaceStability.Evolving def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
Here’s the link to the source. Hope you’d like to ask that question too! Why not simplify the approach. Yes, that’s the next User Story that I intend to work and I’m very excited about it.
For now, the approach that I have taken is to leverage existing capabilities around Sockets, gain access to serialized HOCON string as a DataSet[String] and move on to generating the corresponding ADT for each record that comes through the stream. I however taken care not to repeat my self for each data pipe that I wanted to implement. Instead introduced a trait that sets the base foundations of the flow semantics and leaves just the business function implementation for repeat use cases, where Spark Structured Streams is the solution choice.
gBoxADTTypeEncoder
Before checking out the boiler-plate that abstracts repeat configuration items like stream initialization, etc., let’s take a look at the kicker in the game – the gBoxADTTypeEncoder. Here’s the signature and definition of the corresponding method (a bit of verbose code; excuse me there):
def encocderForADTFromFQCN(classFQCN: String): Try[ExpressionEncoder[gBoxADTBase[_]]] = Try { val adtTTag: ru.TypeTag[gBoxADTBase[_]] = if (classFQCN.charAt(classFQCN.length - 1) == '
Boiler plate – Stream Initialization & Deserialization
Here’s the boiler plate from the trait – gBoxCPETLStructuredStreamingApp, defined as:
trait gBoxCPETLStructuredStreamingApp[T <: gBoxADTBase[T]]
!
One key aspect that I’d like to highlight is the inability to serialize types. In an ideal world, I’d like to achieve the support to indicate in-bound gBoxADTBase type using a type parameter set on the base trait. The notion in the above trait only lasts to support few basic signatures. Anything to do with lambdas must happen inside the lambda or on its reference i.e., implicit evidences. Since I cannot pass types which requires the whole of the driver code to be serialized and pushed to each task, I’ve considered the approach to set some broad cast variables. Here are those variables:
- Fully Qualified Class Name (FQCN) such that gBoxTypeHelper can be used to establish corresponding type tag reference of the ADT in the context. Type tag is necessary in two places – gBoxSerDeImplicits#hoconToADT that deserializes String to corresponding ADT, and for the gBoxADTTypeEncoder#encocderForADTFromFQCN that generates the corresponding ExpressionEncoder[gBoxADTBase[_]] evidence. See the code snippet below (boiler-plate further down here).
- Serialized version of gBoxCPCommonsCacheImplConfig (remember it’s also a gBoxADTBase type), such that the cache configuration is localized to each executor’s environment that handles a partition of data. Use of cached type-meta will speed up processing the partitioned in-bound data sets.
And, now to the actual boiler plate in two segments – stream initialization and ADT materialization (i.e., deserialization):
- Part that initializes the socket stream:
sparkSession .readStream .format("socket") .option("host", socketCfg.hostName) .option("port", socketCfg.portNumber) .load() .as[String] .mapPartitions{ //..logic to product gBoxADTBase types go here! }
- And, the part that handles deserialization of in-bound records from String to gBoxADTBase:
.mapPartitions { partitionInStream: Iterator[String] => { //Initialize cache reference if (gboxADTTypeHelperCacheCfg.value.nonEmpty) { //This will not utilize the cache, as it is not yet set gboxADTTypeHelperCacheCfg.value.hoconToType[gBoxCPCommonsCacheImplConfig]() match { case Left(error: Throwable) => throw error //Down side of calling below will result in cache already set, which can be ignored at per-JVM case Right(cacheImplCfg: gBoxCPCommonsCacheImplConfig) => { gBoxTypeHelper.updateCacheRef(gBoxCPCommonsCacheImpl.initialize(cacheImplCfg).get) } } } //Derive gBoxADTBase[_] instance without specificity, but generality of base type val adtTTag = gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(inputADTTypeFQCNBCastRef.value).right.get val listBuffer: ListBuffer[gBoxADTBase[_]] = ListBuffer.empty[gBoxADTBase[_]] //Iterate through in-bound serialized records and de-serialize them to corresponding gBoxADTBase[_] type while (partitionInStream.hasNext) { partitionInStream.next().hoconToADT(adtTTag) match { case Left(error: Throwable) => fdc.add(1) case Right(adtDeserialized: gBoxADTBase[_]) => listBuffer.+=(adtDeserialized); vdc.add(1) case Right(_) => ivdc.add(1) } } //Log cache utilization to the executor logs println(gBoxTypeHelper.getCacheStats()) //Return the Iterator[gBoxADTBase[_]] back to Spark listBuffer.iterator } }(gBoxADTTypeEncoder.encocderForADTFromFQCN(inputADTTypeFQCNBCastRef.value).get).toDF()
The DataFrame reference from above partition logic is then passed onto the Object that extends this trait, as a reference to the data processing function. The data processing function is represented as – functionToActOnInputStream(materializedADTDF, sparkSession):Unit
.
The implementation for example can be something like below:
materializedDF .withColumn("timestamp", lit(current_timestamp())) .select($"firstName", $"secondName", $"address.country" as "country", $"timestamp") .groupBy("country").count().withColumnRenamed("count", "CountByCountry") .orderBy("country") .writeStream .trigger(Trigger.ProcessingTime(Duration(10, TimeUnit.SECONDS))) .outputMode(OutputMode.Complete) .format("console") .start() .awaitTermination()
Parts in italicized style above can probably be refactored back to the base trait. That is the return type of the functionToActOnInputStream(materializedADTDF, sparkSession):DataSet[Row]
. That way the flow semantics from end to end are handled within the base, leaving aside the piece to deal with use case requirements i.e., business logic. Note that it’s all only to specify how we’d like to execute our logic. Everything inside Spark is lazily evaluated, to the point unless a call to start()
is not made. At that point the call-to-action is applied, finally processing the streams as they are ingested at the source.
Data Flow – Empirical View
Okay, unlike prior posts I have reserved the choice to describe visually towards mid to later part of the post. So here is the visual that describes the flow of events and highlights some key functions within the data pipe:
Sample Runtime References
Now to some samples. Will break these down to following:
- gBoxADTBase types that represent domain objects
- Serialized HOCON Sample
- Schema (original and transformed)
- Logical Plan
- Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
- Accumulator Counts
- Cache Utilization
Domain Objects – ADT Definition
@ItsAGBoxADT(doesValidations = true) case class gBoxAddress(streetNumber: Int, streetName: String, city: String, zipCode: String, state: String, country: String) @ItsAGBoxADT(doesValidations = true) case class gBoxPersonProfile(firstName: String, secondName: String, address: gBoxAddress)
Serialized HOCON Sample
I’ve tried with 20 record samples, with 2 of them incorrect i.e., will result in parser errors and captured during the deserialization attempt. These numbers are logged to the accumulators. Here’s one sample HOCON:
me{ganaakruti{gboxcp{etl{structuredstreaming{spark{mock{gBoxPersonProfile{secondName ="Doe",firstName ="Name001",me{ganaakruti{gboxcp{etl{structuredstreaming{spark{mock{gBoxAddress={country ="India",state ="FirstState",zipCode ="Z12341",city ="FirstCity",streetName ="FirstStreet",streetNumber =1234}}}}}}}}}}}}}}}}
Schema (Original & Transformed)
Original
root |-- firstName: string (nullable = true) |-- secondName: string (nullable = true) |-- address: struct (nullable = true) | |-- streetNumber: integer (nullable = false) | |-- streetName: string (nullable = true) | |-- city: string (nullable = true) | |-- zipCode: string (nullable = true) | |-- state: string (nullable = true) | |-- country: string (nullable = true)
Transformed
root |-- country: string (nullable = true) |-- CountByCountry: long (nullable = false)
Logical Plan
Sort [country#21 ASC NULLS FIRST], true +- Project [country#21, count#32L AS CountByCountry#35L] +- Aggregate [country#21], [country#21, count(1) AS count#32L] +- Project [firstName#10, secondName#11, address#12.country AS country#21, timestamp#16] +- Project [firstName#10, secondName#11, address#12, current_timestamp() AS timestamp#16] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).firstName, true, false) AS firstName#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).secondName, true, false) AS secondName#11, if (isnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address)) null else named_struct(streetNumber, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).streetNumber, streetName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).streetName, true, false), city, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).city, true, false), zipCode, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).zipCode, true, false), state, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).state, true, false), country, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile, true]).address).country, true, false)) AS address#12] +- MapPartitions , obj#9: me.ganaakruti.gboxcp.etl.structuredstreaming.spark.mock.gBoxPersonProfile +- DeserializeToObject cast(value#1 as string).toString, obj#8: java.lang.String +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@ca10069,socket,List(),None,List(),None,Map(host -> localhost, port -> 9999),None), textSocket, [value#1]
Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
------------------------------------------- Batch: 0 ------------------------------------------- +---------+--------------+ | country|CountByCountry| +---------+--------------+ |Argentina| 1| |Australia| 1| | Brazil| 1| | Canada| 1| | India| 7| |Indonesia| 1| | Japan| 1| | Kenya| 1| |Singapore| 1| | UK| 1| | US| 2| +---------+--------------+
Accumulator Counts
Cache Utilization
Note – the lines below are progressive output, where you could see the miss-count value remains same. That is to say, there were about 8 objects for which type information needed to be generated, while the utilization took upward streak.
Next Stop
Much before dwelling into gBoxADTBase[_] types, I’ve ran few experiments in building a CEP implementation using Akka. Later interest moved a bit more into type-safe handling of data. I’ll try and touch with a similar representation of integrating gBoxADTBase[_] types with that CEP framework. Although, it’s quite a deal in refactoring and also addressing the fault-tolerance support in clustered mode, etc. that often is found in commercial offerings from Lightbend. Have some plans to roll these into Docker containers. So will see which ever I can grab as a quick next experiment. Finally allowing me to try out the continuous experimentation box as described in my first (non-technical blog) on gBox – Experimenting with data in a box!
References
Nothing is quite done yet, without acknowledging many of those in the open source community that helped me distill what I’m sharing here. Here are few references that helped me (reminding of those topics that I may have forgotten as well):
- Encoder — Internal Row Converter
- Are Spark’s “Encoders” fast because they are macro based serializers?
- Real-Time Analysis of Popular Uber Locations using Apache APIs: Spark Structured Streaming, Machine Learning, Kafka and MapR-DB
- Sample Data Set Uber Locations (referenced from Mapr blog-post and public git-hub)
- Adding StructType columns to Spark DataFrames (talks about solutions to eliminate Order dependencies from code)
- Transforming Complex Data Types in Spark SQL
- How is using encoders so much faster than Java Serialization?
- Stackoverflow: … No encoder found for <<type>>
- org.apache.sql.Encoders.scala
- Scala StructType (Scala API)
- Scala StructField (Scala API)
- Scala StructField DataTypes (Scala API); see the direct known sub-classes of this type in the documentation.
- Streaming Query Listeners
- Spark ExpressionEncoder
- Typed Data Set
- Endless videos from Databricks on vimeo!
) { gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(classFQCN.substring(0, classFQCN.length - 1)).right.get } else { gBoxTypeHelper.getGBoxADTTypeTagUsingFQCN(classFQCN).right.get } val adtCTag: scala.reflect.ClassTag[gBoxADTBase[_]] = gBoxTypeHelper.adtClassTag(adtTTag).get.asInstanceOf[scala.reflect.ClassTag[gBoxADTBase[_]]] //Uses a combination of Java and Scala reflection inside the gBoxTypeHelper implementation. val typeArgumentMetaInfo: Array[(String, ru.Symbol)] = gBoxTypeHelper.typesConstructorMetaInfo(adtTTag).get._3 //Approach taken from how Spark internally applies Encoders val dataType: DataType = ScalaReflection.dataTypeFor(adtTTag) val boundReference: BoundReference = BoundReference(0, dataType, nullable = true) val serializer: CreateNamedStruct = ScalaReflection.serializerFor(boundReference)(adtTTag) val deserializer: Expression = ScalaReflection.deserializerFor(adtTTag) new ExpressionEncoder[gBoxADTBase[_]]( schema = serializer.dataType, flat = false, serializer.flatten, deserializer, adtCTag) }
Boiler plate – Stream Initialization & Deserialization
Here’s the boiler plate from the trait – gBoxCPETLStructuredStreamingApp, defined as:
trait gBoxCPETLStructuredStreamingApp[T <: gBoxADTBase[T]]
!
One key aspect that I’d like to highlight is the inability to serialize types. In an ideal world, I’d like to achieve the support to indicate in-bound gBoxADTBase type using a type parameter set on the base trait. The notion in the above trait only lasts to support few basic signatures. Anything to do with lambdas must happen inside the lambda or on its reference i.e., implicit evidences. Since I cannot pass types which requires the whole of the driver code to be serialized and pushed to each task, I’ve considered the approach to set some broad cast variables. Here are those variables:
- Fully Qualified Class Name (FQCN) such that gBoxTypeHelper can be used to establish corresponding type tag reference of the ADT in the context. Type tag is necessary in two places – gBoxSerDeImplicits#hoconToADT that deserializes String to corresponding ADT, and for the gBoxADTTypeEncoder#encocderForADTFromFQCN that generates the corresponding ExpressionEncoder[gBoxADTBase[_]] evidence. See the code snippet below (boiler-plate further down here).
- Serialized version of gBoxCPCommonsCacheImplConfig (remember it’s also a gBoxADTBase type), such that the cache configuration is localized to each executor’s environment that handles a partition of data. Use of cached type-meta will speed up processing the partitioned in-bound data sets.
And, now to the actual boiler plate in two segments – stream initialization and ADT materialization (i.e., deserialization):
- Part that initializes the socket stream:
- And, the part that handles deserialization of in-bound records from String to gBoxADTBase:
The DataFrame reference from above partition logic is then passed onto the Object that extends this trait, as a reference to the data processing function. The data processing function is represented as – functionToActOnInputStream(materializedADTDF, sparkSession):Unit
.
The implementation for example can be something like below:
Parts in italicized style above can probably be refactored back to the base trait. That is the return type of the functionToActOnInputStream(materializedADTDF, sparkSession):DataSet[Row]
. That way the flow semantics from end to end are handled within the base, leaving aside the piece to deal with use case requirements i.e., business logic. Note that it’s all only to specify how we’d like to execute our logic. Everything inside Spark is lazily evaluated, to the point unless a call to start()
is not made. At that point the call-to-action is applied, finally processing the streams as they are ingested at the source.
Data Flow – Empirical View
Okay, unlike prior posts I have reserved the choice to describe visually towards mid to later part of the post. So here is the visual that describes the flow of events and highlights some key functions within the data pipe:
Sample Runtime References
Now to some samples. Will break these down to following:
- gBoxADTBase types that represent domain objects
- Serialized HOCON Sample
- Schema (original and transformed)
- Logical Plan
- Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
- Accumulator Counts
- Cache Utilization
Domain Objects – ADT Definition
Serialized HOCON Sample
I’ve tried with 20 record samples, with 2 of them incorrect i.e., will result in parser errors and captured during the deserialization attempt. These numbers are logged to the accumulators. Here’s one sample HOCON:
Schema (Original & Transformed)
Original
Transformed
Logical Plan
Compute Sample (i.e., groupBy(“country”).count().orderBy(“country”))
Accumulator Counts
Cache Utilization
Note – the lines below are progressive output, where you could see the miss-count value remains same. That is to say, there were about 8 objects for which type information needed to be generated, while the utilization took upward streak.
Next Stop
Much before dwelling into gBoxADTBase[_] types, I’ve ran few experiments in building a CEP implementation using Akka. Later interest moved a bit more into type-safe handling of data. I’ll try and touch with a similar representation of integrating gBoxADTBase[_] types with that CEP framework. Although, it’s quite a deal in refactoring and also addressing the fault-tolerance support in clustered mode, etc. that often is found in commercial offerings from Lightbend. Have some plans to roll these into Docker containers. So will see which ever I can grab as a quick next experiment. Finally allowing me to try out the continuous experimentation box as described in my first (non-technical blog) on gBox – Experimenting with data in a box!
References
Nothing is quite done yet, without acknowledging many of those in the open source community that helped me distill what I’m sharing here. Here are few references that helped me (reminding of those topics that I may have forgotten as well):
- Encoder — Internal Row Converter
- Are Spark’s “Encoders” fast because they are macro based serializers?
- Real-Time Analysis of Popular Uber Locations using Apache APIs: Spark Structured Streaming, Machine Learning, Kafka and MapR-DB
- Sample Data Set Uber Locations (referenced from Mapr blog-post and public git-hub)
- Adding StructType columns to Spark DataFrames (talks about solutions to eliminate Order dependencies from code)
- Transforming Complex Data Types in Spark SQL
- How is using encoders so much faster than Java Serialization?
- Stackoverflow: … No encoder found for <<type>>
- org.apache.sql.Encoders.scala
- Scala StructType (Scala API)
- Scala StructField (Scala API)
- Scala StructField DataTypes (Scala API); see the direct known sub-classes of this type in the documentation.
- Streaming Query Listeners
- Spark ExpressionEncoder
- Typed Data Set
- Endless videos from Databricks on vimeo!