Getting data into datafusion
is not well documented, especially using serde_json
or serde
data.
This example shows how to convert a serde_json::Value::Array
into a datafusion
DataFrame
, manipulate the dataframe
in datafusion
, then convert it back to serde_json
.
# Cargo.toml
datafusion = "47.0.0"
serde_arrow = { version = "0.13.3", features = ["arrow-55"] }
// `serde_json::Value`
let json = serde_json::json!([{
"date": "2025-06-05",
"test": "test",
"price": 1.01,
}]);
let ctx = SessionContext::new();
let serde_json::Value::Array(json_array) = &json else {
return Err(anyhow::anyhow!("Expected JSON array, got different type"));
};
if json_array.is_empty() {
return Ok(Vec::new());
}
// Configure `TracingOptions` to allow null fields and coerce numbers
let tracing_options = TracingOptions::default()
.allow_null_fields(true)
.coerce_numbers(true);
// Get the schema from actual data, using samples, with `TracingOptions`
let fields = Vec::<FieldRef>::from_samples(json_array, tracing_options)?;
// Convert `serde_json::Value::Array` to `RecordBatch` using `serde_arrow`
let record_batch = serde_arrow::to_record_batch(&fields, &json_array)?;
// Create a DataFrame from the `RecordBatch`
let mut df = ctx.read_batch(record_batch)?;
// Add a new column `new_col` using DataFrame API
df = df.with_column("new_col", lit("test".to_string()))?;
// Execute the DataFrame query
let result_batches = df.collect().await?;
// Convert back to `serde_json` using `serde_arrow`
let all_json_values = result_batches
.into_iter()
.flat_map(|batch| {
serde_arrow::from_record_batch(&batch).unwrap_or_else(|_| Vec::new())
})
.collect::<Vec<serde_json::Value>>();
#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct TestData {
date: String,
test: String,
price: f64,
new_col: String,
}
// Convert the `serde_json::Value` to Vec<TestData>
let test_data: Vec<TestData> =
serde_json::from_value(serde_json::Value::Array(all_json_values))?;
assert_eq!(
test_data,
Vec![
TestData {
date: "2025-06-05".to_string(),
test: "test".to_string(),
price: 1.01,
new_col: "test".to_string(),
},
]
);