QBoard » Big Data » Big Data - Spark » Apache Spark -- Java , Group Live Stream data

Apache Spark -- Java , Group Live Stream data

  • I am trying to get live JSON data from RabbitMQ to Apache Spark using Java and do some realtime analytics out of it.

    I am able to get the data and also do some basic SQL queries on it, but I am not able to figure out the grouping part.

    Below is the JSON I have

    {"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
    {"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
    {"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
    {"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}

     

    I would like to group them by device id. The idea is that way I can run and gather analytics against individual devices. Below is the sample code snippet that I am trying

    public static void main(String[] args) {
    
            try {
    
                mconf = new SparkConf();
                mconf.setAppName("RabbitMqReceiver");
                mconf.setMaster("local
    • "); jssc = new JavaStreamingContext(mconf,Durations.seconds(10)); SparkSession spksess = SparkSession .builder() .master("local
    • ") .appName("RabbitMqReceiver2") .getOrCreate(); SQLContext sqlctxt = new SQLContext(spksess); JavaReceiverInputDStream<String> jsonData = jssc.receiverStream( new mqreceiver(StorageLevel.MEMORY_AND_DISK_2())); //jsonData.print(); JavaDStream<String> machineData = jsonData.window(Durations.minutes(1), Durations.seconds(20)); machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() { @Override public void call(JavaRDD<String> rdd) { if(!rdd.isEmpty()){ Dataset<Row> data = sqlctxt.read().json(rdd); //Dataset<Row> data = spksess.read().json(rdd).select("*"); data.createOrReplaceTempView("DeviceData"); data.printSchema(); //data.show(false); // The below select query works //Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData where DeviceId='MAC-101'"); // The below sql fails... Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData GROUP BY DeviceId"); groupedData.show(); } } }); jssc.start(); jssc.awaitTermination(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

     

    What i am looking to do with the streamed data is to see if i can push the incoming data into individual buckets...

    Lets say we have the below incoming data from rabbitmq,

    What i want to do is to have either a single key/value based collection which will have the device id as key and List as value Or it could be somekind of individual dynamic collection for each device id.

    Can we do something like the below code (from url -- http://backtobazics.com/big-data/spark/apache-spark-groupby-example/)

    public class GroupByExample {
            public static void main(String[] args) throws Exception {
                
                JavaSparkContext sc = new JavaSparkContext();
                
                // Parallelized with 2 partitions
                JavaRDD<String> rddX = sc.parallelize(
                        Arrays.asList("Joseph", "Jimmy", "Tina",
                                "Thomas", "James", "Cory",
                                "Christine", "Jackeline", "Juan"), 3);
                
                JavaPairRDD<Character, Iterable<String>> rddY = rddX.groupBy(word -> word.charAt(0));
                
                System.out.println(rddY.collect());
            }
        }
    

     

    So in our case we need to pass a filter for the group by w.r.t DeviceId

    Working Code....

    JavaDStream<String> strmData = jssc.receiverStream(
    					new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
    			
    //This is just a sliding window i have kept
    JavaDStream<String> machineData = strmData.window(Durations.minutes(1), Durations.seconds(10));
    machineData.print();
    
    JavaPairDStream<String, String> pairedData = machineData.mapToPair(s -> new Tuple2<String, String>(s.substring(5, 10) , new String(s)));
    			
    JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
    groupedData.print();
  October 29, 2021 3:24 PM IST
0
  • It's because in queries with group by, only following columns can be used in select:

    • columns listed in group by
    • aggregation of any of column

    If you use "*", then all columns are used in select - and that's why the query fails. Change the query to for example:

    select DeviceId, count(distinct DeviceType) as deviceTypeCount from DeviceData group by DeviceId
    

     

    and it will work, because it uses only column in group by and columns in aggregation functions

     
      November 9, 2021 2:27 PM IST
    0
  • The GROUP BY statement is often used with aggregate functions (COUNT, MAX, MIN, SUM, AVG) to group the result-set by one or more columns.

     
      November 10, 2021 12:36 PM IST
    0
  • Streaming is unstructured data that is generated continuously by thousands of data sources. This Streaming data includes a wide variety of data such as log files generated by customers using your mobile or web applications, in-game player activity, information from social networks, Financial trading, and telemetry from connected devices or instrumentation in data centers. You can also explore more about real-time streaming data lake in this blog.

    What is Structured Streaming?

    Structure streaming limits what can express that enables optimizations since we can perform complex data computation that is not possible in stream processing.

    Why we use Structured Streaming?

    In structure streaming, any data stream treat as unbound data: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables. DataFrame/Dataset queries can apply to both batch and streaming data. Users describe the query they want to run, the input and output locations, and optionally a few more details. The system then runs their query incrementally, maintaining enough state to recover from failure, keep the results consistent in external storage.
      November 12, 2021 1:48 PM IST
    0