dmwm / wmarchive Goto Github PK
View Code? Open in Web Editor NEWWorkload Archive project
Workload Archive project
Just use python dict, REST code will automatically encode depending on the header "Accept" from request.
When client decode json str, it get list of str instead of list of dictionary.
['{"status": "ok", "ids": ["b51484a168677e16660ea00250fa8036"], "stype": "mongodb"}']
We need to find out a way to inject data into Hive tables. Below there are few resources to help:
Dear @vkuznet ,
for my project on AI error-logs with @vlimant I now need to locate a large number of
log files, i.e. I have ~25.000 workflows (https://vocms0113.cern.ch/actionshistory) and for each
of them I need to locate the logs.
I managed to write for each of the workflows a spec file that contains the task name and the time-
range (with the exact day when it was run), e.g.
{"spec":{"task":"/vlimant_ACDC0_task_HIG-RunIIFall17wmLHEGS-01415__v1_T_180706_002124_986/HIG-RunIIFall17DRPremix-02001_1/HIG-RunIIFall17DRPremix-02001_1MergeAODSIMoutput/HIG-RunIIFall17MiniAODv2-01299_0", "timerange":[20180706,20180706]}, "fields":[]}
With this I am able to locate the unmerged jobs.
However, for one task this runs for around ~10 seconds, so if I create 25.000 spec files and run
a query for each of it it will take a lot of time. So I would like to know what might be the most
efficient way to locate that many files. Can I somehow make one large query that efficiently
returns all the files that I need, or give a finer time-range to speed up the query?
It would be great to know what possibilities exist for this problem.
Many, many thanks for your help!
We'll need to create MongoDBCleanup script who will be responsible to clean-up "hdfs' docs from MongoDB based on given lifetime stamp. For example:
mongoCleanup.py --uri=URI --tstamp=3months --status=hdfs
It would be nice to pass tstamp in human readable form as well as normal UNIX seconds since epoch. The human readable form should support Nhours, Ndays, Nmonths, Nyears notations, where N is some integer.
The following response was returned but status code was 200
['{"status": "Not supported, expect "data", "query" attributes in your request", "data": []}']
So far I deployed service with MongoDB 3.0.2 back-end since this RPM was available from my private repository. A new version of MongoDB came out recently and we need to rebuild MongoDB RPM, redeploy the WMArchive and test its functionality.
@vkuznet, @yuyiguo
Since we applied following patch to fix the unit (Which gets the second for total performance value instead of CMSSW defined milisecond),
We need to validate the statistics whether second is correct unit for these data. I see some examples in the test which have sub seconds value. 60 mili seconds. which will translate to 0 sec.
If many of the data has sub second value or less then a couple of seconds milisecond might be the better unit.
Hi,
I have a problem to run myspark from lxplus.
I work with Jean-Roch and I need to access some error logs.
I tried to reproduce the example in https://github.com/dmwm/WMArchive/wiki/How-to-find-records-on-HDFS-using-pyspark
I logged in from an lxplus node following https://hadoop-user-guide.web.cern.ch/hadoop-user-guide/getstart/client_cvmfs.html, since it is not possible anymore to log in via ssh analytix
However when I run the the example in the twiki:
{"spec":{"task":"/amaltaro_StepChain_ReDigi3_HG1612_WMArchive_161130_192654_9283/DIGI","timerange":[20161130,20161202]}, "fields":[]}
myspark --spec=cond.spec --script=RecordFinder --records-output=records.json
I get an error message that tells me that the PrepID is missing:
I attach the output of myspark below.
It would be great if you could help me to solve this problem.
Many, many thanks in advance,
Best Lukas
[llayer@lxplus130 WMArchive]$ myspark --spec=cond.spec --script=RecordFinder --records-output=records.json
Using spark 2.X
Enable CVMFS ...
Ivy Default Cache set to: /afs/cern.ch/user/l/llayer/.ivy2/cache
The jars for the packages stored in: /afs/cern.ch/user/l/llayer/.ivy2/jars
:: loading settings :: url = jar:file:/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.databricks#spark-avro_2.11;4.0.0 in central
found org.slf4j#slf4j-api;1.7.5 in central
found org.apache.avro#avro;1.7.6 in central
found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
found com.thoughtworks.paranamer#paranamer;2.3 in central
found org.xerial.snappy#snappy-java;1.0.5 in central
found org.apache.commons#commons-compress;1.4.1 in central
found org.tukaani#xz;1.0 in central
:: resolution report :: resolve 963ms :: artifacts dl 38ms
:: modules in use:
com.databricks#spark-avro_2.11;4.0.0 from central in [default]
com.thoughtworks.paranamer#paranamer;2.3 from central in [default]
org.apache.avro#avro;1.7.6 from central in [default]
org.apache.commons#commons-compress;1.4.1 from central in [default]
org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
org.codehaus.jackson#jackson-mapper-asl;1.9.13 from central in [default]
org.slf4j#slf4j-api;1.7.5 from central in [default]
org.tukaani#xz;1.0 from central in [default]
org.xerial.snappy#snappy-java;1.0.5 from central in [default]
:: evicted modules:
org.slf4j#slf4j-api;1.6.4 by [org.slf4j#slf4j-api;1.7.5] in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 10 | 0 | 0 | 1 || 9 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 9 already retrieved (0kB/25ms)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/cvmfs/sft.cern.ch/lcg/releases/hadoop/2.7.3-b1221/x86_64-centos7-gcc62-opt/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/02/08 09:11:08 INFO SparkContext: Running Spark version 2.2.1
19/02/08 09:11:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/02/08 09:11:09 INFO SparkContext: Submitted application: AvroKeyInputFormat
19/02/08 09:11:09 INFO SecurityManager: Changing view acls to: llayer
19/02/08 09:11:09 INFO SecurityManager: Changing modify acls to: llayer
19/02/08 09:11:09 INFO SecurityManager: Changing view acls groups to:
19/02/08 09:11:09 INFO SecurityManager: Changing modify acls groups to:
19/02/08 09:11:09 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(llayer); groups with view permissions: Set(); users with modify permissions: Set(llayer); groups with modify permissions: Set()
19/02/08 09:11:09 INFO Utils: Successfully started service 'sparkDriver' on port 5001.
19/02/08 09:11:09 INFO SparkEnv: Registering MapOutputTracker
19/02/08 09:11:09 INFO SparkEnv: Registering BlockManagerMaster
19/02/08 09:11:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/02/08 09:11:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/02/08 09:11:09 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-71770f69-9df2-48f0-96b8-541540e19d03
19/02/08 09:11:09 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
19/02/08 09:11:09 INFO SparkEnv: Registering OutputCommitCoordinator
19/02/08 09:11:09 INFO log: Logging initialized @11312ms
19/02/08 09:11:09 INFO Server: jetty-9.3.z-SNAPSHOT
19/02/08 09:11:09 INFO Server: Started @11410ms
19/02/08 09:11:09 INFO AbstractConnector: Started ServerConnector@220b5902{HTTP/1.1,[http/1.1]}{0.0.0.0:5201}
19/02/08 09:11:09 INFO Utils: Successfully started service 'SparkUI' on port 5201.
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1ea5f333{/jobs,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4c756e2b{/jobs/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@9cf0917{/jobs/job,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5c0adfa3{/jobs/job/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@154de6bd{/stages,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@59dff77e{/stages/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@69ffa2df{/stages/stage,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@63fc12aa{/stages/stage/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@56bcafc0{/stages/pool,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2fd3a3e0{/stages/pool/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6a0d5620{/storage,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@38c6051f{/storage/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3db721{/storage/rdd,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41726ede{/storage/rdd/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@26fada3e{/environment,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1ef26daf{/environment/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3aa8db11{/executors,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5c06ad81{/executors/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@74b7f684{/executors/threadDump,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6db8becd{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@703bb05b{/static,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@346b2582{/,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@48511a93{/api,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4f021b90{/jobs/job/kill,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@27a69ce6{/stages/stage/kill,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://188.184.30.74:5201
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/v/valya/public/spark/spark-examples-1.6.0-cdh5.15.1-hadoop2.6.0-cdh5.15.1.jar at spark://188.184.30.74:5001/jars/spark-examples-1.6.0-cdh5.15.1-hadoop2.6.0-cdh5.15.1.jar with timestamp 1549613469967
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar at spark://188.184.30.74:5001/jars/com.databricks_spark-avro_2.11-4.0.0.jar with timestamp 1549613469968
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar at spark://188.184.30.74:5001/jars/org.slf4j_slf4j-api-1.7.5.jar with timestamp 1549613469968
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar at spark://188.184.30.74:5001/jars/org.apache.avro_avro-1.7.6.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar at spark://188.184.30.74:5001/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar at spark://188.184.30.74:5001/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar at spark://188.184.30.74:5001/jars/com.thoughtworks.paranamer_paranamer-2.3.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar at spark://188.184.30.74:5001/jars/org.xerial.snappy_snappy-java-1.0.5.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar at spark://188.184.30.74:5001/jars/org.apache.commons_commons-compress-1.4.1.jar with timestamp 1549613469970
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar at spark://188.184.30.74:5001/jars/org.tukaani_xz-1.0.jar with timestamp 1549613469970
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py at file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py with timestamp 1549613470276
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/myspark.py
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar with timestamp 1549613470304
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/com.databricks_spark-avro_2.11-4.0.0.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar with timestamp 1549613470313
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.slf4j_slf4j-api-1.7.5.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar with timestamp 1549613470323
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.apache.avro_avro-1.7.6.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar with timestamp 1549613470336
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.codehaus.jackson_jackson-core-asl-1.9.13.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar with timestamp 1549613470347
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar with timestamp 1549613470366
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/com.thoughtworks.paranamer_paranamer-2.3.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar with timestamp 1549613470376
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.xerial.snappy_snappy-java-1.0.5.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar with timestamp 1549613470426
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.apache.commons_commons-compress-1.4.1.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar with timestamp 1549613470436
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.tukaani_xz-1.0.jar
19/02/08 09:11:10 INFO Executor: Starting executor ID driver on host localhost
19/02/08 09:11:10 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 5101.
19/02/08 09:11:10 INFO NettyBlockTransferService: Server created on 188.184.30.74:5101
19/02/08 09:11:10 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/02/08 09:11:10 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManagerMasterEndpoint: Registering block manager 188.184.30.74:5101 with 2004.6 MB RAM, BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManager: external shuffle service port = 7337
19/02/08 09:11:10 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3a8ae0e6{/metrics/json,null,AVAILABLE,@Spark}
19/02/08 09:11:10 WARN DFSUtil: Namenode for hadalytic remains unresolved for ID p01001532067275.cern.ch. Check your hdfs-site.xml file to ensure namenodes are configured properly.
19/02/08 09:11:11 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/02/08 09:11:11 INFO EventLoggingListener: Logging events to hdfs:///user/spark/applicationHistory/local-1549613470488
19/02/08 09:11:11 INFO SparkContext: Added file /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py at file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py with timestamp 1549613471955
19/02/08 09:11:11 INFO Utils: Copying /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/RecordFinder.py
19/02/08 09:11:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/02/08 09:11:14 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 455, in <module>
main()
File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 415, in main
results = run(opts.schema, hdir, opts.script, opts.spec, verbose, opts.rout, opts.yarn)
File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 275, in run
avro_rdd = ctx.union([ctx.newAPIHadoopFile(f, aformat, akey, awrite, aconv, conf=conf) for f in data_path])
File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/pyspark.zip/pyspark/context.py", line 675, in newAPIHadoopFile
File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopFile(PythonRDD.scala:570)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
For better maintenance we need to store our data into HDFS in hierarchical structure, e.g. /path/YYYY/MM. We need to add this into HdfsIO module.
Even though initial prototype is developed with MongoDB back-end we still may need to explore Elastic Search one. This task involved deployment of Elastic Search and performing various tests to get benchmark numbers.
We may need to provide front-end UI interface to WMArchive. Here is possible functionality:
Need to implement wmaClient stand-alone tool to place queries into WMArchive.
I talked with Dima regarding the WMArchive UI. He was very interested to it. The tried the average event time that he cares a lot. And he found we have negative number for that. See below plot:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775&jobtype=Merge
@ticoann, is the default value for average event time is negative?
Then he tried this one:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775&jobtype=Processing
It has average event time for 10 second/event and the is the correct value. This plot made with job_type=Processing.
If one removes the the filer (that is for all job types), Then this is the plot:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775
It has 1 second/event. This is too small. Could be the data integration counting the number of events wrong? Or added the negative time?
WMArchive should supoprt POST requests with single or multiple docs. It will send back the uid of the doc for single doc or send back list of uids which correspond to given list of docs. And, we need a tool to find matching between docs and uids.
An alternative approach to MR jobs would be usage of spark platform [1]. We should investigate further if this approach would be valuable for us.
[1] https://spark.apache.org/docs/0.9.1/python-programming-guide.html
We need to develop deployment procedure
Dependencies:
[1] https://avro.apache.org/
[2] http://crs4.github.io/pydoop/installation.html
[3] https://hadoopy.readthedocs.org/en/latest/
We discussed that we need to run 10-100 integration tests to study the ExitCode. The goal is to understand the exitCode and what is the root exitCode to cause a workflow failed. We will run these tests on testbed and the workflow should have distinct names to be identified.
The problem came out because the exitCodes were interesting to @jenimal could not be found in WMArchive db. She got these error from WMStats(?) or requestManager(?). She was trying to put information from different sources together to identify the cause/location/severity of one error. @ticoann, please comment on how exitCodes were handle in other service.
@amaltaro you have some tests that can be used for this integration test. Could you please start running them and report the workflow names in this issue.
@vkuznet Addition to the current exitCode plots, should we add a plot that only have the first exitCode for each failed workflow?
We need to extend MongoIO module to do the following items:
[1] https://github.com/dmwm/DAS/blob/master/src/python/DAS/core/das_son_manipulator.py
Based on hadoop large files requirements probably we need another tool which will write large chunk of docs into avro file on local file system. This can be done in the following way:
https://gist.github.com/vkuznet/27506907e9ff8a36790d
Then we can resolve problem with large memory footprint and creating bulk document.
It would be nice to have a tool which automatically creates avro schema out of given JSON file. For that it should inspect structure and value types of JSON and write schema spec. Here is a format for such script:
avro_shema.py --fin=file.json --fout=file.avsc
Based on #7 ticket now we can write MR mapper/reducer implementation for MR jobs. Assign this to Benjamin.
@vkuznet , @ticoann , @amaltaro
Action #1, Seangchan will talk with Jen on exitCode and finalized it this week.
Action #2, Seangchan will check the unit issue and make final decision.
Action #3, Valentin check the files with workflow names for the data that Alan upload to WMArchive prod DB that is connected to cmsweb-testbed UI. The date of the uploading is Nov. 16 . Alan tired to fetch them using the UI for time period of Nov 16-23, but was unable to find it. Valentin will try again, if not find it, Alan will rerun the tests. If we could not find the files after rerunning the test, Valentin will check the cron jobs that handling the data.
Action #1 and #2 may involve schema changes, so we need to settle down them ASAP.
Agent could send duplicated docs due to network problem.
One case is when agent sent the bulk doc and although it was successfully updated (or partially successful) if there is temporary network problem right before agent get the response, agent will try to update the same document again.
The metadata field should be unique to each document (it contains time stamps), that could be used as the validation information,
We can use the idea of Julia to run spark queries on freshly inserted data into HDFS. This will use less resources and aggregate some info on daily basis. This aggregated info can be fed into dashboard or any other db or HDFS. Then we can run another aggregated query over pre-aggregated daily data.
Luca pointed out that our size goes above 256MB on HDFS, this caused a 2 block allocation instead of one and therefore is inefficient. We need to adjust size in migration script to be slightly less than 256MB.
Paola request below data searching task:
Given a PrepID and a time frame (a week), she needs to find out:
The definition of a failed job is that the job was failed after all the tries, usually Agents will try three times. If a job runs successful after the 3rd try, we will not collect the information for the previous failed tries. And this job is considered as a successful job.
We will only collect the final failed try for above information, the previous tries will not be collected.
currently, PrepID info is at file level. Seangchan and his group is working on getting PrepID and Champaign info into the top level of FWJR. We will need to update WMarchive schema after they put these in production.
Valentin is working on the scripts using the prepID at file level. He will provide Paola a working script for her initial use.
I provided code to use PySpark APIs and submit spark job. Based on information I got so far it should be much faster then MR since Spark handle data in memory. We need to investigate how it works and compare it with MR approach.
Create a json file that document keys used in WMArchive. UI will read from this json file to show defination of the metrics when mouse is moving on them. @vkuznet please let me know where in the repository to store this file. I will first put the performance keys there.
The pydoop [1] library provides native support for map-reduce jobs written in python. I would suggest to write a wrapper layer in WMArchive which will execute given Map/Reduce classes. The later will be written by users.
Hi,
When I ran myspark to collect the log
$ myspark --spec=cond.spec --script=RecordFinder --records-output=records.json
I encountered this error:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://analytix/cms/wmarchive/avro/2017/06/30
The correct path should be:
hdfs://analytix/cms/wmarchive/avro/fwjr/2017/06/30
Could you please update this in the source code? Thanks!
After yesterday meeting I was working with a code to implement bulk read/write
functionality for WMArchive. It is doable, but I realized that based on current
structure of FWJR and Hadoop "desires" to deal with large files we may end-up
with a bottleneck on our server. Here is a problem:
I didn't make any estimates yet, since I don't know N, but we may have a
problem here. On one hand we can't hold too many docs due to potential
RAM usage, on another Hadoop "desire" large docs.
So, to make reasonable estimate I need to know N.
From WMArchive twiki I read the following:
"We are currently running 10 WMAgents, each agent has on average about 1M jobs,
each job will produce FWJR report which we need to store in WMArchive. All jobs
are done in parallel, but we expect to have on the order of <1K docs per
seconds feed into WMArchive. The system should sustain such load."
But from this sentence I don't know what is a daily rate we need to deal with.
Seangchan, can you provide better estimate. I need to know how many docs
each agent will write on average per day. Once we know that I can try different
N's, e.g. 1K, 10K, 100K and measure RAM usage when we'll load data from
MongoDB into code which will convert them to Avro file as a chunk.
Then I can have better idea which threshold for N we can sustain on our server.
So far, mongo2hdfs script reads docs from Mongo and write them (one-by-one) to
HDFS. To support bulk write, I need to read N docs from Mongo, keep them in RAM
in order to write Avro file to HDFS. Of course I can split last step and write
N docs into local file system and then run separate process to copy that file
into HDFS, but it still does not solve RAM usage issue.
But the real issue here is a FWJR data-structure we use and python memory
allocation for it. Since FWJR has deep nestness structure and we need
to hold large chunk of them again in a dictionary I can't organize streaming.
Yet, it is another issue with FWJR and storage/look-up approach.
The storage and look-up prefer flat data, while our application relies
on deep nested dict structures. The real remedy is to modify FWJR structure
to be flat and then use streaming for writing. The data look-up will benefit
from flat structure as well, e.g. compare a.b.c.d.e.f data access with just
single column look-up.
Best,
Valentin.
[1] http://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/avroschemas.html#avro-creatingschemas
To reduce load on a system we may need to return list of IDs instead of actual docs for given query. And, may be prohibit query with list of IDs and rather re-route user to HDFS that (s)he will fetch them directly.
We would like to use Hive tool to search over our data. The data will resides somewhere on hadoop cluster along with some schema which describe the data. Then we want to create an external hive table which will allow us to search over our data via Hive queries.
To do this task few steps should be performed
The file with avsc extension is a avro schema file. To generate avsc and avro files someone can json2avsc
and json2avro
scripts from https://github.com/dmwm/WMArchive
Here are steps to setup necessary software;
cd some_dir
git clone [email protected]:dmwm/WMArchive.git
export PYTHONPATH=$PWD/WMArchive/src/python
export PATH=$PATH:$PWD/WMArchive/bin
and this will put into your PATH and PYTHONPATH all necessary stuff. Then you can run
# to generate file.avsc out of file.json
json2avsc --fin=file.json --fout=file.avsc
# to generate file.avro out of file.json/file.avsc
json2avro --fin=file.json --schema=file.avsc --fout=file.avro
Once you generate avsc and avro files you can put them into HDFS
# ssh to analytix
# create dir for hive tests
shell# hadoop fs -mkdir /user/valya/test/hive
# create dir for data within hive dir
shell# hadoop fs -mkdir /user/valya/test/hive/data
# copy avsc to hive dir
shell# hadoop fs -cp file.avsc /user/valya/test/hive
# copy avro to hive data dir, I replicate data multiple times
shell# hadoop fs -cp file.avro /usre/valya/test/data/file1.avro
shell# hadoop fs -cp file.avro /usre/valya/test/data/file2.avro
At this step you'll have avsc and avro files on HDFS. Now we can create hive table.
The hive table can be created as following:
# start hive on analytix cluster
analytix# hive
# within hive shell invoke the following command
# please change paths accordingly to your data location
# in this case I used /user/valya/test/hive directory on HDFS
# and arch.db is a file name for my hive DB
create database cms_wm_archive location '/user/valya/test/hive/arch.db';
# please change hdfs path to path where your schema file will reside
# in this case it is under
# /user/valya/test/hive/fwjr_processing.avsc
create external table wmarchive
ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.avro.AvroSerDe"
STORED AS INPUTFORMAT "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"
location "/user/valya/test/hive/data"
TBLPROPERTIES("avro.schema.literal"="hdfs://p01001532965510.cern.ch:9000/user/valya/test/hive/file.avsc");
Once table is created we can try to query our data via Hive queries, see
http://thornydev.blogspot.com/2013/07/querying-json-records-via-hive.html
https://docs.treasuredata.com/articles/hive
We need to perform stress tests, i.e. inject data into HDFS, create external table, setup Hive indexes and run Hive queries. All instructions can be found here: https://github.com/dmwm/WMArchive/wiki/Mongo-to-HDFS-Hive-migration
duplicate
To accommodate user queries we need uniform Query Language to be used in WMArchive regardless of the storage. If we'll use MongoDB short term storage it provides JSON based QL [1]. For HDFS we should leverage Hive [2] or others. If we'll use DAS-QL approach it can be adopted to [1] and [2].
Or, we can use different QLs based on back-end. It is open issue.
[1] https://docs.mongodb.org/v3.0/tutorial/query-documents/
[2] https://docs.treasuredata.com/articles/hive
Per discussion in [1] we'll need to write a script which will read data from MongoDB, extract not injected docs, inject them into HDFS and change attribute status of the document in MongoDB from "mongo" to "hdfs". To change status of the doc it should use MongoDB in place document update, see [2,3,4]. We need to add statuses independently into WMArchive such that this script will read load them. For HDFS injection someone should use WMArchive/Storage/HdfsIO.py module.
[1] https://twiki.cern.ch/twiki/bin/view/CMS/WMArchive
[2] https://docs.mongodb.org/v3.0/reference/method/db.collection.update/
[3] https://docs.mongodb.org/v3.0/tutorial/modify-documents/
[4] https://docs.mongodb.org/v3.0/reference/operator/update/set/
We need to speed-up aggregation process and reduce latency in agg. record creation for web UI. The following steps should be done:
As discussed before, I need to make sense of the FWJR performance data to present them properly in the visualizations. Could you provide documentation on as much data as possible in the FWJR data structure?
You can refer to this sample FWJR.
Most importantly I refer to the data in steps.performance
such as this:
"performance" : {
"multicore" : {
},
"storage" : {
"writeTotalMB" : 2127.67,
"readPercentageOps" : 1.2768,
"readAveragekB" : 4010.8291593,
"readTotalMB" : 3125.62663,
"readNumOps" : 625,
"readCachePercentageOps" : 0,
"readMBSec" : 0.00019514828571,
"readMaxMSec" : 611502,
"readTotalSecs" : 0,
"writeTotalSecs" : 10169000
},
"cpu" : {
"TotalJobCPU" : 165155,
"EventThroughput" : 0.0644757,
"AvgEventTime" : 59.1134,
"MaxEventTime" : 2066.86,
"TotalJobTime" : 57181.8,
"TotalLoopCPU" : 165142,
"MinEventTime" : 0.205815
},
"memory" : {
"PeakValueRss" : 3350.23,
"PeakValueVsize" : 4157.39
}
}
The second important part of the FWJR data to document are the exit codes in steps.errors.exitCode
, e.g. in this sample FWJR.
Last week at the WMArchive meeting it was requested that we pull logs "the old way" by pulling them off the agent:
https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsWorkflowTeamLeaderResponsibilities#Retrieving_log_files_from_failed
and via the instructions at:
https://github.com/dmwm/WMArchive/wiki/Given-a-LFN,-return-a-logArchive-or-logCollect-tarball-location
and looking at what Unified pulls:
to see if we are getting the same information all 3 ways, and if that information is what the sites need to debug workflows:
A lot of CMS services utilizes sphinx engine for documentation about the project. It comes from auxiliary files and code itself. We need to put in place similar system to auto generate WMArchive documentation.
Jen would like see below plots, but we don't have data for 3. 1 and 2 should be easy to add.
Currently archive contains all the fwjr even though it get eventually successful.
i.e
Below documents in meta_data fields (extracting from the 2 different documents in WMArchive data). fwjr_id contains the same prefix 1234- which considered as the same job but retried.
It might be useful only gets the aggregated statistics for the final case (in this case "fwjr_id": "1234-1").
"meta_data": {
...
"fwjr_id": "1234-0",
"host": "vocms001.cern.ch",
"agent_ver": "1.0.16.pre1",
...
},
....
"meta_data": {
...
"fwjr_id": "1234-1",
"host": "vocms001.cern.ch",
"agent_ver": "1.0.16.pre1",
...
},
Need to make following assumption.
Following two documents cannot be combined since it has different version.
"meta_data": {
...
"fwjr_id": "1234-0",
"host": "vocms001.cern.ch",
"agent_ver": "1.0.16.pre1",
...
},
....
"meta_data": {
...
"fwjr_id": "1234-1",
"host": "vocms001.cern.ch",
"agent_ver": "1.0.17.pre1",
...
},
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.