SPARK安装:
访问 Spark 下载页面 ,并选择最新版本的 Spark 直接下载,当前的最新版本是 2.4.2 。下载好之后需要解压缩到安装文件夹中,如 /opt 目录下:
tar -xzf spark-2.4.2-bin-hadoop2.7.tgz mv spark-2.4.2-bin-hadoop2.7/opt/spark-2.4.2
为了能在终端中直接打开 Spark 的 shell 环境,需要配置相应的环境变量,配置环境到 ~/.bashrc 中:
sudo gedit ~/.bashrc export SPARK_HOME=/opt/spark-2.4.2export PATH=$SPARK_HOME/bin:$PATH export <a href="https://www.linuxidc.com/topicnews.aspx?tid=17" target="_blank" title="Python">Python</a>PATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
配置完成后,shell 中输入spark-shell 或 pyspark 就可以进入到 Spark 的交互式编程环境中,前者是进入Scala交互式环境,后者是进入Python交互式环境。
SPARK-SHELL使用方法:
首先参照”HDFS创建用户“为自己创建一个hdfs账户。然后直接使用pyspark就可以启动python版本的spark-shell。
在进行示例操作前,建议先使用“HDFS上传/下载文件夹”中的命令创建一个可以操作的文件。
HDFS创建用户:
默认情况下,hdfs只有一个用户,用户名是hdfs,这个用户相当于root用户,拥有最高权限。为了方便管理,不建议使用这个用户,建议为自己创建一个用户目录,这样互不影响。首先确保当前shell是你自己的用户。然后输入:
sudo -u hdfs hadoop fs -mkdir /user/$USER sudo -u hdfs hadoop fs -chown $USER /user/$USER
这样就在hdfs的/user/目录下创建了一个你的目录,你所有hdfs命令当前路径就是指这个路径。常见命令如下:
HDFS上传文件:
hadoop fs -put /本地电脑的路径 /hdfs路径
如:
echo "123" > test_put hadoop fs -put test_put .
这样就上传到/user/$User/test_put上了。
查看文件:
hadoop fs -ls /hdfs路径
如:
hadoop fs -ls hadoop fs -ls . hadoop fs -ls /user/$User
HDFS下载文件:
hadoop fs -get /hdfs路径 /本地路径
如:
hadoop fs -get test_put test_put_from_hdfs
HDFS删除文件:
hadoop fs -rm /hdfs路径
如:
hadoop fs -rm test_put
你再使用hadoop fs -ls 就看不到刚才上传的文件了。
HDFS上传/下载文件夹:
与上传文件完全相同,该命令支持上传/下载文件夹
如:
mkdir test_put_dir && echo "123" > test_put_dir/1 && echo "456" > test_put_dir/2 hadoop fs -put test_put_dir . hadoop fs -ls hadoop fs -ls test_put_dir
就能看到你上传的文件夹以及文件夹内容。
hadoop fs -get test_put_dir test_put_dir_from_hdfs
就可以下载下来
把整个文件夹当作一个文件下载:
通常spark命令跑出来的文件是并行跑的,所以一个文件会按文件块分配在不同机器上,文件块又拼凑成part,看起来就像是把一个超大文件按照文件行拆分为part-0000到part-0100这样的。所以有时候你跑完的东西希望当作一个文件处理这时候就需要把这个文件夹当作一个文件下载,命令是:
hadoop fs -getmerge test_put_dir test_put_onefile
HDFS移动文件/文件夹:
hadoop fs -mv /hdfs路径 /新的hdfs路径
HDFS删除文件夹:
hadoop fs -rmr /hdfs路径
如:
hadoop fs -rmr test_put_dir
HDFS其余命令:
其余命令可以参考hadoop fs -help去查看。
#读取文件: file_rdd = sc.textFile("test_put_dir") #查看文件的第一行: file_rdd.first() #手动创建一个RDD list_rdd = sc.parallelize([7,7,8]) #执行action和tranformation #map,为每一行的数据执行一个函数,可以是加减乘除,也可以是分词、词性标注。 list_rdd_map_1 = list_rdd.map(lambda x:(x,1)) #reduceByKey,根据数据key进行聚合得到key,value_list。然后对value_list一个接一个执行函数输出一个值。 list_rdd_num_count = list_rdd_map_1.reduceByKey(lambda value1,value2: (value1+value2)) #将rdd收集到当前spark-shell所在机器变成一个变量(谨慎操作,如果文件过大会导致OOM,你之前跑的就白跑了) list_rdd_num_count_local = list_rdd_num_count.collect() #缓存rdd(当这个rdd会再次使用,并且是很久以后再次使用,才使用缓存操作。) list_rdd_num_count.cache() #存储到hdfs list_rdd_map_1.saveAsTextFile("list_rdd_map_1") #统计数目 list_rdd_map_1.count() #其余操作可以参考一些手册常见操作有map/reduce/reduceByKey/groupByKey/flatMap/filter/join