• 7
name

A PHP Error was encountered

Severity: Notice

Message: Undefined index: userid

Filename: views/question.php

Line Number: 191

Backtrace:

File: /home/prodcxja/public_html/questions/application/views/question.php
Line: 191
Function: _error_handler

File: /home/prodcxja/public_html/questions/application/controllers/Questions.php
Line: 433
Function: view

File: /home/prodcxja/public_html/questions/index.php
Line: 315
Function: require_once

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because that method is so poorly documented in the Apache Spark with Python documentation -- and is why I wrote this Q&A -- until recently I had been using the above code sequence. But again, it's less efficient, so avoid doing it that way unless necessary.

Here's how to do the same using the rdd.aggregateByKey() method (recommended) ...

By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

Where the following is true about the meaning of each a and b pair above (so you can visualize what's happening):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Finally, calculate the average for each KEY, and collect results.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

I hope this question and answer with aggregateByKey() will help.

  • 50
Reply Report
    • "key1", (1, 1) "key1", (2, 1) => "key1", (3, 2) Based on the same explaination of a, b: .aggregateByKey(aTuple, lambda a, b: (a[0] + b[0], a[1] + 1), lambda a, b: (a[0] + b[0], a[1] + b[1])) This is what worked for me
      • 2
    • This is really a great answer. I will note, however, that due to PEP 3113 this is only python 2.x compatible, as tuple unpacking in lambda expressions is no longer supported in python 3.x
      • 2
    • @Tgsmith61591 Thank you. I added the intermediate "aTuple" variable to address this. (Sigh, I couldn't think of a better identifier name, LoL). Nice catch on PEP 3113!

To my mind a more readable equivalent to an aggregateByKey with two lambdas is:

rdd1 = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

In this way the whole average calculation would be:

avg_by_key = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()
  • 7
Reply Report

Just adding a note about an intuitive and shorter (but a bad) solution to this problem. The book Sam's Teach Yourself Apache Spark in 24 Hours has explained this problem well in the last chapter.

Using groupByKey one can solve the problem easily like this:

rdd = sc.parallelize([
        (u'2013-10-09', 10),
        (u'2013-10-09', 10),
        (u'2013-10-09', 13),
        (u'2013-10-10', 40),
        (u'2013-10-10', 45),
        (u'2013-10-10', 50)
    ])

rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()

Output:

[('2013-10-10', 45.0), ('2013-10-09', 11.0)]

This is intuitive and appealing, but don't use it! groupByKey does not do any combining on the mappers and brings all the individual key value pairs to the reducer.

Avoid groupByKey as much as possible. Go with the reduceByKey solution like @pat's.

  • 1
Reply Report

A slight enhancement to the answer of prismalytics.io.

There could be a case where computing the sum might overflow number because we are summing huge number of values. We could instead keep the average values and keep computing the average from the average and counts of two parts getting reduced.

If you have two parts having average and counts as (a1, c1) and (a2, c2), the overall average is: total/counts = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)

If we mark R = c2/c1, It can be re-written further as a1/(1+R) + a2*R/(1+R) If we further mark Ri as 1/(1+R), we can write it as a1*Ri + a2*R*Ri

myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
    R = 1.0*B[1]/A[1]
    Ri = 1.0/(1+R);
    av = A[0]*Ri + B[0]*R*Ri
    return (av, B[1] + A[1]);

(av, counts) = sumcount_rdd.reduce(avg)
print(av)

This approach can be converted for key-value by simply using mapValues instead of map and reduceByKey instead of reduce.

This is from: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

  • 0
Reply Report

Trending Tags