QBoard » Big Data » Big Data - Spark » Spark: Best practice for retrieving big data from RDD to local machine

Spark: Best practice for retrieving big data from RDD to local machine

  • I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

    UDP: commited toLocalIterator method

     
      October 27, 2021 2:12 PM IST
    0
  • Wildfire answer seems semantically correct, but I'm sure you should be able to be vastly more efficient by using the API of Spark. If you want to process each partition in turn, I don't see why you can't using map/filter/reduce/reduceByKey/mapPartitions operations. The only time you'd want to have everything in one place in one array is when your going to perform a non-monoidal operation - but that doesn't seem to be what you want. You should be able to do something like:

    rdd.mapPartitions(recordsIterator => your code that processes a single chunk)
    ​

    Or this

    rdd.foreachPartition(partition => {
      partition.toArray
      // Your code
    })
      October 29, 2021 3:12 PM IST
    0
  • pyspark dataframe solution using RDD.toLocalIterator():

    separator  = '|'
    df_results = hiveCtx.sql(sql)
    columns    = df_results.columns
    print separator.join(columns)
    
    # Use toLocalIterator() rather than collect(), as this avoids pulling all of the
    # data to the driver at one time.  Rather, "the iterator will consume as much memory
    # as the largest partition in this RDD."
    MAX_BUFFERED_ROW_COUNT = 10000
    row_count              = 0
    output                 = cStringIO.StringIO()
    for record in df_results.rdd.toLocalIterator():
        d = record.asDict()
        output.write(separator.join([str(d[c]) for c in columns]) + '\n')
        row_count += 1
        if row_count % MAX_BUFFERED_ROW_COUNT== 0:
            print output.getvalue().rstrip()
            # it is faster to create a new StringIO rather than clear the existing one
            # http://stackoverflow.com/questions/4330812/how-do-i-clear-a-stringio-object
            output = cStringIO.StringIO()
    if row_count % MAX_BUFFERED_ROW_COUNT:
        print output.getvalue().rstrip()
      November 9, 2021 2:29 PM IST
    0
  • Here is the same approach as suggested by @Wildlife but written in pyspark.

    The nice thing about this approach - it lets user access records in RDD in order. I'm using this code to feed data from RDD into STDIN of the machine learning tool's process.

    rdd = sc.parallelize(range(100), 10)
    def make_part_filter(index):
        def part_filter(split_index, iterator):
            if split_index == index:
                for el in iterator:
                    yield el
        return part_filter
    
    for part_id in range(rdd.getNumPartitions()):
        part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
        data_from_part_rdd = part_rdd.collect()
        print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

     

    Produces output:

    partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
    partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
    partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
    partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
    partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
      October 28, 2021 4:32 PM IST
    0