Friday, September 8, 2017

Spark Notes

URL ::
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter
http://tparhegapd034.nielsen.com:4040
http://tparhegapd017.nielsen.com:8088/proxy/application_1440579785423_4732/jobs/
hdfs://nameservice1/user/spark/applicationHistory/
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter


http://apachesparkbook.blogspot.in/2015/12/sortbykey-example.html
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues
http://spark.apache.org/docs/latest/programming-guide.html

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py

http://www.analyticsvidhya.com/blog/2016/01/complete-tutorial-learn-data-science-python-scratch-2/
http://www.analyticsvidhya.com/blog/2015/04/pycon-montreal-2015-data-science-workshops/
https://www.youtube.com/watch?v=9xYfNznjClE

https://www.youtube.com/playlist?list=PLf0swTFhTI8rJvGpOp-LujOcpk-Rlz-yE
https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

http://nbviewer.ipython.org/gist/fperez/6384491/00-Setup-IPython-PySpark.ipynb
https://github.com/ipython/ipython/wiki/A-gallery-of-interesting-IPython-Notebooks


https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

short example::
https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#join
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues

http://blog.brakmic.com/data-science-for-losers-part-5-spark-dataframes/

http://www.supergloo.com/fieldnotes/apache-spark-examples-of-transformations/




CONCEPTS ::
    IN MEMORY, PARALLEL DISTRIBUTION ENGINE. 

        1. EASY TO USE 
        2. MANY USE CASE OTHER THAN BATCH JOBS.
   
    Distribute the data when it is stored
        Data is distributed in memory
    Run computaion where the data is
   
    SPARK APPLICATION REQUIRES SPARK CONTEXT - MAIN ENTRY TO SPARK API
   
    DRIVER PROGRAMS ACCESS SPARK THROUGH A SPARKCONTEXT OBJECT, WHICH REPRESENTS A CONNECTION TO A COMPUTING CLUSTER. A SPARK CONTEXT OBJECT (SC) IS THE MAIN ENTRY POINT FOR SPARK FUNCTIONALITY.
   
    DRIVER PROGRAM - USER'S MAIN FUNCTION - EXECUTES PARALLEL OPERATION
        RDD - COLLECTION OF ELEMENTS PARTITIONED ACROSS NODES OF THE CLUSTER
        SHARED VARIABLE - CAN BE USED IN PARALLEL.
            FUNCTION IN PARALLEL, AS A SET OF TASKS ON NODES, VARIABLE SHIPS FROM TASK TO TASKS, BETWEEN TASKS AND DRIVER PROGRAM.
            BROADCAST VARIABLE - CACHE A VALUE IN MEMORY OF ALL NODES
            ACCUMULATERS - VARIABLE THAT ARE ONLY 'ADDED' TO IT, COUNTS AND SUMS.
        http://10.7.192.215:4040   
       
    RDD - 

       RESILIENT(RECREATED WHEN LOST/ PRODUCT OF TRACKING LINEAGE)                      DISTRIBUTED(IN MEMORY)
       DATASET(FROM FILE,..)
   
    TASK - Parition one task per partition
    Driver - Task - executor
    RDD - FUNDATAMENTAL UNIT OF DATA.
    FUNCTION - FUNDATAMENTAL UNIT OF WORK/PROGRAMMING.
   
    RDD - CAN HAVE
        NAMED FUNCTIONS
        LAMBDA FUNCTIONS
        pyspark
   
    sc - sparkContext - tells SPARK how to connect to cluster
    (sparkContext methods) with dot notation
   
    Pair(RDD) - KV PAIR ::
   
        MAP
            users = sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1]))
            sc.textFile(file).map(lambda line: line.split()).map(lambda fields: (fields[0],(fields[1],fields[2])))
            ONE TO ONE
        FLATMAP/FLATMAPVALUES
            sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
            ONE TO MANY
           
            testJsonDataRDD.flatMap(lambda s: s.array).collect()
                [[1, 2, 3, 4], [2, 4, 6, 8], [3, 6, 9, 11]] -> [1, 2, 3, 4, 2, 4, 6, 8, 3, 6, 9, 11]
               
            singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
            singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
           
                [('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
                ['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
               
                from test_helper import Test
                Test.assertEquals(singularAndPluralWordsRDDMap.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
                  'incorrect values for singularAndPluralWordsRDDMap (1d)')
                 
        KEYBY
            sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
            zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)

               
        REDUCEBYKEY
            counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
        COUNTBYKEY
        GROUPBYKEY VS FLATMAPVALUES
            ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
                                                .map(lambda (usr,ip): (usr,ip)).groupByKey()
                                               
            wordsGrouped = wordPairs.groupByKey() --Itertable RDD
            for key, value in wordsGrouped.collect():
                print '{0}: {1}'.format(key, list(value))
               
                rat: [1, 1]
                elephant: [1]
                cat: [1, 1]
               
            wordCountsGrouped = wordsGrouped.map(lambda (k, v): (k, sum(v)))
           
                [('rat', 2), ('elephant', 1), ('cat', 2)]
               
        SORTBYKEY
            sort_user = hit_user.sortByKey(0)
        JOIN
            movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
            final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
        KEYS
        VALUES
        LOOKUP
        LEFTOUTERJOIN,RIGHTOUTERJOIN,
        MAPVALUES
            zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
        FLATMAPVALUES
       
    DOUBLE RDD - NUMERIC ::


   
    # CREATE RDD ::
        TWO METHODS:
        # 1
        my_data = sc.textFile("file:file:/home/training/training_materials/sparkdev/data/frostroad.txt")
       
        # 2
        logfile = "file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
        logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
           
        logs = sc.textFile(logfile)
        logss = sc.textFile(logfiles)
       
    # COUNT NUMBER OF LINES IN THE RDD ::
        logs.count() - 3566
        logss.count() - 1080926
       
    # DISPLAY ALL THE LINES IN THE RDD ::
   
        logs.first()
            -- 233.19.62.103 - 16261 [15/Sep/2013:23:55:57 +0100] "GET /titanic_1100.jpg HTTP/1.0" 200 16713 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F10L"
        jpglogs.take(10)
       
    # MAP ::
        MAP - EACH RECORD TO ONE/MORE NEW RECORDS.
        jpglogs.map(lambda x: len(x)).take(4) - New RDD - MAP(function) applies function in EACH record.
       
        # split() ::
       
            jpglogs.map(lambda x: x.split()).take(5)
           
            ips = jpglogs.map(lambda x: x.split()[0]) - take only first element -- also be x.split(":")
           
            ip_id = logs.map(lambda x: x.split()).map(lambda x: (x[0],x[2])).take(5) - take first and third element from RDD
           
            ip_ids = logs.map(lambda x: x.split()).map(lambda x: (x[0] + "/" + x[2])).take(5) -- u'116.180.70.237/128'
            for ip in ips.take(5): print ip --print IP
           
            users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
            users = logs.map(lambda s: (s.split()[2],1))
       
        logs.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
       
        mean_x = textdata.map(lambda lines: float(lines.split(',')[1])).mean()
       
        hit_user = users_info.map(lambda x: (x[1],x[0])) - after split
       
        # for loop and add values into RDD silent
        for i in range(200):
            mydatas = mydatas.map(lambda int: int + 1)

       
    # FILTER ::
        filter(condition) - based on boolean include/exclude
       
        sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()
       
        shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x != '')
        or
        shakeWordsRDD = shakeWordsRDD.filter(lambda x: x == '')
    # FILTER & MAP ::
   
        jpglogs = logs.filter(lambda x: ".jpg" in x)
                 
        sc.textFile("purplecow.txt").map(lambda line: line.upper()).filter(lambda line: line.startswith('I')).count()
           
       
        htmlog = logs.filter(lambda x: '.htm' in x)
        htm_ip = htmlog.map(lambda x: x.split())
        htm_five = htm_ip.take(5) -- LIST OF LIST
        htm_five[0] - FIRST LIST
        len(htm_five) = 5
        htm_five[0][6] - address
       
        # RDD and FOR loop ::
       
        fiverec = logrdd.filter(lambda x: '.jpg' in x).map(lambda x: x.split()).take(5)
        for item in fiverec:
            print "%s/%s" % (item[0],item[2])
           
        htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
        for x in htmllogs.take(5):
            print x[0]+"/"+x[1]

            233.19.62.103/16261
            136.98.55.4/22232
            219.248.10.104/66652
            47.148.67.112/40002
            29.110.195.184/39859
       
        # RDD and Explicit Functions ::
        def changec(x):
            spt = x.split()
            return spt[6].lower()
        logs.filter(lambda x: '.htm' in x).map(lambda x: x.split()).map(changec).take(2)
            [u'/kbdoc-00031.html', u'/kbdoc-00273.html']   
           
        import re
        def getRequestDoc(s):
            return re.search(r'KBDOC-[0-9]*',s).group()
        kbreqs = sc.textFile(logfile).filter(lambda line: 'KBDOC-' in line).map(lambda line: (getRequestDoc(line),line.split(' ')[2])).distinct()
        kblist = sc.textFile(kblistfile).map(lambda line: line.split(':')).map(lambda fields: (fields[0],fields[1]))
        titlereqs = kbreqs.join(kblist)
        titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey()

    # REDUCEBYKEY::
        users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
            FLATMAP/FLATMAPVALUES
            sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
           
        counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
    KEYBY::
        sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
        zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)


    COUNTBYKEY ::
    GROUPBYKEY VS FLATMAPVALUES ::
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
                                                .map(lambda (usr,ip): (usr,ip)).groupByKey()
    SORTBYKEY ::
        sort_user = hit_user.sortByKey(0)
    JOIN ::
        movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
        final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
    KEYS ::
    VALUES ::
    LOOKUP ::
    LEFTOUTERJOIN ::
    RIGHTOUTERJOIN ::
    MAPVALUES ::
        zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
    FLATMAPVALUES ::
   

        # PARTITIONS::
            RDDs ARE STORED IN THE MEMORY OF SPARK EXECUTOR JVMS
                - DATA IS PARTITIONED ACROSS WORKER NODES
                    - EACH PARTITION IN A SEPARATE EXECUTOR
                    -    ACTION  - EXECUTOR - TASKS - LOADS FROM EACH FILE BLOCK INTO SINGLE PARTITION RDD_1_0, RDD_1_1, RDD_1_2
                - DATA IS DISTRIBUTED ACROSS ALL NODES
                - spark.default.parallelism 10
                - words.reduceByKey(lambda v1, v2: v1 + v2, 15)
            RDD OPERATION EXECUTED IN PARALLEL IN EACH PARTITION
                PRESERVE PARTITIONING
                    map
                        avglens = sc.textFile(file).flatMap(lambda line: line.split())
                                    .map(lambda word: (word[0],len(word))).groupByKey().map(lambda (k, values): (k, sum(values)/len(values)))
                        > avglens.count()
                    flatMap
                    filter
                REPARTITION
                    reduce
                    sort
                    group
                   
                OPERATION ON EACH Partition
                foreachPartition
                mapPartitions
                    # Step 3 - Parse each partition as a file into an activation XML record
                    activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
                mapPartitionsWithIndex
                    # Count JPG requests per file, work with small files that fit in single partition
                        def countJpgs(index,partIter):
                        jpgcount = 0
                        for line in partIter:
                        if "jpg" in line: jpgcount += 1
                        yield (index,jpgcount)
                    > jpgcounts = sc.textFile("weblogs/*").mapPartitionsWithIndex(countJpgs)
                    $
                    (0,237)
                    (1,132)
                    (2,188)
                    (3,193)
                   
                    Second Example:
                     def output_index_and_count(index, iter):
                          count = 0
                          for item in iter:
                            count += 1
                          yield (index, count)
                    sqlContext.sql("select * from my_large_table join my_small_temp_table on my_large_table.bit = my_small_temp_table.bit")\
                        .rdd.mapPartitionsWithIndex(output_index_and_count).filter(lambda p: p[1] > 0).collect()
                   
            DRIVER(SC) - MASTER - WORKER - EXECUTOR - JOB - TASKS(STAGES) IN PARALLEL
            EXECUTOR TASKS LOADS DATA FROM BLOCK TO PARTITIONS
            JOB - STAGE - TASKS
           
            MASTER:
                http://localhost:18080/
            WORKER:
                http://localhost:18081/
            STAGE:
                http://localhost:4040/

        # CACHING ::
            RDD SAVES DATA IN MEMORY
           
                > mydata = sc.textFile("purplecow.txt")
                > myrdd = mydata.map(lambda s:
                s.upper())
                > myrdd.cache()
                > myrdd2 = myrdd.filter(lambda \
                s:s.startswith('I'))
                > myrdd2.count()
                3
                > myrdd2.count()
                3
               
                # Caching RDDs Exercise Optional Step 9
                    from pyspark import StorageLevel
                    models.unpersist()
                    models.persist(StorageLevel.DISK_ONLY)
                    models.count()
                   
                    models.cache()
                   
            CACHED PARTITIONS ARE STORED IN MEMORY IN EXECUTOR JVMs
           
            METHOD
                cache
                    STORES DATA IN MEMORY ONLY
                persist
                    STORAGE LEVELS
                        - MEMORY_ONLY*D
                        - MEMORY_AND_DISK - SPILLING
                        - DISK_ONLY
                        - MEMORY_ONLY_SER
                        - MEMORY_AND_DISK_SER
                    rdd.unpersist() - stop and removal
           
            CHECK POINT - SAVES DATA TO HDFS
                sc.setCheckpointDir(directory)
                myrdd = …initial value….
                while x in xrange(100):
                    myrdd = myrdd.transform(…)
                    if x % 3 == 0:
                        myrdd.checkpoint()
                        myrdd.count()
                myrdd.saveAsTextFile()
               
        BROADCAST VARIABLES ::

            BROADCAST VARIABLE - SET BY DRIVER, RETRIVED BY WORKERS
                               - READ ONLY
            SINGLE:
                KBLIST_TXT()
                WEBLOG()
               
                # read file
                weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
                kblistfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/kblist.txt")
               
                # filter KBDOC line and get userid, kb request
               
                import re
                def getkbvalue(s):
                    return re.search(r'KBDOC-[0-9]*',s).group()
               
                kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
               
                # get kblist in key value pair
                kblist = kblistfile.map(lambda line: line.split(":")).map(lambda field: (field[0],field[1]))

                # get userid and titles in kv pair
                titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
               
                # get userid and title in key values pair
                titlereqjoin = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
               
                # get RDD values as list
                titlegroup = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().mapValues(list).collect()
               
                # get RDD values using reduceby key
                titlereduce = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).reduceByKey(lambda t1,t2: t1 + ":" + t2)
               
                # sorting based on USERID
                titlesort = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).sortByKey()
               
                # list of titles using  map function
                titlegroup2 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): (userid,list(titles)))
               
                # REDUCELIST3 EXTERNAL FUNCTION IN MAP
                titlegroup3 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): reducelist3(userid,titles))
                def reducelist3(userid, titles):
                    list_profile = []
                    list_profile.append(userid)
                    list_titles = ','.join(titles)
                    list_profile.append(list_titles)
                    return list_profile


               
                #
                (u'160',
                  [u'Titanic 4000 - reboots',
                   u'iFruit 2 - Change the phone ringtone and notification sound',
                   u'Titanic 2000 - Transfer Contacts']),

  
                # function reduclist
                def reducelist2(userid, titles):
                        return ','.join(titles)
                       
                # function reduclist
                def reducelist3(userid, titles):
                       
                        return ','.join(titles)

               
                # for loop to access list RDD

                for (userid,titles) in titlegroup:
                    print userid
                    for title in titles:
                        print value

                for (userid,titles) in titlereqgroup.take(10):
                    print 'user id: ',userid
                    for title in titles: print '\t',title
                   
                for (userid,titles) in titlereqgroup.collect():
                    print "UserID :", userid
                    count = 0
                    for title in titles:
                        count += 1
                        print "\t Title (%r): %s" % (count,title)
                       
                # output
                    UserID : 40
                     Title (0): Ronin Novelty Note 3 - rebooting
                     Title (1): Ronin Novelty Note 1 - Battery Life
                     Title (2): Titanic 2000 - Back up files

                titlereduce.saveAsTextFile("file:/home/training/training_materials/sparkdev/data/titles/")
       

                BROADCAST AS LIST ::
               
                kb
                # Read in list of target models from a local file
                targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
                targetlist = list(map(lambda l: l.strip(),open(targetfile)))

                # broadcast the target list to all workers
                targetlistbc = sc.broadcast(targetlist)

                # filter out requests from devices not in the target list
                # web server log file(s)
                logfile="hdfs://localhost/user/training/weblogs/*"
                targetreqs = sc.textFile(logfile)\
                    .filter(lambda line:  any(model in line for model in targetlistbc.value) )

                targetreqs.take(5)
               
                sp_logs = logfile.filter(lambda line: any(model in line for model in targetlist))

                BROADCAST AS DICTIONARY:: when lookup file has KV pairs
           
                # read LOCAL file as HDFS
                weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
               
                # filter KBDOC line and get userid, kb request
               
                import re
                def getkbvalue(s):
                    return re.search(r'KBDOC-[0-9]*',s).group()
                kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
                    --1306 COUNT
                   
                    #
                    [(u'KBDOC-00172', u'29161'),
                     (u'KBDOC-00081', u'47010'),
                     (u'KBDOC-00164', u'165'),
                     (u'KBDOC-00059', u'42075'),
                     (u'KBDOC-00281', u'55510')]
                   
                # Read local file as DICTIONARY
                kblisttxt = "/home/training/training_materials/sparkdev/data/kblist.txt"
               
                kb_dict = dict(map(lambda line: line.split(":"), open(kblisttxt)))

                for (key,value) in kb_dict.iteritems():
                    print key
                    print value
                   
                    #
                    KBDOC-00066
                    Sorrento F21L - overheating

                    KBDOC-00270
                    Titanic 2400 - overheating

                    KBDOC-00060
                    MeeToo 1.0 - overheating
               
                for key in kb_dict:
                    print key,kb_dict[key]

                    #
                    KBDOC-00066 Sorrento F21L - overheating
                    KBDOC-00270 Titanic 2400 - overheating
                    KBDOC-00060 MeeToo 1.0 - overheating
               
                # SEND THIS DICTIONARY AS BROADCAST VARAIBLE KB_DICT
                kb_dict_bc = sc.broadcast(kb_dict)


                kblookup = kbreqs.map(lambda (kbid,uid): (uid,kb_dict_bc.value[kbid]))
               
                    [(u'29161', 'Sorrento F41L - reboots\n'),
                     (u'47010', 'Titanic 2200 - Back up files\n'),
                     (u'165', 'Titanic 4000 - Transfer Contacts\n'),
                     (u'42075', 'MeeToo 1.0 - Battery Life\n'),
                     (u'55510', 'Ronin S1 - Battery Life\n')]



        # ACCUMULATORS ::
            WORKER NODES CAN ADD VALUE
            ONLY DRIVER CAN ACCESS VALUE
           
            # JPG_COUNT IS ACCUMULATOR THAT SENDS TO ALL NODES
            jpg_count = sc.accumulator(0)

            # EXTERNAL FUNCTION TO ADD VALUE TO ACCUMULATOR
            def func_count(s):
                if '.jpg' in s:
                    jpg_count.add(1)

            # foreach value in RDD
            jpgslogs = weblogfile.foreach(lambda line: func_count(line))
           
            # access accumulator from driver
            print jpg_count.value
           
            # verify with filter
            weblogfile.filter(lambda line: '.jpg' in line).count() -- 237

SQOOP ::

sqoop list-databases --connect jdbc:mysql://localhost --username training --password training
    information_schema
    movielens
    mysql
    test
sqoop list-tables --connect jdbc:mysql://localhost/movielens --username training --password training
    genre
    movie
    moviegenre
    movierating
    occupation
    user
   
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movie
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movierating

In [109]: movierating = sc.textFile("hdfs://localhost/user/training/movierating/part*")
movierating_value = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2])))

     [(u'1193', 5), (u'661', 3), (u'914', 3), (u'3408', 4), (u'2355', 5)]
   
movierating_value_grp = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2]))).groupByKey()

    [(u'593', <pyspark.resultiterable.ResultIterable at 0x2153c90>),
     (u'1200', <pyspark.resultiterable.ResultIterable at 0x2153250>),
     (u'3724', <pyspark.resultiterable.ResultIterable at 0x2153310>),
     (u'1868', <pyspark.resultiterable.ResultIterable at 0x2153950>),
     (u'643', <pyspark.resultiterable.ResultIterable at 0x2153f90>)]

movierating_value_grp.mapValues(lambda ratings: sum(ratings)).take(5)
instead of
    movierating_value_grp.reduceByKey(lambda value1,value2: value1 + value2).take(5)
   
sum_movie_rating = movierating_value_grp.mapValues(lambda ratings: sum(ratings)/float(len(ratings)))

        Out[126]:
        [(u'593', 4.3518231186966645),
         (u'1200', 4.1258241758241763),
         (u'3724', 3.7179487179487181),
         (u'1868', 2.0),
         (u'643', 2.0)]

In [110]: movie = sc.textFile("hdfs://localhost/user/training/movie/part*")
In [111]: movie_value = movie.map(lambda line: line.split("\t")).map(lambda value: (value[0],value[1]))



        Out[127]:
        [(u'1', u'Toy Story'),
         (u'2', u'Jumanji'),
         (u'3', u'Grumpier Old Men'),
         (u'4', u'Waiting to Exhale'),
         (u'5', u'Father of the Bride Part II')]

final_movie = movie_value.join(sum_movie_rating).map(lambda (movie_id,(title,rating)): title + "\t" + str(rating))

    [u"Wayne's World\t3.60089285714",
     u'Aliens\t4.12582417582',
     u'Prince of the City\t3.49122807018',
     u'Coming Home\t3.71794871795',
     u'American Pop\t3.07142857143']
   

final_movie.saveAsTextFile("hdfs://localhost/user/training/averagerating")




       
STREAMING ::
 http://spark.apache.org/docs/latest/streaming-programming-guide.html
STREAMING :: hdfscount.py

    https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
   
    """
     Counts words in new text files created in the given directory
     Usage: hdfs_wordcount.py <directory>
       <directory> is the directory that Spark Streaming will use to find and read new text files.
     To run this on your local machine on directory `localdir`, run this example
        $ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py localdir
     Then create a text file in `localdir` and the words in the file will get counted.
    """
    from __future__ import print_function

    import sys

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
            exit(-1)

        sc = SparkContext(appName="PythonStreamingHDFSWordCount")
        ssc = StreamingContext(sc, 1)

        lines = ssc.textFileStream(sys.argv[1])
        counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda x: (x, 1))\
                      .reduceByKey(lambda a, b: a+b)
        counts.pprint()

        ssc.start()
        ssc.awaitTermination()
STREAMING :: COUNTS

    // start the shell with this command:
    // spark-shell --master local[2]
    //
    // in a separate terminal
    // nc -lkv 1234

    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.streaming.Seconds

    var ssc = new StreamingContext(sc,Seconds(5))
    var mystream = ssc.socketTextStream("localhost",1234)
    var words = mystream.flatMap(line => line.split("\\W"))
    var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

UsingTheSparkShell.pyspark::

    # Step 3 - create an RDD based on a data file
    mydata = sc.textFile("file:/home/training/training_materials/sparkdev/data/frostroad.txt")

    # Step 4 - count the number of lines in the RDD
    mydata.count()

    # Step 5 - display all the lines in the RDD
    mydata.collect()

LogIPs.pyspark::

    # Steps 8, 9 - create an RDD based on a data file
    logfile="file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
    logs = sc.textFile(logfile)

    # count the number of records (lines) in the RDD
    logs.count()

    # Step 10 - Display all lines which are requests for JPG files
    jpglogs=logs.filter(lambda x: ".jpg" in x)
    jpglogs.collect()

    # Step 12 - Display the JPG requests, this time using a single command line
    sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()

    # Step 13 - Create an RDD of the length of each line in the file
    lengths = logs.map(lambda s: len(s))
    # Display the first 5 line lengths
    lengths.take(5)

    # Map the log data to an RDD of arrays of the words on each line
    logwords = logs.map(lambda line: line.split())

    # Step 15 - Map the log data to an RDD of IP addresses for each line
    ips = logs.map(lambda line: line.split()[0])

    # Step 17 - Save the IP addresses to text file(s)
    ips.saveAsTextFile("file:/home/training/iplist")

    # Challenge Exercise 1 - Do the same thing but use the whole data set
    sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*").map(lambda s: s.split()[0]).saveAsTextFile("file:/home/training/iplist-entire")

    # Challenge Exercise 2 - Display "ip-address/user-id" for the first 5 HTML requests
    # in the data set
    htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
    for x in htmllogs.take(5): print x[0]+"/"+x[1]
   
    165.32.101.206/8
    100.219.90.44/102
    182.4.148.56/173
    246.241.6.175/45395
UserRequests.pyspark - working with pair RDD::

    # Step 1 - Create an RDD based on all the weblogs
    logs=sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*")
    # map each request (line) to a pair (userid, 1), then sum the values
    userreqs = logs \
       .map(lambda line: line.split()) \
       .map(lambda words: (words[2],1))  \
       .reduceByKey(lambda count1,count2: count1 + count2)
      
    # Step 2 - Show the records for the 10 users with the highest counts
    userreqs.map(lambda pair: (pair[1],pair[0])).sortByKey(False).take(10)

    # Step 3 - Group IPs by user ID
    userips = logs \
       .map(lambda line: line.split()) \
       .map(lambda words: (words[2],words[0])) \
       .groupByKey()
    # print out the first 10 user ids, and their IP list
    for (userid,ips) in userips.take(10):
       print userid, ":"
       for ip in ips: print "\t",ip

    # Step 4a - Map account data to (userid,[values....])
    accounts = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
       .map(lambda s: s.split(',')) \
       .map(lambda account: (account[0],account[1:]))

    # Step 4b - Join account data with userreqs then merge hit count into valuelist  
    accounthits = accounts.join(userreqs)

    # Step 4c - Display userid, hit count, first name, last name for the first 5 elements
    for (userid,(values,count)) in accounthits.take(5) :
        print  userid, count, values[2],values[3]
      
    # Challenge 1 - key accounts by postal/zip code
    accountsByPCode = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
       .map(lambda s: s.split(','))\
       .keyBy(lambda account: account[8])
   
     # Challenge 2 - map account data to lastname,firstname 
    namesByPCode = accountsByPCode\
       .mapValues(lambda account: account[4] + ',' + account[3]) \
       .groupByKey()

    # Challenge 3 - print the first 5 zip codes and list the names
    for (pcode,names) in namesByPCode.sortByKey().take(5):
       print "---" ,pcode
       for name in names: print name

  
    userid1 4 Cheryl West
    userid2 8 Elizabeth Kerns
    userid3 1 Melissa Roman

    PAIR RDD - EXAMPLES::
   
        # READS TWO FILES
        logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
        logs = sc.textFile(logfiles)
        accfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv")
       
        # GET USERS ALON
        users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
        users = logs.map(lambda s: (s.split()[2],1))
       
        # ADD USERS
        users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
   
        # REVERSE USERS AND HITS
        hit_user = users_info.map(lambda x: (x[1],x[0]))
       
        # SORT
        sort_user = hit_user.sortByKey(0)
       
        # GET USER ID AND IP ADDRESS
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
       
        # GROUP USER WITH MULTIPLE IP ADDRESS
        ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0])).map(lambda (usr,ip): (usr,ip
)).groupByKey()

        for (ip,user) in ip_user.take(5):
            print ip
            for i in user: print '\t',i
           
        # ACCOUNT USER AND INFORMATION
        acc_info = accfile.map(lambda line: line.split(",")).map(lambda s: (s[0],(s[3],s[4])))
       
        # merge two RDD based on USER ID
        final_info = acc_info.join(users_info).take(25)
        final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
       
        #CHALLENGE 1
        zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)
       
        #CHALLENGE 2
        zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
       
        # CHALLENGE 3
        zip_info_sort = zip_info.sortByKey()
        zip_info_group = zip_info_sort.groupByKey()
       
        for (zip,users) in zip_info_group:
            print "---------",zip
            for user in users:
                print "\t", user
               
       
        --- 85003
        Jenkins,Thad
        Rick,Edward
        Lindsay,Ivy
        …
        --- 85004
        Morris,Eric
        Reiser,Hazel
        Gregg,Alicia
        Preston,Elizabeth

SparkHDFS.pyspark::

    # Steps 14, 15 - Read in a weblog file, then filter and save the JPG requests
    logs=sc.textFile("hdfs://localhost/user/training/weblogs/2014-03-08.log")
    logs.filter(lambda s: ".jpg" in s).saveAsTextFile("hdfs://localhost/user/training/jpgs")

TopModels.pyspark:: working with PARTITION & CACHE

    # Step 1 - Stub code to copy into Spark Shell
    # load XML files containing device activation records.
    # Find the most common device models activated

    import xml.etree.ElementTree as ElementTree

    # Given a partition containing multi-line XML, parse the contents.
    # Return an iterator of activation Elements contained in the partition
    def getactivations(fileiterator):
        s = ''
        for i in fileiterator: s = s + str(i)
        filetree = ElementTree.fromstring(s)
        return filetree.getiterator('activation')

    # Get the model name from a device activation record
    def getmodel(activation):
        return activation.find('model').text

    # Step 2 - Read XML files into an RDD and show number of partitions
    filename="hdfs://localhost/user/training/activations/*.xml"
    # Load activation files
    activations = sc.textFile(filename)
    # Show the partitioning
    print "Activations: ",activations.toDebugString()

    # Step 3 - Parse each partition as a file into an activation XML record
    activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))

    # Step 4 - Map each activation record to a device model name
    models = activationTrees.map(lambda activation: getmodel(activation))

    # Step 5 - Show the partitioning
    print "Models: ",models.toDebugString()

    # Step 6 - Count activations by model
    modelcounts = models.map(lambda model: (model,1)).reduceByKey(lambda v1,v2: v1+v2)

    # Optionally, show the partitioning.
    print "Model Counts: ",modelcounts.toDebugString()

    # Step 7 - Display the top 10 models
    for (count,model) in modelcounts.map(lambda (model,count): (count,model)).top(10):
        print "Model %s (%s)" % (model,count)

    # Caching RDDs Exercise Optional Step 9
    from pyspark import StorageLevel
    models.unpersist()
    models.persist(StorageLevel.DISK_ONLY)
    models.count()
   
    output ::
        iFruit 1 (392)
        Sorrento F00L (224)
        MeeToo 1.0 (12)
IterationTest.pyspark :: Checkpointing RDDs

    # Step 1 - create an RDD of integers
    mydata = sc.parallelize([1,2,3,4,5])

    # Step 2 - loop 200 times
    for i in range(200):
        mydata = mydata.map(lambda myInt: myInt + 1)

    # Step 3 - collect and display the data in the RDD
    for x in mydata.collect(): print x

    # Step 4 - show the final RDD
    mydata.toDebugString()

    # Steps 5,6 - repeat steps 2 - 4 above until you receive and error

    # Note: The steps above demonstrated the error without checkpointing
    # The steps below are a simple program to iteratively create child RDDs
    # from parent RDDs. Use iPython %paste to run.

    # Step 9 - enable checkpointing.
    sc.setCheckpointDir("checkpoints")

    # Step 10 - create an RDD of integers
    mydata = sc.parallelize([1,2,3,4,5])

    # Steps 11, 12 - Iterate to generate a new RDD which descends from prior RDDs
    # resulting in a long lineage
    for i in range(1000):
       mydata = mydata.map(lambda myInt: myInt + 1)
       print "Iteration",i
       # Every 10 iterations, checkpoint the RDD, and materialize it to save the checkpoint file
       # shorten the lineage
       if i % 10 == 0:
           print "Checkpoint"
           mydata.checkpoint()
           mydata.count()

    # Step 13 - collect the contents of the RDD to an array and display
    for x in mydata.collect(): print x
       
    # Step 14 -  Display the lineage (formatted)
    for rddstring in mydata.toDebugString().split('\n'): print rddstring.strip()


COUNTJPGs.PY :: APPLICATION
    import sys
    from pyspark import SparkContext
    from pyspark import SparkConf

    if __name__ == "__main__":
        if len(sys.argv) < 2:
            print >> sys.stderr, "Usage: CountJPGs <file>"
            exit(-1)
           
        sc = SparkContext()
        # Challenge: Configure app name and UI port programatically
        # sconf = SparkConf().setAppName("My Spark App").set("spark.ui.port","4444")
        # sc = SparkContext(conf=sconf)
        logfile = sys.argv[1]
        count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()
        print "Number of JPG requests: ", count
   
    COMMAND::
        spark-submit CountJPGs.py weblogs/*
        spark-submit --master spark://localhost:7077 -name 'Count JPGs' CountJPGs.py weblogs/*
                     --properties-file myspark.conf
       
        myspark.conf
            spark.app.name My Spark App
            spark.ui.port 4141
            spark.master spark://localhost:7077
localfile.py ::

# _*_ coding: utf-8 _*_
from pyspark import SparkContext

textfile = "file:/ap_data/etl/bin/spark/training_materials/sparkdev/data/purplecow.txt"
sc = SparkContext("local", "Test_app")
textdata = sc.textFile(textfile)
#textdata = sc.textFile(textfile).cache()

#################################################
# number of line with a '1' in it.
num_1 = textdata.map(lambda line:line.upper()).filter(lambda line: line.startswith('I')).count()
print "**********************************************************************************************************", num_1

spark-submit localfile.py

countjpgs_2.py::
import sys
from pyspark import SparkContext
from pyspark import SparkConf

if __name__ == "__main__":
    print "Counting starting..."
    if len(sys.argv) < 2:
        print >> sys.stderr, "usage: CountJPGs <file>"
        exit(-1)
    else:
        print "Counting Started"
    sc = SparkContext()
    logfile = sys.argv[1]
    log = sc.textFile(logfile)
    logfilter = log.filter(lambda line: '.jpg' in line)
    count = logfilter.count()
    print "Number of JPG request", count
TargetModels ::

# Read in list of target models from a local file
targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
targetlist = list(map(lambda l: l.strip(),open(targetfile)))

# broadcast the target list to all workers
targetlistbc = sc.broadcast(targetlist)

# filter out requests from devices not in the target list
# web server log file(s)
logfile="hdfs://localhost/user/training/weblogs/*"
targetreqs = sc.textFile(logfile)\
    .filter(lambda line:  any(model in line for model in targetlistbc.value) )

targetreqs.take(5)

RequestAccumulator ::

jpgcount = sc.accumulator(0)
htmlcount = sc.accumulator(0)
csscount = sc.accumulator(0)

def countFileType(s):
    if '.jpg' in s: jpgcount.add(1)
    elif '.html' in s: htmlcount.add(1)
    elif '.css' in s: csscount.add(1)

filename="hdfs://localhost/user/training/weblogs/*"

logs = sc.textFile(filename)
logs.foreach(lambda line: countFileType(line))

print  'Request Totals:'
print '.css requests: ', csscount.value
print '.html requests: ', htmlcount.value
print '.jpg requests: ', jpgcount.value


PROJECT ::        
line = sc.textFile("hdfs://nameservice1/user/hive/warehouse/ap_cptsph2_pl_dev.db/sff_prod_cptsph2")
line.map(lambda s: s.split(",")).take(5)
prodline = line.filter(lambda l: l.startswith('2'))

/analytics_platform_dev/etl/Colgate/cp_tsp_h/charn/cp_tsp_h/
hdfs dfs -cat /analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item

file = sc.textFile("hdfs://nameservice1/analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item")
file.count()
line = file.map(lambda s: s.split(";"))
fields = line.map(lambda fields: (fields[0],fields[1],fields[2]))


file.map(lambda s: s.split(";")).filter(lambda fields: int(fields[0]) == 2).take(10)

#hdfs://tparhegapd017.nielsen.com:8020/analytics_platform_dev/

How to use pycharm or Ipython with spark
connect to hive table through pyspark - column separator


import pandas as pd
from pyspark.sql import HiveContext,SQLContext
sqlContext = HiveContext(sc)
df = sqlContext.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)
store_char_pd = pd.DataFrame(df, columns=('store_id','char_abr','char_val'))
sc_p = store_char_pd.pivot(index=store_id,columns=char_abr,values=char_val)
sc_p.to_csv("/ap_data/dev/Basir/sample.csv")

>>> sc_p
<class 'pandas.core.frame.DataFrame'>

sqlc = SQLContext(sc)

sdf = sqlc.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)

pdf = sqlc.createDataFrame(store_char_pd)
>>> type(pdf)
<class 'pyspark.sql.dataframe.DataFrame'>
============================================================================================================================================

readme = sc.textFile("file:/ap_data/etl/bin/spark/labfiles/README.md")

LOGGER ::



down vote


    You can get the logger from the SparkContext object:
    log4jLogger = sc._jvm.org.apache.log4j
    LOGGER = log4jLogger.LogManager.getLogger(__name__)
    LOGGER.info("pyspark script logger initialized")

to turn OFF or set only WARNING logs

    LoggerManager()
    logger = logging.getLogger(__name__)
    loggerSpark = logging.getLogger('py4j')
    loggerSpark.setLevel('WARNING')
   
sc logs
    sc.setLogLevel("FATAL").
         ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
       
HEADER ::

    header = rdd.first()
    rdd = rdd.filter(lambda line:line != header)
   
   
DMLE ::

dmleuser
tempDevUse

ctolab/ctolab

SPARK-SUBMIT ::

    export JAVA_HOME="/usr/java/jdk1.7.0_67-cloudera/"
    export SPARK_HOME="/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark"
    export PATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PATH"
    export YARN_CONF_DIR=$HADOOP_CONF_DIR
    export SPARK_DIST_CLASSPATH=$(hadoop classpath)
    spark-submit --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser --py-files oozie_egg_test.egg "$@"

    pyspark --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/ --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser oozie_egg_test.egg "$@"

    https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-submit.html
    http://blog.appliedinformaticsinc.com/how-to-write-spark-applications-in-python/
    https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
    http://www.slideshare.net/BenjaminBengfort/fast-data-analytics-with-spark-and-python

Wednesday, November 5, 2014

DS services stop


ps -ef|grep -i dscs    
ps -ef |grep -i dsapi
ps -ef | grep -i osh  
ps -ef |grep -i phantom

Check datastage shared memory segments:
ipcs -mop | grep ade

If there any shared memory segments returned, remove them, use command:

ipcrm -m ID , where ID is the number from the second column of ipcs - mop


dsadm - DS Engine
-----------------
cd /opt/IBM/InformationServer/Server/DSEngine
. ./dsenv
./bin/uv -admin -info
./bin/uv -admin -stop
./bin/uv -admin -start


dsadm - IS
-----------------
cd /opt/IBM/InformationServer/ASBNode/bin
./NodeAgents.sh stop
./NodeAgents.sh start

root - WAS
-----------------
cd /opt/IBM/InformationServer/ASBServer/bin
./MetadataServer.sh stop
./MetadataServer.sh start
ps -ef|grep dsrpcd
ps -ef|grep agent
ps -ef|grep  server1 

ps -ef | grep phantom | grep java | grep agent

netstat -an | grep dsrpc
netstat -an | grep -i  9080
netstat -an | grep -i 31531
netstat -an | grep -i 31533
netstat -an | grep -i 31535
netstat -an | grep -i 31538



netstat -a | grep dsrpc

Tuesday, May 27, 2014

Overview of Database Normalization

Normalization reduces data redundancy and inconsistent data dependency to avoid all anamolites(Deletion,Insertion and updation)

1NF: No Repeiting Column value
2NF: No Partial functional dependency
3NF: No Transitive dependency

A database table is said to be in 1NF if it contains no repeating fields/columns
Separate the repeating fields into new database tables along with the key from unnormalized database table.
The primary key of new database tables may be a composite key
A database table is said to be in 2NF if it is in 1NF and contains only those fields/columns that are functionally dependent(means the value of field is determined by the value of another field(s)) on the primary key
Remove the partial dependencies(A type of functional dependency where a field is only functionally dependent on the part of primary key) of any non-key field.
If field B depends on field A and vice versa. Also for a given value of B, we have only one possible value of A and vice versa, Then we put the field B in to new database table where B will be primary key and also marked as foreign key in parent table.
A database table is said to be in 3NF if it is in 2NF and all non keys fields should be dependent on primary key(2NF) and no fields of the table is transitively functionally dependent on the primary key.
    Remove the transitive dependecies(A type of functional dependency where a field is functionally dependent on the Field that is not the primary key.Hence its value is determined, indirectly by the primary key )
Make separate table for transitive dependent Field.


KEY that can uniquely determine records

https://www.youtube.com/watch?v=G9SA0Yv-o28
https://www.simple-talk.com/community/forums/thread/111401.aspx
http://holowczak.com/database-normalization/


Thursday, January 30, 2014

By the Morning brightness

morning
by Raiiq Ridwan
Tough times come in the life of every human being, young or old, rich or poor, king or slave. And tough times, more often than not, make us who we are.
In fact, tough times (and learning to deal with them) help our true nature emerge. For example, during bad times a really “nice” person might show complete impatience and displeasure at the will of Allah (swt), whereas a not-so-nice person might actually turn towards Allah in times of need, bringing about a change in his life that puts him among the pious.
The Way Out of Depression
In recent times depression and sadness have been rife in the Earth. We have more than ever before but we’re more depressed than ever before. Anti-depressants are fast becoming the most prescribed drug, suicide is high, and morality has gone to an all-time low. The Messenger used to say, The best speech is the speech of Allah, and the best of guidance is the guidance of Muhammad.
Surah Duha
A man who is depressed, sad, or grieving requires the nurturing and caring of a well-wishing soul more than he needs the pill that alleviates his symptoms for a little while. Who better to speak to you to calm you down than Allah (swt)? And which better way but the way of the messenger? No human being suffered more than him, and it was at one of the saddest points of Mohamed’s life that Allah (swt) revealed Surah Duha.
The Messenger has just started giving the message to his people who were mocking and making fun of him. At that time revelation stopped for six months and their mockery and slandering just increased as they said things like, “No revelation today? Your Lord surely hates you now!”
Wake Up and See the Sunshine
So often do we not feel that Allah (swt) really hates us? He must hate us for all the pain we get, right? No— the answer comes!
By the morning brightness! [Quran, 93:1]
Your life was all hustle and bustle like the morning brightness! It was bright and sunny before, and don’t worry, it still is today! Wake up, look at the sunshine, look at life around you— it’s not all doom and gloom.
And by the night when it grows still . . . [Quran, 93:2]
Yes, a depressed life is like the stillness of the night; nothing moves, nothing seems to be going forward. But what comes after the stillness of the night? The bright morning! It was a bright morning only hours earlier, and another bright morning is coming! Its okay, don’t worry, good times are ahead. Get some sleep.
Allah (swt) Loves You and Has not Deserted You
Your Master has not said goodbye to you nor does He hate! [Quran, 93:3]
Your Master has not even said goodbye, so don’t worry! Wadda’a in Arabic is a loving goodbye. Your Master has not even lovingly said goodbye to you and doesn’t “hate.” Still not impressed? Look at how Allah (swt) gives you evidence that He doesn’t hate you or that He hasn’t said goodbye to you.
Things Will Get Better!
And the future will be better for you than the past. [Quran, 93:4]
Look forward. After bad times come good, and after pain comes ease. And if not, then the believer knows that after death comes paradise. Yes, it’s painful, but hold on, the future will be better!
Your Master will give you so much that you will be pleased. [Quran, 93:5]
Don’t worry, be patient. A time is coming that Allah will give you so much that you will be pleased. For many of us in this world, if not then at least in the hereafter in sha Allah! Imagine all that you wanted. Your wildest dreams that you know will never come true, Allah will make them all come true in paradise. You still need proof right?
Remember What He’s Already Saved You From
Did he not find you an orphan and shelter you? [Quran, 93:6]
Many of you were orphaned once upon a time, either you lost your father or both father and mother. But didn’t Allah help you out of it? Aren’t you better now? What about us with both parents? Were we not helpless and small babies, and didn’t Allah give us parents as a shelter? Or maybe uncles and aunties? Or brothers and sisters? Did Allah not give us a shelter?
And He found you lost(or searching) and guided you. [Quran, 93:7]
Were we not lost and wandering in the ocean of misguidance once and Allah guided us? Were we not searching for happiness everywhere (some of us still might be) whereas it was in the guidance of Allah? Ask Allah (swt) to guide you through man. Look what He did for you before, is He really going to forsake and bid you goodbye?
Remember His Generousity to You
And He found you in need and satisfied your need? [Quran, 93:8]
How many a times have we needed something and it was satisfied by Allah (swt)? We needed parents to take care of us and Allah (swt) gave that to us. We needed food and Allah created the world in such a way that we can extract food. We needed help and Allah (swt)created someone to help us. We needed and He satisfied? Are you yet not convinced that Allah (swt) is going to help? If you are, now time to take action. What do you do?
So do not be harsh with the orphan. And do not turn away the one who asks for help. [Quran, 93: 9]
In short, get active in the community. Help those who are in a worse condition than you are. That will humble your heart, take your mind off your own problems as you focus on others’. It will make you a stronger man and you will also be reminded of the bounties of God in your life and you will thus be grateful. Did I say grateful?
Talk About the Blessings of Your Master
Any of those people writing books on “being happy” or on “positive thinking” will tell you to look at the good side of life. This is exactly what Allah (swt) prescribes, just that He did 1400 years before today in the Quran and also in previous scriptures. Talk about the blessings Allah (swt) has given you. You’ve whined and complained enough. Now talk about how much He’s given you and thank Him for it. When was the last time we thanked Him for our eyes? For good skin? For not having a stomach ulcer? For the immune system?
Talk about the blessings of your Master— therein is peace.