nessos / streams Goto Github PK
View Code? Open in Web Editor NEWA lightweight F#/C# library for efficient functional-style pipelines on streams of data.
Home Page: http://nessos.github.io/Streams/
License: Other
A lightweight F#/C# library for efficient functional-style pipelines on streams of data.
Home Page: http://nessos.github.io/Streams/
License: Other
I could not find scan, am I missing some other method which does the same but has different name?
I would like to implement let's say, running sum, so for input stream { 1; 2; 3; 4; 5 } it emits { 1; 3; 6; 10; 15 }. So it needs to behave both like map (emitting data as it arrives) and fold (passing state)
I believe ParStream.iter needs to check for if stream.PreserveOrdering then...
Repro:
ParStream.ofArray [| 101 .. 201 |]
|> ParStream.filter (fun x -> x % 2 = 0)
|> ParStream.mapi (fun i x -> (i,x))
|> ParStream.iter (printfn "%A")
gives
(25, 126)
(1, 102)
(51, 152)
(3, 104)
(27, 128)
(53, 154)
(5, 106)
(29, 130)
(31, 132)
(7, 108)
(55, 156)
(33, 134)
(9, 110)
(57, 158)
(35, 136)
(11, 112)
(59, 160)
(37, 138)
(13, 114)
(15, 116)
(39, 140)
(61, 162)
(17, 118)
(41, 142)
(63, 164)
(19, 120)
(43, 144)
(65, 166)
(21, 122)
(45, 146)
(23, 124)
(47, 148)
(67, 168)
(75, 176)
(49, 150)
(69, 170)
(77, 178)
(71, 172)
(79, 180)
(73, 174)
(81, 182)
(83, 184)
(85, 186)
(87, 188)
(89, 190)
(91, 192)
(93, 194)
(95, 196)
(97, 198)
(99, 200)
I could use this one right now
While looking over the code I noticed this. There seems to be a double disposal here and here.
As far as I understand it, if the TryAdvance() returns false, then it is the responsibility of the owner of the iterator to then call Dispose() - the disposal shouldn't be called inside TryAdvance()
This is actually fixed in this work at this point by just deleting the disposal logic in the tryAdvance(), though it could be fixed directly instead.
https://nessos.github.io/Streams/reference/index.html is a 404, if you follow the link from the front page. It would also be a good idea to link to the docs from this repo (under the title?), perhaps?
This discriminated union pollutes the F# namespace taking the names "ResizeArray", "Seq" and "Array"
type SourceType =
| Array
| ResizeArray
| Seq
For example, after opening Nessos.Streams the code
let results = ResizeArray()
no longer compiles. Please add [<RequireQualifiedAccess>]
to this union type?
Please make this Require
To have the full parity with Seq module it would be good to have Stream.averageBy
https://fsharp.github.io/fsharp-core-docs/reference/fsharp-collections-seqmodule.html#averageBy
It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.
module Stream =
let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
{ new System.IObservable<'T> with
member __.Subscribe(observer:System.IObserver<'T>) =
let (Stream streamf) = stream
let (bulk, _) = streamf (fun value -> observer.OnNext value; true)
bulk ()
observer.OnComplete()
{ new System.IDisposable with member __.Dispose() = ()}
}
let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)
I wonder if this fits the semantics of Streams, but additional operators could be useful like:
find/pick/tryFind/tryPick/head/headOrDefault/etc
Otherwise we have to resort to transferring the entire stream to an array and then doing such operation...
and therefore should be removed
As per this short twitter conversation, it would be nice to have the ability to control the number of threads used by ParStream
.
As was suggested on twitter, something like this would seem to be the way to go:
ParStream.withDegreeOfParallelism : ParStream<'T> -> int -> ParStream<'T>
There is a hidden dependency on FSharp.Core that is not automatically resolved when consuming this library from C#. At least Streams.CSharp should explicitly state this dependency.
I'm been going through the Streams code trying to figure out how to add SIMD support. For example this SIMD enhanced fold performs very well compared to an inlined version of the core library fold.
static member inline SIMDFold folder combiner (start:'T) (values : 'T[]) =
let mutable i = 0;
let mutable v = Vector<'T>(start)
while i < values.Length - Vector<'T>.Count do
v <- folder v (Vector<'T>(values,i))
i <- i + Vector<'T>.Count
i <- 0
let mutable result = start
while i < Vector<'T>.Count do
result <- combiner result v.[i]
i <- i+1
result
Adding support to streams has a few considerations:
Ideally, say we had an Array of floats - values
We would want to be able to do something like
values
|> SIMDStream.simdMap (fun e -> e*e) //operations on Vector<float>s
|> SIMDStream.map (fun e -> if (e < 5) then 0 else 3) //scalar operations on floats
|> SIMDStream.simdSum
This would people could mix and match SIMD operations with scalar ones as sometimes has to be done.
I'm going through the streams code trying to see how this could be done, and it isn't entirely clear. Some parts of the composed function would need to operator on a Vector while others would need to iterate Vector.Count times to operate on individual elements of the array?
If there is interest in this I'd love to contribute but need some guidance as I don't fully understand how the streams work yet.
When following pipeline is processed
let map =
[(1, 1);(2, 2);(2, 3);(1, 4)]
|> ParStream.ofSeq
|> ParStream.ordered
|> ParStream.foldBy fst (fun l x -> x :: l) (fun x _ -> x) (fun () -> [])
|> ParStream.toSeq
|> Map.ofSeq
|> Map.map (fun _ -> Seq.map snd)
The results are unstable, sometimes I get:
> map.[1];;
val it : seq<int> = seq [4; 1]
> map.[2];;
val it : seq<int> = seq [2; 3]
On some other evaluation I get:
> map.[1];;
val it : seq<int> = seq [4; 1]
> map.[2];;
val it : seq<int> = seq [3; 2]
With given pipeline I would expect to always get the second version.
As far as I can tell stream iteration is inherently single threaded in the sense that, like Seq, the Iterable and Iterator objects are accessed linearly.
In this setting, the use of CancellationTokenSource has costs because accesses are multi-threaded ready and are ultimately synchronized via a Volatile field. So I experimented with changing to use simpler, single-threaded Stream cancellation tokens. The branch and diff is here: dsyme/Streams@hide-reprs...dsyme:no-cts-for-streams (this builds on #31 and #30). In some cases the code seems to be simpler in any case (since we effectively switch between Boolean flags and cancellation tokens in any case) unless there's something I've missed.
Note that no disposal is needed on these since they don't have wait handles
Raw iteration performance seems to be improved by around 5-10%:based on the runStream() sample in Streams.fs
If you think this is a good idea I can submit a PR.
AFTER:
Real: 00:00:02.286, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.299, CPU: 00:00:02.296, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.325, CPU: 00:00:02.296, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.257, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.225, CPU: 00:00:02.234, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.252, CPU: 00:00:02.250, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.350, CPU: 00:00:02.328, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.207, CPU: 00:00:02.218, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.218, CPU: 00:00:02.218, GC gen0: 0, gen1: 0, gen2: 0
Real: 00:00:02.208, CPU: 00:00:02.187, GC gen0: 1, gen1: 0, gen2: 0
before:
// Real: 00:00:02.384, CPU: 00:00:02.375, GC gen0: 0, gen1: 0, gen2: 0
// Real: 00:00:02.490, CPU: 00:00:02.484, GC gen0: 0, gen1: 0, gen2: 0
// Real: 00:00:02.362, CPU: 00:00:02.343, GC gen0: 0, gen1: 0, gen2: 0
Hi, I'm trying to use the library with a stream coming from a sensor, like an accelerometer or something else, and do some computation. How I can use the library on it?
Is it by design that the following throws an IndexOutOfRangeException?
let refs = [| 1; 2; 3; 4 |]
let pages = [| [| 0; 3 |]; [| 0; 5 |]; [| 0; 1 |]|]
|> Stream.ofArray
|> Stream.flatMap Stream.ofArray
|> Stream.map (fun ref -> refs.[ref])
|> Stream.take 3
|> Stream.toArray
test <@ [| 1; 4; 1 |] = pages @>
It appears this repo has a hard dependency on FSharp.Core 4.3.1.0 being GACed.
4.3.1.0 is binplaced by VS 2015/F# 4.0, but not added to the GAC. Only the latest (4.4.0.0) is GACed.
If the latest FSharp.Core is not the desired dependency that's fine, but it might be good to document that and/or print a helpful error message during build/test if the required version can't be loaded.
As it stands tests just blow up with
System.IO.FileLoadException: Could not load file or assembly 'FSharp.Core, Version=4.3.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
File name: 'FSharp.Core, Version=4.3.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' ---> System.IO.FileLoadException: Could not load file or assembly 'FSharp.Core, Version=4.3.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)
File name: 'FSharp.Core, Version=4.3.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a'
There seems to be quite a bit of work done on the dev branch which hasn't been brought over to master. Any particular reason why this is happening? /cc @palladin @anirothan
Is it possible to add support to create flows beyond a pipeline by being able to create a flow graph like in:
http://akka.io/news/2014/09/12/akka-streams-0.7-released.html
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.