Friday 18 August 2017

Apache Spark Structured Streaming

Standard


Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. inshort Structure streaming rebuild streaming functionalities on top of Catalyst and Data Frames.

Structured Streaming is the first API to build stream processing on top of SQL engine,in traditional streaming we process only real-time events, structured streaming is a novel approach of processing structured data of either a discrete table of data (where you know all the data) or an infinite table of data (where new data is continuously arriving). When viewed through this structured lens of data – as a discrete table or an infinite table.


Stream as a Table
Model

Logically, each input stream is an appendonly table (i.e. DataFrame), where records can arrive in any order across our system processing time (event time is just a field in each record).

Users define queries as just traditional SQL or DataFrame queries on this whole table,which return a new table whenever they are executed in processing time.

Users set a trigger to say when to run each query and output, which is based purely on processing time. The system makes best effort to meet such requirement. In some cases, the trigger can be gas soon as possibleh.

Finally, users set an output mode for each query. The different output modes are:

Delta : Although logically the output of each query is always a table, users can set a "delta" output mode that only writes the records from the query result changed from the last firing of the trigger.
                         
  1. These are physical deltas and not logical deltas. That is to say, they specify what rows were added and removed, but not the logical difference for some  row.  
  2.  Users must specify a primary key (can be composite) for the records. The output schema would include an extra "status" field to indicate whether this is an add, remove, or update delta record for the primary key.
Append : A special case of the Delta mode that does not include removals. There is no need to specify a primary key, and the output would not include the status field.

Update( inplace ) : Update the result directly in place (e.g. update a MySQL  table) ,similar to delta, a primary key must be specified.

Complete : For each run of the query, create a complete snapshot of the query result.

Repeated queries(RQ) :RQ simply states that the majority of streaming applications can be seen as asking the same question over and over again,the results from RQ are identical to running each query manually at various points in time, and saving either its whole output or just the difference.

Example :  How many credit card transactions happen today

                       Select * from tbl_credit_card_transactions group by card_type
           







  • https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Saturday 12 August 2017

Apache Spark Broadcast vs Accumulators

Standard
Image result for apache spark

Broadcast variable :


Broadcast variable is a read-only variable that is made available from the driver program that runs the SparkContext object to the nodes that will execute the computation. This is very useful in applications that need to make the same data available to the worker nodes in an efficient manner, such as machine learning algorithms.

If a large read-only piece of data (e.g., a lookup table) is used in multiple parallel operations, it is preferable to distribute it to the workers only once instead of packaging it with every closure.Spark lets the programmer create a broadcast variable" object that wraps the value and ensures that it is only copied to each worker once.


When should use broadcast variables:

  • Allow for an efficient sharing of potentially large data sets
  • Intended for workers as reference data
  • Cached, transported via broadcast protocol, (de)serialized
Spark makes creating broadcast variables as simple as calling a method on SparkContext as follows:

> val broadcastvarableAList = sc.broadcast(List("Vaquar", "khan", test1", "test2", "test3"))

The console output shows that the broadcast variable was stored in memory,
taking up approximately 488 bytes, and it also shows that we still have 270 MB
available to us:

14/01/30 07:13:32 INFO MemoryStore: ensureFreeSpace(488) called with
curMem=96414, maxMem=311387750

14/01/30 07:13:32 INFO MemoryStore: Block broadcast_1 stored as values to
memory (estimated size 488.0 B, free 296.9 MB)

broadcastvarableAList : org.apache.spark.broadcast.Broadcast[List[String]] =
Broadcast(1)

Image result for broadcast and accumulator

Accumulators

An accumulator is also a variable that is broadcasted to the worker nodes. The key difference between a broadcast variable and an accumulator is that while the broadcast variable is read-only, the accumulator can be added to.

There are limitations to this,that is, in particular, the addition must be an associative operation so that the global accumulated value can be correctly computed in parallel and returned to the driver
program. Each worker node can only access and add to its own local accumulator value, and only the driver program can access the global value. Accumulators are also accessed within the Spark code using the value method.

These are variables that workers can only add" to using an associative operation, and that only the driver can read. They can be used to implement counters as in MapReduce and to provide a more imperative syntax for parallel sums. Accumulators can be de ned for any type that has an \add" operation and a \zero" value. Due to their \add-only" semantics, they are easy to make fault-tolerant.

When should use accumulators

  • Counters or sums that can be reliably used in parallel processing
  • Native support for numeric types, extensions possible via API
  • Workers can modify state, but cannot read content
  • Only a driver program can read the accumulated value

> val accum = sc.accumulator(0)
 accum: spark.Accumulator[Int] = 0

> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

> accum.value
 res2: Int = 10



  • https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
  • http://spark.apache.org/docs/latest/rdd-programming-guide.html
  • https://people.eecs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf