Typesafe Entities – Part II

Summary

This article is the second in series about type-safe application of entity definitions. The first one was posted few weeks ago. To recap, it touched upon an intent-based augmentation of entity definitions, preparing them for type-safe applications, and a serialization technique to represent entity state using human optimized config object notation format. Such formats preserve hierarchical or linear associations between entities, as they exist in a given problem domain. For example, atomics, composites, and collections of these varieties, etc. This article provides a detailed context into object serialization and deserialization techniques.

Guidelines

Implementation technique adopts following guidelines:

  • Type-safe serialization and deserialization of objects and their composite parts, using generic stateless methods.
  • Support for simple and complex attribute types.
  • Minimal code generation, where object definitions will drive both composition as well as their type appropriateness.
  • Use of external libraries is limited to quintessential. Still favor DRY!
  • And some future thoughts to enhance the performance and extensibility.

As a side note of interest to the readers – type-safe data operations are part of well established data processing frameworks such as Spark and Flink.  These frameworks allow users to assign target schema to the data processing logic. For example as case classes in Scala or POJOs in Java.  This article presents a similar idea, at least on the skin, where a structure and type of composites drives mapping of source to target values operations. However, it is not the intent of this article to describe framework comparisons or how its implemented in respective frameworks. Design and implementation approach described in this article considers application of  Scala language features – Macros (adds minimal boiler plate to make entities type-safe) and Reflection along recursive constructs implemented as stateless routines.

Scenario

Previous article considered a simpler entity composition – Person type. In this article we are going to consider a bit more complex type composition that would have a mix of types, including collections and wrappers. Picture is a good compression technique, than words. So, here is a canonical representation of our sample entity:

gBoxCPCEPPipeDeploymentSpec

Here’s the class definition of the above:

@ItsAGBoxADT(doesValidations = true)
case class gBoxCPCEPPipeDeploymentSpec(
  contextName:  String,
  feederCfg: gBoxCPCEPPipeFeederCfg,
  writerCfg:    gBoxCPCEPPipeWriterCfg,
  targetDefaultNodes: Array[String] = Array(
    "akka.tcp://gBoxCEPFramework@ghorse1.gbox.net:2551",
    "akka.tcp://gBoxCEPFramework@ghorse2.gbox.net:2551",
    "akka.tcp://gBoxCEPFramework@ghorse3.gbox.net:2551"),
  customConfig: Array[CustomConfig] = Array(CustomConfig("ThrottlePipeSingleton", true).get, CustomConfig("PauseBetweenRetries", 1000).get),
  bootStrapSettings: Map[String, Any] = Map("MaxRetries" -> 3, "TimeInMillis" -> 10000L, "SeedNodeRole" -> "seed"))

Note, the only side-affect of `doesValidations = true` is that, during compile time, the macro-expansion will enrich the default constructor to return an Option of type indicated i.e., Option[gBoxCPCEPPipeDeploymentSpec]. It is because, the require(…) validations are expected to fail if the criteria is not met.

Besides, the naming convention of the composite parts is self-explanatory.  As you observe, the specification is for a pipeline framework based on Akka (an event-driven framework with Actors at it’s core). For brevity, this example is drawn from another effort around event-based streaming pipeline developed to handle data requirements for use cases such as low-latent, IoT scenarios as well as API based data services.

Getting back to the current context, following are the entity definitions for the composite parts of our pipe deployment specification:

  • Pipe feeder’s configuration Entity
@ItsAGBoxADT(doesValidations = true)
case class gBoxCPCEPPipeFeederCfg(
  val contextName:  String,
  val feederCfg: String) {
  require(Option(contextName).isDefined && contextName.nonEmpty && Option(feederCfg).isDefined && feederCfg.nonEmpty)
}
  • Pipe Writers Configuration Entity
@ItsAGBoxADT(doesValidations = true)
case class gBoxCPCEPPipeWriterCfg(
  contextName:         String,
  writerCfg:           String,
  bootStrapParameters: Map[String, Any] = Map("MaxRetries" -> 3, "TimeInMillis" -> 10000L, "AuthServiceNodeRole" -> "AuthServices")) {
  require(Option(contextName).isDefined && contextName.nonEmpty && Option(writerCfg).isDefined && writerCfg.nonEmpty)
}
  • Pipe Deployment Spec’s Custom-config Entity
@ItsAGBoxADT(doesValidations = true)
case class CustomConfig(val key: String, val value: Any) {
  require(Option(key).isDefined && key.nonEmpty)
}

A quick note about those require(…) declarations. These sections can be independently managed and maintained by the entity designers and developers, with an expectation for type and value assertions. Folks who handle behaviors i.e., workflows can be totally ignorant of the fact and focus on side-effect handling associated to workflows i.e., drawing from the Either[Throwable, T] idiom, covered in below sections.

Said that, below is the serialized representation of the specification in HOCON format. It is auto-generated using the serialization approach, initially discussed in the first post. There are some enhancements to it since then. Notable are avoid sorting of attributes using TreeMap[], support for broader collection types, and handling of null-values.

gBoxCPCEPPipeDeploymentSpec {
    CustomConfig=[
        {
            key=ThrottlePipeSingleton
            value=true
        },
        {
            key=PauseBetweenRetries
            value=1000
        }
    ]
    bootStrapSettings {
        MaxRetries=3
        SeedNodeRole=seed
        TimeInMillis=10000
    }
    contextName=PipeRuntime
    gBoxCPCEPPipeFeederCfg {
        contextName=helloWorldPipeCoordinator
        feederCfg=someFeederCfg
    }
    gBoxCPCEPPipeWriterCfg {
        bootStrapParameters {
            AuthServiceNodeRole=AuthServices
            MaxRetries=3
            TimeInMillis=10000
        }
        contextName=helloWorldPipeWriters
        writerCfg=someWriterCfg
    }
    targetDefaultNodes=[
        "akka.tcp://gBoxCEPFramework@ghorse1.gbox.net:2551",
        "akka.tcp://gBoxCEPFramework@ghorse2.gbox.net:2551",
        "akka.tcp://gBoxCEPFramework@ghorse3.gbox.net:2551"
    ]
}

Implementation

Entity Serialization

First part of article briefly touched on serialization approach. Serialization follows a level-order traversal of the object hierarchy, flattening along the path to generate a key-value map. Key format follows a dot-notation format, while value is captured as `Any`.  The map is then parsed using TypeSafe Config library to produce a formatted HOCON serialized string. Dot-notation preserves the path-relevance of attributes and types. Types that extend gBoxADTBase[_] are preserved with their namespace information i.e., package hierarchy. This is a functional provision to preserve namespace to entity type association. Its possible to have entities with same names across namespaces. For example, an object of type User in Accounts receivable vs Accounts payable, with slight variation in their compositions. Guide wire for the traversal here is just the object’s type composition. Here’s the canonical representation of serialization approach:

gBoxCPCEPPipeSpecSerialization

Entity Deserialization

Entity deserialization takes on a different route, it first need to determine the order in which the dependencies must be initialized and memorized, before building their parent type. So, the process is two parts:

  • Derive necessary build-order, where the lowest composite entity T of type gBoxADTBase[T] should be built, before the parent entity P of type gBoxADTBase[P] would be built. The build-order generates a LIFO-order i.e., Stack, with the lowest entity on top of the stack.
  • Based on the build-order, materialize the source to corresponding entities. The materialized entities are accumulated to a stack, where they are pulled into construction of their composite parent. Effectively leaving just one entry in the materialized entity stack, which is the type to which the source is essentially mapped to.

The process is fail-fast. That is, if an error is encountered at any given point in the materialization process, the process will be aborted immediately indicating the specific reason for failure. The return type is always going to be an Either[Throwable, T] result type. Upstream can fit this idiom nicely into their implementation. Here’s the canonical representation of the process of deserialization:

gBoxCPCEPPipeSpecDeserialization

Both serialization and deserialization leverage recursive constructs and immutable objects. The procedure is type-safe. For higher-order types where type information is potentially lost to type-erasure, implicit type information is preserved inside their wrapped containers and applied during interpretation.

Object construction i.e., call to instantiate an object depends on Scala’s reflection capabilities. The reference to the constructor (primary constructor) is obtained using reflection API. Once the required arguments are initialized, the sequence is passed to the generic constructor invocation. Here’s a quick excerpt:

val adtClassSymbol = tTag.tpe.typeSymbol.asClass
val adtClassMirror: ru.ClassMirror = runtimeMirror.reflectClass(tTag.tpe.typeSymbol.asClass)
val primaryConstructorSymbol: ru.Symbol = adtClassSymbol.info.decls.filter(member => member.isMethod && member.asMethod.isPrimaryConstructor).head
val primaryConstructor: ru.MethodMirror = adtClassMirror.reflectConstructor(primaryConstructorSymbol.asMethod)
//...unwrap the complex types here. say a map container with appropriate type info has a method `.unwrap()`
primaryConstructor(unwrappedArgs: _*).asInstanceOf[T]

It is important that type information is available implicit to the runtime. Lack of which will end up in an argument type-mismatch errors, when the last line in the above excerpt is invoked. Source of such error is deep down inside Java library. One should not relate this to sort of a defect, but rather an expected side-effect, as the runtime is unable to recognize certain value types for lack of type information.

Application

Proof is in the pudding. Let’s take a quick walk-through of signatures that this library implementation offers to serialize and deserialize marked entities. That is, entities of type gBoxADTBase[_] marked using the annotation @ItsAGBoxADT(...).

  • To serialize the object instance
<<ADT Instance variable>>.toString()
  • To de-serialize from serialized HOCON string
<<serializedStringTypeVariable>>.hoconToType[<<ADTType i.e., class name>>]

For brevity, let’s define a simpler ADT (to not to clutter in this page):

@ItsAGBoxADT(doesValidations = true)
 case class gBoxMyADT(name: String, context: String) {
     //Note - we can be more FP aligned below in not using `!=null`. Instead we can do `Option(argument).isDefined`
     require(name != null && name.nonEmpty && context != null && context.nonEmpty)
 }

And a test case to demonstrate:

it should "flatten simple ADT" in {
    //Note - doesValidations assignment let's macro-expansion to drive .apply() produce an Option[] instead of the actual type itself
    gBoxMyADT(name = "Foo-Bar", context = "HelloWorld") match {
      case None => logger.error("Failed instantiating object of type - gBoxMyADT")
      case Some(myADT) => {
        //Valid attempt
        myADT.toString().hoconToType[gBoxMyADT] match {
          case Left(error: Throwable) => logger.error(s"Attempt to Ser-De gBoxMyADT returned error - ${error.getMessage()}")
          case Right(myADTAgain)      => logger.debug(s"Successfully produced myADT from it's serialized format; myADTAgain -> $myADTAgain")
        }

        //Invalid attempt
        myADT.toString().hoconToType[gBoxCPCEPPipeDeploymentSpec] match {
          case Left(error: Throwable) => logger.error(s"Attempt to Ser-De gBoxMyADT returned error - ${error.getMessage()}")
          case Right(myADTAgain)      => logger.debug(s"Successfully produced myADT from it's serialized format; myADTAgain -> $myADTAgain")
        }
      }
    }
}

Here’s the console output to describe the outcomes:

17:51:40.594 [pool-4-thread-7-ScalaTest-running-TestgBoxCPConfigBase] DEBUG m.g.g.commons.adt.gBoxADTBuilder$ - Time to build gBoxMyADT  in time millis - 0
17:51:40.596 [pool-4-thread-7-ScalaTest-running-TestgBoxCPConfigBase] DEBUG TestgBoxCPConfigBase - Successfully produced myADT from it's serialized format; myADTAgain -> gBoxMyADT {
    context=HelloWorld
    name=Foo-Bar
}

17:51:40.599 [pool-4-thread-7-ScalaTest-running-TestgBoxCPConfigBase] ERROR TestgBoxCPConfigBase - Attempt to Ser-De gBoxMyADT returned error - Path gBoxCPCEPPipeDeploymentSpec.contextName is not supported at source.

To conclude, moving onto our original sample entity, here’s the test case to demonstrate the capability:

it should "flatten ADT" in {
    (gBoxCPCEPPipeFeederCfg(
      contextName = "helloWorldPipeCoordinator",
      feederCfg = "someFeederCfg"), gBoxCPCEPPipeWriterCfg(
      contextName = "helloWorldPipeWriters",
      writerCfg = "someWriterCfg")) match {
      case (Some(pipeSingleton), Some(pipeWriter)) => {
        gBoxCPCEPPipeDeploymentSpec(contextName = "PipeRuntime", feederCfg = pipeSingleton, writerCfg = pipeWriter) match {
          case Some(pipeRuntime) => {
            val serializedPipeRuntime = pipeRuntime.toString()
            logger.debug(s"Flattened to HOCON format as - $serializedPipeRuntime")
            serializedPipeRuntime.hoconToType[gBoxCPCEPPipeDeploymentSpec] match {
              case Left(error: Throwable)                          => logger.error(s"HOCON To type returned error - ${error.getMessage}", error)
              case Right(pipeRuntime: gBoxCPCEPPipeDeploymentSpec) => logger.debug(s"HOCON to type returned deserialized type -> $pipeRuntime")
            }
          }
          case None => assert(false, "Invalid attempt at initializing CEP pipe runtime.")
        }
      }
      case (_, _) => assert(false, "Invalid attempt at initializing either the pipe singleton or the writer definitions.")
    }
  }

And the console output for above test case:

17:51:40.564 [pool-4-thread-7-ScalaTest-running-TestgBoxCPConfigBase] DEBUG m.g.g.commons.adt.gBoxADTBuilder$ - Time to build gBoxCPCEPPipeDeploymentSpec  in time millis - 0
17:51:40.574 [pool-4-thread-7-ScalaTest-running-TestgBoxCPConfigBase] DEBUG TestgBoxCPConfigBase - HOCON to type returned deserialized type -> gBoxCPCEPPipeDeploymentSpec {
    CustomConfig=[
        {
            key=ThrottlePipeSingleton
            value=true
        },
...

Some future-thoughts!

The approach discussed here is a starter and as such has scope for future improvements and extensibility additions. To quote a few:

  • Refactor the build-order part and type-inferencing capability for speed i.e., cache or pre-warm the runtime with known types. For example, in case of dedicated API end points that cater to certain known services.
  • Lazy evaluation and caching to support dynamic semantic discovery and validation.
  • Include data and application security hooks into serialization and deserialization aspects, primarily using implicit extensions to support pluggability.
  • Support for other data serialization formats such as Parquet, Avro, etc.
  • Support for implicit compression techniques, when serializing objects of complex structures. Specially for those expected to hold good amount of data. For example, audio-video data.
  • Higher-order functions to handle basic data operations such as Like, Group, Sum, Lineage-checkpoints, etc. These are essential and useful operations in treating streams.
  • Push-down predicates as part of deserialization process, such that we can extend the case beyond `is path supported or not` variants.

Upcoming works!

An event driven pipeline for serving IoT and other event based systems with expectation for low-latent, infrequent data collection needs. Idea here is not to replace or ignore matured data processing frameworks, but rather to serve the needs on the edge, while evangelizing the writer’s thoughts about respective use cases.

References

Leave a Reply