snapview / tungstenite-rs Goto Github PK
View Code? Open in Web Editor NEWLightweight stream-based WebSocket implementation for Rust.
License: Apache License 2.0
Lightweight stream-based WebSocket implementation for Rust.
License: Apache License 2.0
The following crafted input
#[test]
fn parse_overflow() {
let mut raw: Cursor<Vec<u8>> = Cursor::new(vec![
0b1000_0011, 0b1111_1111, 0b1111_1111, 0b1111_1111,
0b1111_1111, 0b1111_1111, 0b1111_1111, 0b1111_1111,
0b1111_1111, 0b1111_1111, 0b0000_0000, 0b0000_0000,
0b0000_0000, 0b0000_0000,
]);
// Check that it doesn't panic.
let _ = Frame::parse(&mut raw);
}
results in
---- protocol::frame::frame::tests::parse_overflow stdout ----
thread 'protocol::frame::frame::tests::parse_overflow' panicked at 'attempt to add with overflow', src/protocol/frame/frame.rs:347:19
Code:
if size < length + header_length {
cursor.set_position(initial);
return Ok(None)
}
I just new on rust but not sure how to get http with websocket. Since I wanted to have minimal to host simple web browser page using websocket. I hope have time to create simple example since there no example on http just using TcpStream from just rust very simple rust http hello world.
Hi, I am trying to find ways to access the internal buffer of a message instead of using Message.into_data()
since it returns a Vec<u8>
. I am not sure how tungstenite really works but I suspect it allocates the data on a internal buffer in the stack. When using Vec<u8>
it moves the data to the heap and creates a unecessary overhead (I think). My Application is using only stack buffers so I would like to do the same with tungstenite.
TLTR: Any idea of what I can do to access this internal buffer in the stack?
Thanks.
Now that SIMD intrinsics for x86 have been stabilized, it might be worthwhile to add explicit SIMD to accelerate unmasking. For example, autobahn-python uses SIMD exactly for this.
This stackoverflow thread contains a barebones implementation of XOR unmasking for websockets in C and x86 intrinsics.
SIMDeez crate might be useful for abstracting over instruction widths and providing runtime selection of the SIMD implementation depending on availability. Its description also links to other SIMD crates to consider.
Things to look out for:
unsafe
blocks, although it is inherently fragileWhen a WebSocket connection is closed by the server side, how does a user retrieve the close code and reason?
Would you mind to add a CHANGELOG.md
that summarize the changes?
I can't see a non-blocking alternative to server::accept
, e.g. something like try_accept
. It doesn't seem to be possible to build a server that runs in just a single thread.
This is a mandatory part of the spec, and is thus needed to be Web-compatible.
I can imagine this is rather a beginner's question for most of you, so I apologise to bother you with that.
A simplified version of what I'm trying to do is to extend the server.js example to forward each incoming message to all currently connected users, instead of just the one user that has sent the message.
In my own attempts I always failed at the fact that a mutable version of the websocket
is in use in a never-ending loop to listen to incoming messages, using websocket.read_message()
. And so I wasn't able to have another mutable version of it in another thread to distribute the messages using websocket.write_message(msg)
.
Usually, when I need to share mutable variables across threads, I use Arc
and Mutex
, but in this case this only leads to the websocket
forever being locked in the websocket.read_message()
-loop.
Does anyone have a suggestion on how to solve this problem?
I'm new to rust so apologies if I've done something wrong and haven't noticed.
I was using tungstenite-rs with mio and noticed that if messages were sent to quickly mio sometimes would not trigger to let me know there was a read pending. I found this confusing as I only received 1 message and multiple were sent. After a brief debugging I found out that depending on the speed at which everything executed sometimes tungstenite-rs will read the whole contents of the stream but only return a single message, with no way of know how many remaining messages have been buffered.
There's a minimal example here:
https://github.com/justinvreeland/tungstenite-rs-issue-example
if you keep refreshing the javascript you'll see output that from rust that looks like:
msg_1: Text("This is message 1")
No second message
msg_1: Text("This is message 2")
No second message
msg_1: Text("This is message 1")
msg_2: Text("This is message 2")
msg_1: Text("This is message 1")
No second message
msg_1: Text("This is message 2")
No second message
msg_1: Text("This is message 1")
No second message
msg_1: Text("This is message 2")
No second message
msg_1: Text("This is message 1")
No second message
msg_1: Text("This is message 2")
No second message
The behavior I'd expect is "No second message" allowing epoll to wait again, notice that there's pending data and notify again, however occasionally tungstenite-rs will read the entire contents rather than just the first message and the notification will not trigger again and we're left with a queued message.
I could read in a loop until I encounter would block or similar but It'd be nice if tungestenite-rs could either tell me how many messages it's keeping buffered or only read a single message. Happy to take a shot at a patch if either solution seems acceptable.
sometime i got issue when connect to websocket (connection lost) , Is there any solution regarding auto re-connect websocket
For the Web, we need to be able to access the headers of the response to the initial HTTP request.
The RFC defines a "Fail the WebSocket Connection" procedure:
Certain algorithms and specifications require an endpoint to _Fail
the WebSocket Connection_. To do so, the client MUST _Close the
WebSocket Connection_, and MAY report the problem to the user (which
would be especially useful for developers) in an appropriate manner.
Similarly, to do so, the server MUST _Close the WebSocket
Connection_, and SHOULD log the problem.
Grepping the RFC for "Fail the WebSocket Connection" brings up the following cases:
If a nonzero value [for RSV1-3
] is received and none of the negotiated extensions defines the meaning of such a nonzero value
If an unknown opcode is received
When an endpoint is to interpret a byte stream as UTF-8 but finds that the byte stream is not, in fact, a valid UTF-8 stream
I've ignored errors in the handshake phase since these should be naturally handled correctly I think.
Currently tungstenite does not follow the RFC; when one of these conditions happen the error is propagated to the caller but the WebSocket
is still usable. Beyond just conformance I think making these errors unrecoverable makes sense, since something is obviously wrong. Continuing like nothing happened may turn up garbage.
How to re-establish the connection back the server? Would be possible to provide an example?
There is a TODO in the code: "replace this with read_unaligned() as it stabilizes"
https://github.com/snapview/tungstenite-rs/blob/3abe419/src/protocol/frame/mask.rs#L31
read_unaligned()
has been stabilized in Rust 1.17
Also of interest, to_bytes()
/from_bytes()
which seems to be the safe equivalent of what the unsafe code is doing here, is going to be stabilized in Rust 1.29, which is the next release.
This is my main.rs:
use std::net::TcpStream;
use native_tls::{TlsConnector};
use url::Url;
use tungstenite::{client, Message};
use std::time;
fn test_1() {
println!("TEST 1");
let connector = TlsConnector::new().unwrap();
let stream = TcpStream::connect("echo.websocket.org:443").unwrap();
let stream = connector.connect("echo.websocket.org", stream).unwrap();
let (mut socket, _) = client(
Url::parse("wss://echo.websocket.org").unwrap()
, stream
).unwrap();
let inner_stream = socket.get_ref().get_ref();
inner_stream.set_read_timeout(
Some(time::Duration::from_millis(1000))
).unwrap();
let mut x = 0;
while x < 5 {
match socket.read_message() {
Ok(t) => println!("ok {} {}", x, t.to_string()),
Err(e) => println!("error {} {}", x, e)
}
if x == 3 {
socket
.write_message(Message::Text("ciao".to_string()))
.unwrap()
}
x = x + 1;
};
}
fn test_2() {
println!("TEST 2");
let connector = TlsConnector::new().unwrap();
let stream = TcpStream::connect("echo.websocket.org:443").unwrap();
let stream = connector.connect("echo.websocket.org", stream).unwrap();
let (mut socket, _) = client(
Url::parse("wss://echo.websocket.org").unwrap()
, stream
).unwrap();
let inner_stream = socket.get_ref().get_ref();
inner_stream.set_nonblocking(true).unwrap();
let mut x = 0;
while x < 5 {
match socket.read_message() {
Ok(t) => println!("ok {} {}", x, t.to_string()),
Err(e) => println!("error {} {}", x, e)
}
std::thread::sleep(time::Duration::from_secs(1));
if x == 3 {
socket
.write_message(Message::Text("ciao".to_string()))
.unwrap()
}
x = x + 1;
};
}
fn main() {
test_1();
test_2();
}
When I run the above, I expect to get:
TEST 1
error 0 IO error: Resource temporarily unavailable (os error 11)
error 1 IO error: Resource temporarily unavailable (os error 11)
error 2 IO error: Resource temporarily unavailable (os error 11)
error 3 IO error: Resource temporarily unavailable (os error 11)
ok 4 ciao
TEST 2
error 0 IO error: Resource temporarily unavailable (os error 11)
error 1 IO error: Resource temporarily unavailable (os error 11)
error 2 IO error: Resource temporarily unavailable (os error 11)
error 3 IO error: Resource temporarily unavailable (os error 11)
ok 4 ciao
But is printed:
TEST 1
error 0 IO error: Resource temporarily unavailable (os error 11)
error 1 IO error: Resource temporarily unavailable (os error 11)
error 2 IO error: Resource temporarily unavailable (os error 11)
error 3 IO error: Resource temporarily unavailable (os error 11)
ok 4 ciao
TEST 2
error 0 IO error: Resource temporarily unavailable (os error 11)
error 1 IO error: Resource temporarily unavailable (os error 11)
error 2 IO error: Resource temporarily unavailable (os error 11)
error 3 IO error: Resource temporarily unavailable (os error 11)
error 4 IO error: Resource temporarily unavailable (os error 11)
I'm wondering if this is an issue or just the expected behavior
Hey,
I know you probably don't want to let users do this, but would you possibly be interested in optionally letting the user accept invalid certs when connecting?
I have used:
let connector = TlsConnector::builder().danger_accept_invalid_certs(true).build()?;
Instead of:
let connector = TlsConnector::builder().build()?;
In the tls encryption module (client.rs). This allows me to connect to my websocket server while I test without having to worry about setting up certificates.
InputBuffer
currently uses a growable Vec
. It appends new data, and drains old data.
The drain
part incurs a memmove
of the Vec
s content. Since memory copies take up the majority of the runtime, I think the library should try to avoid them when possible. In this case the library can use another data structure for that. The usual one would be a growable ring buffer, this way the drain
would not incur a copy, only growing/shrinking the buffer would.
Luckily, the standard library already has an implementation of a growable ring buffer, VecDeque
. So I think it'd be worthwhile to try it and see if my theory is true :)
One thing we need to be able to do is to read directly into the buffer, again to avoid copies. For Vec
it's implemented with the help of bytes::BufMut::bytes_mut
. But BufMut
is not implemented for VecDeque
. I opened tokio-rs/bytes#94 - maybe it's possible.
[Side note: the implementation of InputBuffer::read_from
is in fact not technically safe, because it is actually illegal in Rust to do Read::read
into an uninitialized buffer. That's because a Read
implementation is technically free to read the buffer that's passed into it, and reading uninitialized bytes is illegal in Rust. I'm not sure how much I care about that, though - a Read
implementation which does something like that would be pretty odd.]
Is there a way to accept a connection from a non-blocking tcp listener (like those of Mio)? For all I know, either I call accept on a blocking tcpStream, or I can use WebSocket::from_ram_socket
, but creating a websocket this way won't handle any handshake msg..
Thx!
Hi, sorry for may be newbie questions, i am quite new to the Rust but i have a lot of experience working with websockets in JS. I would appricaiate if you could answer few questions.
1. I am not sure how threading works but do i understand right that for each new connection will be created a new thread ? And how many connection do you think ( approximately ) simple 1 core 2 gb ram server can handle.
2. How can i created broadcast system, where i send messages to all connected websockets (or may be to only specific one).
3. What about using multi core ? Would it automatically scale across all available CPU on the pc and would it be able to communicate between websokcets connected to different CPU (JS has problems with that so you have to manually implement this system).
4. Are you planning to add more examples and benchmarks. I would really like to see some benchmarks against different Rust websocket implementation (may be not just Rust).
The library looks cool, but does not have enough information for newcomers like me. Thank you :)
I'm using mio and tungstenite, and it would be nice to have a way to discard the handle when being refused a handshake:
match mid.handshake() {
Ok(x) => *space = SocketSpace::Connected(x),
Err(HandshakeError::Interrupted(mid)) => *space = SocketSpace::Connecting(mid),
Err(HandshakeError::Failure(err)) => {
discard(mid.stream?! halp)
}
}
Clippy notes the following errors:
error: handle written amount returned or use `Write::write_all` instead
--> src/protocol/frame/frame.rs:411:13
|
411 | try!(w.write(&headers));
| ^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[deny(unused_io_amount)] on by default
= help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.212/index.html#unused_io_amount
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
error: handle written amount returned or use `Write::write_all` instead
--> src/protocol/frame/frame.rs:417:13
|
417 | try!(w.write(&headers));
| ^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.212/index.html#unused_io_amount
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
error: handle written amount returned or use `Write::write_all` instead
--> src/protocol/frame/frame.rs:434:13
|
434 | try!(w.write(&headers));
| ^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.212/index.html#unused_io_amount
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
error: handle written amount returned or use `Write::write_all` instead
--> src/protocol/frame/frame.rs:440:13
|
440 | try!(w.write(&mask));
| ^^^^^^^^^^^^^^^^^^^^^
|
= help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.212/index.html#unused_io_amount
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
error: handle written amount returned or use `Write::write_all` instead
--> src/protocol/frame/frame.rs:443:9
|
443 | try!(w.write(&self.payload));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: for further information visit https://rust-lang-nursery.github.io/rust-clippy/v0.0.212/index.html#unused_io_amount
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
This seems right to me. If the W: Writer
is assumed to never have short writes, I don't think this is valid, since Frame::format
is public API and therefore W
could be anything.
As for the suggestion to use write_all
, I'm not sure if that's OK for an asynchronous Write
. So I'm just logging the issue here.
How to handle HTTP redirects? I am trying to connect to gitter API with no success:
https://github.com/shmutalov/gitter-rs/blob/develop-streaming/faye/src/lib.rs#L71
On reading this error can be returned by: https://github.com/snapview/tungstenite-rs/blob/master/src/protocol/frame/mod.rs#L165
I wonder how to interpret this. It seems the buffer is created with limit usize::max_value()
, which must be huge. When can this happen? What to do when it does?
Is the socket still operational?
Should we try to send a close frame, or just consider it's sudden death?
Is there a way to trigger this error to test it?
I had a hard time digging through InputBuffer
trying to figure this out.
Hi!
I am building a game server, and I am passing the auth token as a handshake header. I am checking the token during the handshake to reduce the impact of DoS attacks and to simplify the logic afterwards (the user is auth'ed, no need of a state machine). However, I would like to have access to the token after the handshake, or to the socket address in the callback.
Is that currently possible?
Thanks for the great project.
After looking at the documentation and examples, it seems very straightforward to send a simple text or binary message from a server. For instance, after establishing a connection, one can write
websocket.write_message(tungstenite::Message::text("foo".into())).unwrap();
I was expecting that sending a ping would be similar, something like
websocket.write_message(tungstenite::Message::ping()).unwrap();
However, the Message
enum does not have a method for constructing ping messages, so that does not work. Is there some other way to send simple ping messages? I see that one can send a ping frame with tungstenite::protocol::frame::Frame::ping()
, but that seems much lower levelโit would be fairly verbose to turn that back into a message to send, wouldn't it?
The use-case here is sending auto pings from a server for keep-alive purposes. I'm attempting to port a program from JavaScript to Rust and the JS version used Cluseter WS, which has an 'startAutoPing` configuration setting. I'm attempting to implement something similar via Tungstenite, but am having difficulty sending ping messages.
I'd greatly appreciate any help you can offer. Thanks!
In the following code, on 32-bits architectures,
let mut data = Vec::with_capacity(length as usize);
if length > 0 {
unsafe {
try!(cursor.read_exact(data.bytes_mut()));
data.advance_mut(length as usize);
}
}
the length
variable is u64
(and can have the full range), while usize
is u32
, so the casts will unintentionally truncate if length
is outside the u32
range.
For the callback, something nice you can do is make the accept Into<Option<Callback>>
and then you can pass the callback into it and it will get converted to the option if you do let cb = cb.into()
.
Example:
fn print<T: Into<Option<&'static str>>>(str: T) {
let str = str.into();
match str {
Some(str) => println!("{}", str),
None => {}
}
}
fn main() {
print("hello");
print(None);
}
https://play.rust-lang.org/?gist=ea5e776718d5338a0c0744e9e8e7d997&version=stable
Thats all.
Following somewhat from the discussion here, but I opened a separate issue, because I really wonder whether this does what it's author intended.
/// Read a message from the provided stream, if possible.
///
/// This function sends pong and close responses automatically.
/// However, it never blocks on write.
pub fn read_message<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
where
Stream: Read + Write,
{
// Do not read from already closed connections.
self.state.check_active()?;
loop {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
self.write_pending(stream).no_block()?;
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame(stream)? {
trace!("Received message {}", message);
return Ok(message);
}
}
}
The loop seems like an iffy control structure here. After looking through the methods it calls, if I'm not mistaking, it can only loop if there is a fragmented message being read. However the comment suggests this will deal with responding to ping and close.
So this will not respond to ping and close directly, but only on the next read. That's inconvenient for client code, especially when using through tokio-tungstenite, which means not being able to call read_message
directly when a ping or close is returned here.
It also means that write_pending
is called in a loop when a framed message is being read. That doesn't seem very useful. I would think in the least the call to write pending
should be before the loop, if we really want to only respond to close/ping from the last read.
Nicer would be to respond to close/ping directly. It's not so clear to me what the best solution is. Something like this comes to mind:
// This loops so we continue reading frames when a fragmented message is coming in.
// There are 4 conditions that break from this loop:
// - errors
// - ConnectionClosed
// - nothing to read anymore (WouldBlock)
// - a complete message was read
loop {
if let Some(message) = self.read_message_frame(stream)? {
trace!("Received message {}", message);
match message
{
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
Close(..) | Ping(..) => self.write_pending(stream).no_block()?,
_ => {}
};
return Ok(message);
}
}
The problem here is that write_pending
might encounter an error. In that case, we have taken the message out the underlying stream, but we will never return it. It just get's lost. We just return the error.
There are several possible solutions to this problem, like storing the error on self and returning that on the next read. I'm not sure what would be best.
if let Some(pong) = replace(&mut self.pong, None) {
self.send_one_frame(pong)?;
}
// If we have any unsent frames, send them.
while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(data)?;
}
If self.send_one_frame(pong)
or self.send_one_frame(data)
return Err(_)
, self.pong
or the frame that was popped off from the front of self.send_queue
will be lost and never sent.
Cargo is able to let users manage multiple related crates from a single repository, and this makes development of closely-related crates quite easier. See serde, url and webrender for examples of such projects.
How can I read from a websocket like try_recv() from a std Receiver?
So that in my loop I don't have to wait until the client sends a message before I can send a message to the client.
Hello, I'm trying to connect to ws server, but receive strage error: Http(200)
Connection code:
let req = Request {
url: url::Url::parse("wss://cupis.winline.ru/?client=site").unwrap(),
extra_headers: Some(vec![
("Origin".into(), "https://winline.ru".into()),
]),
};
let (mut ws_stream, _) = connect_async(req).await.expect("Failed to connect to winline ws");
Trace:
[2019-10-23T19:58:14Z TRACE mio::poll] registering with poller
[2019-10-23T19:58:14Z TRACE mio::sys::unix::kqueue] registering; token=Token(0); interests=Readable | Writable | Error | Hup
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::handshake] Setting ctx when starting handshake
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::client] Client handshake initiated.
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:91 Write.write
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:96 Write.with_context write -> poll_write
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::handshake] Setting context in handshake
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::handshake] Setting context in handshake
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE tungstenite::handshake::machine] Doing handshake round.
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:71 Read.read
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:45 AllowStd.with_context
[2019-10-23T19:58:14Z TRACE tokio_tungstenite::compat] /Users/a.mayorsky/.cargo/git/checkouts/tokio-tungstenite-42c59291ea04b35c/0c27799/src/compat.rs:76 Read.with_context read -> poll_read
[2019-10-23T19:58:14Z TRACE mio::poll] deregistering handle with poller
thread 'main' panicked at 'Failed to connect to winline ws: Io(Custom { kind: Other, error: "" })', src/libcore/result.rs:1165:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Hello! I was tinkering around and wondering how to implement a timeout of a pong message. That is, after a ping is sent, having a maximum amount of time before the pong is received after which the connection is torn down. It looks like right now though once a pong message is received currently no action is taken.
I was wondering, is there perhaps another method to currently implement this? If not, do y'all have thoughts on how this might be implemented?
For the Web, we need to be able to pass additional headers in the initial HTTP request.
In the server I've been working on the load profile of it currently has thousands/millions of connected websockets, but they're all idle. One particular constraint has been memory so far, and we're thinking that a possible optimization would be to omit the InputBuffer
for idle connections. In theory with an async server there's no need to actually allocate a buffer for input until the socket is itself readable.
I was curious if y'all had thought about this before? I think this could possible get built externally with just Frame
and other low-level bits, but ideally we could reuse the WebSocket
logic itself without having to reimplement various portions.
In a similar idea of #76 I would like to have a look at this part of WebSocketContext::write_pending
:
// If we're closing and there is nothing to send anymore, we should close the connection.
if self.role == Role::Server && !self.state.can_read() {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
self.state = WebSocketState::Terminated;
Err(Error::ConnectionClosed)
} else {
Ok(())
}
Note that right now the code still does a check on WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged
, but that's exactly what can_read
covers, namely that we received a close frame from the remote, so I cleaned that up a bit.
This code mixes 2 concerns:
ConnectionClosed
to indicate that to client code.write_pending
What causes this I think is that there is no state in WebSocketState
to indicate the close handshake is finished when the remote initiated it. CloseAcknowlegded
is only used when we initiated. We know that when we received an initial close frame from the remote, we will have cued a reply so this kind of bumps it to terminated, but only for server role.
It doesn't really make sense to have CloseAcknowlegded
only count when we initiated the close handshake. The websocket protocol mainly distinguishes a state when the handshake is finished, regardless of who initiated it. I think it's just the way it is because given the design of tungstenite, it's kind of hard to do this right. We know when we cue a close response, but we never know when we actually sent it out. It seems not easy to fix this without creating extra overhead on each call to write_pending, similar to how it's done for pong messages for example.
There is a few problems:
AlreadyClosed
claims that you will get AlreadyClosed
if you try to send after receiving a Message::Close
. For servers we can see here that we will actually return ConnectionClosed
if the user tries to send after receiving a close. Integration test: already_closed.rs.txt.ConnectionClosed
to soon, because we shouldn't drop the underlying connection yet.In the current code, there is one thread per websocket connection, right? Are there plans to use mio or something similar to be able to have many client connections on a single thread? Feel free to close this if it's completely nonsensical.
โฆ with #63. This would allow deduplicating the url
crate in Firefox. Thanks!
This method has a loop as:
while !self.out_buffer.is_empty() {
let len = stream.write(&self.out_buffer)?;
self.out_buffer.drain(0..len);
}
However the documentation of std::io::Write::write
states:
If the return value is Ok(n) then it must be guaranteed that n <= buf.len(). A return value of 0 typically means that the underlying object is no longer able to accept bytes and will likely not be able to in the future as well, or that the buffer provided is empty.
If anything ever returns Ok(0)
here, this hangs in an infinite loop. I think 0
len is sometimes returned to signal an unexpected EOF. I think this loop should check for that and probably bubble up an error instead of looping infinitely.
I would like to be able to use tungstenite to add websocket support to an existing web-app, using the same tcp listener for both WS and regular HTTP requests.
This means that both web-server frameworks and WS libraries need to be able to inter-operate. Some pieces are already in place: hyper
, for example, supports HTTP upgrade in a generic way, which allows servers to perform websocket handshake; and tokio-tungstenite
provides WebSocketStream::from_raw_socket
, which can take over after the handshake. An example of this can be found in gotham-rs/gotham#354.
The missing bit is the handshake itself. tungstenite
does HTTP parsing itself from the byte streams, something that can not be used in a framework based app: once the route handler is invoked, the request was already parsed; the response typically must come as some high-level 'Response` object too, rather than a stream of bytes.
So the question is, would it make sense to extend tungstenite to support this use case? Ideally, user would supply request headers in a high-level form, and receive result, indicating whether websocket connection can be initiated and what the response should be.
native-tls does not support everything needed to be Web-compatible (AFAIK), so if we want to use tungstenite in Servo we need openssl support.
Do I need to use a Read + Write object with tungstenite::server::accept like in #11 (comment)? Anyone has an sample code for that?
I want to implement ping timeouts but still be able to use websocket.read_message() which blocks.
The RFC describes two uses for pong messages:
Tungstenite automatically replies to pings, and since the RFC defines that the application data should be identical to the ping, so there's not much the user can add to that. For that reason afaict a user should never manually respond to ping.
However the user can send pongs as unidirectional heartbeats. The problem is that right now by storing these on self.pong
, they can override replies to ping, and potentially have different application data inside, so not constitute a valid response to ping.
It's pretty unwieldy for client code to avoid this problem. If using tungstenite directly it can be avoided by following the reception of a ping by calling write_pending
until it doesn't return wouldblock to be sure the ping has been answered and only then send a manual pong.
If using through tokio-tungstenite or other code that splits reader and writer tasks, things become complicated, because now you have to communicate from your reader that received the ping that your writer should call write_pending
before sending the next pong, and through tokio-tungstenite write_pending
isn't exactly accessible.
I feel it would be better to document that a user should never respond to ping manually and consider user created pong messages like any other message to be queued. It doesn't need to jump the queue. The RFC only says replies to pings should be sent as fast as practical.
That would avoid this problem all together.
hi. the slack crate uses this crate for websockets. Every now and then I get stuck in read_message and it never returns.
Either that or it returns a binary message (which we ignore) https://github.com/slack-rs/slack-rs/blob/master/src/lib.rs#L205
The server end of the system thinks there is no one connected though so it seems to me that we're in some zombie state. Is this expected to be possible? We used to used a different websocket-rs and had to set a manual timeout on the underlying socket. Is this someone you would expect we have to do? I was expecting ping/pong to take care of this but obviously it's not
Stacktrace below shows waiting in read
#0 0x00007fa984221102 in recv () from /lib/x86_64-linux-gnu/libpthread.so.0
#1 0x00007fa9855ad42f in _$LT$std..net..tcp..TcpStream$u20$as$u20$std..io..Read$GT$::read::h08a6bc50c3293175 ()
at /checkout/src/libstd/sys/unix/net.rs:161
#2 0x00007fa9854e3053 in std::panicking::try::do_call::h569c007b8386635e ()
#3 0x00007fa9855bd2cb in __rust_maybe_catch_panic () at /checkout/src/libpanic_unwind/lib.rs:98
#4 0x00007fa9854e3ad6 in openssl::ssl::bio::bread::h20a49d7450378b36 ()
#5 0x00007fa98491c139 in BIO_read () from /lib/x86_64-linux-gnu/libcrypto.so.1.0.0
#6 0x00007fa984c391e1 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.0.0
#7 0x00007fa984c3a41d in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.0.0
#8 0x00007fa984c375e0 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.0.0
#9 0x00007fa985139667 in tungstenite::input_buffer::InputBuffer::read_from::h95abdbaba330b42c ()
#10 0x00007fa98515dc73 in slack::RtmClient::run::hcef610b3e818eb3e ()
#11 0x00007fa9852042d5 in sirbotalot::main::hfa1e56b4aa13694e ()
#12 0x00007fa9855b5fe6 in std::panicking::try::do_call::h689a21caeeef92aa () at /checkout/src/libstd/panicking.rs:454
#13 0x00007fa9855bd2cb in __rust_maybe_catch_panic () at /checkout/src/libpanic_unwind/lib.rs:98
#14 0x00007fa9855b6a97 in std::rt::lang_start::hf63d494cb7dd034c () at /checkout/src/libstd/panicking.rs:433
#15 0x00007fa983c5c7ed in __libc_start_main () from /lib/x86_64-linux-gnu/libc.so.6
#16 0x00007fa985134f1d in _start ()
Would it be possible to publish a new version of the crate? I have code that's dependent on da85989 and I can't publish my crate until there's a new version of this crate (and tokio-tungstenite
that uses the next version of this crate).
Thank you!
I try to work on correct buffering for my code, and I end up reading the code of WebSocketContext::write_message. Maybe I'm missing something, but the docs say:
/// WebSocket will buffer a configurable number of messages at a time, ...
However there is only one call to self.send_queue.push_back(frame)
at the end of the method:
self.send_queue.push_back(frame);
self.write_pending(stream)
as you can see it's immediately followed by self.write_pending(stream)
. When looking at write_pending
it does seem that it tries to send all messages out, even if the queue is not full.
So I wonder, does every message immediately get flushed? If so that seems contradictory to the documentation.
Frame::parse()
accepts the input to be parsed as the following type: &mut Cursor<Vec<u8>>
First, the use of vector instead of slice is confusing. A slice should be sufficient since the function is not expected to append anything to the input. Unless it has an undocumented side effect, this should be a slice. The use of vector actually creates a superfluous copy in something I'm currently writing.
Second, the use of &mut Cursor
is not obvious to me. If the Cursor is there to mark some bytes as already processed, then the function should just accept &mut [u8]
and reduce the length of the input slice, like write!
macro from the standard library does.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.