pyspark访问hive数据实战

直接进行spark开发需要去学习scala,为了降低数据分析师的学习成本,决定前期先试用sparkSQL,能够让计算引擎无缝从MR切换到spark,现在主要使用pyspark访问hive数据。

直接进行spark开发需要去学习scala,为了降低数据分析师的学习成本,决定前期先试用sparkSQL,能够让计算引擎无缝从MR切换到spark,现在主要使用pyspark访问hive数据。

数据分析都是直接使用hive脚本进行调用,随着APP用户行为和日志数据量的逐渐累积,跑每天的脚本运行需要花的时间越来越长,虽然进行了sql优化,但是上spark已经提上日程。

直接进行spark开发需要去学习scala,为了降低数据分析师的学习成本,决定前期先试用sparkSQL,能够让计算引擎无缝从MR切换到spark,现在主要使用pyspark访问hive数据。

以下是安装配置过程中的详细步骤:

1.安装spark

需要先安装JDK和scala,这不必多说,由于现有hadoop集群版本是采用的2.6.3,所以spark版本是下载的稳定版本spark-1.4.0-bin-hadoop2.6.tgz

我是先在一台机器上完成了Spark的部署,Master和Slave都在一台机器上。注意要配置免秘钥ssh登陆。

1.1 环境变量配置

  1. exportJAVA_HOME=/usr/jdk1.8.0_73
  2. exportHADOOP_HOME=/usr/hadoop
  3. exportHADOOP_CONF_DIR=/usr/hadoop/etc/hadoop
  4. exportSCALA_HOME=/usr/local/scala-2.11.7
  5. exportSPARK_HOME=/home/hadoop/spark_folder/spark-1.4.0-bin-hadoop2.6
  6. exportSPARK_MASTER_IP=127.0.0.1
  7. exportSPARK_MASTER_PORT=7077
  8. exportSPARK_MASTER_WEBUI_PORT=8099
  9. exportSPARK_WORKER_CORES=3//每个Worker使用的CPU核数
  10. exportSPARK_WORKER_INSTANCES=1//每个Slave中启动几个Worker实例
  11. exportSPARK_WORKER_MEMORY=10G//每个Worker使用多大的内存
  12. exportSPARK_WORKER_WEBUI_PORT=8081//Worker的WebUI端口号
  13. exportSPARK_EXECUTOR_CORES=1//每个Executor使用使用的核数
  14. exportSPARK_EXECUTOR_MEMORY=1G//每个Executor使用的内存
  15. exportHIVE_HOME=/home/hadoop/hive
  16. exportSPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
  17. exportLD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native

1.2 配置slaves

  1. cpslaves.templateslaves
  2. vislaves添加以下内容:localhost

1.3 启动master和slave

  1. cd$SPARK_HOME/sbin/
  2. ./start-master.sh
  3. 启动日志位于$SPARK_HOME/logs/目录,访问http://localhost:8099,即可看到Spark的WebUI界面
  4. 执行./bin/spark-shell,打开Scala到Spark的连接窗口

2.SparkSQL与Hive的整合

  1. 拷贝$HIVE_HOME/conf/hive-site.xml和hive-log4j.properties到$SPARK_HOME/conf/
  2. 在$SPARK_HOME/conf/目录中,修改spark-env.sh,添加
  3. exportHIVE_HOME=/home/hadoop/hive
  4. exportSPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
  5. 另外也可以设置一下Spark的log4j配置文件,使得屏幕中不打印额外的INFO信息(如果不想受干扰可设置为更高):
  6. log4j.rootCategory=WARN,console
  7. 进入$SPARK_HOME/bin,执行./spark-sql–masterspark://127.0.0.1:7077进入spark-sqlCLI:
  8. [hadoop@hadoopspark]$bin/spark-sql--help
  9. Usage:./bin/spark-sql[options][clioption]
  10. CLIoptions:
  11. -d,--define<keykey=value>Variablesubsitutiontoapplytohive
  12. commands.e.g.-dA=Bor--defineA=B
  13. --database<databasename>Specifythedatabasetouse
  14. -e<quoted-query-string>SQLfromcommandline
  15. -f<filename>SQLfromfiles
  16. -h<hostname>connectingtoHiveServeronremotehost
  17. --hiveconf<propertyproperty=value>Usevalueforgivenproperty
  18. --hivevar<keykey=value>Variablesubsitutiontoapplytohive
  19. commands.e.g.--hivevarA=B
  20. -i<filename>InitializationSQLfile
  21. -p<port>connectingtoHiveServeronportnumber
  22. -S,--silentSilentmodeininteractiveshell
  23. -v,--verboseVerbosemode(echoexecutedSQLtothe
  24. console)

需要注意的是CLI不是使用JDBC连接,所以不能连接到ThriftServer;但可以配置conf/hive-site.xml连接到hive的metastore,然后对hive数据进行查询。下面我们接着说如何在python中连接hive数据表查询。

3.配置pyspark和示例代码

3.1 配置pyspark

  1. 打开/etc/profile:
  2. #PythonPath将Spark中的pySpark模块增加的Python环境中
  3. exportPYTHONPATH=/opt/spark-hadoop/python
  4. source/etc/profile

执行./bin/pyspark ,打开Python到Spark的连接窗口,确认没有报错。

打开命令行窗口,输入python,Python版本为2.7.6,如图所示,注意Spark暂时不支持Python3。输入import pyspark不报错,证明开发前工作已经完成。

3.2 启动ThriftServer

启动ThriftServer,使之运行在spark集群中:

sbin/start-thriftserver.sh --master spark://localhost:7077 --executor-memory 5g

ThriftServer可以连接多个JDBC/ODBC客户端,并相互之间可以共享数据。

3.3 请求示例

查看spark官方文档说明,spark1.4和2.0对于sparksql调用hive数据的API变化并不大。都是用sparkContext 。

pyspark访问hive数据实战

  1. frompysparkimportSparkConf,SparkContext
  2. frompyspark.sqlimportHiveContext
  3. conf=(SparkConf()
  4. .setMaster("spark://127.0.0.1:7077")
  5. .setAppName("Myapp")
  6. .set("spark.executor.memory","1g"))
  7. sc=SparkContext(conf=conf)
  8. sqlContext=HiveContext(sc)
  9. my_dataframe=sqlContext.sql("Selectcount(1)fromlogs.fmnews_dim_where")
  10. my_dataframe.show()

返回结果:

pyspark访问hive数据实战

运行以后在webUI界面看到job运行详情。

pyspark访问hive数据实战

4.性能比较

截取了接近一个月的用户行为数据,数据大小为2G,总共接近1600w条记录。

为了测试不同sql需求情况下的结果,我们选取了日常运行的2类sql:

1.统计数据条数:

  1. selectcount(1)fromfmnews_user_log2;

2.统计用户行为:

  1. SELECTdevice_id,min_timeFROM
  2. (SELECTdevice_id,min(import_time)min_timeFROMfmnews_user_log2
  3. GROUPBYdevice_id)a
  4. WHEREfrom_unixtime(int(substr(min_time,0,10)),'yyyy-MM-dd')='2017-03-02';

3. 用户行为分析:

  1. selectcasewhenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'06:00'and'07:59'then1
  2. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'08:00'and'09:59'then2
  3. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'10:00'and'11:59'then3
  4. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'12:00'and'13:59'then4
  5. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'14:00'and'15:59'then5
  6. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'16:00'and'17:59'then6
  7. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'18:00'and'19:59'then7
  8. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'20:00'and'21:59'then8
  9. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'22:00'and'23:59'then9
  10. else0endfmnews_time_type,count(distinctdevice_id)device_count,count(1)click_count
  11. fromfmcm.fmnews_user_log2
  12. wherefrom_unixtime(int(substr(import_time,0,10)),'yyyy-MM-dd')='2017-03-02'
  13. groupbycasewhenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'06:00'and'07:59'then1
  14. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'08:00'and'09:59'then2
  15. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'10:00'and'11:59'then3
  16. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'12:00'and'13:59'then4
  17. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'14:00'and'15:59'then5
  18. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'16:00'and'17:59'then6
  19. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'18:00'and'19:59'then7
  20. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'20:00'and'21:59'then8
  21. whenfrom_unixtime(int(substr(fmnews_time,0,10)),'HH:mm')between'22:00'and'23:59'then9
  22. else0end;

第一条sql的执行结果对比:hive 35.013 seconds

pyspark访问hive数据实战

第一条sql的执行结果对比:sparksql 1.218 seconds

pyspark访问hive数据实战

第二条sql的执行结果对比:hive 78.101 seconds

pyspark访问hive数据实战

第二条sql的执行结果对比:sparksql 8.669 seconds

pyspark访问hive数据实战

第三条sql的执行结果对比:hive 101.228 seconds

pyspark访问hive数据实战

第三条sql的执行结果对比:sparksql 14.221 seconds

pyspark访问hive数据实战

可以看到,虽然没有官网吹破天的100倍性能提升,但是根据sql的复杂度来看10~30倍的效率还是可以达到的。

不过这里要注意到2个影响因子:

1. 我们数据集并没有采取全量,在数据量达到TB级别两者的差距应该会有所减小。同时sql也没有针对hive做优化。

2. spark暂时是单机(内存足够)并没有搭建集群,hive使用的hadoop集群有4台datanode。

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月6日 03:41
下一篇 2023年5月6日 03:42

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信