Git Product home page Git Product logo

dust-dds's People

Contributors

jrebelo avatar martinetp avatar stkimmer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

dust-dds's Issues

DurationKind incompatible with json, ron deserialization

I'm trying to save/load some QoS parameters in a text file for configuration purposes. I noticed that DurationKind has a custom Serialize/Deserialize impl that is causing issues when trying to deserialize it from a JSON or RON strings. Here's what I mean:

use dust_dds::infrastructure::time::{Duration, DurationKind};

#[derive(serde::Serialize, serde::Deserialize, Debug)]
enum AltDurationKind {
    Finite(Duration),
    Infinite,
}

#[test]
fn duration_kind_serde() -> anyhow::Result<()> {
    let duration_kind = DurationKind::Finite(Duration::new(0, 0));
    let alt_duration_kind = AltDurationKind::Finite(Duration::new(0, 0));

    let serialized = bincode::serialize(&duration_kind)?;
    println!("bincode serialized normal: {serialized:?}");
    let result = bincode::deserialize::<DurationKind>(&serialized);
    println!("deserialized: {result:?}");

    let serialized = bincode::serialize(&alt_duration_kind)?;
    println!("bincode serialized alt: {serialized:?}");
    let result = bincode::deserialize::<AltDurationKind>(&serialized);
    println!("deserialized: {result:?}");

    let serialized = serde_json::to_string(&duration_kind)?;
    println!("json serialized normal: {serialized:?}");
    let result = serde_json::from_str::<DurationKind>(&serialized);
    println!("deserialized: {result:?}");

    let serialized = serde_json::to_string(&alt_duration_kind)?;
    println!("json serialized alt: {serialized:?}");
    let result = serde_json::from_str::<AltDurationKind>(&serialized);
    println!("deserialized: {result:?}");

    let serialized = ron::to_string(&duration_kind)?;
    println!("ron serialized normal: {serialized:?}");
    let result = ron::from_str::<DurationKind>(&serialized);
    println!("deserialized: {result:?}");

    let serialized = ron::to_string(&alt_duration_kind)?;
    println!("ron serialized alt: {serialized:?}");
    let result = ron::from_str::<AltDurationKind>(&serialized);
    println!("deserialized: {result:?}");

    Ok(())
}

Output:

bincode serialized normal: [0, 0, 0, 0, 0, 0, 0, 0]
deserialized: Ok(Finite(Duration { sec: 0, nanosec: 0 }))
bincode serialized alt: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
deserialized: Ok(Finite(Duration { sec: 0, nanosec: 0 }))
json serialized normal: "{\"sec\":0,\"nanosec\":0}"
deserialized: Err(Error("invalid type: map, expected DurationKind", line: 1, column: 0))
json serialized alt: "{\"Finite\":{\"sec\":0,\"nanosec\":0}}"
deserialized: Ok(Finite(Duration { sec: 0, nanosec: 0 }))
ron serialized normal: "(sec:0,nanosec:0)"
deserialized: Err(SpannedError { code: ExpectedInteger, position: Position { line: 1, col: 2 } })
ron serialized alt: "Finite((sec:0,nanosec:0))"
deserialized: Ok(Finite(Duration { sec: 0, nanosec: 0 }))

I don't understand serde well enough to say how you could make DurationKind serialize/deserialize properly to/from in all cases. I think it has something to do with how serde handles enums.

The only suggestion I have is to replace DurationKind with just Duration and use a constant for infinite duration instead, like RustDDS does, but I realize this might seem less idiomatic than an enum.

EDIT: I've just noticed that ReliabilityQosPolicyKind (and possibly others?) has the same problem when deserializing with serde_json, but ron seems to work OK.

failed to resolve: could not find `implementation` in the crate root When use actor_interface

dust_dds_derive v0.8.2.
https://docs.rs/dust_dds_derive/0.8.2/dust_dds_derive/attr.actor_interface.html

use dust_dds::{
    dds_async::{topic::TopicAsync, topic_listener::TopicListenerAsync},
    infrastructure::status::InconsistentTopicStatus,
};
use dust_dds_derive::actor_interface;
pub struct TopicListenerActor {
    listener: Option<Box<dyn TopicListenerAsync + Send>>,
}

#[actor_interface]
impl TopicListenerActor {
    async fn on_inconsistent_topic(
        &mut self,
        the_topic: TopicAsync,
        status: InconsistentTopicStatus,
    ) {
        if let Some(l) = &mut self.listener {
            l.on_inconsistent_topic(the_topic, status).await
        }
    }
}

fn main() {
    println!("Hello, world!");
}
$ cargo run
   Compiling dust_dds_examples v0.1.0 
error[E0433]: failed to resolve: could not find `implementation` in the crate root
  --> src/main.rs:10:1
   |
10 | #[actor_interface]
   | ^^^^^^^^^^^^^^^^^^ could not find `implementation` in the crate root
   |
   = note: this error originates in the attribute macro `actor_interface` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0433`.
error: could not compile `dust_dds_examples` (bin "dust_dds_examples") due to previous error

Remove Sync trait bound for create_datareader, AnyDataReaderListener impl

I needed to create a DataReader with a listener that holds a Box<dyn SerialPort> as returned from this function (SerialPort implies Send but not Sync). At first I couldn't because the create_datareader function requires the listener trait object to be Sync. Then I tried deleting the Sync bound in my local patch of dust_dds, which required also removing it from this AnyDataReaderListener impl. Then dust_dds and my project compiled and ran fine with the SerialPort listener.

Is there a good reason for this bound or could it be removed?

If it could be removed in this case, there may be other similar cases in the code base that it could be removed as well.

Cannot start a runtime from within a runtime

dustdds not working in tokio main:

use common::dds_message::aos::transport::cyclone::DDSMessage;
use dust_dds::{
    domain::domain_participant_factory::DomainParticipantFactory,
    infrastructure::{listeners::NoOpListener, qos::QosKind, status::NO_STATUS},
};
#[tokio::main]
async fn main() {
    let domain_id = 0;
    let participant_factory = DomainParticipantFactory::get_instance();

    let participant = participant_factory
        .create_participant(domain_id, QosKind::Default, NoOpListener::new(), NO_STATUS)
        .unwrap();
}

Option Serialization Support

I'm inquiring about the possibility of adding serialization support for Option types?
Are there any existing mechanisms for working with Option types in DDS messages as of now?

Better configuration

In general, it is the configuration of DustDDS does not scale and is hard to work with. I'm opening this issue to discuss possible alternatives.

Inconsistent documentation on interfaces:
The docs mention that you can pass an array of interface names as configuration option: [string, null].
However, the implementation only accepts one String value xor null.

Env variables does not scale well
Environment variables are only ok for small number of config options and get cumbersome to use with great number of values.

Possible solutions:

  • RTI Connext uses XML
  • Eclipse CycloneDDS also uses XML

I think a TOML based configuration file would fit well into the rustacean philosophy. Then the configuration file path could be provided like DUST_DDS_CONFIGURATION=/etc/dds/config.toml.
I'd be glad to hear the opinion of the maintainers! ๐Ÿ™‚

Comparison to RustDDS

Hi! This looks like a really cool project! Thanks for sharing it!

I did notice that there's another Rust implementation of DDS, RustDDS and I'm trying to decide which one to use, so I'd like to suggest that you clarify the important technical and non-technical differences between these projects so that developers like me know how to choose. I've seen some projects post an FAQ right on their main README for visibility to newcomers.

Specifically, my questions right now are:

  • What is motivating this development effort separate from RustDDS? Why not just contribute to a that project, or a fork?
  • What are S2E Systems long term support intentions for this project?
  • What is the currently implemented set of DDS features, what's not implemented, and what's planned to be implemented.

I also noticed that y'all haven't had any issues yet, so I thought I'd kick things off for you ๐Ÿ˜„

Receive messages from other topics when communicate with cyclonedds

Based on dust_subsciber and cyclonedds_publisher

When I create two readers with different topic:

use dust_dds::{
    domain::domain_participant::DomainParticipant,
    domain::domain_participant_factory::DomainParticipantFactory,
    infrastructure::{
        listeners::NoOpListener,
        qos::{DataReaderQos, QosKind},
        qos_policy::{
            DurabilityQosPolicy, DurabilityQosPolicyKind, ReliabilityQosPolicy,
            ReliabilityQosPolicyKind,
        },
        status::{StatusKind, NO_STATUS},
        time::{Duration, DurationKind},
        wait_set::{Condition, WaitSet},
    },
    subscription::sample_info::{ANY_INSTANCE_STATE, ANY_SAMPLE_STATE, ANY_VIEW_STATE},
    subscription::subscriber::Subscriber,
};

mod hello_world {
    include!("target/idl/hello_world.rs");
}

fn create_reader_task(
    topic_name: String,
    participant: &DomainParticipant,
    subscriber: &Subscriber,
) {
    println!("Begin to create reader task, topic[{topic_name}]");
    let topic = participant
        .find_topic::<hello_world::HelloWorldType>(&topic_name, Duration::new(120, 0))
        .unwrap();

    let reader_qos = DataReaderQos {
        reliability: ReliabilityQosPolicy {
            kind: ReliabilityQosPolicyKind::Reliable,
            max_blocking_time: DurationKind::Finite(Duration::new(1, 0)),
        },
        durability: DurabilityQosPolicy {
            kind: DurabilityQosPolicyKind::TransientLocal,
        },
        ..Default::default()
    };
    let reader = subscriber
        .create_datareader::<hello_world::HelloWorldType>(
            &topic,
            QosKind::Specific(reader_qos),
            NoOpListener::new(),
            NO_STATUS,
        )
        .unwrap();
    let topic_name_clone = topic_name.to_string();
    std::thread::spawn(move || {
        loop {
            let sample_result = reader.take_next_sample();
            if let Err(error) = sample_result {
                std::thread::sleep(std::time::Duration::from_millis(100));
                continue;
            }
            let message = sample_result.unwrap().data().unwrap();
            println!(
                "[{}] receive message, message[{:?}]",
                topic_name_clone, message
            );
            if topic_name_clone != message.msg {
                println!(
                    "========Topic confustion, reader_topic[{}] message_topic[{}]",
                    topic_name_clone, message.msg
                );
            }
            std::thread::sleep(std::time::Duration::from_millis(100));
        } // loop
    });
    println!("Succ to create reader task, topic[{topic_name}]");
}

fn main() {
    let domain_id = 0;
    let participant_factory = DomainParticipantFactory::get_instance();

    let participant = participant_factory
        .create_participant(domain_id, QosKind::Default, NoOpListener::new(), NO_STATUS)
        .unwrap();

    let subscriber = participant
        .create_subscriber(QosKind::Default, NoOpListener::new(), NO_STATUS)
        .unwrap();
    let topics = vec!["t1", "t2"];
    for topic_name in topics {
        create_reader_task(topic_name.to_string(), &participant, &subscriber);
    }

    // Sleep to allow sending acknowledgements
    loop {
        std::thread::sleep(std::time::Duration::from_secs(2));
    }
}

And add two writers:

#include "ddsc/dds.h"
#include "HelloWorld.h"

int main(int argc, char *argv[])
{
	printf("Begin to create participant\n");
	const dds_entity_t participant = dds_create_participant(0, NULL /*qos*/, NULL /*listener*/);
	if (participant < 0)
	{
		DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
	}

	// topic : t1
	printf("Begin to create writer t1\n");
	const char *topic_name = "t1";
	const dds_entity_t topic = dds_create_topic(participant, &HelloWorldType_desc, topic_name, NULL /*qos*/, NULL /*listener*/);
	if (topic < 0)
	{
		DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
	}
	dds_qos_t *qos = dds_create_qos();
	dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
	dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);

	const dds_entity_t data_writer = dds_create_writer(participant, topic, qos, NULL /*listener*/);
	if (data_writer < 0)
	{
		DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-data_writer));
	}
	// topic : t2
	printf("Begin to create writer t2\n");
	const char *topic_name_2 = "t2";
	const dds_entity_t topic_2 = dds_create_topic(participant, &HelloWorldType_desc, topic_name_2, NULL /*qos*/, NULL /*listener*/);
	if (topic_2 < 0)
	{
		DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
	}
	dds_qos_t *qos_2 = dds_create_qos();
	dds_qset_reliability(qos_2, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
	dds_qset_durability(qos_2, DDS_DURABILITY_TRANSIENT_LOCAL);

	const dds_entity_t data_writer_2 = dds_create_writer(participant, topic_2, qos_2, NULL /*listener*/);
	if (data_writer_2 < 0)
	{
		DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-data_writer_2));
	}
	uint32_t id = 0;
	printf("Begin to write msg\n");
	while (true)
	{
		HelloWorldType msg_1 = {id, "t1"};
		printf("Succ to write msg, id[%d], msg[%s]\n", msg_1.id, msg_1.msg);
		dds_write(data_writer, &msg_1);
		HelloWorldType msg_2 = {id, "t2"};
		printf("Succ to write msg, id[%d], msg[%s]\n", msg_2.id, msg_2.msg);
		dds_write(data_writer_2, &msg_2);
		id++;
		sleep(1);
	}
}

HelloWorld.idl:

struct HelloWorldType
{
	unsigned long id;
	string msg;
};

The message received by the reader will be confused:

dust_dds_reader:

    Finished dev [unoptimized + debuginfo] target(s) in 2.59s
     Running `target/debug/dust_dds_subscriber`
Begin to create reader task, topic[t1]
Succ to create reader task, topic[t1]
Begin to create reader task, topic[t2]
Succ to create reader task, topic[t2]
[t2] receive message, message[HelloWorldType { id: 28, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 29, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 29, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 30, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 30, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 31, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 31, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 32, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 32, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 33, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 33, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 34, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 34, msg: "t2" }]
[t1] receive message, message[HelloWorldType { id: 35, msg: "t2" }]
========Topic confustion, reader_topic[t1] message_topic[t2]
[t2] receive message, message[HelloWorldType { id: 35, msg: "t2" }]

cyclonedds_writer:

Begin to create participant
Begin to create writer t1
Begin to create writer t2
Begin to write msg
Succ to write msg, id[0], msg[t1]
Succ to write msg, id[0], msg[t2]
Succ to write msg, id[1], msg[t1]
Succ to write msg, id[1], msg[t2]
Succ to write msg, id[2], msg[t1]
Succ to write msg, id[2], msg[t2]
Succ to write msg, id[3], msg[t1]
Succ to write msg, id[3], msg[t2]
Succ to write msg, id[4], msg[t1]
Succ to write msg, id[4], msg[t2]
Succ to write msg, id[5], msg[t1]
Succ to write msg, id[5], msg[t2]
Succ to write msg, id[6], msg[t1]
Succ to write msg, id[6], msg[t2]
Succ to write msg, id[7], msg[t1]
Succ to write msg, id[7], msg[t2]
Succ to write msg, id[8], msg[t1]
....
  • If I only use cyclonedds or rust_dds for writers and readers, everything is ok.
  • The same works if I create two participants.

Inconsistent Behavior with DataWriterQos "HistoryQosPolicyKind::KeepLast" in a Publisher-Subscriber Scenario

I have encountered a puzzling issue while working with a data writer configured with a specific Quality of Service (QoS) policy.
Here's my QoS configuration:

let writer_qos = DataWriterQos {
    reliability: ReliabilityQosPolicy {
        kind: ReliabilityQosPolicyKind::Reliable,
        max_blocking_time: DurationKind::Finite(Duration::new(1, 0)),
    },
    durability: DurabilityQosPolicy {
        kind: DurabilityQosPolicyKind::TransientLocal,
    },
    history: HistoryQosPolicy {
        kind: HistoryQosPolicyKind::KeepLast(1),
    },
    ..Default::default()
};

My setup involves a data writer that writes data 10 times, and then a data reader is initiated with the same QoS settings. Surprisingly, I observe the following behavior:

When the HistoryQosPolicyKind is set to KeepLast(1), I receive the first and the last message.
If I modify the HistoryQosPolicy to KeepLast(2), I receive the first, the second, and the last messages.

I expected to only receive the last message when the HistoryQosPolicyKind is set to KeepLast(1). Why am I receiving both the first and last messages in this case?
Furthermore, when I set the HistoryQosPolicyKind to KeepLast(2), I anticipate receiving only the last two messages, not the first, second, and last messages. Is this behavior in line with the expected outcome?

Additional Context:
I'm using the hello_world_publisher with the same message ID for all the messages, only changing the message content during each write operation.

Interoperability with FastDDS

I'm trying to test interoperability with FastDDS on Windows 11. You already test interop with CycloneDDS, right?

My test so far goes like this:

  1. Download the FastDDS Shapes Demo app.
    • This requires filling out a form, but is pretty easy.
  2. Run the contained bin/ShapesDemo.exe.
  3. In the GUI, click "Publish" and accept the default settings to create a red square publisher.
  4. Run the test programs below.

My Dust DDS test program is as follows, using version 0.4.0:

use dust_dds::{
    domain::domain_participant_factory::DomainParticipantFactory,
    infrastructure::{error::DdsResult, qos::QosKind},
};
use std::{thread, time::Duration};

fn main() -> DdsResult<()> {
    let participant = DomainParticipantFactory::get_instance().create_participant(
        0,
        QosKind::Default,
        None,
        &[],
    )?;
    loop {
        println!(
            "{:?}",
            participant
                .get_discovered_topics()?
                .into_iter()
                .map(|topic_handle| participant.get_discovered_topic_data(topic_handle).unwrap())
                .collect::<Vec<_>>()
        );
        thread::sleep(Duration::from_secs(1));
    }
}

The result is empty:

[]
[]
...

For comparison, I tried the same thing using RustDDS:

use std::{thread, time::Duration};
use anyhow::Result;
use rustdds::DomainParticipant;

fn main() -> Result<()> {
    let participant = DomainParticipant::new(0)?;
    loop {
        println!("{:?}", participant.discovered_topics());
        thread::sleep(Duration::from_secs(1));
    }
}

and I get the following output:

[]
[]
[]
[DiscoveredTopicData { updated_time: 2023-09-19T20:28:12.684058600Z, topic_data: TopicBuiltinTopicData { key: None, name: "Square", type_name: "ShapeType", durability: Some(Volatile), deadline: Some(Deadline(Duration { seconds: 2147483647, fraction: 4294967295 })), latency_budget: Some(LatencyBudget { duration: Duration { seconds: 0, fraction: 0 } }), liveliness: Some(Automatic { lease_duration: Duration { seconds: 2147483647, fraction: 4294967295 } }), reliability: Some(Reliable { max_blocking_time: Duration { seconds: 0, fraction: 429496730 } }), lifespan: Some(Lifespan { duration: Duration { seconds: 2147483647, fraction: 4294967295 } }), destination_order: Some(ByReceptionTimestamp), presentation: Some(Presentation { access_scope: Instance, coherent_access: false, ordered_access: false }), history: None, resource_limits: None, ownership: Some(Shared) } }]
...

So I'm wondering if there's an incompatibility or if I'm just using the Dust DDS API wrong?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.