gnieh / fs2-data Goto Github PK
View Code? Open in Web Editor NEWstreaming data parsing and transformation library
Home Page: https://fs2-data.gnieh.org
License: Apache License 2.0
streaming data parsing and transformation library
Home Page: https://fs2-data.gnieh.org
License: Apache License 2.0
The developer of stork has discontinued their work on the project.
In the announcement, several projects are mentioned and are worth investigating:
When encoding large integers to bytes and back, the CBOR (de)serialisation doesn't produce the same as the input as it handles bignum tags only when encoding, not on the way back:
import cats.effect.SyncIO
import fs2.*
import fs2.data.cbor.*
import fs2.data.cbor.high.*
val value = CborValue.Integer(BigInt("-739421513997118914047232662242593364"))
val out = Stream(value).covary[SyncIO].through(toBinary).through(values).compile.onlyOrError.unsafeRunSync()
println(value) // Integer(-739421513997118914047232662242593364)
println(out) // Tagged(3,ByteString(ByteVector(16 bytes, 0x008e684b9913062199945815948c0a53)))
Scastie: https://scastie.scala-lang.org/VuVAwL2wQiqrcWiSeyCIeg
Currently there are three ways to build JSON selectors:
They all work pretty well but depending on the context might not be the best suited ones:
A fourth way would be a small DSL, allowing to write the selectors in scala in a more convenient way than using the constructors directly. As an inspiration, one can think about the circe ACursor
class.
This DSL can offer a convenient, pure way to build selectors without any extra dependency.
The CSV module was the first one written in this project and suffers from a really cluttered and surprising API. For the 1.0 release, let's rework it completely to make it easier to use.
When deriving shapeless implicits for very wide case classes, sometimes the compiler fails with a stack overflow error. One technique to avoid this is to nest the case classes, which is isomorphic to the flattened structure.
Something like this:
case class Sub(
@CsvName("first-name") firstName: String,
@CsvName("the-age") age: Int
)
case class Whole(
@CsvName("last-name") lastName: String,
@CsvEmbed sub: Sub
)
// last-name, first-name, the-age
// flintstone, fred, 42
Context: https://gitter.im/fs2-data/general?at=5f159b91a28d973192e7d07a
Doobie supports a similar structure
Hi! Are there any plans in place to migrate to Cats Effect 3 / fs2 3.x? If there are any blockers, it would be best to catch them before the RCs are being released. Asking for typelevel/cats-effect#1330 :)
Weaver snapshots are available:
CellDecoder
and CellEncoder
have implicit instances for common java.time
formats in CSVs, but there's no way to use custom formats yet. This could be exposed on the companion objects (non-implicit methods), but should provide a safe and sane signature despite the exceptions involved in date parsing through java.time
.
Inspired from a question on Gitter: https://matrix.to/#/!GUjuKyYmWZJiiqOfRW:gitter.im/$8WXkYg1Q5SQvNHAwAId14j3_wvoA5DvI9GqZxbNNBnI?via=gitter.im&via=matrix.org
Based on the tools developped in #369, add support for a subset of jq rewritings.
Maybe have a look at https://github.com/scalacenter/sbt-version-policy
Consider an ill-formed file such as this: (spaces around \t
for readability; assume they aren't there)
user-generated-text \t some-number
First line is ok \t 1
Unescaped\t raw tab in here \t 2
Note that the last line there has an extra tab character because the source of the file did not properly escape data from user input
Currently what happens is that a CsvRowDecoder
coincidentally fails because for this one schema, the "shifted" column ends up putting raw tab in here
into the some-number
header column, which is using a CellDecoder[Int]
But if our CellDecoder
had instead been expecting to receive a string there, I believe it would pass without error.
When this condition happens (row.length != headers.length)
, the entire stream should abort with an error, rather than continuing to emit potentially malformed results. This check would also catch a case where a user-input \n
in the middle of an unescaped field would cause nonsense lines.
It can be interesting in a json stream to get only tokens corresponding to values matching a given filter. jq has nice query language, from which a subset can be borrowed to implement these filters.
Currently, when a JSON selector fails only the immediate context is given. Inspired by the circe HCursor
class, which records history traversed from the document root, the selector pipe could add this information when selecting tokens, in orer for the user to understand better and more easily where the problem occurred.
Rendering an a stream of XML events to a formatted string (with indentation and newlines) can be useful for printing some pieces of data or writing them into files.
The result of rendering a stream in a formatted manner should roughly look like this :
<a>
<b>
SomeText
</b>
</a>
instead of
<a><b>SomeText</b></a>
Right now the workaround is to use a scala-xml sink and rendering the result. Obviously this implies an additional dependency.
scan
on the fs2 streamFor Scala Steward to pick up the migrations automatically, we need to add them to a list in the Steward repo – should do that together with the final 1.0 release.
I'm using fs2-data-xml for library that I'd like to cross publish to scala.js.
It seems that it should be possible given the dependencies.
I may try myself but I don't know much about using mill
.
Tests are problematic though as they use files and that would be harder on Scala.js
The roundtrip tests in http4s-fs2-data are flaky, they sometimes fail on escaped entities:
https://github.com/http4s/http4s-fs2-data/actions/runs/5514890884/job/14930647806?pr=97 (only &
)
and https://github.com/http4s/http4s-fs2-data/actions/runs/5481925805/job/14845710701 (combined with a lt
).
It's not yet clear whether the input is actually invalid XML or fs2-data handles it incorrectly, to be investigated.
Recently, we hit an issue where our code that used .key.[]
emitted no elements in its stream, because .key
was incorrect - no such key was in the json.
I'd like to have a mode for filters that can error out if the key being filtered isn't present
Currently one can use Json Selectors to only keep some values or transform a Json value at the selected position into another one. Interesting other use cases would be:
This can be achieved by adding following Pipe
s:
def transformF[F[_], Json](sel: Selector, f: Json => F[Json]): Pipe[F, Token, Token]
def transformOpt[F[_], Json](sel: Selector, f: Json => Option[Json]): Pipe[F, Token, Token]
This will dramatically help make diagnosis easier for people using the library
This is only nice-to-have request :) the question was raised in http4s/http4s-scala-xml#25 (review)
I do wonder how the performance compares to SAX.
The CSV row update
function does not add a column to the row if it wasn't present. In some use cases one might want to had a column (the same way we can delete one).
It should be possible to have a behavior that replaces or add if not present. Behavior is defined as:
When performing this in the stream, it should be up to the caller to ensure that the resulting stream is valid (i.e., all rows have the same columns in the result).
I'm using the fs2.data.csv.lowlevel.rows
pipe to parse CSV files. I wanted to provide meaningful error information to my users and was hoping that I can use the line information from the RowF
class. However, the provided line number may be incorrect if the CSV file contains empty lines. I understand that the rows
pipe skips empty lines, but I didn't expect that the line numbers of subsequent lines do not correspond with the line numbers of the file. I'm not sure whether this is the intended behaviour.
Example:
import fs2._
import fs2.data.csv._
import cats.implicits._
val input = """A,B,C
|D,E,F
|
|G,H,I
|
|J,K,L
|""".stripMargin
Stream.emit(input)
.covary[Fallible]
.through(lowlevel.rows[Fallible, String]())
.map(r => s"""${r.line.orEmpty}: ${r.values.toList.mkString}""")
.compile
.toList
// actual result : Right(List(1: ABC, 2: DEF, 3: GHI, 4: JKL))
// my expectation : Right(List(1: ABC, 2: DEF, 4: GHI, 6: JKL))
fs2-data version: 1.6.1
Some patterns come often when using the library, the website should have a section for this.
Among common patterns:
It would also be nice to add a simple way to submit new ones to the website, maybe using PR templates.
Version: 1.3.0
Scala: 3.1.1-RC
Example:
import cats.effect._
import cats.syntax.all._
import fs2._
import fs2.io.file._
import fs2.data.csv
object TestApp extends IOApp.Simple {
override def run: IO[Unit] =
Files[IO]
.readAll(Path("some_valid_csv_path"))
.through(text.utf8.decode)
.through(csv.lowlevel.rows())
.evalTap { row =>
IO(println(s"[1] ${row.line}"))
}
.through(csv.lowlevel.headers[IO, String])
.evalTap { row =>
IO(println(s"[2] ${row.line}"))
}
.compile
.drain
}
Actual output:
[1] Some(1)
[1] Some(2)
[2] None
[1] Some(3)
[2] None
[1] Some(4)
[2] None
[1] Some(6)
[2] None
...
Expected output:
[1] Some(1)
[1] Some(2)
[2] Some(2)
[1] Some(3)
[2] Some(3)
[1] Some(4)
[2] Some(4)
[1] Some(6)
[2] Some(6)
...
Based on the tools introduced by #369, add XQuery support.
A use case for fs2-data might be to generate Json data in a streaming manner and not transform parsed data. I this case it can be interesting to have pipes to wrap the produced data into some array/object structure in a safe way (i.e. while ensuring the produced sequence of tokens is valid).
This is based on a question asked on gitter.
I ran into an issue where reading in json file of filepaths would sometimes duplicate text when running it through tokens. The following code when run with the attached sample text reproduces the issue. The second token is parsed as \pathpath\to\some\other\file.txt instead of \path\to\some\other\file.txt with chunkSize of 55
val chunkSize = 55
val stream: fs2.Stream[IO, Byte] = fs2.io.readInputStream(IO(getClass.getResourceAsStream("/Sample.txt")), chunkSize)
stream.map(_.toChar)
.through(tokens[IO, Char])
.map((token: Token) => {
println(s"token: ${token}")
token
}).compile.toList.unsafeRunSync()
I added some code to print out the chunks to see where they start and stop and found that when the chunks end with an escape character the text just before it gets duplicated on the next chunk.
val chunkSize = 55
val stream: fs2.Stream[IO, Byte] = fs2.io.readInputStream(IO(getClass.getResourceAsStream("/Sample.txt")), chunkSize)
stream.map(_.toChar).chunkN(chunkSize)
.map((chunk: Chunk[Char]) => {
println(s"chunk: ${chunk.toList.toString()}")
chunk
})
.flatMap(c => fs2.Stream.chunk(c))
.through(tokens[IO, Char])
.map((token: Token) => {
println(s"token: ${token}")
token
}).compile.toList.unsafeRunSync()
I tried to use the data-csv
library to parse some large files. Reading them into the RowF
case class was quite fast but I did get some poor performance during the cell decoder stage. It did occur for both the semi-auto and custom-written decoders. I've located a code path in the RowF.as[T]
function where it does recreate the byHeader
value for each field that's extracted. I could not think of a fix for it without any breaking API changes. I'm happy to provide a pull request if we can find a suitable solution.
A temporary workaround for me was to create a wrapper and cache the byHeader
value and provide a custom decoder. Here's an example:
class CachedHeaderRow(val origin: csv.RowF[Some, String])(implicit hasHeaders: HasHeaders[Some, String]) {
private lazy val byHeader = origin.headers.get.toList.zip(origin.values.toList).toMap
def as[T](header: String)(implicit decoder: CellDecoder[T]): DecoderResult[T] =
byHeader.get(header) match {
case Some(v) => decoder(v)
case None => Left(new DecoderError(s"unknown field $header"))
}
}
This decreased the pipeline of reading, parsing, and decoding of 1 000 000 rows with 30 cells from ~100 seconds to ~10 seconds.
Similar to what exists for JSON, add a way to transform stream of XML events into XML ASTs, and back.
With PRs like #31 source code will need some migrations when version 1.0 is released. Let's provide scalafix migrations for this to ease people's life.
Via http4s/http4s-scala-xml#25 (comment).
//> using scala "3.1.2"
//> using lib "org.gnieh::fs2-data-xml-scala::1.4.1"
import cats.effect.*
import fs2.*
import scala.xml.*
val xml = """<Ẵ줐샃뗧饜孫 悊頃ふ퉞="ꨍ邭䋒ừ" 듸괎:ʿक턻뽜="촏"/>"""
object App extends IOApp.Simple {
def run = for
_ <- IO(XML.loadString(xml)) *> IO.println("scala-xml works")
_ <- Stream.emit(xml).covary[IO].through(fs2.data.xml.events()).compile.drain *> IO.println("fs2-data works")
yield ()
}
scala-xml works
fs2.data.xml.XmlException: character 'ʿ' cannot start a NCName
at fs2.data.xml.internals.EventParser$.fail$1$$anonfun$1(EventParser.scala:40)
at fs2.Pull$$anon$2.cont(Pull.scala:183)
at fs2.Pull$BindBind.cont(Pull.scala:701)
at fs2.Pull$ContP.apply(Pull.scala:649)
at fs2.Pull$ContP.apply$(Pull.scala:648)
at fs2.Pull$Bind.apply(Pull.scala:657)
at fs2.Pull$Bind.apply(Pull.scala:657)
at fs2.Pull$.go$1$$anonfun$1(Pull.scala:1207)
at fs2.Pull$.interruptGuard$1$$anonfun$1(Pull.scala:933)
at get @ fs2.internal.Scope.openScope(Scope.scala:281)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Pull$.goCloseScope$1$$anonfun$1$$anonfun$3(Pull.scala:1187)
at update @ fs2.internal.Scope.releaseChildScope(Scope.scala:227)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at modify @ fs2.internal.Scope.close(Scope.scala:262)
at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:162)
at flatMap @ fs2.Pull$.goCloseScope$1$$anonfun$1(Pull.scala:1188)
at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:160)
at flatMap @ fs2.Pull$.goCloseScope$1(Pull.scala:1195)
at get @ fs2.internal.Scope.openScope(Scope.scala:281)
Magnolia instance derivation is preferable to shapeless induction for a few reasons:
Adding this kind of derivation in a new submodule would be a great win for making the library more beginner-friendly and useful to fs2+typelevel-stack newcomers
Very excited about the enhanced XML support in 1.4.0 :) I've been experimenting with it in http4s/http4s-scala-xml#25 and running into trouble with non UTF encodings. FTR I'm no expert in these things :)
For example this request:
Content-Type: application/xml
<?xml version="1.0" encoding="iso-8859-1"?><hello name="Günther"/>
as used in this test:
https://github.com/http4s/http4s-scala-xml/blob/1ca64f2ab7ef500d384d2ec5f8caf88df600e6a6/scala-xml/src/test/scala/org/http4s/scalaxml/ScalaXmlSuite.scala#L198-L209
Furthermore the RFC specifies:
Since the charset parameter is not provided in the Content-Type
header and there is no overriding BOM, conformant XML processors must
treat the "iso-8859-1" encoding as authoritative. Conformant XML-
unaware MIME processors should make no assumptions about the
character encoding of the XML MIME entity.
https://datatracker.ietf.org/doc/html/rfc7303#section-8.3
I'm not sure if there is a way to support this without an XML parser that operates directly on bytes instead of chars/strings 😕 any thoughts? Thanks!
In some scenario, it is interesting to extract textual data based on some regular expression.
This can be an interesting fs2-data module that handles this with corner cases (e.g. when the match spans over several elements and/or chunks).
Using a non-backtracking regular expression implementation (such as re2) can help us leverage this problem in a streaming context.
This PhD thesis can be interesting in this context.
The RFC defines conversion back and forth to/from JSON values. By leveraging the Builder
/Tokenizer
concept, we can add a module that allows for parsing/serializing CBOR streams to/from JSON values.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.