Git Product home page Git Product logo

Comments (7)

Lordworms avatar Lordworms commented on August 16, 2024 1

starts to work on this one, was delayed by the substrait one.

from datafusion.

Lordworms avatar Lordworms commented on August 16, 2024

take

from datafusion.

Lordworms avatar Lordworms commented on August 16, 2024

When I was doing this one, I added a test like

#[tokio::test]
async fn roundtrip_physical_plan_copy_to_sql_options() -> Result<()> {
    // Create a new SessionContext
    let ctx = SessionContext::new();
    let file_type = format_as_file_type(Arc::new(CsvFormatFactory::new()));
    // Create a CSV scan as input
    let input = create_csv_scan(&ctx).await?;
    let plan = LogicalPlan::Copy(CopyTo {
        input: Arc::new(input),
        output_url: "test.csv".to_string(),
        partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
        file_type,
        options: Default::default(),
    });

    // Convert the logical plan to a physical plan
    let planner = DefaultPhysicalPlanner::default();
    let physical_plan = planner.create_physical_plan(&plan, &ctx.state()).await?;
    roundtrip_test(physical_plan)
}

I find that it seems like the roundtrip in physical plan test should be ok since the fileformatfactory is translated into Fileformat automaticly here

LogicalPlan::Copy(CopyTo {
                input,
                output_url,
                file_type,
                partition_by,
                options: source_option_tuples,
            }) => {
                let input_exec = children.one()?;
                let parsed_url = ListingTableUrl::parse(output_url)?;
                let object_store_url = parsed_url.object_store();

                let schema: Schema = (**input.schema()).clone().into();

                // Note: the DataType passed here is ignored for the purposes of writing and inferred instead
                // from the schema of the RecordBatch being written. This allows COPY statements to specify only
                // the column name rather than column name + explicit data type.
                let table_partition_cols = partition_by
                    .iter()
                    .map(|s| (s.to_string(), arrow_schema::DataType::Null))
                    .collect::<Vec<_>>();

                let keep_partition_by_columns = match source_option_tuples
                    .get("execution.keep_partition_by_columns")
                    .map(|v| v.trim()) {
                    None => session_state.config().options().execution.keep_partition_by_columns,
                    Some("true") => true,
                    Some("false") => false,
                    Some(value) =>
                        return Err(DataFusionError::Configuration(format!("provided value for 'execution.keep_partition_by_columns' was not recognized: \"{}\"", value))),
                };

                // Set file sink related options
                let config = FileSinkConfig {
                    object_store_url,
                    table_paths: vec![parsed_url],
                    file_groups: vec![],
                    output_schema: Arc::new(schema),
                    table_partition_cols,
                    overwrite: false,
                    keep_partition_by_columns,
                };

                let sink_format = file_type_to_format(file_type)?
                    .create(session_state, source_option_tuples)?;

                sink_format
                    .create_writer_physical_plan(input_exec, session_state, config, None)
                    .await?
            }

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

from datafusion.

alamb avatar alamb commented on August 16, 2024

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

from datafusion.

devinjdangelo avatar devinjdangelo commented on August 16, 2024

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

from datafusion.

Lordworms avatar Lordworms commented on August 16, 2024

So it causes my confusion here how to actually add a test here, appreciate your guidence @alamb

Do you mean that the test you wrote above passes without any code changes? Maybe @devinjdangelo has some idea

yes, that's why I am confused since it is transformed internally

from datafusion.

Lordworms avatar Lordworms commented on August 16, 2024

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

let me take a look and

The format factories can optionally carry modified default write options. This is only used for DataFrame::write_* methods, so that they do not need to pass strings to modify write behavior.

pub struct CsvFormatFactory {
options: Option<CsvOptions>,
}

The existing SerDe implementation does not handle the case when the format factory carries options itself. This will prevent systems like Ballista from correctly passing around options in its DataFrame::write_* methods. Here are the lines that are not fully implemented:

fn try_decode_file_format(
&self,
__buf: &[u8],
__ctx: &SessionContext,
) -> datafusion_common::Result<Arc<dyn FileFormatFactory>> {
Ok(Arc::new(CsvFormatFactory::new()))
}
fn try_encode_file_format(
&self,
__buf: &[u8],
__node: Arc<dyn FileFormatFactory>,
) -> datafusion_common::Result<()> {
Ok(())
}
}

You can see that the encoding method does not actually write any bytes, and the decoding method just initializes a default struct, ignoring any written bytes.

sure, I'll take a look then

from datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.