Git Product home page Git Product logo

streams's People

Contributors

anirothan avatar biboudis avatar cloudroutine avatar dsyme avatar eiriktsarpalis avatar forki avatar gusty avatar kurtschelfthout avatar latkin avatar moodmosaic avatar palladin avatar varon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streams's Issues

Equivalent to scan?

I could not find scan, am I missing some other method which does the same but has different name?
I would like to implement let's say, running sum, so for input stream { 1; 2; 3; 4; 5 } it emits { 1; 3; 6; 10; 15 }. So it needs to behave both like map (emitting data as it arrives) and fold (passing state)

ParStream.filter followed by ParStream.mapi gives incorrect results when iterated

I believe ParStream.iter needs to check for if stream.PreserveOrdering then...

Repro:

ParStream.ofArray [| 101 .. 201 |] 
  |> ParStream.filter (fun x -> x % 2 = 0) 
  |> ParStream.mapi (fun i x -> (i,x)) 
  |> ParStream.iter (printfn "%A")

gives

(25, 126)
(1, 102)
(51, 152)
(3, 104)
(27, 128)
(53, 154)
(5, 106)
(29, 130)
(31, 132)
(7, 108)
(55, 156)
(33, 134)
(9, 110)
(57, 158)
(35, 136)
(11, 112)
(59, 160)
(37, 138)
(13, 114)
(15, 116)
(39, 140)
(61, 162)
(17, 118)
(41, 142)
(63, 164)
(19, 120)
(43, 144)
(65, 166)
(21, 122)
(45, 146)
(23, 124)
(47, 148)
(67, 168)
(75, 176)
(49, 150)
(69, 170)
(77, 178)
(71, 172)
(79, 180)
(73, 174)
(81, 182)
(83, 184)
(85, 186)
(87, 188)
(89, 190)
(91, 192)
(93, 194)
(95, 196)
(97, 198)
(99, 200)

Double disposal

While looking over the code I noticed this. There seems to be a double disposal here and here.

As far as I understand it, if the TryAdvance() returns false, then it is the responsibility of the owner of the iterator to then call Dispose() - the disposal shouldn't be called inside TryAdvance()

This is actually fixed in this work at this point by just deleting the disposal logic in the tryAdvance(), though it could be fixed directly instead.

Please make SourceType "RequireQualifiedAccess"

This discriminated union pollutes the F# namespace taking the names "ResizeArray", "Seq" and "Array"

type SourceType =
    | Array
    | ResizeArray
    | Seq

For example, after opening Nessos.Streams the code

let results = ResizeArray()

no longer compiles. Please add [<RequireQualifiedAccess>] to this union type?

Please make this Require

Support IObservable<'T>

It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.

module Stream =
  let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
     { new System.IObservable<'T> with 
        member __.Subscribe(observer:System.IObserver<'T>) =
           let (Stream streamf) = stream
           let (bulk, _) = streamf (fun value -> observer.OnNext value; true) 
           bulk ()
           observer.OnComplete()
           { new System.IDisposable with member __.Dispose() = ()}
     }

let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)

Additional operators

I wonder if this fits the semantics of Streams, but additional operators could be useful like:

find/pick/tryFind/tryPick/head/headOrDefault/etc

Otherwise we have to resort to transferring the entire stream to an array and then doing such operation...

Adding SIMD support

I'm been going through the Streams code trying to figure out how to add SIMD support. For example this SIMD enhanced fold performs very well compared to an inlined version of the core library fold.

   static member inline SIMDFold folder combiner (start:'T) (values : 'T[]) =        
        let mutable i = 0;
        let mutable v = Vector<'T>(start)
        while i < values.Length - Vector<'T>.Count do            
            v <- folder v (Vector<'T>(values,i))
            i <- i + Vector<'T>.Count
        i <- 0
        let mutable result = start        
        while i < Vector<'T>.Count do
            result <- combiner result v.[i]
            i <- i+1
        result

Adding support to streams has a few considerations:

  • I think it would only make sense to allow it with Arrays and maybe ResizeArrays
  • The elements in the array have to be valid SIMD types
  • I'd love for it to be available in parallel and non parallel streams
  • Can it be added to Streams and ParStreams in some way? Or should there be a separate SIMDStream and ParSIMDStream?
  • Can we handle a mix of scalar and vector operations on the stream? (example below)

Ideally, say we had an Array of floats - values
We would want to be able to do something like

values
|> SIMDStream.simdMap (fun e -> e*e)  //operations on Vector<float>s
|> SIMDStream.map (fun e -> if (e < 5) then 0 else 3)  //scalar operations on floats
|> SIMDStream.simdSum

This would people could mix and match SIMD operations with scalar ones as sometimes has to be done.

I'm going through the streams code trying to see how this could be done, and it isn't entirely clear. Some parts of the composed function would need to operator on a Vector while others would need to iterate Vector.Count times to operate on individual elements of the array?

If there is interest in this I'd love to contribute but need some guidance as I don't fully understand how the streams work yet.

ParStream.foldBy do not preserve order when requested to

When following pipeline is processed

let map =
[(1, 1);(2, 2);(2, 3);(1, 4)]
|> ParStream.ofSeq
|> ParStream.ordered
|> ParStream.foldBy fst (fun l x -> x :: l) (fun x _ -> x) (fun () -> [])
|> ParStream.toSeq
|> Map.ofSeq
|> Map.map (fun _ -> Seq.map snd)

The results are unstable, sometimes I get:

> map.[1];;
val it : seq<int> = seq [4; 1]
> map.[2];;
val it : seq<int> = seq [2; 3]

On some other evaluation I get:

> map.[1];;
val it : seq<int> = seq [4; 1]
> map.[2];;
val it : seq<int> = seq [3; 2]

With given pipeline I would expect to always get the second version.

Consider moving Streams to use simpler, single-threaded Cancellation token

As far as I can tell stream iteration is inherently single threaded in the sense that, like Seq, the Iterable and Iterator objects are accessed linearly.

In this setting, the use of CancellationTokenSource has costs because accesses are multi-threaded ready and are ultimately synchronized via a Volatile field. So I experimented with changing to use simpler, single-threaded Stream cancellation tokens. The branch and diff is here: dsyme/Streams@hide-reprs...dsyme:no-cts-for-streams (this builds on #31 and #30). In some cases the code seems to be simpler in any case (since we effectively switch between Boolean flags and cancellation tokens in any case) unless there's something I've missed.

Note that no disposal is needed on these since they don't have wait handles

Raw iteration performance seems to be improved by around 5-10%:based on the runStream() sample in Streams.fs

If you think this is a good idea I can submit a PR.

AFTER:

Real: 00:00:02.286, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.299, CPU: 00:00:02.296, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.325, CPU: 00:00:02.296, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.257, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.225, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.252, CPU: 00:00:02.250, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.350, CPU: 00:00:02.328, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.207, CPU: 00:00:02.218, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.218, CPU: 00:00:02.218, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.208, CPU: 00:00:02.187, GC gen0: 1, gen1: 0, gen2: 0

before:

  //   Real: 00:00:02.384, CPU: 00:00:02.375, GC gen0: 0, gen1: 0, gen2: 0
  //   Real: 00:00:02.490, CPU: 00:00:02.484, GC gen0: 0, gen1: 0, gen2: 0
  //   Real: 00:00:02.362, CPU: 00:00:02.343, GC gen0: 0, gen1: 0, gen2: 0

Generators

Hi, I'm trying to use the library with a stream coming from a sensor, like an accelerometer or something else, and do some computation. How I can use the library on it?

Streams too eager?

Is it by design that the following throws an IndexOutOfRangeException?

    let refs = [| 1; 2; 3; 4 |]
    let pages = [| [| 0; 3 |]; [| 0; 5 |]; [| 0; 1 |]|]
                |> Stream.ofArray
                |> Stream.flatMap Stream.ofArray
                |> Stream.map (fun ref -> refs.[ref])
                |> Stream.take 3
                |> Stream.toArray
    test <@ [| 1; 4; 1 |] = pages @>

build.cmd fails if only F# 4.0/VS 2015 installed

It appears this repo has a hard dependency on FSharp.Core 4.3.1.0 being GACed.

4.3.1.0 is binplaced by VS 2015/F# 4.0, but not added to the GAC. Only the latest (4.4.0.0) is GACed.

If the latest FSharp.Core is not the desired dependency that's fine, but it might be good to document that and/or print a helpful error message during build/test if the required version can't be loaded.

As it stands tests just blow up with

System.IO.FileLoadException: Could not load file or assembly 'FSharp.Core, Version=4.3.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
File name: 'FSharp.Core, Version=4.3.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' ---> System.IO.FileLoadException: Could not load file or assembly 'FSharp.Core, Version=4.3.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
File name: 'FSharp.Core, Version=4.3.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a'

Dev branch?

There seems to be quite a bit of work done on the dev branch which hasn't been brought over to master. Any particular reason why this is happening? /cc @palladin @anirothan

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.