Git Product home page Git Product logo

datasplash's Introduction

Datasplash

Clojars Project

cljdoc badge

Clojure API for a more dynamic Google Cloud Dataflow and (not really battle tested) any other Apache Beam backend.

Usage

API docs

You can also see ports of the official Dataflow examples in the datasplash.examples namespace.

Here is the classic word count example.

ℹ️ You will need to run (compile 'datasplash.examples) every time you make a change.

(ns datasplash.examples
  (:require [clojure.string :as str]
            [datasplash.api :as ds]
            [datasplash.options :refer [defoptions]])
  (:gen-class))

(defn tokenize
  [^String l]
  (remove empty? (.split (str/trim l) "[^a-zA-Z']+")))

(defn count-words
  [p]
  (ds/->> :count-words p
          (ds/mapcat tokenize {:name :tokenize})
          (ds/frequencies)))

(defn format-count
  [[k v]]
  (format "%s: %d" k v))

(defoptions WordCountOptions
  {:input {:default "gs://dataflow-samples/shakespeare/kinglear.txt"
           :type String}
   :output {:default "kinglear-freqs.txt" :type String}
   :numShards {:default 0 :type Long}})

(defn -main
  [& str-args]
  (let [p (ds/make-pipeline WordCountOptions str-args)
        {:keys [input output numShards]} (ds/get-pipeline-options p)]
    (->> p
         (ds/read-text-file input {:name "King-Lear"})
         (count-words)
         (ds/map format-count {:name :format-count})
         (ds/write-text-file output {:num-shards numShards})
         (ds/run-pipeline))))

Run it from the repl

Locally on your machine using a DirectRunner:

(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--input=sometext.txt" "--output=out-freq.txt" "--numShards=1")

Remotely on Google Cloud using a DataflowRunner:

You should have properly configured your Google Cloud account and Dataflow access from your machine.

(in-ns 'datasplash.examples)
(clojure.core/compile 'datasplash.examples)
(-main "--project=my-project"
       "--runner=DataflowRunner"
       "--gcpTempLocation=gs://bucket/tmp"
       "--input=gs://apache-beam-samples/shakespeare/kinglear.txt"
       "--output=gs://bucket/outputs/kinglear-freq.txt"
       "--numShards=1")

Run it as a standalone program

Datasplash needs to be AOT compiled, so you should prepare an uberjar and run from your main entry like so:

java -jar my-dataflow-job-uber.jar [beam-args]

Caveats

  • Due to the way the code is loaded when running in distributed mode, you may get some exceptions about unbound vars, especially when using instances with a high number of cpu. They will not however cause the job to fail and are of no consequences. They are caused by the need to prep the Clojure runtime when loading the class files in remote instances and some tricky business with locks and require.
  • If you have to write your own low-level ParDo objects (you shouldn't), wrap all your code in the safe-exec macro to avoid issues with unbound vars. Any good idea about finding a better way to do this would be greatly appreciated!
  • If some of the UserCodeException as seen in the cloud UI are mangled and missing the relevant part of the Clojure source code, this is due to a bug with the way the sdk mangles stacktraces in Clojure. In this case look for ClojureRuntimeException in the logs to find the original unaltered stacktrace.
  • Beware of using Clojure 1.9: proxy results are not Serializable anymore, so you cannot use anywhere in your pipeline Clojure code that uses proxy. Use Java shim for these objects instead.
  • If you see something like java.lang.ClassNotFoundException: Options you probably forgot to compile your namespace.
  • Whenever you need to check some spec in user code, you will have to first require those specs because they may not be loaded in your Clojure runtime. But don't use (require) because it's not thread safe. See [this issue] for a workaround.
  • If you see a java.io.IOException: No such file or directory when invoking compile, make sure there is a directory in your project root that matches the value of *compile-path* (default classes).

About compression libraries

The Beam Java SDK does not pull in the Zstd library by default, so it is the user's responsibility to declare an explicit dependency on zstd-jni. Attempts to read or write .zst files without this library loaded will result in NoClassDefFoundError at runtime.

The Beam Java SDK does not pull in the required libraries for LZOP compression by default, so it is the user's responsibility to declare an explicit dependency on io.airlift:aircompressor and com.facebook.presto.hadoop:hadoop-apache2. Attempts to read or write .lzo files without those libraries loaded will result in a NoClassDefFoundError at runtime.

See Apache Beam Compression enum for details.

License

Copyright © 2015-2024 Oscaro.com

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

datasplash's People

Contributors

bfontaine avatar domparry avatar edporras avatar ericfode avatar fiv0 avatar hiteki avatar jasonmm avatar jeremy-lc avatar jprudent avatar kawas44 avatar m-faucon avatar m-lce avatar neuromantik33 avatar ngrunwald avatar quan-nh avatar rolt avatar simply-gh avatar xfthhxk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

datasplash's Issues

Apache Beam support

Hey guys,

Just ran my first job on Dataflow using datasplash today, and it worked great. Thanks for the library!

Any plans to migrate to Beam? Any idea how big a change it would be to datasplash?

Cheers!

`sliding-windows` vs `SlidingWindows`

The sliding windows in this Java code work as I would expect them to. Each window is one file that contains the elements from that window.

The sliding windows in this Clojure code behave differently. Each window is split across multiple files with each file containing a portion of the elements from that window.

The two pipelines are intended to behave identically, and, with the exception of the output files, do appear to behave identically.

Reading the code for sliding-windows I can't see what would cause the difference. Is sliding-windows intended to behave differently from SlidingWindows? If so, what is the proper way to duplicate the functionality of SlidingWindows?

Exception when running example in Readme

First of all thanks for the library! The datasplash API and examples make Beam very approachable. I was trying out the example in the Readme and got the following error.

*e
#error {
 :cause ".via() is required"
 :via
 [{:type java.lang.IllegalArgumentException
   :message ".via() is required"
   :at [org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions checkArgument "Preconditions.java" 122]}]
 :trace
 [[org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions checkArgument "Preconditions.java" 122]
  [org.apache.beam.sdk.io.FileIO$Write expand "FileIO.java" 1212]
  [org.apache.beam.sdk.io.FileIO$Write expand "FileIO.java" 778]
  [org.apache.beam.sdk.Pipeline applyInternal "Pipeline.java" 537]
  [org.apache.beam.sdk.Pipeline applyTransform "Pipeline.java" 471]
  [org.apache.beam.sdk.values.PCollection apply "PCollection.java" 357]
  [jdk.internal.reflect.GeneratedMethodAccessor105 invoke nil -1]
  [jdk.internal.reflect.DelegatingMethodAccessorImpl invoke "DelegatingMethodAccessorImpl.java" 43]
  [java.lang.reflect.Method invoke "Method.java" 566]
  [clojure.lang.Reflector invokeMatchingMethod "Reflector.java" 167]
  [clojure.lang.Reflector invokeInstanceMethod "Reflector.java" 102]
  [datasplash.core$tapply invokeStatic "core.clj" 432]
  [datasplash.core$tapply invoke "core.clj" 428]
  [datasplash.core$apply_transform invokeStatic "core.clj" 452]
  [datasplash.core$apply_transform invoke "core.clj" 445]
  [datasplash.core$write_text_file invokeStatic "core.clj" 1168]
  [datasplash.core$write_text_file invoke "core.clj" 1149]

Discussion on exclude-from-index option for datastore entities

Hi datasplashers!

I have a change in my forked version of the lib. It's for datastore, when creating entities.

(defn- make-ds-entity-builder
  [raw-values {:keys [exclude-from-index] :as options}]
  (let [excluded-set (into #{} (map name exclude-from-index))
        ^Entity$Builder entity-builder (Entity/newBuilder)]
    (doseq [[v-key v-val] raw-values]
      (.put (.getMutableProperties entity-builder)
            (if (keyword? v-key) (name v-key) v-key)
            (let [^Value$Builder val-builder (make-ds-value-builder v-val)]
              (-> val-builder
                  (cond->
                      (or (excluded-set (name v-key))
                          (and (string? v-val) (> (alength (.getBytes v-val)) 1500)))
                      (.setExcludeFromIndexes true))
                  (.build)))))
    entity-builder))

Currently you can pass in a set of fields to be excluded from indexes, but this only works for top level entities.

So I have added a check to look for strings that are larger than the allowed 1500 bytes, and those are also excluded from indexes.

Perhaps I can add this to a PR. But I'm thinking it should be opt in? Perhaps a flag that can be passed in.

I have also implemented a change that will exclude indexing on nested entities, but this excludes any entity with a name that is included in the exclude-from-index option in the options map. I could submit a PR for this too if preferred.

Would love to hear any thoughts from active users of the library.

write-json-file regression

@domparry bad news, I've ran into a problem with the output generated in that objects are getting stringified. ex., generating a pcoll of

[{:id "23" :vals #{{:id "41" :views 342} {:id "52" :views 41}}}
 {:id "15" :vals #{{:id "44" :views 4} {:id "492" :views 59}}}]

results in this when calling write-json-file:

{"id":"23","vals":"#{{:id \"52\", :views 41} {:id \"41\", :views 342}}"}
{"id":"15","vals":"#{{:id \"492\", :views 59} {:id \"44\", :views 4}}"}

At first I was baffled how the test was not catching this and then I realized… I never added a write-json-file test 🤦‍♂️😭

I'll take a look at this tomorrow or over the weekend.

Separate out non-core APIs from api.clj

I'd like to propose restructuring the api resource so that optional dependencies such as bq and pubsub not be included in api.clj. That would allow those of us not using those subsystems from having to pull in those dependencies while still having access to the syntactic sugar offered there.

Alternatively, include the syntactic sugar directly in core.clj.

swap out cheshire for jsonista

I'm considering swapping out cheshire for jsonista (https://github.com/metosin/jsonista). It offers significantly faster parsing to and from json, and still uses jackson under the covers.

Would there be any objections? I'm keen to do this on our fork, but don't want to diverge too much from datasplash.

java.io.IOException No such file or directory

When compiling

(in-ns 'foo.bar)
(compile 'foo.bar)

I keep encountering the following error:

1. Caused by java.io.IOException
   No such file or directory

       UnixFileSystem.java:   -2  java.io.UnixFileSystem/createFileExclusively
                 File.java: 1024  java.io.File/createNewFile
             Compiler.java: 7672  clojure.lang.Compiler/writeClassFile
             Compiler.java: 4670  clojure.lang.Compiler$ObjExpr/compile
             Compiler.java: 4106  clojure.lang.Compiler$FnExpr/parse
             Compiler.java: 7105  clojure.lang.Compiler/analyzeSeq
             Compiler.java: 6789  clojure.lang.Compiler/analyze
             Compiler.java: 7095  clojure.lang.Compiler/analyzeSeq
             Compiler.java: 6789  clojure.lang.Compiler/analyze
             Compiler.java: 6745  clojure.lang.Compiler/analyze
             Compiler.java: 3820  clojure.lang.Compiler$InvokeExpr/parse
             Compiler.java: 7109  clojure.lang.Compiler/analyzeSeq
             Compiler.java: 6789  clojure.lang.Compiler/analyze
             Compiler.java: 6745  clojure.lang.Compiler/analyze
             Compiler.java: 7726  clojure.lang.Compiler/compile1
             Compiler.java: 7721  clojure.lang.Compiler/compile1
             Compiler.java: 7798  clojure.lang.Compiler/compile
                   RT.java:  411  clojure.lang.RT/compile
                   RT.java:  457  clojure.lang.RT/load
                   RT.java:  424  clojure.lang.RT/load
                  core.clj: 6126  clojure.core/load/fn
                  core.clj: 6125  clojure.core/load
                  core.clj: 6109  clojure.core/load
               RestFn.java:  408  clojure.lang.RestFn/invoke
                  core.clj: 5908  clojure.core/load-one
                  core.clj: 6136  clojure.core/compile/fn
                  core.clj: 6136  clojure.core/compile
                  core.clj: 6128  clojure.core/compile

No matching method found: withNumQuerySplit for class com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$Read

Hi there,

Seems to be a typo in read-datastore-raw when passing in option :num-query-split. When invoking like this:

(dts/read-datastore-raw {:name "Read Events from Datastore"
                                             :project-id dataset
                                             :query (make-event-query interactionId)
                                             :namespace namespace
                                             :num-query-split 1})

I got the following exception:

Exception in thread "main" java.lang.IllegalArgumentException: No matching method found: withNumQuerySplit for class com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$Read, compiling:(/private/var/folders/mc/6vrbpxbs60q4hwdrp8wm89380000gn/T/form-init7020878441071113286.clj:1:125)
	at clojure.lang.Compiler.load(Compiler.java:7391)
	at clojure.lang.Compiler.loadFile(Compiler.java:7317)
	at clojure.main$load_script.invokeStatic(main.clj:275)
	at clojure.main$init_opt.invokeStatic(main.clj:277)
	at clojure.main$init_opt.invoke(main.clj:277)
	at clojure.main$initialize.invokeStatic(main.clj:308)
	at clojure.main$null_opt.invokeStatic(main.clj:342)
	at clojure.main$null_opt.invoke(main.clj:339)
	at clojure.main$main.invokeStatic(main.clj:421)
	at clojure.main$main.doInvoke(main.clj:384)
	at clojure.lang.RestFn.invoke(RestFn.java:421)
	at clojure.lang.Var.invoke(Var.java:383)
	at clojure.lang.AFn.applyToHelper(AFn.java:156)
	at clojure.lang.Var.applyTo(Var.java:700)
	at clojure.main.main(main.java:37)
Caused by: java.lang.IllegalArgumentException: No matching method found: withNumQuerySplit for class com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1$Read
	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:53)
	at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
	at datasplash.datastore$read_datastore_raw.invokeStatic(datastore.clj:30)
	at datasplash.datastore$read_datastore_raw.invoke(datastore.clj:21)
	at fulliautomatix.fix_events$fetch_and_fix_events.invokeStatic(fix_events.clj:65)
	at fulliautomatix.fix_events$fetch_and_fix_events.invoke(fix_events.clj:59)
	at clojure.core$partial$fn__4759.invoke(core.clj:2515)
	at clojure.core$map$fn__4785.invoke(core.clj:2644)
	at clojure.lang.LazySeq.sval(LazySeq.java:40)
	at clojure.lang.LazySeq.seq(LazySeq.java:49)
	at clojure.lang.RT.seq(RT.java:521)
	at clojure.core$seq__4357.invokeStatic(core.clj:137)
	at clojure.core$map$fn__4785.invoke(core.clj:2637)
	at clojure.lang.LazySeq.sval(LazySeq.java:40)
	at clojure.lang.LazySeq.seq(LazySeq.java:49)
	at clojure.lang.RT.seq(RT.java:521)
	at clojure.core$seq__4357.invokeStatic(core.clj:137)
	at clojure.core$dorun.invokeStatic(core.clj:3024)
	at clojure.core$doall.invokeStatic(core.clj:3039)
	at clojure.core$doall.invoke(core.clj:3039)
	at fulliautomatix.fix_events$apply_transformations_to_pipeline.invokeStatic(fix_events.clj:90)
	at fulliautomatix.fix_events$apply_transformations_to_pipeline.invoke(fix_events.clj:86)
	at fulliautomatix.fix_events$run_main.invokeStatic(fix_events.clj:102)
	at fulliautomatix.fix_events$run_main.invoke(fix_events.clj:93)
	at fulliautomatix.core$run_fix_events.invokeStatic(core.clj:15)
	at fulliautomatix.core$run_fix_events.invoke(core.clj:13)
	at fulliautomatix.core$_main.invokeStatic(core.clj:32)
	at fulliautomatix.core$_main.doInvoke(core.clj:26)
	at clojure.lang.RestFn.invoke(RestFn.java:439)
	at clojure.lang.Var.invoke(Var.java:388)
	at user$eval5.invokeStatic(form-init7020878441071113286.clj:1)
	at user$eval5.invoke(form-init7020878441071113286.clj:1)
	at clojure.lang.Compiler.eval(Compiler.java:6927)
	at clojure.lang.Compiler.eval(Compiler.java:6917)
	at clojure.lang.Compiler.load(Compiler.java:7379)
	... 14 more

Looking at the documentation, I think the method to be called is: withNumQuerySplits <- note the "s" at the end.

I'll submit a PR to fix this.

Issues with boolean serialization

The test that illustrates this issue is the following (and adapted version of the first map-test)

(testing "bad map-fn serialization"
    (let [p (sut/make-pipeline [])
          input (-> [{:a 1} {:b 2} {:c 3}]
                    (sut/generate-input p))
          my-bool false
          rslt (sut/map (fn [x]
                          (if my-bool
                            x
                            {:random :value}))
                        {:name :map-w-sys
                         :initialize-fn (fn [] {:init 10})}
                        input)]

      (is (str/starts-with? (.getName rslt) "map-w-sys"))
      (is (-> (PAssert/that rslt)
              (.containsInAnyOrder [{:a 1 }
                                    {:b 2 }
                                    {:c 3 }])))

      (sut/wait-pipeline-result (sut/run-pipeline p))))

This test currently passes on master (it shouldn't). The problem seems to be the my-bool serialization. Replacing it with a literal value of false in the map fn makes the test fail (as expected).

I think the problem has to do with the anonymous fn serialization. my-bool is out scope when deserialized and executed on some worker. The question is how we should deal with it.

PubSub message Attributes

Sorry, another question from me. I seem to only get the contents of the PubSub "data" payload in the pubsub message. Is there a way to get the Attributes too?

About clj-kondo integration

Hello,

PR #145 raises concerns about clj-kondo integration. Let's talk about this matter here to know what to implement.

A - For Datasplash users

As a library, Datasplash exposes functions & macro that may raise Kondo warnings. We may/should provide a Kondo config using Kondo extension mechanism. This "opt-in" mechanism allow our users to import "if they want/need" our packaged Kondo config to ease Kondo understanding of our code.

B - For Datasplash developers

As a Clojure project, Datasplash follow coding practices that Kondo can help enforce. We may/should set a Kondo config for CI pipeline and set a lint stage which fails on warnings.

What can we do?

I think that we should provide a config for our users

  • Create a Kondo config and code in resources/clj-kondo.exports/datasplash/datasplash/

I do not think that we need to integrate Kondo to our pipeline

But it is not something complicated and it can help code quality (But what about Clj-Fmt then?).

  • Add clj-kondo as a dev dependency
  • Create a CI Kondo config .clj-kondo/ci-config.edn and define project linting rules
  • Configure Github Worflow to run clj-kondo and fail on warnings

I do think that we should remove the current Kondo config file

This user config file should be ignored and everything user related under .clj-kondo as well. This config file should be edited as needed by developers themself.

  • Remove the current .clj-kondo/config.edn
  • Configure .gitignore to ignore everything under .clj-kondo/

What do you think?

Regards
Kawas

Different behavior of output for DataflowRunner vs. DirectRunner

Not sure if this is an issue with datasplash or Beam, but web search isn't revealing anything. I'm seeing different behavior for output when I run locally via DirectRunner vs. on Dataflow when using sliding windows. All works as expected on Dataflow, but locally, there is not out, either with ds/write-edn-file or ps/write-to-pubsub. Input in this case is via ps/read-from-pubsub. If I just run locally from a bounded input soure, it works as expected locally. No errors that I can see, have tried looking through logs with DEBUG enabled and enabled the java default thread exception handler. The pipeline functions are being called

I'm hoping there's something obvious I'm overlooking here. My current code is kind of a mess, if an example is needed I'll try to put together a simple repro.

One hint: I'm using ds/combine with the definition below. When things work, :extract and :combine get called, not so in the non-working case.

(defn map-stats
  [ks]
  (ds/combine-fn
   {:reduce (fn [acc v]
              (merge-with reduce-stats acc (select-keys v ks)))
    :extract (fn [acc]
               (println "EXTRACT")
               (reduce (fn [x [k v]] (assoc x k (extract-stats v))) {} acc))
    :combine (fn [& args] (println "COMBINE") (combine-stats-by-keys ks args))
    :init (constantly (->> ks
                           (map (fn [k] [k [0 0 Long/MAX_VALUE Long/MIN_VALUE 0]]))
                           (into {})))}))

1.9.0 deprecated

Hi, I know you're active on 2.0 of dataslpash, but I see that 1.9.0 SDK has been deprecated due to a bug. They have released 1.9.1, according to: https://cloud.google.com/dataflow/release-notes/release-notes-java-1?hl=en_US&_ga=2.175618658.-1572301745.1495804842

I get this notification in the job sidebar currently on 1.9.0:

Google Cloud Dataflow SDK for Java 1.9.0
 This version of the SDK is deprecated.
 There is a data loss bug in this version of the SDK.

Any chance of a quick bump in the deps? I'm happy to submit a PR if someone is willing to merge it?

Kind Regards,
Dom

Example doesn't work

Hey, I am new for both Apache Beam and DataFlow, and I've tried run datasplash.examples, but it raise an error message.

My command to run:

lein run -- word-count --input=gs://dataflow-samples/shakespeare/kinglear.txt --output=./results.txt

Error message:

Compiling 6 source files to /Users/xingxing/PXN/datasplash/target/classes
Exception in thread "main" Syntax error compiling at (/private/var/folders/dr/gn78q20n3kl8xmjz8_9kbz0r0000gn/T/form-init2152568073736720306.clj:1:125).
        at clojure.lang.Compiler.load(Compiler.java:7647)
        at clojure.lang.Compiler.loadFile(Compiler.java:7573)
        at clojure.main$load_script.invokeStatic(main.clj:452)
        at clojure.main$init_opt.invokeStatic(main.clj:454)
        at clojure.main$init_opt.invoke(main.clj:454)
        at clojure.main$initialize.invokeStatic(main.clj:485)
        at clojure.main$null_opt.invokeStatic(main.clj:519)
        at clojure.main$null_opt.invoke(main.clj:516)
        at clojure.main$main.invokeStatic(main.clj:598)
        at clojure.main$main.doInvoke(main.clj:561)
        at clojure.lang.RestFn.applyTo(RestFn.java:137)
        at clojure.lang.Var.applyTo(Var.java:705)
        at clojure.main.main(main.java:37)
Caused by: java.lang.IllegalArgumentException: .via() is required
        at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
        at org.apache.beam.sdk.io.FileIO$Write.expand(FileIO.java:1212)
        at org.apache.beam.sdk.io.FileIO$Write.expand(FileIO.java:778)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)
        at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:102)
        at datasplash.core$tapply.invokeStatic(core.clj:432)
        at datasplash.core$tapply.invoke(core.clj:428)
        at datasplash.core$apply_transform.invokeStatic(core.clj:452)
        at datasplash.core$apply_transform.invoke(core.clj:445)
        at datasplash.core$write_text_file.invokeStatic(core.clj:1168)
        at datasplash.core$write_text_file.invoke(core.clj:1149)
        at datasplash.examples$run_word_count.invokeStatic(examples.clj:53)
        at datasplash.examples$run_word_count.invoke(examples.clj:45)
        at datasplash.examples$_main.invokeStatic(examples.clj:340)
        at datasplash.examples$_main.doInvoke(examples.clj:336)
        at clojure.lang.RestFn.invoke(RestFn.java:439)
        at clojure.lang.Var.invoke(Var.java:393)
        at user$eval140.invokeStatic(form-init2152568073736720306.clj:1)
        at user$eval140.invoke(form-init2152568073736720306.clj:1)
        at clojure.lang.Compiler.eval(Compiler.java:7176)
        at clojure.lang.Compiler.eval(Compiler.java:7166)
        at clojure.lang.Compiler.load(Compiler.java:7635)
        ... 12 more

Library size and AOT compilation

Hello,

As of the latest Datasplash version (0.7.16), the jar size is 3.3 Mb.

This size is mainly due to AOT compilation of few classes pulled by the project :main entry. This main entry is just to run code exemples and we already describe in the README how to run them from the repl.

So, we could remove the project :main entry definition and avoid any AOT compilation, which will have the effect of reducing the jar size to only a few Kb (~67 Kb).

If I understand correctly, Datasplash needs to be AOT compiled only in our users' projects, but the lib itself does not need it, right?

Templates / ValueProvider NullPointerException

Hi I've been able to become productive with this library and love its Clojuery API. I was hoping to create a template to work with Cloud Dataflow, however Beam requires using ValueProvider getter/setters for the PipelineOptions interface.

I get a NullPointerException when using ValueProvider as the type in defoptions. Has anyone attempted this or perhaps there is an example to reference? Things work fine if the type is String.

Thanks!

(defoptions DataFetchOptions
  {
   :dataset       {:type        org.apache.beam.sdk.options.ValueProvider
                   :description "dataset"}})

(defn pipeline
  [args]
  (let [p    (ds/make-pipeline DataFetchOptions args)
        opts (ds/get-pipeline-options p)]
,,,,))

Error

java.lang.NullPointerException
PipelineOptionsFactory.java: 1637  org.apache.beam.sdk.options.PipelineOptionsFactory/isCollectionOrArrayOfAllowedTypes
PipelineOptionsFactory.java: 1573  org.apache.beam.sdk.options.PipelineOptionsFactory/parseObjects
PipelineOptionsFactory.java:  111  org.apache.beam.sdk.options.PipelineOptionsFactory/access$400
PipelineOptionsFactory.java:  294  org.apache.beam.sdk.options.PipelineOptionsFactory$Builder/as
                  core.clj:  974  datasplash.core/make-pipeline*
                  core.clj:  955  datasplash.core/make-pipeline*
                  core.clj:  984  datasplash.core/make-pipeline*
                  core.clj:  955  datasplash.core/make-pipeline*

Boolean issue

Hi ! I am currently having some issues with some of my pipelines regarding the boolean fields.
For example, i have :

(let [p (ds/make-pipeline {})]
    (ds/->> :log-values p
            (ds/generate-input [{:deleted false} {:deleted true}])
            (ds/map #(log/error %) {:name "do-log"}))
    (ds/run-pipeline p))
;; => {:deleted true}
;; => {:deleted true}

Does anyone have the same problem and/or has anyone found some way to bypass the issue ?

Thanks ;)

Why does (ds/map inc) work in the REPL, but (ds/map (fn [x] (* x x))) does not?

(->> (ds/make-pipeline nil)
     (ds/generate-input [1 2 3])
     (ds/map inc)
     (ds/write-edn-file "output.txt")
     (ds/run-pipeline))
=>
[direct-runner-worker] INFO org.apache.beam.sdk.io.WriteFiles - Opening writer for write operation TextWriteOperation{tempDirectory=/Users/r631004/projects/bimu/datasplash-examples/.temp-beam-2017-10-300_17-05-00-16/, windowedWrites=false}
[direct-runner-worker] INFO org.apache.beam.sdk.io.WriteFiles - Opening writer for write operation TextWriteOperation{tempDirectory=/Users/r631004/projects/bimu/datasplash-examples/.temp-beam-2017-10-300_17-05-00-16/, windowedWrites=false}
[direct-runner-worker] INFO org.apache.beam.sdk.io.WriteFiles - Opening writer for write operation TextWriteOperation{tempDirectory=/Users/r631004/projects/bimu/datasplash-examples/.temp-beam-2017-10-300_17-05-00-16/, windowedWrites=false}
[direct-runner-worker] INFO org.apache.beam.sdk.io.WriteFiles - Finalizing write operation TextWriteOperation{tempDirectory=/Users/r631004/projects/bimu/datasplash-examples/.temp-beam-2017-10-300_17-05-00-16/, windowedWrites=false}.
#object[org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult 0x73b8774a "org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult@73b8774a"]
(->> (ds/make-pipeline nil)
     (ds/generate-input [1 2 3])
     (ds/map (fn [x] (* x x)))
     (ds/write-edn-file "output.txt")
     (ds/run-pipeline))
=>
ClassNotFoundException hsq.datasplash_examples$eval1950$fn__1951  java.net.URLClassLoader.findClass (URLClassLoader.java:381)

Project required including beam-runners-core-java

To get my project working, I had to explicitly include [org.apache.beam/beam-runners-core-java "2.8.0"] after upgrading from datasplash 0.5.3 to 0.6.3.

My project looks like

(ns my-pipeline.core
  (:gen-class)
  (:require [cheshire.core :as json]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]
            [datasplash.api :as datasplash]
            [datasplash.bq :as bq]
            [datasplash.core :as core]
            [datasplash.pubsub :as pubsub]))

# Actual work pulled out to keep it compact
(defn clean-data [m] m)

(defn log-data [m]
  (let [out (java.io.StringWriter.)]
    (pp/pprint m out)
    (log/info (.toString out)))
  m)

(datasplash/defoptions Options
  {:pubsubTopic {:type String
                 :description "Input PubSub topic"}
   :bigqueryTable {:type String
                   :description "Output BigQuery table as PROJECT:DATASET.TABLE"}
   :tempLocation {:type String}})

(defn -main
  [& args]
  (let [pipeline (datasplash/make-pipeline Options args {:streaming true})
        {:keys [pubsubTopic bigqueryTable]} (datasplash/get-pipeline-options pipeline)]
    (->> pipeline
         (pubsub/read-from-pubsub pubsubTopic {:kind :topic})
         (datasplash/map #(json/decode % true))
         (datasplash/map clean-data)
         (datasplash/map log-data)
         (bq/write-bq-table bigqueryTable {:create-disposition :never
                                           :write-disposition :append
                                           :retry-policy :retry-transient})
         (datasplash/run-pipeline)))

Strive for "equivalent" semantics of `datasplash.api` fns and `clojure.core` fns

I think it would be nice if the datasplash.api functions have kind of the same semantics as the clojure.core ones as it otherwise makes debugging things at the repl extremely painful.
As an example.

  (def data1 {:a (int 1)})
  (def data2 {:a (long 1)})

  (= data1 data2)
  ;; => true

  (clojure.set/intersection #{data1} #{data2})
  ;; => #{{:a 1}}

  (let [p (ds/make-pipeline {})
        input1 (ds/generate-input [data1] p)
        input2 (ds/generate-input [data2] p)
        _ (ds/->> :intersect-pipeline
                  (ds/intersect-distinct {:name :intersect} input1 input2)
                  (ds/write-json-file "test-output" {}))]
    (-> (ds/run-pipeline p)
        (ds/wait-pipeline-result)))

The last pipeline produces no results (it will when changing data2 to {:a (int 1)}. The problem is that if there is need to compare, intersect or group-by a lot of data, it is first needed to make all the rows comparable (with something like clojure.walk ) which can be very expensive.

make-pipeline does not handle collections for pipeline options

if you specify :jdk-add-open-modules ["java.base/java.io=ALL-UNNAMED" "java.base/sun.nio.ch=ALL-UNNAMED"] the generated pipeline cli option will be "--jdkAddOpenModules=[java.base/java.io=ALL-UNNAMED java.base/sun.io.ch=ALL-UNNAMED"] instead of "--jdkAddOpenModules=java.base/java.io=ALL-UNNAMED" and "--jdkAddOpenModules=java.base/sun.nio.ch=ALL-UNNAMED" per
this.

Processes are locking

We have observed that some of our processes are getting stuck when dynamically requiring namespaces in a distributed mode although we are using the safe-exec macro provided by datasplash.

unable to deserialize datasplash.core.proxy$com.google.cloud.dataflow.sdk.transforms.DoFn

Hi there. I'm getting an exception trying to run the following:

If I remove the block below ;; log the message, then it submits successfully and runs. I can see it processing pubsub messages. I was trying to write these to a BigQuery table, but getting errors. I then thought I'd try to see what the structure of the messages was, and used this bit of code from your example to try to log the messages. Am I doing something wrong or is there an issue here?

(defn apply-transformations-to-pipeline
 [read-subscription outputTable pipeline]

(->> pipeline

      ;; Read the messages from PubSub
      (ps/read-from-pubsub read-subscription {:name "read-interactions-from-pubsub"})

      ;; log the message

      (ds/map (fn [message]
                (do
                 (log/info (str "Got message:\n" message))))
              {:name "log-message"})))

Stacktrace below:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize datasplash.core.proxy$com.google.cloud.dataflow.sdk.transforms.DoFn$ff19274a@730bea0, compiling:(/private/var/folders/mc/6vrbpxbs60q4hwdrp8wm89380000gn/T/form-init1605582858210262277.clj:1:125)
	at clojure.lang.Compiler.load(Compiler.java:7391)
	at clojure.lang.Compiler.loadFile(Compiler.java:7317)
	at clojure.main$load_script.invokeStatic(main.clj:275)
	at clojure.main$init_opt.invokeStatic(main.clj:277)
	at clojure.main$init_opt.invoke(main.clj:277)
	at clojure.main$initialize.invokeStatic(main.clj:308)
	at clojure.main$null_opt.invokeStatic(main.clj:342)
	at clojure.main$null_opt.invoke(main.clj:339)
	at clojure.main$main.invokeStatic(main.clj:421)
	at clojure.main$main.doInvoke(main.clj:384)
	at clojure.lang.RestFn.invoke(RestFn.java:421)
	at clojure.lang.Var.invoke(Var.java:383)
	at clojure.lang.AFn.applyToHelper(AFn.java:156)
	at clojure.lang.Var.applyTo(Var.java:700)
	at clojure.main.main(main.java:37)
Caused by: java.lang.IllegalArgumentException: unable to deserialize datasplash.core.proxy$com.google.cloud.dataflow.sdk.transforms.DoFn$ff19274a@730bea0
	at com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:76)
	at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91)
	at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:720)
	at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:678)
	at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:596)
	at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:563)
	at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:558)
	at datasplash.core$map_op$make_map_op__5352.invoke(core.clj:500)
	at fulliautomatix.core$apply_transformations_to_pipeline.invokeStatic(core.clj:33)
	at fulliautomatix.core$apply_transformations_to_pipeline.invoke(core.clj:24)
	at fulliautomatix.core$_main.invokeStatic(core.clj:89)
	at fulliautomatix.core$_main.doInvoke(core.clj:79)
	at clojure.lang.RestFn.invoke(RestFn.java:397)
	at clojure.lang.Var.invoke(Var.java:375)
	at user$eval5.invokeStatic(form-init1605582858210262277.clj:1)
	at user$eval5.invoke(form-init1605582858210262277.clj:1)
	at clojure.lang.Compiler.eval(Compiler.java:6927)
	at clojure.lang.Compiler.eval(Compiler.java:6917)
	at clojure.lang.Compiler.load(Compiler.java:7379)
	... 14 more
Caused by: java.lang.ClassNotFoundException: fulliautomatix.core$apply_transformations_to_pipeline$fn__45
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:73)
	... 32 more

ParDo question/clarification/example

Hello, I'm rather new to Dataflow and have tried this library out for some data processing. It's awesome and thank you! I have a question, can you clarify how/have some examples of how to use the raw pardo-fn within a Clojure program? How is this different from say just using map?

Examples (including README) are outdated, will not run on GCP Dataflow

I'm just trying out a few of the examples seem outdated and/or buggy.

In the current PubSub example, I see a few issues

  • DataflowPipelineRunner should be DataflowRunner or dataflow
  • PubSubOptions, when passed to ds/make-pipeline (PubSubOptions is unused in example currently), throws:
    Property [project] is marked with contradictory annotations. Found [[Default.String(value=project-name-example) on PubSubOptions#getProject()], [Default.InstanceFactory(value=class org.apache.beam.sdk.extensions.gcp.options.GcpOptions$DefaultProjectFactory) on org.apache.beam.runners.dataflow.options.DataflowPipelineOptions#getProject()], [Default.InstanceFactory(value=class org.apache.beam.sdk.extensions.gcp.options.GcpOptions$DefaultProjectFactory) on org.apache.beam.sdk.extensions.gcp.options.GcpOptions#getProject()]].
  • project value is null when calling from command-line and passing the argument correctly: Syntax error (IllegalArgumentException) compiling at (/private/var/folders/y6/vb53_tq92019mpdmwkjdff600000gn/T/form-init9184696066443347895.clj:1:125). Illegal project name specified in Pubsub subscription: null

Path to README example's input file has been updated:
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L162

0.6.5 won't run on Java 8 anymore

Hi there,

I'm getting the following when trying to use 0.6.5 on Java 8:

Execution error (UnsupportedClassVersionError) at java.lang.ClassLoader/defineClass1 (ClassLoader.java:-2).
datasplash/fns/ClojureDoFn has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

Was the new version compiled with Java 11?

Release new version

Are you able to release the current version on the github master? We are working with a project that needs to use side-outputs. Currently we can manage by pulling the repo as a git dependency but it would be nice to have it on clojars. We would also be happy to contribute in the future.

Best, Martin / Unacast

clojure.lang.ExceptionInfo: Unfreezable type: class com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables$

Hi there,

I have some strange behaviour, and I'm wondering if anyone has run into this or can see something obvious...

I have a pipeline that runs 100%, pulling the same dataset using InProcessPipelineRunner, but when I push to DataflowPipelineRunner I get the exception below. It seems nippy is not able to serialize the data when running in the cloud, but does so perfectly locally. This is in a using a combin call with the following:

answer-data     (->> pipeline
                             (fetch-kind :Sales :answer)
                             (ds/map transform-answers {:name "transform-and-add-key"})
                             (ds/map-kv (fn [{:keys [interactionId answers]}]
                                          [interactionId answers]))
                             (ds/combine
                              (ds/sfn (fn [answers] answers))
                              {:scope :per-key})
                             (ds/map (fn [[interactionId answers]]
                                       {:interactionId interactionId
                                        :answers (vec answers)}))

I've successfully run the code and checkout the output up to the point past the map-kv, and the correct output is given for all the data.

2017-08-06T19:23:04.010Z: Error:   (e60b7ed2b9fb314e): java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element '[[{:questionId "age", :answer "28"}, {:questionId "gender", :answer "Female"}, {:questionId "income", :answer "4000"}, {:questionId "smoker", :answer "No"}]]' with coder 'CustomCoder$ff19274a'.
	at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
	at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
	at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
	at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
	at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:203)
	at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.processElement(GroupAlsoByWindowsAndCombineDoFn.java:190)
	at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
	at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
	at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
	at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
	at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
	at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
	at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
	at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
	at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:285)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:221)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:171)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Unable to encode element '[[{:questionId "age", :answer "28"}, {:questionId "gender", :answer "Female"}, {:questionId "income", :answer "4000"}, {:questionId "smoker", :answer "No"}]]' with coder 'CustomCoder$ff19274a'.
	at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:170)
	at datasplash.core.proxy$com.google.cloud.dataflow.sdk.coders.CustomCoder$ff19274a.getEncodedElementByteSize(Unknown Source)
	at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:185)
	at datasplash.core.proxy$com.google.cloud.dataflow.sdk.coders.CustomCoder$ff19274a.registerByteSizeObserver(Unknown Source)
	at com.google.cloud.dataflow.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:211)
	at com.google.cloud.dataflow.sdk.coders.IterableLikeCoder.registerByteSizeObserver(IterableLikeCoder.java:60)
	at com.google.cloud.dataflow.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:158)
	at com.google.cloud.dataflow.sdk.coders.KvCoder.registerByteSizeObserver(KvCoder.java:42)
	at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:641)
	at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:552)
	at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:351)
	at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
	at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
	at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
	at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
	... 25 more
Caused by: clojure.lang.ExceptionInfo: Unfreezable type: class com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables$2 {:type com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables$2, :as-str "#object[com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables$2 0x69e1dde3 \"[[{:questionId \\\"age\\\", :answer \\\"28\\\"}, {:questionId \\\"gender\\\", :answer \\\"Female\\\"}, {:questionId \\\"income\\\", :answer \\\"4000\\\"}, {:questionId \\\"smoker\\\", :answer \\\"No\\\"}]]\"]"}
	at clojure.core$ex_info.invokeStatic(core.clj:4617)
	at clojure.core$ex_info.invoke(core.clj:4617)
	at taoensso.nippy$throw_unfreezable.invokeStatic(nippy.clj:720)
	at taoensso.nippy$throw_unfreezable.invoke(nippy.clj:718)
	at taoensso.nippy$fn__4162.invokeStatic(nippy.clj:924)
	at taoensso.nippy$fn__4162.invoke(nippy.clj:905)
	at taoensso.nippy$fn__3965$G__3960__3972.invoke(nippy.clj:314)
	at taoensso.nippy$fn__4000.invokeStatic(nippy.clj:331)
	at taoensso.nippy$fn__4000.invoke(nippy.clj:316)
	at taoensso.nippy$fn__3983$G__3978__3990.invoke(nippy.clj:315)
	at taoensso.nippy$freeze_to_out_BANG_.invokeStatic(nippy.clj:728)
	at taoensso.nippy$freeze_to_out_BANG_.invoke(nippy.clj:725)
	at datasplash.core$make_nippy_coder$fn__5385.invoke(core.clj:365)
	at datasplash.core.proxy$com.google.cloud.dataflow.sdk.coders.CustomCoder$ff19274a.encode(Unknown Source)
	at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:167)
	... 39 more```

Error: `java.lang.IllegalArgumentException: unable to deserialize datasplash.fns.ClojureDoFn`

I want to window some data that does not have a timestamp. I'm trying to follow this example code to add a timestamp to that data. I believe I need to use pardo to accomplish this. However, I am getting the following error:

Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize datasplash.fns.ClojureDoFn@34bddf43
[..snip..]
Caused by: java.lang.ClassNotFoundException: dataflow_test.core$add_timestamp

The code is:

(defn add-timestamp
  [ctx]
  (pp/pprint (type ctx)))

(defn -main
  [& args]
  (let [pipeline (ds/make-pipeline DataflowPipelineOptions [] pipeline-options)]
    (->> pipeline
         (ds/read-text-file (env :input-filename) {:name "read-file"})
         (ds/pardo add-timestamp {:name "add-timestamp"})
         (ds/write-json-file (env :output-filename) {:name       "write-file"
                                                     :num-shards 1
                                                     :suffix     ".json"}))))

What is the proper way to use pardo? Or, is there a better way to add a new field to data?

README Example Does Not Create Output File

When running the example from the README with lein run I expected a file named "kinglear-freqs.txt" to be created in the current directory, but no file was created.

I tried running the example with lein run -- --output=out.txt and no file was created in the current directory or any subdirectories.

I also tried running it with a custom input file. Regardless of the value of the --input argument the pipeline ran without error, but no output file was created.

how to do cider-repl development?

Hi does datasplash support cider/emacs/nrepl style interactive development?

I tried to dynamically eval my datasplash pipelines, but defn's got stale byte code over time. If you have any ideas how to accomplish this would be great to know, avoid the compile cycles.

Seems like most of the big data pipelines don't have a great repl experience, closest I saw is powderkeg https://github.com/HCADatalab/powderkeg

API doc not working

The link to the API docs seems to be broken.

I'm keen to take a look as I'm considering using datasplash for a project

Read from PubSub topic #10 seems to be a breaking change.

I get the following compilation error now when switching to the new version:

Exception in thread "main" clojure.lang.ExceptionInfo: Wrong type of :kind for pubsub [null], should be either :subscription or :topic {:kind nil},

Oddly, using Uncast version 0.4.2 (where I think this code comes from, it works just fine. I'm invoking the call like this:

 (->> pipeline

      (ps/read-from-pubsub read-subscription {:name "Read events from pubsub"}))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.