and it will work, because it uses only column in group by and columns in aggregation functions
I am trying to get live JSON data from RabbitMQ to Apache Spark using Java and do some realtime analytics out of it.
I am able to get the data and also do some basic SQL queries on it, but I am not able to figure out the grouping part.
Below is the JSON I have
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
I would like to group them by device id. The idea is that way I can run and gather analytics against individual devices. Below is the sample code snippet that I am trying
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local- ");
jssc = new JavaStreamingContext(mconf,Durations.seconds(10));
SparkSession spksess = SparkSession
.builder()
.master("local
- ")
.appName("RabbitMqReceiver2")
.getOrCreate();
SQLContext sqlctxt = new SQLContext(spksess);
JavaReceiverInputDStream<String> jsonData = jssc.receiverStream(
new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
//jsonData.print();
JavaDStream<String> machineData = jsonData.window(Durations.minutes(1), Durations.seconds(20));
machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> data = sqlctxt.read().json(rdd);
//Dataset<Row> data = spksess.read().json(rdd).select("*");
data.createOrReplaceTempView("DeviceData");
data.printSchema();
//data.show(false);
// The below select query works
//Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData where DeviceId='MAC-101'");
// The below sql fails...
Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData GROUP BY DeviceId");
groupedData.show();
}
}
});
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
What i am looking to do with the streamed data is to see if i can push the incoming data into individual buckets...
Lets say we have the below incoming data from rabbitmq,
What i want to do is to have either a single key/value based collection which will have the device id as key and List as value Or it could be somekind of individual dynamic collection for each device id.
Can we do something like the below code (from url -- http://backtobazics.com/big-data/spark/apache-spark-groupby-example/)
public class GroupByExample {
public static void main(String[] args) throws Exception {
JavaSparkContext sc = new JavaSparkContext();
// Parallelized with 2 partitions
JavaRDD<String> rddX = sc.parallelize(
Arrays.asList("Joseph", "Jimmy", "Tina",
"Thomas", "James", "Cory",
"Christine", "Jackeline", "Juan"), 3);
JavaPairRDD<Character, Iterable<String>> rddY = rddX.groupBy(word -> word.charAt(0));
System.out.println(rddY.collect());
}
}
So in our case we need to pass a filter for the group by w.r.t DeviceId
Working Code....
JavaDStream<String> strmData = jssc.receiverStream(
new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
//This is just a sliding window i have kept
JavaDStream<String> machineData = strmData.window(Durations.minutes(1), Durations.seconds(10));
machineData.print();
JavaPairDStream<String, String> pairedData = machineData.mapToPair(s -> new Tuple2<String, String>(s.substring(5, 10) , new String(s)));
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
groupedData.print();
It's because in queries with group by, only following columns can be used in select:
If you use "*", then all columns are used in select - and that's why the query fails. Change the query to for example:
select DeviceId, count(distinct DeviceType) as deviceTypeCount from DeviceData group by DeviceId
and it will work, because it uses only column in group by and columns in aggregation functions
The GROUP BY statement is often used with aggregate functions (COUNT, MAX, MIN, SUM, AVG) to group the result-set by one or more columns.