Hi :) I have this code in spark /scala that partitions big data ( more than 50GB) by category into csv files.
df.write
.mode(SaveMode.Overwrite)
... moreHi :) I have this code in spark /scala that partitions big data ( more than 50GB) by category into csv files.
df.write
.mode(SaveMode.Overwrite)
.partitionBy("CATEGORY_ID")
.format("csv")
.option("header", "true")
.option("sep", "|")
.option("quoteAll", true)
.csv("output/inventory_backup")
The dataframe df is the result of aggregations on imported data from a csv file :
df.groupBy("PRODUCT_ID","LOC_ID","DAY_ID")
.agg(
functions.sum("ASSORTED_STOCK_UNIT").as("ASSORTED_STOCK_UNIT_sum"),
functions.sum("SOLID_STOCK_UNIT").as("SOLID_STOCK_UNIT_sum")
)
I would like to tune the performance of this program. Through Spark UI , I was able to see that the bottleneck of performance occurs at the stage that exports the data into csv files. enter image description here
More details = I'm using a 16cores/120GB RAM instance
Do you guys have any ideas on how to tune the performance ? (It currently takes more than 17min). Any help will be much appreciated. Thank you less
Getting strange behavior when calling function outside of a closure:
when function is in a object everything is working
when function is in a class get... moreGetting strange behavior when calling function outside of a closure:
when function is in a object everything is working
when function is in a class get :
Task not serializable: java.io.NotSerializableException: testing
The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?
This is a working code example:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
This is the non-working example :
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
... less
I'm just wondering what is the difference between an RDD and DataFrame (Spark 2.0.0 DataFrame is a mere type alias for Dataset) in Apache Spark?
Can you convert one to the other?
I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide... moreI installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully.However, I cannot for the life of me figure out how to stop all of the verbose INFO logging after each command.I have tried nearly every possible scenario in the below code (commenting out, setting to OFF) within my log4j.properties file in the conf folder in where I launch the application from as well as on each node and nothing is doing anything. I still get the logging INFO statements printing after executing each statement.I am very confused with how this is supposed to work.
#Set everything to be logged to the console log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout... less
I am doing PoC on Spark's Map Reduce performance for calculating weighted average over 5000 to 200,000 data and it appears to be very slow. So, just wanted to check whether I am... moreI am doing PoC on Spark's Map Reduce performance for calculating weighted average over 5000 to 200,000 data and it appears to be very slow. So, just wanted to check whether I am doing something wrong here. Here is my setup details - • No. of Worker nodes: 2 • CPUs: 8 per node (16)
For 5000 orders, it takes around 9 seconds to do all of the following Map reduce operation to calculate weighted average i.e. (n1*v1 + n2*v2 + ..)/(n1+n2+..)
//Calculation of sum of n*v using Map Reduce
JavaPairRDD<String, Double> jprMap = javaRDD.mapToPair(new PairFunction<Tuple2<Double, Double>, String, Double>() {
public Tuple2<String, Double> call(Tuple2<Double, Double> t) { return new Tuple2<String, Double>("Numerator", t._1*t._2); }
});
I am using Apache Spark to perform sentiment analysis.I am using Naive Bayes algorithm to classify the text. I don't know how to find out the probability of labels. I would be... more
I am using Apache Spark to perform sentiment analysis.I am using Naive Bayes algorithm to classify the text. I don't know how to find out the probability of labels. I would be grateful if I know get some snippet in python to find the probability of labels.
I have got big Data file loaded in Spark but wish to work on a small portion of it to run the analysis, is there any way to do that ?. I tried doing repartition but it brings a... moreI have got big Data file loaded in Spark but wish to work on a small portion of it to run the analysis, is there any way to do that ?. I tried doing repartition but it brings a lot of reshuffling. Is there any good of way of processing the only small chunk of a Big file loaded in Spark?.
I need to implement a big data storage + processing system.
The data increases in a daily basis ( about max 50 million rows / day) , data complies of a very simple JSON document... moreI need to implement a big data storage + processing system.
The data increases in a daily basis ( about max 50 million rows / day) , data complies of a very simple JSON document of about 10 fields ( date,numbers, text, ids).
Data could then be queried online ( if possible) making arbitrary groupings on some of the fields of the document ( date range queries, ids ,etc ) .
I'm thinking on using a MongoDB cluster for storing all this data and build indices for the fields I need to query from, then process the data in an apache Spark cluster ( mostly simple aggregations+sorting). Maybe use Spark-jobserver to build a rest-api around it.
I have concerns about mongoDB scaling possibilities ( i.e storing 10b+ rows ) and throughput ( quickly sending 1b+ worth of rows to spark for processing) or ability to maintain indices in such a large database.
In contrast, I consider using cassandra or hbase, which I believe are more suitable for storing large datasets, but offer less performance in querying which I'd... less
According to Learning SparkKeep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that... moreAccording to Learning SparkKeep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement? less