Git Product home page Git Product logo

wmarchive's People

Contributors

ericvaandering avatar meniluca avatar mrceyhun avatar muhammadimranfarooqi avatar nilsvu avatar stiegerb avatar ticoann avatar vkuznet avatar yuyiguo avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

wmarchive's Issues

Don't use json.dump when respond is returned.

Just use python dict, REST code will automatically encode depending on the header "Accept" from request.

When client decode json str, it get list of str instead of list of dictionary.
['{"status": "ok", "ids": ["b51484a168677e16660ea00250fa8036"], "stype": "mongodb"}']

Large queries with WMArchive

Dear @vkuznet ,

for my project on AI error-logs with @vlimant I now need to locate a large number of
log files, i.e. I have ~25.000 workflows (https://vocms0113.cern.ch/actionshistory) and for each
of them I need to locate the logs.
I managed to write for each of the workflows a spec file that contains the task name and the time-
range (with the exact day when it was run), e.g.

{"spec":{"task":"/vlimant_ACDC0_task_HIG-RunIIFall17wmLHEGS-01415__v1_T_180706_002124_986/HIG-RunIIFall17DRPremix-02001_1/HIG-RunIIFall17DRPremix-02001_1MergeAODSIMoutput/HIG-RunIIFall17MiniAODv2-01299_0", "timerange":[20180706,20180706]}, "fields":[]}

With this I am able to locate the unmerged jobs.
However, for one task this runs for around ~10 seconds, so if I create 25.000 spec files and run
a query for each of it it will take a lot of time. So I would like to know what might be the most
efficient way to locate that many files. Can I somehow make one large query that efficiently
returns all the files that I need, or give a finer time-range to speed up the query?
It would be great to know what possibilities exist for this problem.

Many, many thanks for your help!

Create MongoDBCleanup script

We'll need to create MongoDBCleanup script who will be responsible to clean-up "hdfs' docs from MongoDB based on given lifetime stamp. For example:

mongoCleanup.py --uri=URI --tstamp=3months --status=hdfs

It would be nice to pass tstamp in human readable form as well as normal UNIX seconds since epoch. The human readable form should support Nhours, Ndays, Nmonths, Nyears notations, where N is some integer.

Deploy and test with MongoDB 3.2 back-end

So far I deployed service with MongoDB 3.0.2 back-end since this RPM was available from my private repository. A new version of MongoDB came out recently and we need to rebuild MongoDB RPM, redeploy the WMArchive and test its functionality.

Archive data unit change

@vkuznet, @yuyiguo
Since we applied following patch to fix the unit (Which gets the second for total performance value instead of CMSSW defined milisecond),

dmwm/WMCore#7189

We need to validate the statistics whether second is correct unit for these data. I see some examples in the test which have sub seconds value. 60 mili seconds. which will translate to 0 sec.

If many of the data has sub second value or less then a couple of seconds milisecond might be the better unit.

Missing required field PrepID

Hi,

I have a problem to run myspark from lxplus.
I work with Jean-Roch and I need to access some error logs.
I tried to reproduce the example in https://github.com/dmwm/WMArchive/wiki/How-to-find-records-on-HDFS-using-pyspark

I logged in from an lxplus node following https://hadoop-user-guide.web.cern.ch/hadoop-user-guide/getstart/client_cvmfs.html, since it is not possible anymore to log in via ssh analytix

However when I run the the example in the twiki:
{"spec":{"task":"/amaltaro_StepChain_ReDigi3_HG1612_WMArchive_161130_192654_9283/DIGI","timerange":[20161130,20161202]}, "fields":[]}

myspark --spec=cond.spec --script=RecordFinder --records-output=records.json

I get an error message that tells me that the PrepID is missing:
I attach the output of myspark below.

It would be great if you could help me to solve this problem.

Many, many thanks in advance,
Best Lukas

[llayer@lxplus130 WMArchive]$ myspark --spec=cond.spec --script=RecordFinder --records-output=records.json
Using spark 2.X
Enable CVMFS ...
Ivy Default Cache set to: /afs/cern.ch/user/l/llayer/.ivy2/cache
The jars for the packages stored in: /afs/cern.ch/user/l/llayer/.ivy2/jars
:: loading settings :: url = jar:file:/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-avro_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.databricks#spark-avro_2.11;4.0.0 in central
        found org.slf4j#slf4j-api;1.7.5 in central
        found org.apache.avro#avro;1.7.6 in central
        found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
        found org.codehaus.jackson#jackson-mapper-asl;1.9.13 in central
        found com.thoughtworks.paranamer#paranamer;2.3 in central
        found org.xerial.snappy#snappy-java;1.0.5 in central
        found org.apache.commons#commons-compress;1.4.1 in central
        found org.tukaani#xz;1.0 in central
:: resolution report :: resolve 963ms :: artifacts dl 38ms
        :: modules in use:
        com.databricks#spark-avro_2.11;4.0.0 from central in [default]
        com.thoughtworks.paranamer#paranamer;2.3 from central in [default]
        org.apache.avro#avro;1.7.6 from central in [default]
        org.apache.commons#commons-compress;1.4.1 from central in [default]
        org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
        org.codehaus.jackson#jackson-mapper-asl;1.9.13 from central in [default]
        org.slf4j#slf4j-api;1.7.5 from central in [default]
        org.tukaani#xz;1.0 from central in [default]
        org.xerial.snappy#snappy-java;1.0.5 from central in [default]
        :: evicted modules:
        org.slf4j#slf4j-api;1.6.4 by [org.slf4j#slf4j-api;1.7.5] in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   10  |   0   |   0   |   1   ||   9   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 9 already retrieved (0kB/25ms)
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/cvmfs/sft.cern.ch/lcg/releases/hadoop/2.7.3-b1221/x86_64-centos7-gcc62-opt/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/02/08 09:11:08 INFO SparkContext: Running Spark version 2.2.1
19/02/08 09:11:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/02/08 09:11:09 INFO SparkContext: Submitted application: AvroKeyInputFormat
19/02/08 09:11:09 INFO SecurityManager: Changing view acls to: llayer
19/02/08 09:11:09 INFO SecurityManager: Changing modify acls to: llayer
19/02/08 09:11:09 INFO SecurityManager: Changing view acls groups to: 
19/02/08 09:11:09 INFO SecurityManager: Changing modify acls groups to: 
19/02/08 09:11:09 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(llayer); groups with view permissions: Set(); users  with modify permissions: Set(llayer); groups with modify permissions: Set()
19/02/08 09:11:09 INFO Utils: Successfully started service 'sparkDriver' on port 5001.
19/02/08 09:11:09 INFO SparkEnv: Registering MapOutputTracker
19/02/08 09:11:09 INFO SparkEnv: Registering BlockManagerMaster
19/02/08 09:11:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/02/08 09:11:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/02/08 09:11:09 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-71770f69-9df2-48f0-96b8-541540e19d03
19/02/08 09:11:09 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
19/02/08 09:11:09 INFO SparkEnv: Registering OutputCommitCoordinator
19/02/08 09:11:09 INFO log: Logging initialized @11312ms
19/02/08 09:11:09 INFO Server: jetty-9.3.z-SNAPSHOT
19/02/08 09:11:09 INFO Server: Started @11410ms
19/02/08 09:11:09 INFO AbstractConnector: Started ServerConnector@220b5902{HTTP/1.1,[http/1.1]}{0.0.0.0:5201}
19/02/08 09:11:09 INFO Utils: Successfully started service 'SparkUI' on port 5201.
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1ea5f333{/jobs,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4c756e2b{/jobs/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@9cf0917{/jobs/job,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5c0adfa3{/jobs/job/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@154de6bd{/stages,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@59dff77e{/stages/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@69ffa2df{/stages/stage,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@63fc12aa{/stages/stage/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@56bcafc0{/stages/pool,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2fd3a3e0{/stages/pool/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6a0d5620{/storage,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@38c6051f{/storage/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3db721{/storage/rdd,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41726ede{/storage/rdd/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@26fada3e{/environment,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1ef26daf{/environment/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3aa8db11{/executors,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5c06ad81{/executors/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@74b7f684{/executors/threadDump,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6db8becd{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@703bb05b{/static,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@346b2582{/,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@48511a93{/api,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4f021b90{/jobs/job/kill,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@27a69ce6{/stages/stage/kill,null,AVAILABLE,@Spark}
19/02/08 09:11:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://188.184.30.74:5201
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/v/valya/public/spark/spark-examples-1.6.0-cdh5.15.1-hadoop2.6.0-cdh5.15.1.jar at spark://188.184.30.74:5001/jars/spark-examples-1.6.0-cdh5.15.1-hadoop2.6.0-cdh5.15.1.jar with timestamp 1549613469967
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar at spark://188.184.30.74:5001/jars/com.databricks_spark-avro_2.11-4.0.0.jar with timestamp 1549613469968
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar at spark://188.184.30.74:5001/jars/org.slf4j_slf4j-api-1.7.5.jar with timestamp 1549613469968
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar at spark://188.184.30.74:5001/jars/org.apache.avro_avro-1.7.6.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar at spark://188.184.30.74:5001/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar at spark://188.184.30.74:5001/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar at spark://188.184.30.74:5001/jars/com.thoughtworks.paranamer_paranamer-2.3.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar at spark://188.184.30.74:5001/jars/org.xerial.snappy_snappy-java-1.0.5.jar with timestamp 1549613469969
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar at spark://188.184.30.74:5001/jars/org.apache.commons_commons-compress-1.4.1.jar with timestamp 1549613469970
19/02/08 09:11:09 INFO SparkContext: Added JAR file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar at spark://188.184.30.74:5001/jars/org.tukaani_xz-1.0.jar with timestamp 1549613469970
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py at file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py with timestamp 1549613470276
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/myspark.py
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar with timestamp 1549613470304
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/com.databricks_spark-avro_2.11-4.0.0.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/com.databricks_spark-avro_2.11-4.0.0.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar with timestamp 1549613470313
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.slf4j_slf4j-api-1.7.5.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar with timestamp 1549613470323
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.avro_avro-1.7.6.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.apache.avro_avro-1.7.6.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar with timestamp 1549613470336
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-core-asl-1.9.13.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.codehaus.jackson_jackson-core-asl-1.9.13.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar with timestamp 1549613470347
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.codehaus.jackson_jackson-mapper-asl-1.9.13.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar with timestamp 1549613470366
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/com.thoughtworks.paranamer_paranamer-2.3.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/com.thoughtworks.paranamer_paranamer-2.3.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar with timestamp 1549613470376
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.xerial.snappy_snappy-java-1.0.5.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.xerial.snappy_snappy-java-1.0.5.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar with timestamp 1549613470426
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.apache.commons_commons-compress-1.4.1.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.apache.commons_commons-compress-1.4.1.jar
19/02/08 09:11:10 INFO SparkContext: Added file file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar at file:/afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar with timestamp 1549613470436
19/02/08 09:11:10 INFO Utils: Copying /afs/cern.ch/user/l/llayer/.ivy2/jars/org.tukaani_xz-1.0.jar to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/org.tukaani_xz-1.0.jar
19/02/08 09:11:10 INFO Executor: Starting executor ID driver on host localhost
19/02/08 09:11:10 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 5101.
19/02/08 09:11:10 INFO NettyBlockTransferService: Server created on 188.184.30.74:5101
19/02/08 09:11:10 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/02/08 09:11:10 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManagerMasterEndpoint: Registering block manager 188.184.30.74:5101 with 2004.6 MB RAM, BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO BlockManager: external shuffle service port = 7337
19/02/08 09:11:10 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 188.184.30.74, 5101, None)
19/02/08 09:11:10 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3a8ae0e6{/metrics/json,null,AVAILABLE,@Spark}
19/02/08 09:11:10 WARN DFSUtil: Namenode for hadalytic remains unresolved for ID p01001532067275.cern.ch.  Check your hdfs-site.xml file to ensure namenodes are configured properly.
19/02/08 09:11:11 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/02/08 09:11:11 INFO EventLoggingListener: Logging events to hdfs:///user/spark/applicationHistory/local-1549613470488
19/02/08 09:11:11 INFO SparkContext: Added file /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py at file:/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py with timestamp 1549613471955
19/02/08 09:11:11 INFO Utils: Copying /afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/PySpark/RecordFinder.py to /tmp/spark-4dc0c089-9e65-4cf0-a8d4-f54426f5dac2/userFiles-3ab4c76a-4051-491f-8b02-b84f7a31e719/RecordFinder.py
19/02/08 09:11:14 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
        at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
        at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/02/08 09:11:14 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 455, in <module>
    main()
  File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 415, in main
    results = run(opts.schema, hdir, opts.script, opts.spec, verbose, opts.rout, opts.yarn)
  File "/afs/cern.ch/user/l/llayer/test_wm/WMArchive/src/python/WMArchive/Tools/myspark.py", line 275, in run
    avro_rdd = ctx.union([ctx.newAPIHadoopFile(f, aformat, akey, awrite, aconv, conf=conf) for f in data_path])
  File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/pyspark.zip/pyspark/context.py", line 675, in newAPIHadoopFile
  File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/cvmfs/sft.cern.ch/lcg/releases/spark/2.2.1-3206b/x86_64-centos7-gcc62-opt/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
        at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
        at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
        at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
        at org.apache.spark.api.python.PythonRDD$.newAPIHadoopFile(PythonRDD.scala:570)
        at org.apache.spark.api.python.PythonRDD.newAPIHadoopFile(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroTypeException: Found ns12.name12, expecting ns12.name12, missing required field PrepID
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
        at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
        at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:53)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:207)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more

Investigate ElasticSearch back-end

Even though initial prototype is developed with MongoDB back-end we still may need to explore Elastic Search one. This task involved deployment of Elastic Search and performing various tests to get benchmark numbers.

Create frontend UI

We may need to provide front-end UI interface to WMArchive. Here is possible functionality:

  • front page will provide basic statistics about the service
  • Provide search interface and basic GET interface to fetch some document(s)
  • Provide basic POST interface to upload a document to WMArchive

wmaClient

Need to implement wmaClient stand-alone tool to place queries into WMArchive.

Average event time

I talked with Dima regarding the WMArchive UI. He was very interested to it. The tried the average event time that he cares a lot. And he found we have negative number for that. See below plot:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775&jobtype=Merge
@ticoann, is the default value for average event time is negative?

Then he tried this one:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775&jobtype=Processing
It has average event time for 10 second/event and the is the correct value. This plot made with job_type=Processing.

If one removes the the filer (that is for all job types), Then this is the plot:
https://cmsweb-testbed.cern.ch/wmarchive/web/performance?metrics%5B%5D=jobstate&metrics%5B%5D=cpu.AvgEventTime&axes%5B%5D=host&axes%5B%5D=jobstate&axes%5B%5D=site&start_date=20161009&end_date=20161027&workflow=cerminar_Run2016B-v2-SingleElectron-23Sep2016_8020_161020_183522_4775
It has 1 second/event. This is too small. Could be the data integration counting the number of events wrong? Or added the negative time?

Add support for bulk POST requests

WMArchive should supoprt POST requests with single or multiple docs. It will send back the uid of the doc for single doc or send back list of uids which correspond to given list of docs. And, we need a tool to find matching between docs and uids.

Integration tests to understand the ExitCodes

We discussed that we need to run 10-100 integration tests to study the ExitCode. The goal is to understand the exitCode and what is the root exitCode to cause a workflow failed. We will run these tests on testbed and the workflow should have distinct names to be identified.

The problem came out because the exitCodes were interesting to @jenimal could not be found in WMArchive db. She got these error from WMStats(?) or requestManager(?). She was trying to put information from different sources together to identify the cause/location/severity of one error. @ticoann, please comment on how exitCodes were handle in other service.

@amaltaro you have some tests that can be used for this integration test. Could you please start running them and report the workflow names in this issue.

@vkuznet Addition to the current exitCode plots, should we add a plot that only have the first exitCode for each failed workflow?

Create avro schema out of given JSON file

It would be nice to have a tool which automatically creates avro schema out of given JSON file. For that it should inspect structure and value types of JSON and write schema spec. Here is a format for such script:

avro_shema.py --fin=file.json --fout=file.avsc

Actions needed to have the next release as Nov 30

@vkuznet , @ticoann , @amaltaro

Action #1, Seangchan will talk with Jen on exitCode and finalized it this week.
Action #2, Seangchan will check the unit issue and make final decision.
Action #3, Valentin check the files with workflow names for the data that Alan upload to WMArchive prod DB that is connected to cmsweb-testbed UI. The date of the uploading is Nov. 16 . Alan tired to fetch them using the UI for time period of Nov 16-23, but was unable to find it. Valentin will try again, if not find it, Alan will rerun the tests. If we could not find the files after rerunning the test, Valentin will check the cron jobs that handling the data.

Action #1 and #2 may involve schema changes, so we need to settle down them ASAP.

don't allow duplicate docs get inserted.

Agent could send duplicated docs due to network problem.
One case is when agent sent the bulk doc and although it was successfully updated (or partially successful) if there is temporary network problem right before agent get the response, agent will try to update the same document again.

The metadata field should be unique to each document (it contains time stamps), that could be used as the validation information,

Investigate spark streaming approach

We can use the idea of Julia to run spark queries on freshly inserted data into HDFS. This will use less resources and aggregate some info on daily basis. This aggregated info can be fed into dashboard or any other db or HDFS. Then we can run another aggregated query over pre-aggregated daily data.

Paola's usecase

Paola request below data searching task:
Given a PrepID and a time frame (a week), she needs to find out:

  1. the workflow names/task names associated with the prepID.
  2. The failed job type.
  3. The site name where the job was failed.
  4. The exitCode of the failed job.

The definition of a failed job is that the job was failed after all the tries, usually Agents will try three times. If a job runs successful after the 3rd try, we will not collect the information for the previous failed tries. And this job is considered as a successful job.

We will only collect the final failed try for above information, the previous tries will not be collected.

currently, PrepID info is at file level. Seangchan and his group is working on getting PrepID and Champaign info into the top level of FWJR. We will need to update WMarchive schema after they put these in production.

Valentin is working on the scripts using the prepID at file level. He will provide Paola a working script for her initial use.

Investigate usage of PySpark

I provided code to use PySpark APIs and submit spark job. Based on information I got so far it should be much faster then MR since Spark handle data in memory. We need to investigate how it works and compare it with MR approach.

Document keys used in WMArchive

Create a json file that document keys used in WMArchive. UI will read from this json file to show defination of the metrics when mouse is moving on them. @vkuznet please let me know where in the repository to store this file. I will first put the performance keys there.

Update avro input path

Hi,

When I ran myspark to collect the log
$ myspark --spec=cond.spec --script=RecordFinder --records-output=records.json

I encountered this error:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://analytix/cms/wmarchive/avro/2017/06/30

The correct path should be:
hdfs://analytix/cms/wmarchive/avro/fwjr/2017/06/30

Could you please update this in the source code? Thanks!

Perform memory check for bulk read/write APIs

After yesterday meeting I was working with a code to implement bulk read/write
functionality for WMArchive. It is doable, but I realized that based on current
structure of FWJR and Hadoop "desires" to deal with large files we may end-up
with a bottleneck on our server. Here is a problem:

  • Kate/Zbigniew said that they prefer (Hadoop prefer) to write file of large
    size, the number they mention was about 256MB.
  • In order to write this file we need to load/parse N documents
  • Each FWJR document we deal with is JSON, moreover it has deep nested structure,
    i.e. dict contains list of dicts, etc.
  • The Avro schema is also JSON and it order to write a bulk of docs we need
    to create a JSON like {'data': [our_fwjr_docs_here]} (*), we can't write just
    list of fwjr's since avro schema can't consume list of dics, it requires
    a name for the record [1](I think internally it is static data struct
    defined by schema and such struct should have named attributes).
  • Let's assume that we want to create large avro file with N documents where N
    is large (to accommodate Hadoop desire). Then we'll end-up in python code
    which will hold N FWJR in memory, then we'll need to create (*) structure
    which will increase RAM usage. This is the same effect as I dealt with
    DBS memory leaks and the same as Marco reported for CRAB server.

I didn't make any estimates yet, since I don't know N, but we may have a
problem here. On one hand we can't hold too many docs due to potential
RAM usage, on another Hadoop "desire" large docs.

So, to make reasonable estimate I need to know N.

From WMArchive twiki I read the following:
"We are currently running 10 WMAgents, each agent has on average about 1M jobs,
each job will produce FWJR report which we need to store in WMArchive. All jobs
are done in parallel, but we expect to have on the order of <1K docs per
seconds feed into WMArchive. The system should sustain such load."

But from this sentence I don't know what is a daily rate we need to deal with.
Seangchan, can you provide better estimate. I need to know how many docs
each agent will write on average per day. Once we know that I can try different
N's, e.g. 1K, 10K, 100K and measure RAM usage when we'll load data from
MongoDB into code which will convert them to Avro file as a chunk.

Then I can have better idea which threshold for N we can sustain on our server.
So far, mongo2hdfs script reads docs from Mongo and write them (one-by-one) to
HDFS. To support bulk write, I need to read N docs from Mongo, keep them in RAM
in order to write Avro file to HDFS. Of course I can split last step and write
N docs into local file system and then run separate process to copy that file
into HDFS, but it still does not solve RAM usage issue.

But the real issue here is a FWJR data-structure we use and python memory
allocation for it. Since FWJR has deep nestness structure and we need
to hold large chunk of them again in a dictionary I can't organize streaming.
Yet, it is another issue with FWJR and storage/look-up approach.
The storage and look-up prefer flat data, while our application relies
on deep nested dict structures. The real remedy is to modify FWJR structure
to be flat and then use streaming for writing. The data look-up will benefit
from flat structure as well, e.g. compare a.b.c.d.e.f data access with just
single column look-up.

Best,
Valentin.

[1] http://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/avroschemas.html#avro-creatingschemas

Return list of IDs instead of docs for given query

To reduce load on a system we may need to return list of IDs instead of actual docs for given query. And, may be prohibit query with list of IDs and rather re-route user to HDFS that (s)he will fetch them directly.

Resolve hive issue with FWJR

We would like to use Hive tool to search over our data. The data will resides somewhere on hadoop cluster along with some schema which describe the data. Then we want to create an external hive table which will allow us to search over our data via Hive queries.

To do this task few steps should be performed

  • create your own JSON file or take one from https://github.com/dmwm/WMArchive/tree/master/test/data (for reference I'll refer to it as file.json)
  • create avro schema file for your JSON file (for references I'll refer to it as file.avsc)
  • create avro files from your JSON file and avro schema file (for references I'll refer to it as file1,2,3.avro)

The file with avsc extension is a avro schema file. To generate avsc and avro files someone can json2avsc and json2avro scripts from https://github.com/dmwm/WMArchive

Here are steps to setup necessary software;

cd some_dir
git clone [email protected]:dmwm/WMArchive.git
export PYTHONPATH=$PWD/WMArchive/src/python
export PATH=$PATH:$PWD/WMArchive/bin

and this will put into your PATH and PYTHONPATH all necessary stuff. Then you can run

# to generate file.avsc out of file.json
json2avsc --fin=file.json --fout=file.avsc

# to generate file.avro out of file.json/file.avsc
json2avro --fin=file.json --schema=file.avsc --fout=file.avro

Once you generate avsc and avro files you can put them into HDFS

# ssh to analytix
# create dir for hive tests
shell# hadoop fs -mkdir /user/valya/test/hive
# create dir for data within hive dir
shell# hadoop fs -mkdir /user/valya/test/hive/data

# copy avsc to hive dir
shell# hadoop fs -cp file.avsc /user/valya/test/hive

# copy avro to hive data dir, I replicate data multiple times
shell# hadoop fs -cp file.avro /usre/valya/test/data/file1.avro
shell# hadoop fs -cp file.avro /usre/valya/test/data/file2.avro

At this step you'll have avsc and avro files on HDFS. Now we can create hive table.

The hive table can be created as following:

# start hive on analytix cluster
analytix# hive

# within hive shell invoke the following command 
# please change paths accordingly to your data location
# in this case I used /user/valya/test/hive directory on HDFS
# and arch.db is a file name for my hive DB

create database cms_wm_archive location '/user/valya/test/hive/arch.db';

# please change hdfs path to path where your schema file will reside
# in this case it is under
# /user/valya/test/hive/fwjr_processing.avsc

create external table wmarchive
ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.avro.AvroSerDe"
STORED AS INPUTFORMAT "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"
OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"
location "/user/valya/test/hive/data"
TBLPROPERTIES("avro.schema.literal"="hdfs://p01001532965510.cern.ch:9000/user/valya/test/hive/file.avsc");

Once table is created we can try to query our data via Hive queries, see
http://thornydev.blogspot.com/2013/07/querying-json-records-via-hive.html
https://docs.treasuredata.com/articles/hive

Develop WMArchive QL to support queries in short and long term storage

To accommodate user queries we need uniform Query Language to be used in WMArchive regardless of the storage. If we'll use MongoDB short term storage it provides JSON based QL [1]. For HDFS we should leverage Hive [2] or others. If we'll use DAS-QL approach it can be adopted to [1] and [2].
Or, we can use different QLs based on back-end. It is open issue.

[1] https://docs.mongodb.org/v3.0/tutorial/query-documents/
[2] https://docs.treasuredata.com/articles/hive

Create Mongo2Hdfs migration script

Per discussion in [1] we'll need to write a script which will read data from MongoDB, extract not injected docs, inject them into HDFS and change attribute status of the document in MongoDB from "mongo" to "hdfs". To change status of the doc it should use MongoDB in place document update, see [2,3,4]. We need to add statuses independently into WMArchive such that this script will read load them. For HDFS injection someone should use WMArchive/Storage/HdfsIO.py module.

[1] https://twiki.cern.ch/twiki/bin/view/CMS/WMArchive
[2] https://docs.mongodb.org/v3.0/reference/method/db.collection.update/
[3] https://docs.mongodb.org/v3.0/tutorial/modify-documents/
[4] https://docs.mongodb.org/v3.0/reference/operator/update/set/

Reduce latency in aggregation

We need to speed-up aggregation process and reduce latency in agg. record creation for web UI. The following steps should be done:

  • change migration crontab to write more often to HDFS
  • add hash key into agg docs
  • add hash index into MongoDB
  • change insert to upsert for MongoDB agg doc insertion
  • change interval of agg docs generation and insertion into MongoDB.
    I think it is doable to reduce latency from 1day to 1hour.

FWJR performance documentation

As discussed before, I need to make sense of the FWJR performance data to present them properly in the visualizations. Could you provide documentation on as much data as possible in the FWJR data structure?

You can refer to this sample FWJR.

  • Most importantly I refer to the data in steps.performance such as this:

    "performance" : {
        "multicore" : {
        },
        "storage" : {
            "writeTotalMB" : 2127.67,
            "readPercentageOps" : 1.2768,
            "readAveragekB" : 4010.8291593,
            "readTotalMB" : 3125.62663,
            "readNumOps" : 625,
            "readCachePercentageOps" : 0,
            "readMBSec" : 0.00019514828571,
            "readMaxMSec" : 611502,
            "readTotalSecs" : 0,
            "writeTotalSecs" : 10169000
        },
        "cpu" : {
            "TotalJobCPU" : 165155,
            "EventThroughput" : 0.0644757,
            "AvgEventTime" : 59.1134,
            "MaxEventTime" : 2066.86,
            "TotalJobTime" : 57181.8,
            "TotalLoopCPU" : 165142,
            "MinEventTime" : 0.205815
        },
        "memory" : {
            "PeakValueRss" : 3350.23,
            "PeakValueVsize" : 4157.39
        }
    }
    
  • The second important part of the FWJR data to document are the exit codes in steps.errors.exitCode, e.g. in this sample FWJR.

Need to pull logs multiple ways and compare them

Last week at the WMArchive meeting it was requested that we pull logs "the old way" by pulling them off the agent:
https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsWorkflowTeamLeaderResponsibilities#Retrieving_log_files_from_failed

and via the instructions at:
https://github.com/dmwm/WMArchive/wiki/Given-a-LFN,-return-a-logArchive-or-logCollect-tarball-location
and looking at what Unified pulls:

to see if we are getting the same information all 3 ways, and if that information is what the sites need to debug workflows:

Add sphinx documentation

A lot of CMS services utilizes sphinx engine for documentation about the project. It comes from auxiliary files and code itself. We need to put in place similar system to auto generate WMArchive documentation.

UI request from Jen

Jen would like see below plots, but we don't have data for 3. 1 and 2 should be easy to add.

  1. Distribution of the number of errors by time, by sites filtered by workflow name.
  2. ErrorCode changes vs time filtered by workflow.
  3. The number of CPU cores.

Add filter for aggregation getting only final fwjr

Currently archive contains all the fwjr even though it get eventually successful.

i.e
Below documents in meta_data fields (extracting from the 2 different documents in WMArchive data). fwjr_id contains the same prefix 1234- which considered as the same job but retried.
It might be useful only gets the aggregated statistics for the final case (in this case "fwjr_id": "1234-1").

"meta_data": {
    ...
    "fwjr_id": "1234-0", 
    "host": "vocms001.cern.ch", 
    "agent_ver": "1.0.16.pre1", 
    ... 
}, 

....
"meta_data": {
    ...
    "fwjr_id": "1234-1", 
    "host": "vocms001.cern.ch", 
    "agent_ver": "1.0.16.pre1", 
    ... 
}, 

Need to make following assumption.

  1. Only failed jobs will be retried. so if a job is successful only one job report is created. (with the post fix "-0")
  2. new fwjr will be created when job is eventually failed or succeed. (currently set to do 3 retry but this could be changed although unlikely), So in current setting "-2" post fix indicate the last tried job in failed case. Success case it could be any post fix between -0 to -2.
  3. One important thing to consider fwjr_id alone can't be used. Different document can have the same fwjr_id, so it need to consider "host" and "agent_version" as well do look for the same job.

Following two documents cannot be combined since it has different version.

"meta_data": {
    ...
    "fwjr_id": "1234-0", 
    "host": "vocms001.cern.ch", 
    "agent_ver": "1.0.16.pre1", 
    ... 
}, 

....
"meta_data": {
    ...
    "fwjr_id": "1234-1", 
    "host": "vocms001.cern.ch", 
    "agent_ver": "1.0.17.pre1", 
    ... 
}, 

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.