当前位置:首页 > 代码 > 正文

spark代码包说明(spark编程指南)

admin 发布:2022-12-19 16:12 146


本篇文章给大家谈谈spark代码包说明,以及spark编程指南对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Spark从入门到精通3:Spark全分布模式的安装和配置

Spark的安装模式一般分为三种:1.伪分布模式:即在一个节点上模拟一个分布式环境,master和worker共用一个节点,这种模式一般用于开发和测试Spark程序;2.全分布模式:即真正的集群模式,master和worker部署在不同的节点之上,一般至少需要3个节点(1个master和2个worker),这种模式一般用于实际的生产环境;3.HA集群模式:即高可用集群模式,一般至少需要4台机器(1个主master,1个备master,2个worker),这种模式的优点是在主master宕机之后,备master会立即启动担任master的职责,可以保证集群高效稳定的运行,这种模式就是实际生产环境中多采用的模式。本小节来介绍Spark的全分布模式的安装和配置。

安装介质:

jdk-8u162-linux-x64.tar.gz 提取码:2bh8

hadoop-2.7.3.tar.gz 提取码:d4g2

scala-2.12.6.tgz 提取码:s2ly

spark-2.1.0-bin-hadoop2.7.tgz 提取码:5kcf

准备3台Linux主机,按照下面的步骤在每台主机上执行一遍,设置成如下结果:

安装Linux操作系统比较简单,这里不再详细。参考:《 Linux从入门到精通1:使用 VMware Workstation 14 Pro 安装 CentOS 7 详细图文教程 》

编辑hosts配置文件:# vi /etc/hosts,追加3行:

测试主机名是否可用:

(1)使用ssh-keygen工具生成秘钥对:

(2)将生成的公钥发给三台主机:master、slave1、slave2:

(3)测试秘钥认证是否成功:

由于各个主机上的时间可能不一致,会导致执行Spark程序出现异常,因此需要同步各个主机的时间。在实际生成环境中,一般使用时间服务器来同步时间,但是搭建时间服务器相对较为复杂。这里介绍一种简单的方法来快速同步每台主机主机的时间。我们知道,使用date命令可以设置主机的时间,因此这里使用putty的插件MTPuTTY来同时向每一台主机发送date命令,以到达同步时间的目的。

(1)使用MTPuTTY工具连接三台主机,点击MTPuTTY工具的Tools菜单下的“Send script…”子菜单,打开发送脚本工具窗口。

(2)输入命令:date -s 2018-05-28,然后回车(注意:一定要回车,否则只发送不执行),在下面服务器列表中选择要同步的主机,然后点击“Send script”,即可将时间同步为2018-05-28 00:00:00。

使用winscp工具将JDK安装包 jdk-8u144-linux-x64.tar.gz 上传到/root/tools/目录中,该目录是事先创建的。

进入/root/tools/目录,将jdk安装包解压到/root/training/目录中,该目录也是事先创建的。

使用winscp工具将Hadoop安装包 hadoop-2.7.3.tar.gz 上传到master节点的/root/tools/目录中,该目录是事先创建的。

进入/root/tools/目录,将hadoop安装包解压到/root/training/目录中,该目录也是事先创建的。

进入Hadoop配置文件目录:

(1) 配置hadoop-env.sh文件:

(2) 配置hdfs-site.xml文件:

(3) 配置core-site.xml文件:

(4) 配置mapred-site.xml文件:

将模板文件mapred-site.xml.template拷贝一份重命名为mapred-site.xml然后编辑:

(5) 配置yarn-site.xml文件:

(6) 配置slaves文件:

将master上配置好的Hadoop安装目录分别复制给两个从节点slave1和slave2,并验证是否成功。

第一次启动需要输入yes继续。

启动成功后,使用jps命令查看各个节点上开启的进程:

使用命令行查看HDFS的状态:

使用浏览器查看HDFS的状态:

使用浏览器查看YARN的状态:

(1) 在HDFS上创建输入目录/input:

(2) 将本地数据文件data.txt上传至该目录:

(3) 进入到Hadoop的示例程序目录:

(4) 执行示例程序中的Wordcount程序,以HDFS上的/input/data.txt作为输入数据,输出结果存放到HDFS上的/out/wc目录下:

(5) 查看进度和结果:

可以通过终端打印出来的日志信息知道执行进度:

执行结束后可以在HDFS上的/out/wc目录下查看是否有_SUCCESS标志文件来判断是否执行成功。

如果执行成功,可以在输出目录下看到_SUCCESS标志文件,且可以在part-r-00000文件中查看到wordcount程序的结果:

由于Scala只是一个应用软件,只需要安装在master节点即可。

使用winscp工具将Scala安装包上传到master节点的/root/tools目录下:

进入/root/tools目录,将Scala安装包解压到安装目录/root/training/:

将Scala的家目录加入到环境变量PATH中:

使环境变量生效:

输入scala命令,如下进入scala环境,则证明scala安装成功:

我们先在master节点上配置好参数,再分发给两个从节点slave1和slave2。

使用winscp工具将Spark安装包上传到master节点的/root/tools目录下:

进入/root/tools目录,将Spark安装包解压到安装目录/root/training/下:

注意:由于Spark的命令脚本和Hadoop的命令脚本有冲突(比如都有start-all.sh和stop-all.sh等),

所以这里需要注释掉Hadoop的环境变量,添加Spark的环境变量:

按Esc:wq保存退出,使用source命令使配置文件立即生效:

进入Spark的配置文件目录下:

(1) 配置spark-env.sh文件:

(2) 配置slaves文件:

将master上配置好的Spark安装目录分别复制给两个从节点slave1和slave2,并验证是否成功。

启动后查看每个节点上的进程:

使用浏览器监控Spark的状态:

使用spark-shell命令进入SparkContext(即Scala环境):

启动了spark-shell之后,可以使用4040端口访问其Web控制台页面(注意:如果一台机器上启动了多个spark-shell,即运行了多个SparkContext,那么端口会自动连续递增,如4041,4042,4043等等):

注意:由于我们将Hadoop从环境变量中注释掉了,这时只能手动进入到Hadoop的sbin目录停止Hadoop:

Spark中常用的端口总结:

如何构建第一个Spark项目代码

操作系统

Window7/Mac

IDE

IntelliJ IDEA Community Edition 14.1.6

下载地址

JDK 1.8.0_65

下载地址

Scala 2.11.7

下载地址

其它环境

Spark:1.4.1

下载地址

Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

New Project

使用Maven模型创建一个Scala项目

填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目

项目创建完成后,目录结构如下

4.为项目添加JDK以及Scala SDK

点击File-Project Structure,在SDKS和Global Libraries中为项目配置环境。

至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

MyRouteBuild

MyRouteMain

这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka._

import org.apache.spark.SparkConf

object DirectKafkaWordCount {

def main(args: Array[String]) {

if (args.length 2) {

System.err.println("...")

System.exit(1)

}

//StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics

val topicsSet = topics.split(",").toSet

val kafkaParams = Map[String, String]("metadata.broker.list" - brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print

val lines = messages.map(_._2)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)

wordCounts.print()

// Start the computation

ssc.start()

ssc.awaitTermination()

}

}

将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。

至此Spark处理代码已经编写完成。

修改pom.xml,为项目打包做准备

pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-core_2.10/artifactId

version1.4.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming-kafka_2.10/artifactId

version1.4.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming_2.10/artifactId

version1.4.1/version

/dependency

!-- scala --

dependency

groupIdorg.scala-lang/groupId

artifactIdscala-library/artifactId

version2.10.4/version

/dependency

除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:

plugins

!-- Plugin to create a single jar that includes all dependencies --

plugin

artifactIdmaven-assembly-plugin/artifactId

version2.4/version

configuration

descriptorRefs

descriptorRefjar-with-dependencies/descriptorRef

/descriptorRefs

/configuration

executions

execution

idmake-assembly/id

phasepackage/phase

goals

goalsingle/goal

/goals

/execution

/executions

/plugin

plugin

groupIdorg.apache.maven.plugins/groupId

artifactIdmaven-compiler-plugin/artifactId

version2.0.2/version

configuration

source1.7/source

target1.7/target

/configuration

/plugin

plugin

groupIdnet.alchim31.maven/groupId

artifactIdscala-maven-plugin/artifactId

executions

execution

idscala-compile-first/id

phaseprocess-resources/phase

goals

goaladd-source/goal

goalcompile/goal

/goals

/execution

execution

idscala-test-compile/id

phaseprocess-test-resources/phase

goals

goaltestCompile/goal

/goals

/execution

/executions

/plugin

/plugins

pom.xml文件修改完成后,即可开始maven打包,操作如图:

点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package

Spark作业提交

在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。

将jar包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用spark-submit把任务提交到Yarn集群,即可看到运行结果。

如何使用spark将程序提交任务到yarn-Spark-about云开发

使用脚本提交

1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。

2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.

3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR

1.yarn-standalone方式提交到yarn

在${SPARK_HOME}下面执行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

./bin/spark-class org.apache.spark.deploy.yarn.Client \

--jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

--class org.apache.spark.examples.SparkPi \

--args yarn-standalone \

--num-workers 3 \

--master-memory 2g \

--worker-memory 2g \

--worker-cores 1

复制代码

2. yarn-client 方式提交到yarn

在${SPARK_HOME}下面执行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

./bin/run-example org.apache.spark.examples.SparkPi yarn-client

复制代码

二、使用程序提交

1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报

org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制

复制代码

错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。

2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。

3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR

比如我的设置为向提交任务主机~/.bashrc里面添加:

export SPARK_JAR=

export SPARK_YARN_APP_JAR=

复制代码

file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。

其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。

4.程序中加入hadoop、yarn、依赖。

注意,如果引入了hbase依赖,需要这样配置

dependency

groupIdorg.apache.hbase/groupId

artifactIdhbase-thrift/artifactId

version${hbase.version}/version

exclusions

exclusion

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-mapreduce-client-jobclient/artifactId

/exclusion

exclusion

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-client/artifactId

/exclusion

/exclusions

/dependency

复制代码

然后再加入

dependency

groupIdorg.ow2.asm/groupId

artifactIdasm-all/artifactId

version4.0/version

/dependency

复制代码

否则会报错:

IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class

复制代码

异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar

5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。

6.编写程序

代码示例:

package com.sdyc.ndspark.sys;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.ArrayList;

import java.util.List;

/**

* Created with IntelliJ IDEA.

* User: zarchary

* Date: 14-1-19

* Time: 下午6:23

* To change this template use File | Settings | File Templates.

*/

public class ListTest {

public static void main(String[] args) throws Exception {

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName("listTest");

//使用yarn模式提交

sparkConf.setMaster("yarn-client");

JavaSparkContext sc = new JavaSparkContext(sparkConf);

ListString listA = new ArrayListString();

listA.add("a");

listA.add("a");

listA.add("b");

listA.add("b");

listA.add("b");

listA.add("c");

listA.add("d");

JavaRDDString letterA = sc.parallelize(listA);

JavaPairRDDString, Integer letterB = letterA.map(new PairFunctionString, String, Integer() {

@Override

public Tuple2String, Integer call(String s) throws Exception {

return new Tuple2String, Integer(s, 1);

}

});

letterB = letterB.reduceByKey(new Function2Integer, Integer, Integer() {

public Integer call(Integer i1, Integer i2) {

return i1 + i2;

}

});

//颠倒顺序

JavaPairRDDInteger, String letterC = letterB.map(new PairFunctionTuple2String, Integer, Integer, String() {

@Override

public Tuple2Integer, String call(Tuple2String, Integer stringIntegerTuple2) throws Exception {

return new Tuple2Integer, String(stringIntegerTuple2._2, stringIntegerTuple2._1);

}

});

JavaPairRDDInteger, ListString letterD = letterC.groupByKey();

// //false说明是降序

JavaPairRDDInteger, ListString letterE = letterD.sortByKey(false);

System.out.println("========" + letterE.collect());

System.exit(0);

}

}

复制代码

代码中master设置为yar-client表明了是使用提交到yarn.

关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。

以上弄完之后就可以运行程序了。

运行后会看到yarn的ui界面出现:

正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:

13247 org.apache.spark.deploy.yarn.WorkerLauncher

复制代码

这是spark的工作进程。

如果接收到异常为:

WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

复制代码

出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报

WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。

所以请检查主机名和IP是否配置正确。

我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。

科普Spark,Spark是什么,如何使用Spark

科普Spark,Spark是什么,如何使用Spark

1.Spark基于什么算法的分布式计算(很简单)

2.Spark与MapReduce不同在什么地方

3.Spark为什么比Hadoop灵活

4.Spark局限是什么

5.什么情况下适合使用Spark

什么是Spark

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比

Spark的中间数据放到内存中,对于迭代运算效率更高。

Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。

Spark比Hadoop更通用

Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。

这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。

不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。

容错性

在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。

可用性

Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)

由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。

运行模式

本地模式

Standalone模式

Mesoes模式

yarn模式

Spark生态系统

Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。

Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。

Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

End.

spark代码包说明的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于spark编程指南、spark代码包说明的信息别忘了在本站进行查找喔。

版权说明:如非注明,本站文章均为 AH站长 原创,转载请注明出处和附带本文链接;

本文地址:http://ahzz.com.cn/post/12253.html


取消回复欢迎 发表评论:

分享到

温馨提示

下载成功了么?或者链接失效了?

联系我们反馈

立即下载