Let's assume for the following that only one Spark job is running at every point in time.
What I get so far
Here is what I understand what happens in... moreLet's assume for the following that only one Spark job is running at every point in time.
What I get so far
Here is what I understand what happens in Spark:
When a SparkContext is created, each worker node starts an executor. Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions.
When a job is executed, an execution plan is created according to the lineage graph.
The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.
I understand that
A task is a command sent from the driver to an executor by serializing the Function object.
The executor deserializes (with the driver jar) the command (task) and executes it on a partition.
but
Question(s)
How do I split the stage into those tasks?
Specifically:
I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.With that... moreI prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1 GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6 so that every core has something to process.Scala Code
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array = line3.split(",")
I am using https://github.com/databricks/spark-csv , I am trying to write a single CSV, but not able to, it is making a folder.
Need a Scala function which will take parameter... moreI am using https://github.com/databricks/spark-csv , I am trying to write a single CSV, but not able to, it is making a folder.
Need a Scala function which will take parameter like path and file name and write that CSV file.
I already have a cluster of 3 machines (ubuntu1,ubuntu2,ubuntu3 by VM virtualbox) running Hadoop 1.0.0. I installed spark on each of these machines. ub1 is my master node and the... moreI already have a cluster of 3 machines (ubuntu1,ubuntu2,ubuntu3 by VM virtualbox) running Hadoop 1.0.0. I installed spark on each of these machines. ub1 is my master node and the other nodes are working as slave. My question is what exactly a spark driver is? and should we set a IP and port to spark driver by spark.driver.host and where it will be executed and located? (master or slave)
Hi In the University in the data science area we learned that if we wanted to work with small data we should use pandas and if we work with Big Data we schould use spark, in the... moreHi In the University in the data science area we learned that if we wanted to work with small data we should use pandas and if we work with Big Data we schould use spark, in the case of Python programmers PySpark.
Recently I saw in a Hackaton in the cloud (azure Synapse, that work inside in Spark) importing pandas in the notebook ( I suppose the code is good cause was made from Microsoft people)
import pandas
from azureml.core import Dataset
training_pd = training_data.toPandas().to_csv('training_pd.csv', index=False)
I'm trying to implement a Lambda Architecture using the following tools: Apache Kafka to receive all the datapoints, Spark for batch processing (Big Data), Spark Streaming for... moreI'm trying to implement a Lambda Architecture using the following tools: Apache Kafka to receive all the datapoints, Spark for batch processing (Big Data), Spark Streaming for real time (Fast Data) and Cassandra to store the results.
Also, all the datapoints I receive are related to a user session, and therefore, for the batch processing I'm only interested to process the datapoints once the session finishes. So, since I'm using Kafka, the only way to solve this (assuming that all the datapoints are stored in the same topic) is for the batch to fetch all the messages in the topic, and then ignore those that correspond to sessions that have not yet finished.
So, what I'd like to ask is:
Is this a good approach to implement the Lambda Architecture? Or should use Haddop and Storm instead? (I can't find information about people using Kafka and Apache Spark for batch processing, Map Reduce)
Is there a better approach to solve the user sessions problem?
True ... it has been discussed quite a lot.
However there is a lot of ambiguity and some of the answers provided ... including duplicating jar references in the... moreTrue ... it has been discussed quite a lot.
However there is a lot of ambiguity and some of the answers provided ... including duplicating jar references in the jars/executor/driver configuration or options.
The ambiguous and/or omitted details
Following ambiguity, unclear, and/or omitted details should be clarified for each option:
How ClassPath is affected
Driver
Executor (for tasks running)
Both
not at all
Separation character: comma, colon, semicolon
If provided files are automatically distributed
for the tasks (to each executor)
for the remote Driver (if ran in cluster mode)
type of URI accepted: local file, hdfs, http, etc
If copied into a common location, where that location is (hdfs, local?)
The options to which it affects :
--jarsSparkContext.addJar(...) methodSparkContext.addFile(...) method--conf spark.driver.extraClassPath=... or --driver-class-path ...--conf spark.driver.extraLibraryPath=..., or --driver-library-path ...--conf spark.executor.extraClassPath=...--conf... less
I'm trying to understand the relationship of the number of cores and the number of executors when running a Spark job on YARN.
The test environment is as follows:
Number of data... moreI'm trying to understand the relationship of the number of cores and the number of executors when running a Spark job on YARN.
The test environment is as follows:
Number of data nodes: 3
Data node machine spec:
CPU: Core i7-4790 (# of cores: 4, # of threads: 8)
RAM: 32GB (8GB x 4)
HDD: 8TB (2TB x 4)
I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.
When I tried to overwrite the dataset... moreI have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.
When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution.
I set the Spark property set("spark.files.overwrite","true") , but there is no luck.
How to overwrite or Predelete the files from spark?
I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")I have tried:
scala> val df =... moreI would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")I have tried:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Error which I got:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail but found
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at... less
I tried to start spark 1.6.0 (spark-1.6.0-bin-hadoop2.4) on Mac OS Yosemite 10.10.5 using
"./bin/spark-shell".... moreI tried to start spark 1.6.0 (spark-1.6.0-bin-hadoop2.4) on Mac OS Yosemite 10.10.5 using
"./bin/spark-shell".
It has the error below. I also tried to install different versions of Spark but all have the same error. This is the second time I'm running Spark. My previous run works fine.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
16/01/04 13:49:40 WARN Utils: Service... less
I'm new to big data processing and I'm reading about tools for stream processing and building data pipelines. I found Apache Spark and Spring Cloud Data Flow. I want to know the... moreI'm new to big data processing and I'm reading about tools for stream processing and building data pipelines. I found Apache Spark and Spring Cloud Data Flow. I want to know the main differences and the pros and cons of them. Could anybody help me?
Assume df1 and df2 are two DataFrames in Apache Spark, computed using two different mechanisms, e.g., Spark SQL vs. the Scala/Java/Python API.Is there an idiomatic way to... moreAssume df1 and df2 are two DataFrames in Apache Spark, computed using two different mechanisms, e.g., Spark SQL vs. the Scala/Java/Python API.Is there an idiomatic way to determine whether the two data frames are equivalent (equal, isomorphic), where equivalence is determined by the data (column names and column values for each row) being identical save for the ordering of rows & columns?The motivation for the question is that there are often many ways to compute some big data result, each with its own trade-offs. As one explores these trade-offs, it is important to maintain correctness and hence the need to check for the equivalence/equality on a meaningful test data set. less
I am trying to get live JSON data from RabbitMQ to Apache Spark using Java and do some realtime analytics out of it.
I am able to get the data and also do some basic SQL queries... moreI am trying to get live JSON data from RabbitMQ to Apache Spark using Java and do some realtime analytics out of it.
I am able to get the data and also do some basic SQL queries on it, but I am not able to figure out the grouping part.
Below is the JSON I have
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017... less
I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use... moreI've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.
I'd like to stop various messages that are coming on spark shell.I tried to edit the log4j.properties file in order to stop these message.Here are the contents of... moreI'd like to stop various messages that are coming on spark shell.I tried to edit the log4j.properties file in order to stop these message.Here are the contents of log4j.properties
# Define the root logger with appender file
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
But messages are still getting displayed on the console.
Here are some example messages
15/01/05 15:11:45 INFO SparkEnv: Registering BlockManagerMaster
15/01/05 15:11:45 INFO DiskBlockManager: Created local... less
I did come across a mini tutorial for data preprocessing using spark here: http://ampcamp.berkeley.edu/big-data-mini-course/featurization.html
However, this discusses only about... moreI did come across a mini tutorial for data preprocessing using spark here: http://ampcamp.berkeley.edu/big-data-mini-course/featurization.html
However, this discusses only about textfile parsing. Is there a way to parse xml files from spark system?
What are the differences between Apache Spark SQLContext and HiveContext ?
Some sources say that since the HiveContext is a superset of SQLContext developers should always use... moreWhat are the differences between Apache Spark SQLContext and HiveContext ?
Some sources say that since the HiveContext is a superset of SQLContext developers should always use HiveContext which has more features than SQLContext. But the current APIs of each contexts are mostly same.
What are the scenarios which SQLContext/HiveContext is more useful ?.
Is HiveContext more useful only when working with Hive ?.
Or does the SQLContext is all that needs in implementing a Big Data app using Apache Spark ?
I am using spark-csv to load data into a DataFrame. I want to do a simple query and display the content:
val df =... moreI am using spark-csv to load data into a DataFrame. I want to do a simple query and display the content:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("my.csv")
df.registerTempTable("tasks")
results = sqlContext.sql("select col from tasks");
results.show()
I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide... moreI installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully.However, I cannot for the life of me figure out how to stop all of the verbose INFO logging after each command.I have tried nearly every possible scenario in the below code (commenting out, setting to OFF) within my log4j.properties file in the conf folder in where I launch the application from as well as on each node and nothing is doing anything. I still get the logging INFO statements printing after executing each statement.I am very confused with how this is supposed to work.
#Set everything to be logged to the console log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout... less
I am confused as to where Talend and Apache spark fit in the big data ecosystem as both Apache Spark and Talend can be used for ETL.
Could someone please explain this with an example?
Quoting the Spark DataFrames, Datasets and SQL manual:
A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are less important due to... moreQuoting the Spark DataFrames, Datasets and SQL manual:
A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL.
Being new to Spark, I'm a bit baffled by this for two reasons:
Spark SQL is designed to process Big Data, and at least in my use case the data size far exceeds the size of available memory. Assuming this is not uncommon, what is meant by "Spark SQL’s in-memory computational model"? Is Spark SQL recommended only for cases where the data fits in memory?
Even assuming the data fits in memory, a full scan over a very large dataset can take a long time. I read this argument against indexing in in-memory database, but I was not convinced. The example there discusses a scan of a 10,000,000 records table, but that's not really big data. Scanning a table with billions of records can cause simple queries of the "SELECT x WHERE y=z"... less
Hi :) I have this code in spark /scala that partitions big data ( more than 50GB) by category into csv files.
df.write
.mode(SaveMode.Overwrite)
... moreHi :) I have this code in spark /scala that partitions big data ( more than 50GB) by category into csv files.
df.write
.mode(SaveMode.Overwrite)
.partitionBy("CATEGORY_ID")
.format("csv")
.option("header", "true")
.option("sep", "|")
.option("quoteAll", true)
.csv("output/inventory_backup")
The dataframe df is the result of aggregations on imported data from a csv file :
df.groupBy("PRODUCT_ID","LOC_ID","DAY_ID")
.agg(
functions.sum("ASSORTED_STOCK_UNIT").as("ASSORTED_STOCK_UNIT_sum"),
functions.sum("SOLID_STOCK_UNIT").as("SOLID_STOCK_UNIT_sum")
)
I would like to tune the performance of this program. Through Spark UI , I was able to see that the bottleneck of performance occurs at the stage that exports the data into csv files. enter image description here
More details = I'm using a 16cores/120GB RAM instance
Do you guys have any ideas on how to tune the performance ? (It currently takes more than 17min). Any help will be much appreciated. Thank you less
I need to implement a big data storage + processing system.
The data increases in a daily basis ( about max 50 million rows / day) , data complies of a very simple JSON document... moreI need to implement a big data storage + processing system.
The data increases in a daily basis ( about max 50 million rows / day) , data complies of a very simple JSON document of about 10 fields ( date,numbers, text, ids).
Data could then be queried online ( if possible) making arbitrary groupings on some of the fields of the document ( date range queries, ids ,etc ) .
I'm thinking on using a MongoDB cluster for storing all this data and build indices for the fields I need to query from, then process the data in an apache Spark cluster ( mostly simple aggregations+sorting). Maybe use Spark-jobserver to build a rest-api around it.
I have concerns about mongoDB scaling possibilities ( i.e storing 10b+ rows ) and throughput ( quickly sending 1b+ worth of rows to spark for processing) or ability to maintain indices in such a large database.
In contrast, I consider using cassandra or hbase, which I believe are more suitable for storing large datasets, but offer less performance in querying which I'd... less
According to Learning SparkKeep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that... moreAccording to Learning SparkKeep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement? less