Git Product home page Git Product logo

aliyun-emapreduce-demo's Introduction

本项目包含以下示例:

MapReduce

  • WordCount: 单词统计

Hive

  • sample.hive:表的简单查询

Pig

  • sample.pig:Pig处理OSS数据实例

Spark

  • SparkPi: 计算Pi
  • SparkWordCount: 单词统计
  • LinearRegression: 线性回归
  • OSSSample: OSS使用示例
  • ONSSample: ONS使用示例
  • ODPSSample: ODPS使用示例
  • MNSSample:MNS使用示例
  • LoghubSample:Loghub使用示例

PySpark

  • WordCount: 单词统计

依赖资源

测试数据(data目录下):

  • The_Sorrows_of_Young_Werther.txt:可作为WordCount(MapReduce/Spark)的输入数据
  • patterns.txt:WordCount(MapReduce)作业的过滤字符
  • u.data:sample.hive脚本的测试表数据
  • abalone:线性回归算法测试数据

依赖jar包(lib目录下)

  • tutorial.jar:sample.pig作业需要的依赖jar包

准备工作

本项目提供了一些测试数据,您可以简单地将其上传到OSS中即可使用。其他示例,例如ODPS,MNS,ONS和Loghub等等,需要您自己准备数据如下:

基本概念:

  • OSSURI: oss://accessKeyId:[email protected]/a/b/c.txt,用户在作业中指定输入输出数据源时使用,可以类比hdfs://。
  • 阿里云AccessKeyId/AccessKeySecret是您访问阿里云API的密钥,你可以在这里获取。

集群运行

  • Spark

    • SparkWordCount: spark-submit --class SparkWordCount examples-1.0-SNAPSHOT-shaded.jar <inputPath> <outputPath> <numPartition>
      • inputPath: 输入数据路径
      • outputPath: 输出路径
      • numPartition: 输入数据RDD分片数目
    • SparkPi: spark-submit --class SparkPi examples-1.0-SNAPSHOT-shaded.jar
    • SparkOssDemo:spark-submit --class SparkOssDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <endpoint> <inputPath> <numPartition>
      • accessKeyId: 阿里云AccessKeyId
      • accessKeySecret:阿里云AccessKeySecret
      • endpoint: 阿里云OSS endpoint
      • inputPath: 输入数据路径
      • numPartition:输入数据RDD分片数目
    • SparkRocketMQDemo: spark-submit --class SparkRocketMQDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <consumerId> <topic> <subExpression> <parallelism>
      • accessKeyId: 阿里云AccessKeyId
      • accessKeySecret:阿里云AccessKeySecret
      • consumerId: 参考Consumer ID说明
      • topic: 每个消息队列都有一个topic
      • subExpression: 参考消息过滤
      • parallelism:指定多少个接收器来消费队列消息。
    • SparkMaxComputeDemo: spark-submit --class SparkMaxComputeDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
      • accessKeyId: 阿里云AccessKeyId
      • accessKeySecret:阿里云AccessKeySecret
      • envType: 0表示公网环境,1表示内网环境。如果是本地调试选择0,如果是在E-MapReduce上执行请选择1。
      • project:参考ODPS-快速开始
      • table:参考ODPS术语介绍
      • numPartition:输入数据RDD分片数目
    • SparkMNSDemo: spark-submit --class SparkMNSDemo examples-1.0-SNAPSHOT-shaded.jar <queueName> <accessKeyId> <accessKeySecret> <endpoint>
      • queueName:队列名,参考MNS名词解释
      • accessKeyId: 阿里云AccessKeyId
      • accessKeySecret:阿里云AccessKeySecret
      • endpoint:队列数据访问地址
    • SparkSLSDemo: spark-submit --class SparkSLSDemo examples-1.0-SNAPSHOT-shaded.jar <sls project> <sls logstore> <loghub group name> <sls endpoint> <access key id> <access key secret> <batch interval seconds>
      • sls project: LogService项目名
      • sls logstore: 日志库名
      • loghub group name:作业中消费日志数据的组名,可以任意取。sls project,sls store相同时,相同组名的作业会协同消费sls store中的数据;不同组名的作业会相互隔离地消费sls store中的数据。
      • sls endpoint: 参考日志服务入口
      • accessKeyId: 阿里云AccessKeyId
      • accessKeySecret:阿里云AccessKeySecret
      • batch interval seconds: Spark Streaming作业的批次间隔,单位为秒。
    • LinearRegression: spark-submit --class LinearRegression examples-1.0-SNAPSHOT-shaded.jar <inputPath> <numPartitions>
      • inputPath:输入数据
      • numPartition:输入数据RDD分片数目
  • PySpark

    • WordCount: spark-submit wordcount.py <inputPath> <outputPath> <numPartition>
      • inputPath: 输入数据路径
      • outputPath: 输出路径
      • numPartition: 输入数据RDD分片数目
  • Mapreduce

    • WordCount: hadoop jar examples-1.0-SNAPSHOT-shaded.jar WordCount -Dwordcount.case.sensitive=true <inputPath> <outputPath> -skip <patternPath>
      • inputPathl:输入数据路径
      • outputPath:输出路径
      • patternPath:过滤字符文件,可以使用data/patterns.txt
  • Hadoop Streaming

    • WordCount: hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-*.jar -file <mapperPyFile> -mapper mapper.py -file <reducerPyFile> -reducer reducer.py -input <inputPath> -output <outputPath>
      • mapperPyFile mapper文件,mapper样例
      • reducerPyFile reducer文件, reducer样例
      • inputPath:输入数据路径
      • outputPath:输出路径
  • Hive

    • hive -f sample.hive -hiveconf inputPath=<inputPath>
      • inputPath:输入数据路径
  • Pig

    • pig -x mapreduce -f sample.pig -param tutorial=<tutorialJarPath> -param input=<inputPath> -param result=<resultPath>
      • tutorialJarPath:依赖Jar包,可使用lib/tutorial.jar
      • inputPath:输入数据路径
      • resultPath:输出路径
  • 注意:

    • 如果在E-MapReduce上使用时,请将测试数据和依赖jar包上传到OSS中,路径规则遵循OSSURI定义,见上。
    • 如果集群中使用,可以放在机器本地。

本地运行

这里主要介绍如何在本地运行Spark程序访问阿里云数据源,例如OSS等。如果希望本地调试运行,最好借助一些开发工具,例如Intellij IDEA或者Eclipse。尤其是Windows环境,否则需要在Windows机器上配置Hadoop和Spark运行环境,很麻烦。

  • Intellij IDEA

    • 前提:安装Intellij IDEA,Maven, Intellij IDEA Maven插件,Scala,Intellij IDEA Scala插件
    • 双击进入SparkWordCount.scala idea5
    • 从下图箭头所指处进入作业配置界面 idea1
    • 选择SparkWordCount,在作业参数框中按照所需传入作业参数 idea2
    • 点击“OK”
    • 点击运行按钮,执行作业 idea3
    • 查看作业执行日志 idea4
  • Scala IDE for Eclipse

    • 前提:安装Scala IDE for Eclipse,Maven,Eclipse Maven插件
    • 导入项目 eclipse2 eclipse3 eclipse4
    • Run As Maven build,快捷键是“Alt + Shilft + X, M”;也可以在项目名上右键,“Run As”选择“Maven build”
    • 等待编译完后,在需要运行的作业上右键,选择“Run Configuration”,进入配置页
    • 在配置页中,选择Scala Application,并配置作业的Main Class和参数等等。 eclipse5
    • 点击“Run”
    • 查看控制台输出日志 eclipse6

aliyun-emapreduce-demo's People

Contributors

fengshenwu avatar ferhui avatar gabriel39 avatar jiangyu avatar liwenqiang-zijin avatar powerwu avatar unclegen avatar wenxuanguan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aliyun-emapreduce-demo's Issues

tf_fm_on_spark.py : Zip operation fail in dealing with spark dataframe

Hello,I follow the demo in the project path :
/aliyun-emapreduce-demo/src/main/python/deeplearning/tf_fm_on_spark.py

Therefore I got some problems about the zip operation in line 137~138 as the picture below shows:

image

Q1:Are the user_sub, genres_sub and rating_sub are still Dataframe column object or numpy arrays?

Q2:If they are DataFrame column , will the codes between line 137-138 cause the "collect()" operation in Spark DataFrame?

Q3:I find out that the zip operation can't be applied on the dataframe column object in Spark, the error message is present below:
image

Plugin execution not covered by lifecycle configuration

1.Plugin execution not covered by lifecycle configuration: net.alchim31.maven:scala-maven-plugin:3.1.0:compile (execution: scala-compile-first, phase: process-resources) pom.xml /examples line 280 Maven Project Build Lifecycle Mapping Problem
2.Plugin execution not covered by lifecycle configuration: net.alchim31.maven:scala-maven-plugin:3.1.0:testCompile (execution: scala-test-compile-first, phase: process-test-resources) pom.xml /examples line 287 Maven Project Build Lifecycle Mapping Problem

can't run SparkWordCount

/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/darrenfantasy/Documents/Lib/scala-2.12.2/lib/scala-library.jar:/Users/darrenfantasy/Documents/Lib/scala-2.12.2/lib/scala-parser-combinators_2.12-1.0.5.jar:/Users/darrenfantasy/Documents/Lib/scala-2.12.2/lib/scala-reflect.jar:/Users/darrenfantasy/Documents/Lib/scala-2.12.2/lib/scala-swing_2.12-2.0.0.jar:/Users/darrenfantasy/Documents/Lib/scala-2.12.2/lib/scala-xml_2.12-1.0.6.jar com.aliyun.emr.example.streaming.HBaseSample
错误: 找不到或无法加载主类 com.aliyun.emr.example.streaming.HBaseSample

Process finished with exit code 1

pom中配置的maven仓库的地址更新

central http://maven.aliyun.com/mvn/repository true false snapshots http://maven.aliyun.com/mvn/repository false true oss Maven SNAPSHOT Repository https://oss.sonatype.org/content/repositories/snapshots/ false true

这个地址需要更新了

mvn install error:Failure to find com.taobao:parent:pom:1.0.2

mvn install

Failed to execute goal on project examples: Could not resolve dependencies for project com.aliyun.emr:examples:jar:1.0: Failed to collect dependencies at com.aliyun.emr:emr-sdk_2.10:jar:1.1.2 -> com.aliyun.openservices:ons-client:jar:1.1.8 -> com.alibaba.rocketmq:rocketmq-client:jar:3.2.5: Failed to read artifact descriptor for com.alibaba.rocketmq:rocketmq-client:jar:3.2.5: Failure to find com.taobao:parent:pom:1.0.2 in http://uk.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of uk.maven.org has elapsed or updates are forced -> [Help 1]

运行 FlinkOSSSample 报错 No FileSystem for scheme: oss

完整的错误提示

onnected to JobManager at Actor[akka://flink/user/jobmanager_1#733480482] with leader session id f45683a0-2a5a-41f8-a5f8-b8a5689136c3.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 448754c1e20e600b4a7e875f09faddbd (Flink Java Job at Thu Nov 29 17:28:08 CST 2018)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:262)
	at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:801)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:180)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
	... 19 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
	at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:472)
	at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:248)
	... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'oss'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:102)
	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
	... 27 more
Caused by: java.io.IOException: No FileSystem for scheme: oss
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:99)
	... 28 more

java.lang.IllegalArgumentException: Invalid partition spec.

image 不理解是哪里有错误?

代码:
val partition = "20231218"
val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
val odpsData = odpsOps.readTable(project = project, table = table, partition = partition, transfer = read, numPartition = 50)
println(s"Count (odpsData): ${odpsData.count()}")

MAVEN build problem

My environments:
MAC OS 10.12.2
java 1.8.0_121
Scala IDE for Eclipse 4.5.0
Scala Library Container 2.10.6

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.aliyun.emr:examples:jar:1.1
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 267, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.122 s
[INFO] Finished at: 2017-03-03T20:21:17+08:00
[INFO] Final Memory: 5M/123M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format : or :[:]:. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-classes, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.