Coursera课程Big Data Integration and Processing-Final Project Spark答案
Final Project是利用Spark读取tweet文档,并做相应的分析。这个题目我前后共花费了两周的时间,在spark里艰难探索,最后发现其实并没有想象的那么难。所以还是打算把答案分享出来,供在此题中艰难探索的同志们参考一下。
# Import and create a new SQLContext
from pyspark.sql import SQLContext
sc=SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
第一个坑,在Coursera给的ipynb文件里,是没有第二行的,所以直接运行会导致sc not found之类的提示。我查了半天论坛,试了好几种方法,最后发现其实只需要加上中间这行,就能够正常运行了,不需要重装pyspark什么的。
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')
# Convert each line into a pair of words
#country_lines.take(3)
words=country_lines.map(lambda line:line.split(","))
words.take(3)
这一步,就要将countryline转为(国家,国家缩写)的形式。我一开始做的时候,是参考了week2时候,莎士比亚文集的做法,以空格作为split, 并且用了flatmap,导致了结果大错特错。
这里稍微解释一下map。sc.textFile读取的文件格式,是将每一行视为一个单元块。而单元块里只会视为一个整体的字符串。例如country_list这个RDD,读取之后的形式应该是类似一个n1的数据组:
‘Afghanistan, AFG’
‘Albania, ALB’
…
而我们要做的,是要将每一行,拆分为两列。map函数的作用,就是对每一个单元块进行相同的操作, 有点类似pandas里的纵向apply函数。因此,如果我们map了一个split函数,就能将RDD结构转换为n2数据组:
‘Afghanistan‘, ‘ AFG’
‘Albania‘, ‘ALB’
…
就是我们想要的结果。
而flatmap函数,则是在对所有的单元格map了一个函数之后,再展成1*m的一维结构。
# Convert each pair of words into a tuple
country_tuples=words.map(lambda x :x)
在week2的题目中,这里map里的函数是lambda x:(x,1),意思是给每一个词都匹配一个数字1,是为了方便后续的统计。但此题中,由于我们在上面已经做了split的这个动作,因此在这一步仅仅是做一个赋值的动作,从words赋值到country_tuples里。
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)
结果如下:
root
|-- country: string (nullable = true)
|-- code: string (nullable = true)
[Row(country='Afghanistan', code=' AFG'),
Row(country='Albania', code=' ALB'),
Row(country='Algeria', code=' ALG')]
# Read tweets CSV file into RDD of lines
tweets_lines=sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweet.csv')
# Clean the data: some tweets are empty. Remove the empty tweets using filter()
import json
def js(a):
return json.loads(a)
tweets_dict=tweets_lines.map(lambda x: js(x))
clean_tweets=tweets_dict.filter(lambda x:x['tweet_text']!='')
clean_tweets.first()
困扰我最久的一个坑。这个问题的难点在于,如何定位到tweet_text。我看论坛中的答案,很多人都是用tweet_lines.filter(lambda x:len(x)>0),但是这是将整条line视为一个整体去判断是否有空。但是,tweet.csv这个文件,他实际的结构是一个复杂的字典,结果如下:
{'_id': {'$oid': '578ffa8e7eb9513f4f55a935'},
'coordinates': None,
'retweet_count': 0,
'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>',
'tweet_ID': '755891629932675072',
'tweet_followers_count': 461,
'tweet_mentioned_count': 1,
'tweet_text': 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING https://t.co/BFnV6jfkBL',
'user': { 'CreatedAt': {'$date': '2011-12-27T09:04:01.000Z'},
'FavouritesCount': 5223,
'FollowersCount': 461,
'FriendsCount': 619,
'Location': '501',
'UserId': 447818090},
'user_name': 'koteras'}
我认为题干中要求的“some tweets are empty”,指的应该是tweet_text这个内容是empty的才对。但是,又由于rdd在读取的时候,会将这整条line看成一个字符串,因此我需要做的是从字符串正确提取tweet_text的内容。
我一开始用了本办法,用一连串的split去定位提取。但是这么做的问题是,无法map到整个数据中,可能某些行并不满足split的条件。最后,我找到的办法就是用json这个包去解析,将字符串转为字典。这样之后,就能用字典的Key‘tweet_text’去定位推特内容了。
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweets_text=clean_tweets.map(lambda x:x['tweet_text'])
tweets_text_all=tweets_text.flatMap(lambda x:x.split(" "))
tweets_text_count=tweets_text_all.map(lambda x:(x,1))
counts=tweets_text_count.reduceByKey(lambda a, b:(a+b))
counts.take(5)
下一个困扰我很久的坑。一开始我以为此题的目的,是为了统计每一条推特的字数,之后又要和country_tuple匹配,因此可能还需要Location里的数据作为两表连接点。这里面绕了太多弯路。最后我才发现,是审题出现了问题。
题目实际的要求是:统计所有推特文中,各个国家被提及的次数。
因此,我们的思路也就变得很简单了,把所有的tweet_text展开成行,分割成不同的词,类似字数week2中的字数统计一样,每个词后面匹配一个数字1,再用reduceByKey,统计各个词出现的次数。
这样一来,我们得到的结果,就是所有推特文中每个词对应的频数。结果如下:
('', 2772),
('https://t.co/fQftAwGAad', 1),
('https://t.co/kjl4XvCHEM', 1),
('mobile', 1),
('#FridayNightTouchdown', 1)]
接着,将这个tuple转换成dataframe就可以了:
# Create the DataFrame of tweet word counts
wordDF = sqlContext.createDataFrame(counts, ["word", "no"])
wordDF.printSchema()
wordDF.take(3)
结果如下
root
|-- word: string (nullable = true)
|-- no: long (nullable = true)
[Row(word='', no=2772),
Row(word='https://t.co/fQftAwGAad', no=1),
Row(word='https://t.co/kjl4XvCHEM', no=1)]
下一步,将词频dataframe和countryDF做一个join,就可以得到匹配的结果。
# Join the country and tweet data frames (on the appropriate column)
merge=countryDF.join(wordDF,countryDF['country']==wordDF['word'],"left")
merge.take(5)
#merge.printSchema()
结果如下:
[Row(country='Cyprus', code=' CYP', word=None, no=None),
Row(country='Luxembourg', code=' LUX', word=None, no=None),
Row(country='Bulgaria', code=' BUL', word=None, no=None),
Row(country='Guinea-Bissau', code=' GNB', word=None, no=None),
Row(country='Thailand', code=' THA', word='Thailand', no=1)]
最后的四个问题:
# Question 1: number of distinct countries mentioned
tmp=merge.where(merge['no'].isNull()==False)
tmp.distinct().count()
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
tmp.groupby().sum('no').collect()
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
d=tmp.orderBy(desc("no"))
d.take(3)
# Table 2: counts for Wales, Iceland, and Japan.
c=tmp.where(tmp['country'].isin(['Wales','Kenya','Netherlands']))
c.collect()
总而言之,绕弯路不一定是坏事,至少绕的过程也让我对spark有更深入的了解了。
如有疑问,随时欢迎交流~虽然我也不是很懂,都还在摸索。
上一篇: Lake Counting
下一篇: Apache配置虚拟目录和多主机头