欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Coursera课程Big Data Integration and Processing-Final Project Spark答案

程序员文章站 2022-07-14 20:54:03
...

Final Project是利用Spark读取tweet文档,并做相应的分析。这个题目我前后共花费了两周的时间,在spark里艰难探索,最后发现其实并没有想象的那么难。所以还是打算把答案分享出来,供在此题中艰难探索的同志们参考一下。

# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sc=SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

第一个坑,在Coursera给的ipynb文件里,是没有第二行的,所以直接运行会导致sc not found之类的提示。我查了半天论坛,试了好几种方法,最后发现其实只需要加上中间这行,就能够正常运行了,不需要重装pyspark什么的。

# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

# Convert each line into a pair of words
#country_lines.take(3)
words=country_lines.map(lambda line:line.split(","))
words.take(3)

这一步,就要将countryline转为(国家,国家缩写)的形式。我一开始做的时候,是参考了week2时候,莎士比亚文集的做法,以空格作为split, 并且用了flatmap,导致了结果大错特错。

这里稍微解释一下map。sc.textFile读取的文件格式,是将每一行视为一个单元块。而单元块里只会视为一个整体的字符串。例如country_list这个RDD,读取之后的形式应该是类似一个n1的数据组:
‘Afghanistan, AFG’
‘Albania, ALB’

而我们要做的,是要将每一行,拆分为两列。map函数的作用,就是对每一个单元块进行相同的操作, 有点类似pandas里的纵向apply函数。因此,如果我们map了一个split函数,就能将RDD结构转换为n
2数据组:
‘Afghanistan‘, ‘ AFG’
‘Albania‘, ‘ALB’

就是我们想要的结果。

而flatmap函数,则是在对所有的单元格map了一个函数之后,再展成1*m的一维结构。

# Convert each pair of words into a tuple
country_tuples=words.map(lambda x :x)

在week2的题目中,这里map里的函数是lambda x:(x,1),意思是给每一个词都匹配一个数字1,是为了方便后续的统计。但此题中,由于我们在上面已经做了split的这个动作,因此在这一步仅仅是做一个赋值的动作,从words赋值到country_tuples里。

# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

结果如下:

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)

[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]
# Read tweets CSV file into RDD of lines
tweets_lines=sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweet.csv')
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
import json
def js(a):
    return json.loads(a)

tweets_dict=tweets_lines.map(lambda x: js(x))
clean_tweets=tweets_dict.filter(lambda x:x['tweet_text']!='')
clean_tweets.first()

困扰我最久的一个坑。这个问题的难点在于,如何定位到tweet_text。我看论坛中的答案,很多人都是用tweet_lines.filter(lambda x:len(x)>0),但是这是将整条line视为一个整体去判断是否有空。但是,tweet.csv这个文件,他实际的结构是一个复杂的字典,结果如下:

{'_id': {'$oid': '578ffa8e7eb9513f4f55a935'},
 'coordinates': None,
 'retweet_count': 0,
 'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>',
 'tweet_ID': '755891629932675072',
 'tweet_followers_count': 461,
 'tweet_mentioned_count': 1,
 'tweet_text': 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'user': {  'CreatedAt': {'$date': '2011-12-27T09:04:01.000Z'},
  			'FavouritesCount': 5223,
  			'FollowersCount': 461,
  			'FriendsCount': 619,
  			'Location': '501',
  			'UserId': 447818090},
		    'user_name': 'koteras'}

我认为题干中要求的“some tweets are empty”,指的应该是tweet_text这个内容是empty的才对。但是,又由于rdd在读取的时候,会将这整条line看成一个字符串,因此我需要做的是从字符串正确提取tweet_text的内容。
我一开始用了本办法,用一连串的split去定位提取。但是这么做的问题是,无法map到整个数据中,可能某些行并不满足split的条件。最后,我找到的办法就是用json这个包去解析,将字符串转为字典。这样之后,就能用字典的Key‘tweet_text’去定位推特内容了。

# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)


tweets_text=clean_tweets.map(lambda x:x['tweet_text'])
tweets_text_all=tweets_text.flatMap(lambda x:x.split(" "))
tweets_text_count=tweets_text_all.map(lambda x:(x,1))
counts=tweets_text_count.reduceByKey(lambda a, b:(a+b))
counts.take(5)

下一个困扰我很久的坑。一开始我以为此题的目的,是为了统计每一条推特的字数,之后又要和country_tuple匹配,因此可能还需要Location里的数据作为两表连接点。这里面绕了太多弯路。最后我才发现,是审题出现了问题。

题目实际的要求是:统计所有推特文中,各个国家被提及的次数

因此,我们的思路也就变得很简单了,把所有的tweet_text展开成行,分割成不同的词,类似字数week2中的字数统计一样,每个词后面匹配一个数字1,再用reduceByKey,统计各个词出现的次数。

这样一来,我们得到的结果,就是所有推特文中每个词对应的频数。结果如下:

('', 2772),
 ('https://t.co/fQftAwGAad', 1),
 ('https://t.co/kjl4XvCHEM', 1),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1)]

接着,将这个tuple转换成dataframe就可以了:

# Create the DataFrame of tweet word counts
wordDF = sqlContext.createDataFrame(counts, ["word", "no"])
wordDF.printSchema()
wordDF.take(3)

结果如下

root
 |-- word: string (nullable = true)
 |-- no: long (nullable = true)

[Row(word='', no=2772),
 Row(word='https://t.co/fQftAwGAad', no=1),
 Row(word='https://t.co/kjl4XvCHEM', no=1)]

下一步,将词频dataframe和countryDF做一个join,就可以得到匹配的结果。

# Join the country and tweet data frames (on the appropriate column)

merge=countryDF.join(wordDF,countryDF['country']==wordDF['word'],"left")
merge.take(5)
#merge.printSchema()

结果如下:

[Row(country='Cyprus', code=' CYP', word=None, no=None),
 Row(country='Luxembourg', code=' LUX', word=None, no=None),
 Row(country='Bulgaria', code=' BUL', word=None, no=None),
 Row(country='Guinea-Bissau', code=' GNB', word=None, no=None),
 Row(country='Thailand', code=' THA', word='Thailand', no=1)]

最后的四个问题:

# Question 1: number of distinct countries mentioned
tmp=merge.where(merge['no'].isNull()==False)
tmp.distinct().count()
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
tmp.groupby().sum('no').collect()
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
d=tmp.orderBy(desc("no"))
d.take(3)
# Table 2: counts for Wales, Iceland, and Japan.
c=tmp.where(tmp['country'].isin(['Wales','Kenya','Netherlands']))
c.collect()

总而言之,绕弯路不一定是坏事,至少绕的过程也让我对spark有更深入的了解了。

如有疑问,随时欢迎交流~虽然我也不是很懂,都还在摸索。

相关标签: Big Data