Friday, September 8, 2017

Spark Notes

URL ::
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter
http://tparhegapd034.nielsen.com:4040
http://tparhegapd017.nielsen.com:8088/proxy/application_1440579785423_4732/jobs/
hdfs://nameservice1/user/spark/applicationHistory/
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter


http://apachesparkbook.blogspot.in/2015/12/sortbykey-example.html
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues
http://spark.apache.org/docs/latest/programming-guide.html

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py

http://www.analyticsvidhya.com/blog/2016/01/complete-tutorial-learn-data-science-python-scratch-2/
http://www.analyticsvidhya.com/blog/2015/04/pycon-montreal-2015-data-science-workshops/
https://www.youtube.com/watch?v=9xYfNznjClE

https://www.youtube.com/playlist?list=PLf0swTFhTI8rJvGpOp-LujOcpk-Rlz-yE
https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

http://nbviewer.ipython.org/gist/fperez/6384491/00-Setup-IPython-PySpark.ipynb
https://github.com/ipython/ipython/wiki/A-gallery-of-interesting-IPython-Notebooks


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

short example::
https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#join
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues

http://blog.brakmic.com/data-science-for-losers-part-5-spark-dataframes/

http://www.supergloo.com/fieldnotes/apache-spark-examples-of-transformations/




CONCEPTS ::
    IN MEMORY, PARALLEL DISTRIBUTION ENGINE. 

        1. EASY TO USE 
        2. MANY USE CASE OTHER THAN BATCH JOBS.
   
    Distribute the data when it is stored
        Data is distributed in memory
    Run computaion where the data is
   
    SPARK APPLICATION REQUIRES SPARK CONTEXT - MAIN ENTRY TO SPARK API
   
    DRIVER PROGRAMS ACCESS SPARK THROUGH A SPARKCONTEXT OBJECT, WHICH REPRESENTS A CONNECTION TO A COMPUTING CLUSTER. A SPARK CONTEXT OBJECT (SC) IS THE MAIN ENTRY POINT FOR SPARK FUNCTIONALITY.
   
    DRIVER PROGRAM - USER'S MAIN FUNCTION - EXECUTES PARALLEL OPERATION
        RDD - COLLECTION OF ELEMENTS PARTITIONED ACROSS NODES OF THE CLUSTER
        SHARED VARIABLE - CAN BE USED IN PARALLEL.
            FUNCTION IN PARALLEL, AS A SET OF TASKS ON NODES, VARIABLE SHIPS FROM TASK TO TASKS, BETWEEN TASKS AND DRIVER PROGRAM.
            BROADCAST VARIABLE - CACHE A VALUE IN MEMORY OF ALL NODES
            ACCUMULATERS - VARIABLE THAT ARE ONLY 'ADDED' TO IT, COUNTS AND SUMS.
        http://10.7.192.215:4040   
       
    RDD - 

       RESILIENT(RECREATED WHEN LOST/ PRODUCT OF TRACKING LINEAGE)                      DISTRIBUTED(IN MEMORY)
       DATASET(FROM FILE,..)
   
    TASK - Parition one task per partition
    Driver - Task - executor
    RDD - FUNDATAMENTAL UNIT OF DATA.
    FUNCTION - FUNDATAMENTAL UNIT OF WORK/PROGRAMMING.
   
    RDD - CAN HAVE
        NAMED FUNCTIONS
        LAMBDA FUNCTIONS
        pyspark
   
    sc - sparkContext - tells SPARK how to connect to cluster
    (sparkContext methods) with dot notation
   
    Pair(RDD) - KV PAIR ::
   
        MAP
            users = sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1]))
            sc.textFile(file).map(lambda line: line.split()).map(lambda fields: (fields[0],(fields[1],fields[2])))
            ONE TO ONE
        FLATMAP/FLATMAPVALUES
            sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
            ONE TO MANY
           
            testJsonDataRDD.flatMap(lambda s: s.array).collect()
                [[1, 2, 3, 4], [2, 4, 6, 8], [3, 6, 9, 11]] -> [1, 2, 3, 4, 2, 4, 6, 8, 3, 6, 9, 11]
               
            singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
            singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
           
                [('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
                ['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
               
                from test_helper import Test
                Test.assertEquals(singularAndPluralWordsRDDMap.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for singularAndPluralWordsRDDMap (1d)')
                 
        KEYBY
            sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
            zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)

               
        REDUCEBYKEY
            counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
        COUNTBYKEY
        GROUPBYKEY VS FLATMAPVALUES
            ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
                                                .map(lambda (usr,ip): (usr,ip)).groupByKey()
                                               
            wordsGrouped = wordPairs.groupByKey() --Itertable RDD
            for key, value in wordsGrouped.collect():
                print '{0}: {1}'.format(key, list(value))
               
                rat: [1, 1]
                elephant: [1]
                cat: [1, 1]
               
            wordCountsGrouped = wordsGrouped.map(lambda (k, v): (k, sum(v)))
           
                [('rat', 2), ('elephant', 1), ('cat', 2)]
               
        SORTBYKEY
            sort_user = hit_user.sortByKey(0)
        JOIN
            movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
            final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
        KEYS
        VALUES
        LOOKUP
        LEFTOUTERJOIN,RIGHTOUTERJOIN,
        MAPVALUES
            zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
        FLATMAPVALUES
       
    DOUBLE RDD - NUMERIC ::


   
    # CREATE RDD ::
        TWO METHODS:
        # 1
        my_data = sc.textFile("file:file:/home/training/training_materials/sparkdev/data/frostroad.txt")
       
        # 2
        logfile = "file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
        logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
           
        logs = sc.textFile(logfile)
        logss = sc.textFile(logfiles)
       
    # COUNT NUMBER OF LINES IN THE RDD ::
        logs.count() - 3566
        logss.count() - 1080926
       
    # DISPLAY ALL THE LINES IN THE RDD ::
   
        logs.first()
            -- 233.19.62.103 - 16261 [15/Sep/2013:23:55:57 +0100] "GET /titanic_1100.jpg HTTP/1.0" 200 16713 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F10L"
        jpglogs.take(10)
       
    # MAP ::
        MAP - EACH RECORD TO ONE/MORE NEW RECORDS.
        jpglogs.map(lambda x: len(x)).take(4) - New RDD - MAP(function) applies function in EACH record.
       
        # split() ::
       
            jpglogs.map(lambda x: x.split()).take(5)
           
            ips = jpglogs.map(lambda x: x.split()[0]) - take only first element -- also be x.split(":")
           
            ip_id = logs.map(lambda x: x.split()).map(lambda x: (x[0],x[2])).take(5) - take first and third element from RDD
           
            ip_ids = logs.map(lambda x: x.split()).map(lambda x: (x[0] + "/" + x[2])).take(5) -- u'116.180.70.237/128'
            for ip in ips.take(5): print ip --print IP
           
            users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
            users = logs.map(lambda s: (s.split()[2],1))
       
        logs.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
       
        mean_x = textdata.map(lambda lines: float(lines.split(',')[1])).mean()
       
        hit_user = users_info.map(lambda x: (x[1],x[0])) - after split
       
        # for loop and add values into RDD silent
        for i in range(200):
            mydatas = mydatas.map(lambda int: int + 1)

       
    # FILTER ::
        filter(condition) - based on boolean include/exclude
       
        sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()
       
        shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x != '')
        or
        shakeWordsRDD = shakeWordsRDD.filter(lambda x: x == '')
    # FILTER & MAP ::
   
        jpglogs = logs.filter(lambda x: ".jpg" in x)
                 
        sc.textFile("purplecow.txt").map(lambda line: line.upper()).filter(lambda line: line.startswith('I')).count()
           
       
        htmlog = logs.filter(lambda x: '.htm' in x)
        htm_ip = htmlog.map(lambda x: x.split())
        htm_five = htm_ip.take(5) -- LIST OF LIST
        htm_five[0] - FIRST LIST
        len(htm_five) = 5
        htm_five[0][6] - address
       
        # RDD and FOR loop ::
       
        fiverec = logrdd.filter(lambda x: '.jpg' in x).map(lambda x: x.split()).take(5)
        for item in fiverec:
            print "%s/%s" % (item[0],item[2])
           
        htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
        for x in htmllogs.take(5):
            print x[0]+"/"+x[1]

            233.19.62.103/16261
            136.98.55.4/22232
            219.248.10.104/66652
            47.148.67.112/40002
            29.110.195.184/39859
       
        # RDD and Explicit Functions ::
        def changec(x):
            spt = x.split()
            return spt[6].lower()
        logs.filter(lambda x: '.htm' in x).map(lambda x: x.split()).map(changec).take(2)
            [u'/kbdoc-00031.html', u'/kbdoc-00273.html']   
           
        import re
        def getRequestDoc(s):
            return re.search(r'KBDOC-[0-9]*',s).group()
        kbreqs = sc.textFile(logfile).filter(lambda line: 'KBDOC-' in line).map(lambda line: (getRequestDoc(line),line.split(' ')[2])).distinct()
        kblist = sc.textFile(kblistfile).map(lambda line: line.split(':')).map(lambda fields: (fields[0],fields[1]))
        titlereqs = kbreqs.join(kblist)
        titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey()

    # REDUCEBYKEY::
        users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
            FLATMAP/FLATMAPVALUES
            sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
           
        counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
    KEYBY::
        sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
        zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)


    COUNTBYKEY ::
    GROUPBYKEY VS FLATMAPVALUES ::
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
                                                .map(lambda (usr,ip): (usr,ip)).groupByKey()
    SORTBYKEY ::
        sort_user = hit_user.sortByKey(0)
    JOIN ::
        movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
        final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
    KEYS ::
    VALUES ::
    LOOKUP ::
    LEFTOUTERJOIN ::
    RIGHTOUTERJOIN ::
    MAPVALUES ::
        zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
    FLATMAPVALUES ::
   

        # PARTITIONS::
            RDDs ARE STORED IN THE MEMORY OF SPARK EXECUTOR JVMS
                - DATA IS PARTITIONED ACROSS WORKER NODES
                    - EACH PARTITION IN A SEPARATE EXECUTOR
                    -    ACTION  - EXECUTOR - TASKS - LOADS FROM EACH FILE BLOCK INTO SINGLE PARTITION RDD_1_0, RDD_1_1, RDD_1_2
                - DATA IS DISTRIBUTED ACROSS ALL NODES
                - spark.default.parallelism 10
                - words.reduceByKey(lambda v1, v2: v1 + v2, 15)
            RDD OPERATION EXECUTED IN PARALLEL IN EACH PARTITION
                PRESERVE PARTITIONING
                    map
                        avglens = sc.textFile(file).flatMap(lambda line: line.split())
                                    .map(lambda word: (word[0],len(word))).groupByKey().map(lambda (k, values): (k, sum(values)/len(values)))
                        > avglens.count()
                    flatMap
                    filter
                REPARTITION
                    reduce
                    sort
                    group
                   
                OPERATION ON EACH Partition
                foreachPartition
                mapPartitions
                    # Step 3 - Parse each partition as a file into an activation XML record
                    activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
                mapPartitionsWithIndex
                    # Count JPG requests per file, work with small files that fit in single partition
                        def countJpgs(index,partIter):
                        jpgcount = 0
                        for line in partIter:
                        if "jpg" in line: jpgcount += 1
                        yield (index,jpgcount)
                    > jpgcounts = sc.textFile("weblogs/*").mapPartitionsWithIndex(countJpgs)
                    $
                    (0,237)
                    (1,132)
                    (2,188)
                    (3,193)
                   
                    Second Example:
                     def output_index_and_count(index, iter):
                          count = 0
                          for item in iter:
                            count += 1
                          yield (index, count)
                    sqlContext.sql("select * from my_large_table join my_small_temp_table on my_large_table.bit = my_small_temp_table.bit")\
                        .rdd.mapPartitionsWithIndex(output_index_and_count).filter(lambda p: p[1] > 0).collect()
                   
            DRIVER(SC) - MASTER - WORKER - EXECUTOR - JOB - TASKS(STAGES) IN PARALLEL
            EXECUTOR TASKS LOADS DATA FROM BLOCK TO PARTITIONS
            JOB - STAGE - TASKS
           
            MASTER:
                http://localhost:18080/
            WORKER:
                http://localhost:18081/
            STAGE:
                http://localhost:4040/

        # CACHING ::
            RDD SAVES DATA IN MEMORY
           
                > mydata = sc.textFile("purplecow.txt")
                > myrdd = mydata.map(lambda s:
                s.upper())
                > myrdd.cache()
                > myrdd2 = myrdd.filter(lambda \
                s:s.startswith('I'))
                > myrdd2.count()
                3
                > myrdd2.count()
                3
               
                # Caching RDDs Exercise Optional Step 9
                    from pyspark import StorageLevel
                    models.unpersist()
                    models.persist(StorageLevel.DISK_ONLY)
                    models.count()
                   
                    models.cache()
                   
            CACHED PARTITIONS ARE STORED IN MEMORY IN EXECUTOR JVMs
           
            METHOD
                cache
                    STORES DATA IN MEMORY ONLY
                persist
                    STORAGE LEVELS
                        - MEMORY_ONLY*D
                        - MEMORY_AND_DISK - SPILLING
                        - DISK_ONLY
                        - MEMORY_ONLY_SER
                        - MEMORY_AND_DISK_SER
                    rdd.unpersist() - stop and removal
           
            CHECK POINT - SAVES DATA TO HDFS
                sc.setCheckpointDir(directory)
                myrdd = …initial value….
                while x in xrange(100):
                    myrdd = myrdd.transform(…)
                    if x % 3 == 0:
                        myrdd.checkpoint()
                        myrdd.count()
                myrdd.saveAsTextFile()
               
        BROADCAST VARIABLES ::

            BROADCAST VARIABLE - SET BY DRIVER, RETRIVED BY WORKERS
                               - READ ONLY
            SINGLE:
                KBLIST_TXT()
                WEBLOG()
               
                # read file
                weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
                kblistfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/kblist.txt")
               
                # filter KBDOC line and get userid, kb request
               
                import re
                def getkbvalue(s):
                    return re.search(r'KBDOC-[0-9]*',s).group()
               
                kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
               
                # get kblist in key value pair
                kblist = kblistfile.map(lambda line: line.split(":")).map(lambda field: (field[0],field[1]))

                # get userid and titles in kv pair
                titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
               
                # get userid and title in key values pair
                titlereqjoin = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
               
                # get RDD values as list
                titlegroup = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().mapValues(list).collect()
               
                # get RDD values using reduceby key
                titlereduce = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).reduceByKey(lambda t1,t2: t1 + ":" + t2)
               
                # sorting based on USERID
                titlesort = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).sortByKey()
               
                # list of titles using  map function
                titlegroup2 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): (userid,list(titles)))
               
                # REDUCELIST3 EXTERNAL FUNCTION IN MAP
                titlegroup3 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): reducelist3(userid,titles))
                def reducelist3(userid, titles):
                    list_profile = []
                    list_profile.append(userid)
                    list_titles = ','.join(titles)
                    list_profile.append(list_titles)
                    return list_profile


               
                #
                (u'160',
                  [u'Titanic 4000 - reboots',
                   u'iFruit 2 - Change the phone ringtone and notification sound',
                   u'Titanic 2000 - Transfer Contacts']),

  
                # function reduclist
                def reducelist2(userid, titles):
                        return ','.join(titles)
                       
                # function reduclist
                def reducelist3(userid, titles):
                       
                        return ','.join(titles)

               
                # for loop to access list RDD

                for (userid,titles) in titlegroup:
                    print userid
                    for title in titles:
                        print value

                for (userid,titles) in titlereqgroup.take(10):
                    print 'user id: ',userid
                    for title in titles: print '\t',title
                   
                for (userid,titles) in titlereqgroup.collect():
                    print "UserID :", userid
                    count = 0
                    for title in titles:
                        count += 1
                        print "\t Title (%r): %s" % (count,title)
                       
                # output
                    UserID : 40
                     Title (0): Ronin Novelty Note 3 - rebooting
                     Title (1): Ronin Novelty Note 1 - Battery Life
                     Title (2): Titanic 2000 - Back up files

                titlereduce.saveAsTextFile("file:/home/training/training_materials/sparkdev/data/titles/")
       

                BROADCAST AS LIST ::
               
                kb
                # Read in list of target models from a local file
                targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
                targetlist = list(map(lambda l: l.strip(),open(targetfile)))

                # broadcast the target list to all workers
                targetlistbc = sc.broadcast(targetlist)

                # filter out requests from devices not in the target list
                # web server log file(s)
                logfile="hdfs://localhost/user/training/weblogs/*"
                targetreqs = sc.textFile(logfile)\
                    .filter(lambda line:  any(model in line for model in targetlistbc.value) )

                targetreqs.take(5)
               
                sp_logs = logfile.filter(lambda line: any(model in line for model in targetlist))

                BROADCAST AS DICTIONARY:: when lookup file has KV pairs
           
                # read LOCAL file as HDFS
                weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
               
                # filter KBDOC line and get userid, kb request
               
                import re
                def getkbvalue(s):
                    return re.search(r'KBDOC-[0-9]*',s).group()
                kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
                    --1306 COUNT
                   
                    #
                    [(u'KBDOC-00172', u'29161'),
                     (u'KBDOC-00081', u'47010'),
                     (u'KBDOC-00164', u'165'),
                     (u'KBDOC-00059', u'42075'),
                     (u'KBDOC-00281', u'55510')]
                   
                # Read local file as DICTIONARY
                kblisttxt = "/home/training/training_materials/sparkdev/data/kblist.txt"
               
                kb_dict = dict(map(lambda line: line.split(":"), open(kblisttxt)))

                for (key,value) in kb_dict.iteritems():
                    print key
                    print value
                   
                    #
                    KBDOC-00066
                    Sorrento F21L - overheating

                    KBDOC-00270
                    Titanic 2400 - overheating

                    KBDOC-00060
                    MeeToo 1.0 - overheating
               
                for key in kb_dict:
                    print key,kb_dict[key]

                    #
                    KBDOC-00066 Sorrento F21L - overheating
                    KBDOC-00270 Titanic 2400 - overheating
                    KBDOC-00060 MeeToo 1.0 - overheating
               
                # SEND THIS DICTIONARY AS BROADCAST VARAIBLE KB_DICT
                kb_dict_bc = sc.broadcast(kb_dict)


                kblookup = kbreqs.map(lambda (kbid,uid): (uid,kb_dict_bc.value[kbid]))
               
                    [(u'29161', 'Sorrento F41L - reboots\n'),
                     (u'47010', 'Titanic 2200 - Back up files\n'),
                     (u'165', 'Titanic 4000 - Transfer Contacts\n'),
                     (u'42075', 'MeeToo 1.0 - Battery Life\n'),
                     (u'55510', 'Ronin S1 - Battery Life\n')]



        # ACCUMULATORS ::
            WORKER NODES CAN ADD VALUE
            ONLY DRIVER CAN ACCESS VALUE
           
            # JPG_COUNT IS ACCUMULATOR THAT SENDS TO ALL NODES
            jpg_count = sc.accumulator(0)

            # EXTERNAL FUNCTION TO ADD VALUE TO ACCUMULATOR
            def func_count(s):
                if '.jpg' in s:
                    jpg_count.add(1)

            # foreach value in RDD
            jpgslogs = weblogfile.foreach(lambda line: func_count(line))
           
            # access accumulator from driver
            print jpg_count.value
           
            # verify with filter
            weblogfile.filter(lambda line: '.jpg' in line).count() -- 237

SQOOP ::

sqoop list-databases --connect jdbc:mysql://localhost --username training --password training
    information_schema
    movielens
    mysql
    test
sqoop list-tables --connect jdbc:mysql://localhost/movielens --username training --password training
    genre
    movie
    moviegenre
    movierating
    occupation
    user
   
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movie
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movierating

In [109]: movierating = sc.textFile("hdfs://localhost/user/training/movierating/part*")
movierating_value = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2])))

     [(u'1193', 5), (u'661', 3), (u'914', 3), (u'3408', 4), (u'2355', 5)]
   
movierating_value_grp = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2]))).groupByKey()

    [(u'593', <pyspark.resultiterable.ResultIterable at 0x2153c90>),
     (u'1200', <pyspark.resultiterable.ResultIterable at 0x2153250>),
     (u'3724', <pyspark.resultiterable.ResultIterable at 0x2153310>),
     (u'1868', <pyspark.resultiterable.ResultIterable at 0x2153950>),
     (u'643', <pyspark.resultiterable.ResultIterable at 0x2153f90>)]

movierating_value_grp.mapValues(lambda ratings: sum(ratings)).take(5)
instead of
    movierating_value_grp.reduceByKey(lambda value1,value2: value1 + value2).take(5)
   
sum_movie_rating = movierating_value_grp.mapValues(lambda ratings: sum(ratings)/float(len(ratings)))

        Out[126]:
        [(u'593', 4.3518231186966645),
         (u'1200', 4.1258241758241763),
         (u'3724', 3.7179487179487181),
         (u'1868', 2.0),
         (u'643', 2.0)]

In [110]: movie = sc.textFile("hdfs://localhost/user/training/movie/part*")
In [111]: movie_value = movie.map(lambda line: line.split("\t")).map(lambda value: (value[0],value[1]))



        Out[127]:
        [(u'1', u'Toy Story'),
         (u'2', u'Jumanji'),
         (u'3', u'Grumpier Old Men'),
         (u'4', u'Waiting to Exhale'),
         (u'5', u'Father of the Bride Part II')]

final_movie = movie_value.join(sum_movie_rating).map(lambda (movie_id,(title,rating)): title + "\t" + str(rating))

    [u"Wayne's World\t3.60089285714",
     u'Aliens\t4.12582417582',
     u'Prince of the City\t3.49122807018',
     u'Coming Home\t3.71794871795',
     u'American Pop\t3.07142857143']
   

final_movie.saveAsTextFile("hdfs://localhost/user/training/averagerating")




       
STREAMING ::
 http://spark.apache.org/docs/latest/streaming-programming-guide.html
STREAMING :: hdfscount.py

    https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
   
    """
     Counts words in new text files created in the given directory
     Usage: hdfs_wordcount.py <directory>
       <directory> is the directory that Spark Streaming will use to find and read new text files.
     To run this on your local machine on directory `localdir`, run this example
        $ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py localdir
     Then create a text file in `localdir` and the words in the file will get counted.
    """
    from __future__ import print_function

    import sys

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
            exit(-1)

        sc = SparkContext(appName="PythonStreamingHDFSWordCount")
        ssc = StreamingContext(sc, 1)

        lines = ssc.textFileStream(sys.argv[1])
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda x: (x, 1))\
                      .reduceByKey(lambda a, b: a+b)
        counts.pprint()

        ssc.start()
        ssc.awaitTermination()
STREAMING :: COUNTS

    // start the shell with this command:
    // spark-shell --master local[2]
    //
    // in a separate terminal
    // nc -lkv 1234

    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.Seconds

    var ssc = new StreamingContext(sc,Seconds(5))
    var mystream = ssc.socketTextStream("localhost",1234)
    var words = mystream.flatMap(line => line.split("\\W"))
    var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

UsingTheSparkShell.pyspark::

    # Step 3 - create an RDD based on a data file
    mydata = sc.textFile("file:/home/training/training_materials/sparkdev/data/frostroad.txt")

    # Step 4 - count the number of lines in the RDD
    mydata.count()

    # Step 5 - display all the lines in the RDD
    mydata.collect()

LogIPs.pyspark::

    # Steps 8, 9 - create an RDD based on a data file
    logfile="file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
    logs = sc.textFile(logfile)

    # count the number of records (lines) in the RDD
    logs.count()

    # Step 10 - Display all lines which are requests for JPG files
    jpglogs=logs.filter(lambda x: ".jpg" in x)
    jpglogs.collect()

    # Step 12 - Display the JPG requests, this time using a single command line
    sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()

    # Step 13 - Create an RDD of the length of each line in the file
    lengths = logs.map(lambda s: len(s))
    # Display the first 5 line lengths
    lengths.take(5)

    # Map the log data to an RDD of arrays of the words on each line
    logwords = logs.map(lambda line: line.split())

    # Step 15 - Map the log data to an RDD of IP addresses for each line
    ips = logs.map(lambda line: line.split()[0])

    # Step 17 - Save the IP addresses to text file(s)
    ips.saveAsTextFile("file:/home/training/iplist")

    # Challenge Exercise 1 - Do the same thing but use the whole data set
    sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*").map(lambda s: s.split()[0]).saveAsTextFile("file:/home/training/iplist-entire")

    # Challenge Exercise 2 - Display "ip-address/user-id" for the first 5 HTML requests
    # in the data set
    htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
    for x in htmllogs.take(5): print x[0]+"/"+x[1]
   
    165.32.101.206/8
    100.219.90.44/102
    182.4.148.56/173
    246.241.6.175/45395
UserRequests.pyspark - working with pair RDD::

    # Step 1 - Create an RDD based on all the weblogs
    logs=sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*")
    # map each request (line) to a pair (userid, 1), then sum the values
    userreqs = logs \
       .map(lambda line: line.split()) \
       .map(lambda words: (words[2],1))  \
       .reduceByKey(lambda count1,count2: count1 + count2)
      
    # Step 2 - Show the records for the 10 users with the highest counts
    userreqs.map(lambda pair: (pair[1],pair[0])).sortByKey(False).take(10)

    # Step 3 - Group IPs by user ID
    userips = logs \
       .map(lambda line: line.split()) \
       .map(lambda words: (words[2],words[0])) \
       .groupByKey()
    # print out the first 10 user ids, and their IP list
    for (userid,ips) in userips.take(10):
       print userid, ":"
       for ip in ips: print "\t",ip

    # Step 4a - Map account data to (userid,[values....])
    accounts = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
       .map(lambda s: s.split(',')) \
       .map(lambda account: (account[0],account[1:]))

    # Step 4b - Join account data with userreqs then merge hit count into valuelist  
    accounthits = accounts.join(userreqs)

    # Step 4c - Display userid, hit count, first name, last name for the first 5 elements
    for (userid,(values,count)) in accounthits.take(5) :
        print  userid, count, values[2],values[3]
      
    # Challenge 1 - key accounts by postal/zip code
    accountsByPCode = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
       .map(lambda s: s.split(','))\
       .keyBy(lambda account: account[8])
   
     # Challenge 2 - map account data to lastname,firstname 
    namesByPCode = accountsByPCode\
       .mapValues(lambda account: account[4] + ',' + account[3]) \
       .groupByKey()

    # Challenge 3 - print the first 5 zip codes and list the names
    for (pcode,names) in namesByPCode.sortByKey().take(5):
       print "---" ,pcode
       for name in names: print name

  
    userid1 4 Cheryl West
    userid2 8 Elizabeth Kerns
    userid3 1 Melissa Roman

    PAIR RDD - EXAMPLES::
   
        # READS TWO FILES
        logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
        logs = sc.textFile(logfiles)
        accfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv")
       
        # GET USERS ALON
        users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
        users = logs.map(lambda s: (s.split()[2],1))
       
        # ADD USERS
        users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
   
        # REVERSE USERS AND HITS
        hit_user = users_info.map(lambda x: (x[1],x[0]))
       
        # SORT
        sort_user = hit_user.sortByKey(0)
       
        # GET USER ID AND IP ADDRESS
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
       
        # GROUP USER WITH MULTIPLE IP ADDRESS
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0])).map(lambda (usr,ip): (usr,ip
)).groupByKey()

        for (ip,user) in ip_user.take(5):
            print ip
            for i in user: print '\t',i
           
        # ACCOUNT USER AND INFORMATION
        acc_info = accfile.map(lambda line: line.split(",")).map(lambda s: (s[0],(s[3],s[4])))
       
        # merge two RDD based on USER ID
        final_info = acc_info.join(users_info).take(25)
        final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
       
        #CHALLENGE 1
        zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)
       
        #CHALLENGE 2
        zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
       
        # CHALLENGE 3
        zip_info_sort = zip_info.sortByKey()
        zip_info_group = zip_info_sort.groupByKey()
       
        for (zip,users) in zip_info_group:
            print "---------",zip
            for user in users:
                print "\t", user
               
       
        --- 85003
        Jenkins,Thad
        Rick,Edward
        Lindsay,Ivy
        …
        --- 85004
        Morris,Eric
        Reiser,Hazel
        Gregg,Alicia
        Preston,Elizabeth

SparkHDFS.pyspark::

    # Steps 14, 15 - Read in a weblog file, then filter and save the JPG requests
    logs=sc.textFile("hdfs://localhost/user/training/weblogs/2014-03-08.log")
    logs.filter(lambda s: ".jpg" in s).saveAsTextFile("hdfs://localhost/user/training/jpgs")

TopModels.pyspark:: working with PARTITION & CACHE

    # Step 1 - Stub code to copy into Spark Shell
    # load XML files containing device activation records.
    # Find the most common device models activated

    import xml.etree.ElementTree as ElementTree

    # Given a partition containing multi-line XML, parse the contents.
    # Return an iterator of activation Elements contained in the partition
    def getactivations(fileiterator):
        s = ''
        for i in fileiterator: s = s + str(i)
        filetree = ElementTree.fromstring(s)
        return filetree.getiterator('activation')

    # Get the model name from a device activation record
    def getmodel(activation):
        return activation.find('model').text

    # Step 2 - Read XML files into an RDD and show number of partitions
    filename="hdfs://localhost/user/training/activations/*.xml"
    # Load activation files
    activations = sc.textFile(filename)
    # Show the partitioning
    print "Activations: ",activations.toDebugString()

    # Step 3 - Parse each partition as a file into an activation XML record
    activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))

    # Step 4 - Map each activation record to a device model name
    models = activationTrees.map(lambda activation: getmodel(activation))

    # Step 5 - Show the partitioning
    print "Models: ",models.toDebugString()

    # Step 6 - Count activations by model
    modelcounts = models.map(lambda model: (model,1)).reduceByKey(lambda v1,v2: v1+v2)

    # Optionally, show the partitioning.
    print "Model Counts: ",modelcounts.toDebugString()

    # Step 7 - Display the top 10 models
    for (count,model) in modelcounts.map(lambda (model,count): (count,model)).top(10):
        print "Model %s (%s)" % (model,count)

    # Caching RDDs Exercise Optional Step 9
    from pyspark import StorageLevel
    models.unpersist()
    models.persist(StorageLevel.DISK_ONLY)
    models.count()
   
    output ::
        iFruit 1 (392)
        Sorrento F00L (224)
        MeeToo 1.0 (12)
IterationTest.pyspark :: Checkpointing RDDs

    # Step 1 - create an RDD of integers
    mydata = sc.parallelize([1,2,3,4,5])

    # Step 2 - loop 200 times
    for i in range(200):
        mydata = mydata.map(lambda myInt: myInt + 1)

    # Step 3 - collect and display the data in the RDD
    for x in mydata.collect(): print x

    # Step 4 - show the final RDD
    mydata.toDebugString()

    # Steps 5,6 - repeat steps 2 - 4 above until you receive and error

    # Note: The steps above demonstrated the error without checkpointing
    # The steps below are a simple program to iteratively create child RDDs
    # from parent RDDs. Use iPython %paste to run.

    # Step 9 - enable checkpointing.
    sc.setCheckpointDir("checkpoints")

    # Step 10 - create an RDD of integers
    mydata = sc.parallelize([1,2,3,4,5])

    # Steps 11, 12 - Iterate to generate a new RDD which descends from prior RDDs
    # resulting in a long lineage
    for i in range(1000):
       mydata = mydata.map(lambda myInt: myInt + 1)
       print "Iteration",i
       # Every 10 iterations, checkpoint the RDD, and materialize it to save the checkpoint file
       # shorten the lineage
       if i % 10 == 0:
           print "Checkpoint"
           mydata.checkpoint()
           mydata.count()

    # Step 13 - collect the contents of the RDD to an array and display
    for x in mydata.collect(): print x
       
    # Step 14 -  Display the lineage (formatted)
    for rddstring in mydata.toDebugString().split('\n'): print rddstring.strip()


COUNTJPGs.PY :: APPLICATION
    import sys
    from pyspark import SparkContext
    from pyspark import SparkConf

    if __name__ == "__main__":
        if len(sys.argv) < 2:
            print >> sys.stderr, "Usage: CountJPGs <file>"
            exit(-1)
           
        sc = SparkContext()
        # Challenge: Configure app name and UI port programatically
        # sconf = SparkConf().setAppName("My Spark App").set("spark.ui.port","4444")
        # sc = SparkContext(conf=sconf)
        logfile = sys.argv[1]
        count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()
        print "Number of JPG requests: ", count
   
    COMMAND::
        spark-submit CountJPGs.py weblogs/*
        spark-submit --master spark://localhost:7077 -name 'Count JPGs' CountJPGs.py weblogs/*
                     --properties-file myspark.conf
       
        myspark.conf
            spark.app.name My Spark App
            spark.ui.port 4141
            spark.master spark://localhost:7077
localfile.py ::

# _*_ coding: utf-8 _*_
from pyspark import SparkContext

textfile = "file:/ap_data/etl/bin/spark/training_materials/sparkdev/data/purplecow.txt"
sc = SparkContext("local", "Test_app")
textdata = sc.textFile(textfile)
#textdata = sc.textFile(textfile).cache()

#################################################
# number of line with a '1' in it.
num_1 = textdata.map(lambda line:line.upper()).filter(lambda line: line.startswith('I')).count()
print "**********************************************************************************************************", num_1

spark-submit localfile.py

countjpgs_2.py::
import sys
from pyspark import SparkContext
from pyspark import SparkConf

if __name__ == "__main__":
    print "Counting starting..."
    if len(sys.argv) < 2:
        print >> sys.stderr, "usage: CountJPGs <file>"
        exit(-1)
    else:
        print "Counting Started"
    sc = SparkContext()
    logfile = sys.argv[1]
    log = sc.textFile(logfile)
    logfilter = log.filter(lambda line: '.jpg' in line)
    count = logfilter.count()
    print "Number of JPG request", count
TargetModels ::

# Read in list of target models from a local file
targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
targetlist = list(map(lambda l: l.strip(),open(targetfile)))

# broadcast the target list to all workers
targetlistbc = sc.broadcast(targetlist)

# filter out requests from devices not in the target list
# web server log file(s)
logfile="hdfs://localhost/user/training/weblogs/*"
targetreqs = sc.textFile(logfile)\
    .filter(lambda line:  any(model in line for model in targetlistbc.value) )

targetreqs.take(5)

RequestAccumulator ::

jpgcount = sc.accumulator(0)
htmlcount = sc.accumulator(0)
csscount = sc.accumulator(0)

def countFileType(s):
    if '.jpg' in s: jpgcount.add(1)
    elif '.html' in s: htmlcount.add(1)
    elif '.css' in s: csscount.add(1)

filename="hdfs://localhost/user/training/weblogs/*"

logs = sc.textFile(filename)
logs.foreach(lambda line: countFileType(line))

print  'Request Totals:'
print '.css requests: ', csscount.value
print '.html requests: ', htmlcount.value
print '.jpg requests: ', jpgcount.value


PROJECT ::        
line = sc.textFile("hdfs://nameservice1/user/hive/warehouse/ap_cptsph2_pl_dev.db/sff_prod_cptsph2")
line.map(lambda s: s.split(",")).take(5)
prodline = line.filter(lambda l: l.startswith('2'))

/analytics_platform_dev/etl/Colgate/cp_tsp_h/charn/cp_tsp_h/
hdfs dfs -cat /analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item

file = sc.textFile("hdfs://nameservice1/analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item")
file.count()
line = file.map(lambda s: s.split(";"))
fields = line.map(lambda fields: (fields[0],fields[1],fields[2]))


file.map(lambda s: s.split(";")).filter(lambda fields: int(fields[0]) == 2).take(10)

#hdfs://tparhegapd017.nielsen.com:8020/analytics_platform_dev/

How to use pycharm or Ipython with spark
connect to hive table through pyspark - column separator


import pandas as pd
from pyspark.sql import HiveContext,SQLContext
sqlContext = HiveContext(sc)
df = sqlContext.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)
store_char_pd = pd.DataFrame(df, columns=('store_id','char_abr','char_val'))
sc_p = store_char_pd.pivot(index=store_id,columns=char_abr,values=char_val)
sc_p.to_csv("/ap_data/dev/Basir/sample.csv")

>>> sc_p
<class 'pandas.core.frame.DataFrame'>

sqlc = SQLContext(sc)

sdf = sqlc.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)

pdf = sqlc.createDataFrame(store_char_pd)
>>> type(pdf)
<class 'pyspark.sql.dataframe.DataFrame'>
============================================================================================================================================

readme = sc.textFile("file:/ap_data/etl/bin/spark/labfiles/README.md")

LOGGER ::



down vote


    You can get the logger from the SparkContext object:
    log4jLogger = sc._jvm.org.apache.log4j
    LOGGER = log4jLogger.LogManager.getLogger(__name__)
    LOGGER.info("pyspark script logger initialized")

to turn OFF or set only WARNING logs

    LoggerManager()
    logger = logging.getLogger(__name__)
    loggerSpark = logging.getLogger('py4j')
    loggerSpark.setLevel('WARNING')
   
sc logs
    sc.setLogLevel("FATAL").
         ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
       
HEADER ::

    header = rdd.first()
    rdd = rdd.filter(lambda line:line != header)
   
   
DMLE ::

dmleuser
tempDevUse

ctolab/ctolab

SPARK-SUBMIT ::

    export JAVA_HOME="/usr/java/jdk1.7.0_67-cloudera/"
    export SPARK_HOME="/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark"
    export PATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PATH"
    export YARN_CONF_DIR=$HADOOP_CONF_DIR
    export SPARK_DIST_CLASSPATH=$(hadoop classpath)
    spark-submit --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser --py-files oozie_egg_test.egg "$@"

    pyspark --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/ --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser oozie_egg_test.egg "$@"

    https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-submit.html
    http://blog.appliedinformaticsinc.com/how-to-write-spark-applications-in-python/
    https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
    http://www.slideshare.net/BenjaminBengfort/fast-data-analytics-with-spark-and-python

No comments: