URL ::
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filterhttp://tparhegapd034.nielsen.com:4040
http://tparhegapd017.nielsen.com:8088/proxy/application_1440579785423_4732/jobs/
hdfs://nameservice1/user/spark/applicationHistory/
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter
http://apachesparkbook.blogspot.in/2015/12/sortbykey-example.html
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues
http://spark.apache.org/docs/latest/programming-guide.html
https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
http://www.analyticsvidhya.com/blog/2016/01/complete-tutorial-learn-data-science-python-scratch-2/
http://www.analyticsvidhya.com/blog/2015/04/pycon-montreal-2015-data-science-workshops/
https://www.youtube.com/watch?v=9xYfNznjClE
https://www.youtube.com/playlist?list=PLf0swTFhTI8rJvGpOp-LujOcpk-Rlz-yE
https://www.youtube.com/watch?v=7ooZ4S7Ay6Y
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html
http://nbviewer.ipython.org/gist/fperez/6384491/00-Setup-IPython-PySpark.ipynb
https://github.com/ipython/ipython/wiki/A-gallery-of-interesting-IPython-Notebooks
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
short example::
https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#join
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapValues
http://blog.brakmic.com/data-science-for-losers-part-5-spark-dataframes/
http://www.supergloo.com/fieldnotes/apache-spark-examples-of-transformations/
CONCEPTS ::
IN MEMORY, PARALLEL DISTRIBUTION ENGINE.
1. EASY TO USE
2. MANY USE CASE OTHER THAN BATCH JOBS.
Distribute the data when it is stored
Data is distributed in memory
Run computaion where the data is
SPARK APPLICATION REQUIRES SPARK CONTEXT - MAIN ENTRY TO SPARK API
DRIVER PROGRAMS ACCESS SPARK THROUGH A SPARKCONTEXT OBJECT, WHICH REPRESENTS A CONNECTION TO A COMPUTING CLUSTER. A SPARK CONTEXT OBJECT (SC) IS THE MAIN ENTRY POINT FOR SPARK FUNCTIONALITY.
DRIVER PROGRAM - USER'S MAIN FUNCTION - EXECUTES PARALLEL OPERATION
RDD - COLLECTION OF ELEMENTS PARTITIONED ACROSS NODES OF THE CLUSTER
SHARED VARIABLE - CAN BE USED IN PARALLEL.
FUNCTION IN PARALLEL, AS A SET OF TASKS ON NODES, VARIABLE SHIPS FROM TASK TO TASKS, BETWEEN TASKS AND DRIVER PROGRAM.
BROADCAST VARIABLE - CACHE A VALUE IN MEMORY OF ALL NODES
ACCUMULATERS - VARIABLE THAT ARE ONLY 'ADDED' TO IT, COUNTS AND SUMS.
http://10.7.192.215:4040
RDD -
RESILIENT(RECREATED WHEN LOST/ PRODUCT OF TRACKING LINEAGE) DISTRIBUTED(IN MEMORY)
DATASET(FROM FILE,..)
TASK - Parition one task per partition
Driver - Task - executor
RDD - FUNDATAMENTAL UNIT OF DATA.
FUNCTION - FUNDATAMENTAL UNIT OF WORK/PROGRAMMING.
RDD - CAN HAVE
NAMED FUNCTIONS
LAMBDA FUNCTIONS
pyspark
sc - sparkContext - tells SPARK how to connect to cluster
(sparkContext methods) with dot notation
Pair(RDD) - KV PAIR ::
MAP
users = sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1]))
sc.textFile(file).map(lambda line: line.split()).map(lambda fields: (fields[0],(fields[1],fields[2])))
ONE TO ONE
FLATMAP/FLATMAPVALUES
sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
ONE TO MANY
testJsonDataRDD.flatMap(lambda s: s.array).collect()
[[1, 2, 3, 4], [2, 4, 6, 8], [3, 6, 9, 11]] -> [1, 2, 3, 4, 2, 4, 6, 8, 3, 6, 9, 11]
singularAndPluralWordsRDDMap = wordsRDD.map(lambda x: (x, x + 's'))
singularAndPluralWordsRDD = wordsRDD.flatMap(lambda x: (x, x + 's'))
[('cat', 'cats'), ('elephant', 'elephants'), ('rat', 'rats'), ('rat', 'rats'), ('cat', 'cats')]
['cat', 'cats', 'elephant', 'elephants', 'rat', 'rats', 'rat', 'rats', 'cat', 'cats']
from test_helper import Test
Test.assertEquals(singularAndPluralWordsRDDMap.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
'incorrect values for singularAndPluralWordsRDDMap (1d)')
KEYBY
sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)
REDUCEBYKEY
counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
COUNTBYKEY
GROUPBYKEY VS FLATMAPVALUES
ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
.map(lambda (usr,ip): (usr,ip)).groupByKey()
wordsGrouped = wordPairs.groupByKey() --Itertable RDD
for key, value in wordsGrouped.collect():
print '{0}: {1}'.format(key, list(value))
rat: [1, 1]
elephant: [1]
cat: [1, 1]
wordCountsGrouped = wordsGrouped.map(lambda (k, v): (k, sum(v)))
[('rat', 2), ('elephant', 1), ('cat', 2)]
SORTBYKEY
sort_user = hit_user.sortByKey(0)
JOIN
movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
KEYS
VALUES
LOOKUP
LEFTOUTERJOIN,RIGHTOUTERJOIN,
MAPVALUES
zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
FLATMAPVALUES
DOUBLE RDD - NUMERIC ::
# CREATE RDD ::
TWO METHODS:
# 1
my_data = sc.textFile("file:file:/home/training/training_materials/sparkdev/data/frostroad.txt")
# 2
logfile = "file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
logs = sc.textFile(logfile)
logss = sc.textFile(logfiles)
# COUNT NUMBER OF LINES IN THE RDD ::
logs.count() - 3566
logss.count() - 1080926
# DISPLAY ALL THE LINES IN THE RDD ::
logs.first()
-- 233.19.62.103 - 16261 [15/Sep/2013:23:55:57 +0100] "GET /titanic_1100.jpg HTTP/1.0" 200 16713 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F10L"
jpglogs.take(10)
# MAP ::
MAP - EACH RECORD TO ONE/MORE NEW RECORDS.
jpglogs.map(lambda x: len(x)).take(4) - New RDD - MAP(function) applies function in EACH record.
# split() ::
jpglogs.map(lambda x: x.split()).take(5)
ips = jpglogs.map(lambda x: x.split()[0]) - take only first element -- also be x.split(":")
ip_id = logs.map(lambda x: x.split()).map(lambda x: (x[0],x[2])).take(5) - take first and third element from RDD
ip_ids = logs.map(lambda x: x.split()).map(lambda x: (x[0] + "/" + x[2])).take(5) -- u'116.180.70.237/128'
for ip in ips.take(5): print ip --print IP
users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
users = logs.map(lambda s: (s.split()[2],1))
logs.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
mean_x = textdata.map(lambda lines: float(lines.split(',')[1])).mean()
hit_user = users_info.map(lambda x: (x[1],x[0])) - after split
# for loop and add values into RDD silent
for i in range(200):
mydatas = mydatas.map(lambda int: int + 1)
# FILTER ::
filter(condition) - based on boolean include/exclude
sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x != '')
or
shakeWordsRDD = shakeWordsRDD.filter(lambda x: x == '')
# FILTER & MAP ::
jpglogs = logs.filter(lambda x: ".jpg" in x)
sc.textFile("purplecow.txt").map(lambda line: line.upper()).filter(lambda line: line.startswith('I')).count()
htmlog = logs.filter(lambda x: '.htm' in x)
htm_ip = htmlog.map(lambda x: x.split())
htm_five = htm_ip.take(5) -- LIST OF LIST
htm_five[0] - FIRST LIST
len(htm_five) = 5
htm_five[0][6] - address
# RDD and FOR loop ::
fiverec = logrdd.filter(lambda x: '.jpg' in x).map(lambda x: x.split()).take(5)
for item in fiverec:
print "%s/%s" % (item[0],item[2])
htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
for x in htmllogs.take(5):
print x[0]+"/"+x[1]
233.19.62.103/16261
136.98.55.4/22232
219.248.10.104/66652
47.148.67.112/40002
29.110.195.184/39859
# RDD and Explicit Functions ::
def changec(x):
spt = x.split()
return spt[6].lower()
logs.filter(lambda x: '.htm' in x).map(lambda x: x.split()).map(changec).take(2)
[u'/kbdoc-00031.html', u'/kbdoc-00273.html']
import re
def getRequestDoc(s):
return re.search(r'KBDOC-[0-9]*',s).group()
kbreqs = sc.textFile(logfile).filter(lambda line: 'KBDOC-' in line).map(lambda line: (getRequestDoc(line),line.split(' ')[2])).distinct()
kblist = sc.textFile(kblistfile).map(lambda line: line.split(':')).map(lambda fields: (fields[0],fields[1]))
titlereqs = kbreqs.join(kblist)
titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey()
# REDUCEBYKEY::
users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
FLATMAP/FLATMAPVALUES
sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0],fields[1])) .flatMapValues(lambda skus: skus.split(':'))
counts = sc.textFile(file).flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda v1,v2: v1+v2)
KEYBY::
sc.textFile(logfile).keyBy(lambda line: line.split(' ')[2]) - CAN BE KEY FROM ANY OF POSITION.
zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)
COUNTBYKEY ::
GROUPBYKEY VS FLATMAPVALUES ::
ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
.map(lambda (usr,ip): (usr,ip)).groupByKey()
SORTBYKEY ::
sort_user = hit_user.sortByKey(0)
JOIN ::
movies = moviegross.join(movieyear) - COMBAINED BASED ON KEY IN TWO PAIRED RDD.
final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
KEYS ::
VALUES ::
LOOKUP ::
LEFTOUTERJOIN ::
RIGHTOUTERJOIN ::
MAPVALUES ::
zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
FLATMAPVALUES ::
# PARTITIONS::
RDDs ARE STORED IN THE MEMORY OF SPARK EXECUTOR JVMS
- DATA IS PARTITIONED ACROSS WORKER NODES
- EACH PARTITION IN A SEPARATE EXECUTOR
- ACTION - EXECUTOR - TASKS - LOADS FROM EACH FILE BLOCK INTO SINGLE PARTITION RDD_1_0, RDD_1_1, RDD_1_2
- DATA IS DISTRIBUTED ACROSS ALL NODES
- spark.default.parallelism 10
- words.reduceByKey(lambda v1, v2: v1 + v2, 15)
RDD OPERATION EXECUTED IN PARALLEL IN EACH PARTITION
PRESERVE PARTITIONING
map
avglens = sc.textFile(file).flatMap(lambda line: line.split())
.map(lambda word: (word[0],len(word))).groupByKey().map(lambda (k, values): (k, sum(values)/len(values)))
> avglens.count()
flatMap
filter
REPARTITION
reduce
sort
group
OPERATION ON EACH Partition
foreachPartition
mapPartitions
# Step 3 - Parse each partition as a file into an activation XML record
activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
mapPartitionsWithIndex
# Count JPG requests per file, work with small files that fit in single partition
def countJpgs(index,partIter):
jpgcount = 0
for line in partIter:
if "jpg" in line: jpgcount += 1
yield (index,jpgcount)
> jpgcounts = sc.textFile("weblogs/*").mapPartitionsWithIndex(countJpgs)
$
(0,237)
(1,132)
(2,188)
(3,193)
Second Example:
def output_index_and_count(index, iter):
count = 0
for item in iter:
count += 1
yield (index, count)
sqlContext.sql("select * from my_large_table join my_small_temp_table on my_large_table.bit = my_small_temp_table.bit")\
.rdd.mapPartitionsWithIndex(output_index_and_count).filter(lambda p: p[1] > 0).collect()
DRIVER(SC) - MASTER - WORKER - EXECUTOR - JOB - TASKS(STAGES) IN PARALLEL
EXECUTOR TASKS LOADS DATA FROM BLOCK TO PARTITIONS
JOB - STAGE - TASKS
MASTER:
http://localhost:18080/
WORKER:
http://localhost:18081/
STAGE:
http://localhost:4040/
# CACHING ::
RDD SAVES DATA IN MEMORY
> mydata = sc.textFile("purplecow.txt")
> myrdd = mydata.map(lambda s:
s.upper())
> myrdd.cache()
> myrdd2 = myrdd.filter(lambda \
s:s.startswith('I'))
> myrdd2.count()
3
> myrdd2.count()
3
# Caching RDDs Exercise Optional Step 9
from pyspark import StorageLevel
models.unpersist()
models.persist(StorageLevel.DISK_ONLY)
models.count()
models.cache()
CACHED PARTITIONS ARE STORED IN MEMORY IN EXECUTOR JVMs
METHOD
cache
STORES DATA IN MEMORY ONLY
persist
STORAGE LEVELS
- MEMORY_ONLY*D
- MEMORY_AND_DISK - SPILLING
- DISK_ONLY
- MEMORY_ONLY_SER
- MEMORY_AND_DISK_SER
rdd.unpersist() - stop and removal
CHECK POINT - SAVES DATA TO HDFS
sc.setCheckpointDir(directory)
myrdd = …initial value….
while x in xrange(100):
myrdd = myrdd.transform(…)
if x % 3 == 0:
myrdd.checkpoint()
myrdd.count()
myrdd.saveAsTextFile()
BROADCAST VARIABLES ::
BROADCAST VARIABLE - SET BY DRIVER, RETRIVED BY WORKERS
- READ ONLY
SINGLE:
KBLIST_TXT()
WEBLOG()
# read file
weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
kblistfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/kblist.txt")
# filter KBDOC line and get userid, kb request
import re
def getkbvalue(s):
return re.search(r'KBDOC-[0-9]*',s).group()
kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
# get kblist in key value pair
kblist = kblistfile.map(lambda line: line.split(":")).map(lambda field: (field[0],field[1]))
# get userid and titles in kv pair
titlereqs = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
# get userid and title in key values pair
titlereqjoin = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title))
# get RDD values as list
titlegroup = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().mapValues(list).collect()
# get RDD values using reduceby key
titlereduce = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).reduceByKey(lambda t1,t2: t1 + ":" + t2)
# sorting based on USERID
titlesort = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).sortByKey()
# list of titles using map function
titlegroup2 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): (userid,list(titles)))
# REDUCELIST3 EXTERNAL FUNCTION IN MAP
titlegroup3 = kbreqs.join(kblist).map(lambda (docid,(userid,title)): (userid,title)).groupByKey().map(lambda (userid,titles): reducelist3(userid,titles))
def reducelist3(userid, titles):
list_profile = []
list_profile.append(userid)
list_titles = ','.join(titles)
list_profile.append(list_titles)
return list_profile
#
(u'160',
[u'Titanic 4000 - reboots',
u'iFruit 2 - Change the phone ringtone and notification sound',
u'Titanic 2000 - Transfer Contacts']),
# function reduclist
def reducelist2(userid, titles):
return ','.join(titles)
# function reduclist
def reducelist3(userid, titles):
return ','.join(titles)
# for loop to access list RDD
for (userid,titles) in titlegroup:
print userid
for title in titles:
print value
for (userid,titles) in titlereqgroup.take(10):
print 'user id: ',userid
for title in titles: print '\t',title
for (userid,titles) in titlereqgroup.collect():
print "UserID :", userid
count = 0
for title in titles:
count += 1
print "\t Title (%r): %s" % (count,title)
# output
UserID : 40
Title (0): Ronin Novelty Note 3 - rebooting
Title (1): Ronin Novelty Note 1 - Battery Life
Title (2): Titanic 2000 - Back up files
titlereduce.saveAsTextFile("file:/home/training/training_materials/sparkdev/data/titles/")
BROADCAST AS LIST ::
kb
# Read in list of target models from a local file
targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
targetlist = list(map(lambda l: l.strip(),open(targetfile)))
# broadcast the target list to all workers
targetlistbc = sc.broadcast(targetlist)
# filter out requests from devices not in the target list
# web server log file(s)
logfile="hdfs://localhost/user/training/weblogs/*"
targetreqs = sc.textFile(logfile)\
.filter(lambda line: any(model in line for model in targetlistbc.value) )
targetreqs.take(5)
sp_logs = logfile.filter(lambda line: any(model in line for model in targetlist))
BROADCAST AS DICTIONARY:: when lookup file has KV pairs
# read LOCAL file as HDFS
weblogfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log")
# filter KBDOC line and get userid, kb request
import re
def getkbvalue(s):
return re.search(r'KBDOC-[0-9]*',s).group()
kbreqs = weblogfile.filter(lambda line: 'KBDOC-' in line).map(lambda kbline: (getkbvalue(kbline),kbline.split()[2])).distinct()
--1306 COUNT
#
[(u'KBDOC-00172', u'29161'),
(u'KBDOC-00081', u'47010'),
(u'KBDOC-00164', u'165'),
(u'KBDOC-00059', u'42075'),
(u'KBDOC-00281', u'55510')]
# Read local file as DICTIONARY
kblisttxt = "/home/training/training_materials/sparkdev/data/kblist.txt"
kb_dict = dict(map(lambda line: line.split(":"), open(kblisttxt)))
for (key,value) in kb_dict.iteritems():
print key
print value
#
KBDOC-00066
Sorrento F21L - overheating
KBDOC-00270
Titanic 2400 - overheating
KBDOC-00060
MeeToo 1.0 - overheating
for key in kb_dict:
print key,kb_dict[key]
#
KBDOC-00066 Sorrento F21L - overheating
KBDOC-00270 Titanic 2400 - overheating
KBDOC-00060 MeeToo 1.0 - overheating
# SEND THIS DICTIONARY AS BROADCAST VARAIBLE KB_DICT
kb_dict_bc = sc.broadcast(kb_dict)
kblookup = kbreqs.map(lambda (kbid,uid): (uid,kb_dict_bc.value[kbid]))
[(u'29161', 'Sorrento F41L - reboots\n'),
(u'47010', 'Titanic 2200 - Back up files\n'),
(u'165', 'Titanic 4000 - Transfer Contacts\n'),
(u'42075', 'MeeToo 1.0 - Battery Life\n'),
(u'55510', 'Ronin S1 - Battery Life\n')]
# ACCUMULATORS ::
WORKER NODES CAN ADD VALUE
ONLY DRIVER CAN ACCESS VALUE
# JPG_COUNT IS ACCUMULATOR THAT SENDS TO ALL NODES
jpg_count = sc.accumulator(0)
# EXTERNAL FUNCTION TO ADD VALUE TO ACCUMULATOR
def func_count(s):
if '.jpg' in s:
jpg_count.add(1)
# foreach value in RDD
jpgslogs = weblogfile.foreach(lambda line: func_count(line))
# access accumulator from driver
print jpg_count.value
# verify with filter
weblogfile.filter(lambda line: '.jpg' in line).count() -- 237
SQOOP ::
sqoop list-databases --connect jdbc:mysql://localhost --username training --password training
information_schema
movielens
mysql
test
sqoop list-tables --connect jdbc:mysql://localhost/movielens --username training --password training
genre
movie
moviegenre
movierating
occupation
user
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movie
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movierating
In [109]: movierating = sc.textFile("hdfs://localhost/user/training/movierating/part*")
movierating_value = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2])))
[(u'1193', 5), (u'661', 3), (u'914', 3), (u'3408', 4), (u'2355', 5)]
movierating_value_grp = movierating.map(lambda line: line.split("\t")).map(lambda value: (value[1],int(value[2]))).groupByKey()
[(u'593', <pyspark.resultiterable.ResultIterable at 0x2153c90>),
(u'1200', <pyspark.resultiterable.ResultIterable at 0x2153250>),
(u'3724', <pyspark.resultiterable.ResultIterable at 0x2153310>),
(u'1868', <pyspark.resultiterable.ResultIterable at 0x2153950>),
(u'643', <pyspark.resultiterable.ResultIterable at 0x2153f90>)]
movierating_value_grp.mapValues(lambda ratings: sum(ratings)).take(5)
instead of
movierating_value_grp.reduceByKey(lambda value1,value2: value1 + value2).take(5)
sum_movie_rating = movierating_value_grp.mapValues(lambda ratings: sum(ratings)/float(len(ratings)))
Out[126]:
[(u'593', 4.3518231186966645),
(u'1200', 4.1258241758241763),
(u'3724', 3.7179487179487181),
(u'1868', 2.0),
(u'643', 2.0)]
In [110]: movie = sc.textFile("hdfs://localhost/user/training/movie/part*")
In [111]: movie_value = movie.map(lambda line: line.split("\t")).map(lambda value: (value[0],value[1]))
Out[127]:
[(u'1', u'Toy Story'),
(u'2', u'Jumanji'),
(u'3', u'Grumpier Old Men'),
(u'4', u'Waiting to Exhale'),
(u'5', u'Father of the Bride Part II')]
final_movie = movie_value.join(sum_movie_rating).map(lambda (movie_id,(title,rating)): title + "\t" + str(rating))
[u"Wayne's World\t3.60089285714",
u'Aliens\t4.12582417582',
u'Prince of the City\t3.49122807018',
u'Coming Home\t3.71794871795',
u'American Pop\t3.07142857143']
final_movie.saveAsTextFile("hdfs://localhost/user/training/averagerating")
STREAMING ::
http://spark.apache.org/docs/latest/streaming-programming-guide.html
STREAMING :: hdfscount.py
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
"""
Counts words in new text files created in the given directory
Usage: hdfs_wordcount.py <directory>
<directory> is the directory that Spark Streaming will use to find and read new text files.
To run this on your local machine on directory `localdir`, run this example
$ bin/spark-submit examples/src/main/python/streaming/hdfs_wordcount.py localdir
Then create a text file in `localdir` and the words in the file will get counted.
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream(sys.argv[1])
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
STREAMING :: COUNTS
// start the shell with this command:
// spark-shell --master local[2]
//
// in a separate terminal
// nc -lkv 1234
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",1234)
var words = mystream.flatMap(line => line.split("\\W"))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
UsingTheSparkShell.pyspark::
# Step 3 - create an RDD based on a data file
mydata = sc.textFile("file:/home/training/training_materials/sparkdev/data/frostroad.txt")
# Step 4 - count the number of lines in the RDD
mydata.count()
# Step 5 - display all the lines in the RDD
mydata.collect()
LogIPs.pyspark::
# Steps 8, 9 - create an RDD based on a data file
logfile="file:/home/training/training_materials/sparkdev/data/weblogs/2013-09-15.log"
logs = sc.textFile(logfile)
# count the number of records (lines) in the RDD
logs.count()
# Step 10 - Display all lines which are requests for JPG files
jpglogs=logs.filter(lambda x: ".jpg" in x)
jpglogs.collect()
# Step 12 - Display the JPG requests, this time using a single command line
sc.textFile(logfile).filter(lambda x: ".jpg" in x).count()
# Step 13 - Create an RDD of the length of each line in the file
lengths = logs.map(lambda s: len(s))
# Display the first 5 line lengths
lengths.take(5)
# Map the log data to an RDD of arrays of the words on each line
logwords = logs.map(lambda line: line.split())
# Step 15 - Map the log data to an RDD of IP addresses for each line
ips = logs.map(lambda line: line.split()[0])
# Step 17 - Save the IP addresses to text file(s)
ips.saveAsTextFile("file:/home/training/iplist")
# Challenge Exercise 1 - Do the same thing but use the whole data set
sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*").map(lambda s: s.split()[0]).saveAsTextFile("file:/home/training/iplist-entire")
# Challenge Exercise 2 - Display "ip-address/user-id" for the first 5 HTML requests
# in the data set
htmllogs=logs.filter(lambda s: ".htm" in s).map(lambda s: (s.split()[0],s.split()[2]))
for x in htmllogs.take(5): print x[0]+"/"+x[1]
165.32.101.206/8
100.219.90.44/102
182.4.148.56/173
246.241.6.175/45395
UserRequests.pyspark - working with pair RDD::
# Step 1 - Create an RDD based on all the weblogs
logs=sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*")
# map each request (line) to a pair (userid, 1), then sum the values
userreqs = logs \
.map(lambda line: line.split()) \
.map(lambda words: (words[2],1)) \
.reduceByKey(lambda count1,count2: count1 + count2)
# Step 2 - Show the records for the 10 users with the highest counts
userreqs.map(lambda pair: (pair[1],pair[0])).sortByKey(False).take(10)
# Step 3 - Group IPs by user ID
userips = logs \
.map(lambda line: line.split()) \
.map(lambda words: (words[2],words[0])) \
.groupByKey()
# print out the first 10 user ids, and their IP list
for (userid,ips) in userips.take(10):
print userid, ":"
for ip in ips: print "\t",ip
# Step 4a - Map account data to (userid,[values....])
accounts = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
.map(lambda s: s.split(',')) \
.map(lambda account: (account[0],account[1:]))
# Step 4b - Join account data with userreqs then merge hit count into valuelist
accounthits = accounts.join(userreqs)
# Step 4c - Display userid, hit count, first name, last name for the first 5 elements
for (userid,(values,count)) in accounthits.take(5) :
print userid, count, values[2],values[3]
# Challenge 1 - key accounts by postal/zip code
accountsByPCode = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv") \
.map(lambda s: s.split(','))\
.keyBy(lambda account: account[8])
# Challenge 2 - map account data to lastname,firstname
namesByPCode = accountsByPCode\
.mapValues(lambda account: account[4] + ',' + account[3]) \
.groupByKey()
# Challenge 3 - print the first 5 zip codes and list the names
for (pcode,names) in namesByPCode.sortByKey().take(5):
print "---" ,pcode
for name in names: print name
userid1 4 Cheryl West
userid2 8 Elizabeth Kerns
userid3 1 Melissa Roman
PAIR RDD - EXAMPLES::
# READS TWO FILES
logfiles = "file:/home/training/training_materials/sparkdev/data/weblogs/*"
logs = sc.textFile(logfiles)
accfile = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv")
# GET USERS ALON
users = logs.map(lambda s: s.split()[2]).map(lambda user: (user,1))
users = logs.map(lambda s: (s.split()[2],1))
# ADD USERS
users_info = users.reduceByKey(lambda value_1,value_2: value_1 + value_2)
# REVERSE USERS AND HITS
hit_user = users_info.map(lambda x: (x[1],x[0]))
# SORT
sort_user = hit_user.sortByKey(0)
# GET USER ID AND IP ADDRESS
ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0]))
# GROUP USER WITH MULTIPLE IP ADDRESS
ip_user = logs.map(lambda line: line.split()).map(lambda key_value: (key_value[2],key_value[0])).map(lambda (usr,ip): (usr,ip
)).groupByKey()
for (ip,user) in ip_user.take(5):
print ip
for i in user: print '\t',i
# ACCOUNT USER AND INFORMATION
acc_info = accfile.map(lambda line: line.split(",")).map(lambda s: (s[0],(s[3],s[4])))
# merge two RDD based on USER ID
final_info = acc_info.join(users_info).take(25)
final_infos = users_info.join(acc_info).map(lambda (user_id,(cnt,user)): (user_id,(cnt,user))).take(25)
#CHALLENGE 1
zipcode = accfile.keyBy(lambda line: line.split(',')[8]).take(5)
#CHALLENGE 2
zipcode.mapValues(lambda x: x.split(',')[3]+ " " + x.split(',')[4]).take(5)
# CHALLENGE 3
zip_info_sort = zip_info.sortByKey()
zip_info_group = zip_info_sort.groupByKey()
for (zip,users) in zip_info_group:
print "---------",zip
for user in users:
print "\t", user
--- 85003
Jenkins,Thad
Rick,Edward
Lindsay,Ivy
…
--- 85004
Morris,Eric
Reiser,Hazel
Gregg,Alicia
Preston,Elizabeth
SparkHDFS.pyspark::
# Steps 14, 15 - Read in a weblog file, then filter and save the JPG requests
logs=sc.textFile("hdfs://localhost/user/training/weblogs/2014-03-08.log")
logs.filter(lambda s: ".jpg" in s).saveAsTextFile("hdfs://localhost/user/training/jpgs")
TopModels.pyspark:: working with PARTITION & CACHE
# Step 1 - Stub code to copy into Spark Shell
# load XML files containing device activation records.
# Find the most common device models activated
import xml.etree.ElementTree as ElementTree
# Given a partition containing multi-line XML, parse the contents.
# Return an iterator of activation Elements contained in the partition
def getactivations(fileiterator):
s = ''
for i in fileiterator: s = s + str(i)
filetree = ElementTree.fromstring(s)
return filetree.getiterator('activation')
# Get the model name from a device activation record
def getmodel(activation):
return activation.find('model').text
# Step 2 - Read XML files into an RDD and show number of partitions
filename="hdfs://localhost/user/training/activations/*.xml"
# Load activation files
activations = sc.textFile(filename)
# Show the partitioning
print "Activations: ",activations.toDebugString()
# Step 3 - Parse each partition as a file into an activation XML record
activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
# Step 4 - Map each activation record to a device model name
models = activationTrees.map(lambda activation: getmodel(activation))
# Step 5 - Show the partitioning
print "Models: ",models.toDebugString()
# Step 6 - Count activations by model
modelcounts = models.map(lambda model: (model,1)).reduceByKey(lambda v1,v2: v1+v2)
# Optionally, show the partitioning.
print "Model Counts: ",modelcounts.toDebugString()
# Step 7 - Display the top 10 models
for (count,model) in modelcounts.map(lambda (model,count): (count,model)).top(10):
print "Model %s (%s)" % (model,count)
# Caching RDDs Exercise Optional Step 9
from pyspark import StorageLevel
models.unpersist()
models.persist(StorageLevel.DISK_ONLY)
models.count()
output ::
iFruit 1 (392)
Sorrento F00L (224)
MeeToo 1.0 (12)
IterationTest.pyspark :: Checkpointing RDDs
# Step 1 - create an RDD of integers
mydata = sc.parallelize([1,2,3,4,5])
# Step 2 - loop 200 times
for i in range(200):
mydata = mydata.map(lambda myInt: myInt + 1)
# Step 3 - collect and display the data in the RDD
for x in mydata.collect(): print x
# Step 4 - show the final RDD
mydata.toDebugString()
# Steps 5,6 - repeat steps 2 - 4 above until you receive and error
# Note: The steps above demonstrated the error without checkpointing
# The steps below are a simple program to iteratively create child RDDs
# from parent RDDs. Use iPython %paste to run.
# Step 9 - enable checkpointing.
sc.setCheckpointDir("checkpoints")
# Step 10 - create an RDD of integers
mydata = sc.parallelize([1,2,3,4,5])
# Steps 11, 12 - Iterate to generate a new RDD which descends from prior RDDs
# resulting in a long lineage
for i in range(1000):
mydata = mydata.map(lambda myInt: myInt + 1)
print "Iteration",i
# Every 10 iterations, checkpoint the RDD, and materialize it to save the checkpoint file
# shorten the lineage
if i % 10 == 0:
print "Checkpoint"
mydata.checkpoint()
mydata.count()
# Step 13 - collect the contents of the RDD to an array and display
for x in mydata.collect(): print x
# Step 14 - Display the lineage (formatted)
for rddstring in mydata.toDebugString().split('\n'): print rddstring.strip()
COUNTJPGs.PY :: APPLICATION
import sys
from pyspark import SparkContext
from pyspark import SparkConf
if __name__ == "__main__":
if len(sys.argv) < 2:
print >> sys.stderr, "Usage: CountJPGs <file>"
exit(-1)
sc = SparkContext()
# Challenge: Configure app name and UI port programatically
# sconf = SparkConf().setAppName("My Spark App").set("spark.ui.port","4444")
# sc = SparkContext(conf=sconf)
logfile = sys.argv[1]
count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()
print "Number of JPG requests: ", count
COMMAND::
spark-submit CountJPGs.py weblogs/*
spark-submit --master spark://localhost:7077 -name 'Count JPGs' CountJPGs.py weblogs/*
--properties-file myspark.conf
myspark.conf
spark.app.name My Spark App
spark.ui.port 4141
spark.master spark://localhost:7077
localfile.py ::
# _*_ coding: utf-8 _*_
from pyspark import SparkContext
textfile = "file:/ap_data/etl/bin/spark/training_materials/sparkdev/data/purplecow.txt"
sc = SparkContext("local", "Test_app")
textdata = sc.textFile(textfile)
#textdata = sc.textFile(textfile).cache()
#################################################
# number of line with a '1' in it.
num_1 = textdata.map(lambda line:line.upper()).filter(lambda line: line.startswith('I')).count()
print "**********************************************************************************************************", num_1
spark-submit localfile.py
countjpgs_2.py::
import sys
from pyspark import SparkContext
from pyspark import SparkConf
if __name__ == "__main__":
print "Counting starting..."
if len(sys.argv) < 2:
print >> sys.stderr, "usage: CountJPGs <file>"
exit(-1)
else:
print "Counting Started"
sc = SparkContext()
logfile = sys.argv[1]
log = sc.textFile(logfile)
logfilter = log.filter(lambda line: '.jpg' in line)
count = logfilter.count()
print "Number of JPG request", count
TargetModels ::
# Read in list of target models from a local file
targetfile = "/home/training/training_materials/sparkdev/data/targetmodels.txt"
targetlist = list(map(lambda l: l.strip(),open(targetfile)))
# broadcast the target list to all workers
targetlistbc = sc.broadcast(targetlist)
# filter out requests from devices not in the target list
# web server log file(s)
logfile="hdfs://localhost/user/training/weblogs/*"
targetreqs = sc.textFile(logfile)\
.filter(lambda line: any(model in line for model in targetlistbc.value) )
targetreqs.take(5)
RequestAccumulator ::
jpgcount = sc.accumulator(0)
htmlcount = sc.accumulator(0)
csscount = sc.accumulator(0)
def countFileType(s):
if '.jpg' in s: jpgcount.add(1)
elif '.html' in s: htmlcount.add(1)
elif '.css' in s: csscount.add(1)
filename="hdfs://localhost/user/training/weblogs/*"
logs = sc.textFile(filename)
logs.foreach(lambda line: countFileType(line))
print 'Request Totals:'
print '.css requests: ', csscount.value
print '.html requests: ', htmlcount.value
print '.jpg requests: ', jpgcount.value
PROJECT ::
line = sc.textFile("hdfs://nameservice1/user/hive/warehouse/ap_cptsph2_pl_dev.db/sff_prod_cptsph2")
line.map(lambda s: s.split(",")).take(5)
prodline = line.filter(lambda l: l.startswith('2'))
/analytics_platform_dev/etl/Colgate/cp_tsp_h/charn/cp_tsp_h/
hdfs dfs -cat /analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item
file = sc.textFile("hdfs://nameservice1/analytics_platform_dev/etl/Colgate/cp_tsp_h/item/cp_tsp_h.item")
file.count()
line = file.map(lambda s: s.split(";"))
fields = line.map(lambda fields: (fields[0],fields[1],fields[2]))
file.map(lambda s: s.split(";")).filter(lambda fields: int(fields[0]) == 2).take(10)
#hdfs://tparhegapd017.nielsen.com:8020/analytics_platform_dev/
How to use pycharm or Ipython with spark
connect to hive table through pyspark - column separator
import pandas as pd
from pyspark.sql import HiveContext,SQLContext
sqlContext = HiveContext(sc)
df = sqlContext.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)
store_char_pd = pd.DataFrame(df, columns=('store_id','char_abr','char_val'))
sc_p = store_char_pd.pivot(index=store_id,columns=char_abr,values=char_val)
sc_p.to_csv("/ap_data/dev/Basir/sample.csv")
>>> sc_p
<class 'pandas.core.frame.DataFrame'>
sqlc = SQLContext(sc)
sdf = sqlc.sql("select store_id,char_abr,char_val from ap_us_model_dev.store_char_value limit 100").take(25)
pdf = sqlc.createDataFrame(store_char_pd)
>>> type(pdf)
<class 'pyspark.sql.dataframe.DataFrame'>
============================================================================================================================================
readme = sc.textFile("file:/ap_data/etl/bin/spark/labfiles/README.md")
LOGGER ::
down vote
You can get the logger from the SparkContext object:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
to turn OFF or set only WARNING logs
LoggerManager()
logger = logging.getLogger(__name__)
loggerSpark = logging.getLogger('py4j')
loggerSpark.setLevel('WARNING')
sc logs
sc.setLogLevel("FATAL").
ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
HEADER ::
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
DMLE ::
dmleuser
tempDevUse
ctolab/ctolab
SPARK-SUBMIT ::
export JAVA_HOME="/usr/java/jdk1.7.0_67-cloudera/"
export SPARK_HOME="/opt/cloudera/parcels/CDH-5.5.2-1.cdh5.5.2.p0.4/lib/spark"
export PATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PATH"
export YARN_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
spark-submit --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser --py-files oozie_egg_test.egg "$@"
pyspark --master yarn --deploy-mode client --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/ --num-executors 2 --driver-memory 16g --executor-memory 2g --executor-cores 1 --queue dmleuser oozie_egg_test.egg "$@"
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-submit.html
http://blog.appliedinformaticsinc.com/how-to-write-spark-applications-in-python/
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
http://www.slideshare.net/BenjaminBengfort/fast-data-analytics-with-spark-and-python
No comments:
New comments are not allowed.