Getting data into datafusion
is not well documented, especially using serde_json
or serde
data.
This example shows how to convert a serde_json::Value::Array
into a datafusion
DataFrame
, manipulate the dataframe
in datafusion
, then convert it back to serde_json
.
# Cargo.toml
datafusion = "47.0.0"
serde_arrow = { version = "0.13.3", features = ["arrow-55"] }
// `serde_json::Value`
let json = serde_json::json!([{
"date": "2025-06-05",
"test": "test",
"price": 1.01,
}]);
let ctx = SessionContext::new();
let serde_json::Value::Array(json_array) = &json else {
return Err(anyhow::anyhow!("Expected JSON array, got different type"));
};
if json_array.is_empty() {
return Ok(Vec::new());
}
// Configure `TracingOptions` to allow null fields and coerce numbers
let tracing_options = TracingOptions::default()
.allow_null_fields(true)
.coerce_numbers(true);
// Get the schema from actual data, using samples, with `TracingOptions`
let fields = Vec::<FieldRef>::from_samples(json_array, tracing_options)?;
// Convert `serde_json::Value::Array` to `RecordBatch` using `serde_arrow`
let record_batch = serde_arrow::to_record_batch(&fields, &json_array)?;
// Create a DataFrame from the `RecordBatch`
let mut df = ctx.read_batch(record_batch)?;
// Add a new column `new_col` using DataFrame API
df = df.with_column("new_col", lit("test".to_string()))?;
// Execute the DataFrame query
let result_batches = df.collect().await?;
// Convert back to `serde_json` using `serde_arrow`
let all_json_values = result_batches
.into_iter()
.flat_map(|batch| {
serde_arrow::from_record_batch(&batch).unwrap_or_else(|_| Vec::new())
})
.collect::<Vec<serde_json::Value>>();
#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct TestData {
date: String,
test: String,
price: f64,
new_col: String,
}
// Convert the `serde_json::Value` to Vec<TestData>
let test_data: Vec<TestData> =
serde_json::from_value(serde_json::Value::Array(all_json_values))?;
assert_eq!(
test_data,
Vec![
TestData {
date: "2025-06-05".to_string(),
test: "test".to_string(),
price: 1.01,
new_col: "test".to_string(),
},
]
);
Or you use can use this datafusion_ext
// src/utils/datafusion_ext.rs
use anyhow::Error;
use datafusion::{arrow::datatypes::FieldRef, dataframe::DataFrame, prelude::*};
use serde_arrow::schema::{SchemaLike, TracingOptions};
pub trait JsonValueExt {
/// Converts a `serde_json::Value::Array` into a `datafusion::dataframe`
fn to_df(&self) -> Result<DataFrame, Error>;
}
impl JsonValueExt for serde_json::Value {
fn to_df(&self) -> Result<DataFrame, Error> {
let ctx = SessionContext::new();
let Self::Array(json_array) = self else {
return Err(anyhow::anyhow!(
"Expected `serde_json::Value::Array`, got different type"
));
};
if json_array.is_empty() {
return Err(anyhow::anyhow!("Empty `serde_json::Value::Array` provided"));
}
let tracing_options = TracingOptions::default()
.allow_null_fields(true)
.coerce_numbers(true);
let fields = Vec::<FieldRef>::from_samples(json_array, tracing_options)?;
let record_batch = serde_arrow::to_record_batch(&fields, &json_array)?;
let df = ctx.read_batch(record_batch)?;
Ok(df)
}
}
#[async_trait::async_trait]
pub trait DataFrameExt {
/// Collects a `datafusion::dataframe` and deserializes it to a Vec of the
/// specified type
async fn to_vec<T>(&self) -> Result<Vec<T>, Error>
where
T: serde::de::DeserializeOwned;
}
#[async_trait::async_trait]
impl DataFrameExt for DataFrame {
async fn to_vec<T>(&self) -> Result<Vec<T>, Error>
where
T: serde::de::DeserializeOwned,
{
let result_batches = self.clone().collect().await?;
let all_json_values = result_batches
.into_iter()
.flat_map(|batch| serde_arrow::from_record_batch(&batch).unwrap_or_else(|_| Vec::new()))
.collect::<Vec<serde_json::Value>>();
let typed_result: Vec<T> =
serde_json::from_value(serde_json::Value::Array(all_json_values))?;
Ok(typed_result)
}
}
use utils::datafusion_ext::{DataFrameExt, JsonValueExt};
let json = serde_json::json!([{
"date": "2025-06-05",
"test": "test",
"price": 1.01,
}]);
let mut df = json.to_df()?;
df = df.with_column("new_col", lit("test".to_string()))?;
#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct TestData {
date: String,
test: String,
price: f64,
new_col: String,
}
let etfs = df.to_vec::<TestData>().await?;
assert_eq!(
test_data,
Vec![
TestData {
date: "2025-06-05".to_string(),
test: "test".to_string(),
price: 1.01,
new_col: "test".to_string(),
},
]
);
2025-05-06
:: tags: #CI #cache #docker #github-actions #nixServer Install
Install docker and docker compose
Example docker-compose.yaml
services:
attic:
container_name: attic
image: ghcr.io/zhaofengli/attic:latest
command: ["-f", "/attic/server.toml"]
restart: unless-stopped
ports:
- 8080:8080
networks:
attic:
db:
volumes:
- ./server.toml:/attic/server.toml
- attic-data:/attic/storage
env_file:
- prod.env
depends_on:
db:
condition: service_healthy
healthcheck:
test:
[
"CMD-SHELL",
"wget --no-verbose --tries=1 --spider http://attic:8080 || exit 1",
]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
db:
container_name: db
image: postgres:17.2-alpine
restart: unless-stopped
ports:
- 5432:5432
networks:
db:
volumes:
- postgres-data:/var/lib/postgresql/data
env_file:
- prod.env
healthcheck:
test: ["CMD-SHELL", "pg_isready -U $${POSTGRES_USER} -d $${POSTGRES_DB}"]
interval: 10s
timeout: 5s
retries: 5
volumes:
attic-data:
postgres-data:
networks:
attic:
db:
Example server.toml
listen = "[::]:8080"
[database]
url = "postgres://attic:attic@db:5432/attic_prod"
[storage]
type = "local"
path = "/attic/storage"
[chunking]
nar-size-threshold = 65536
min-size = 16384
avg-size = 65536
max-size = 262144
[compression]
type = "zstd"
[garbage-collection]
interval = "12 hours"
Example prod.env
POSTGRES_DB=attic_prod
POSTGRES_USER=attic
POSTGRES_PASSWORD=attic
DATABASE_URL=postgres://attic:attic@localhost:5432/attic_prod
ATTIC_SERVER_TOKEN_HS256_SECRET_BASE64="<openssl rand 64 | base64 -w0>"
Exmaple Traefik Label
traefik:
# ...
command:
# ...
- "--entrypoints.websecure.transport.respondingTimeouts.readTimeout=0s"
attic:
# ...
labels:
- "traefik.enable=true"
- "traefik.http.routers.attic.rule=Host(`nix.example.com`)"
- "traefik.http.routers.attic.entrypoints=websecure"
- "traefik.http.routers.attic.tls.certresolver=myhttpchallenge"
- "traefik.http.services.attic.loadbalancer.server.port=8080"
- "traefik.http.routers.attic-http.middlewares=redirect-to-https"
- "traefik.docker.network=<network name>"
Cloudflare
If you are using cloudflare make the subdomain DNS only
Create the Token
docker compose up
docker exec -it attic sh -c 'atticadm make-token --sub "{{<your username here>}}" --validity "10y" --pull "*" --push "*" --create-cache "*" --configure-cache "*" --configure-cache-retention "*" --destroy-cache "*" --delete "*" -f "./attic/server.toml"'
Check if it works
If working nix.example.com
should say attic push
Client Install
Install pkg.attic-client
make sure your user is trusted
nix.settings = {
trusted-users = [
"root"
"<your username here>"
];
};
# then login to attic
attic login <pick a name for server> https://nix.example.com <token from just create_token>
# create a cache to push to
attic cache create <cache name>
# use the cache
attic use <cache name>
# pushing to the cache
attic push <cache name> /nix/store/*/
Github Actions Install
Add the token named from just create_token
, named ATTIC_TOKEN, to your repository secrets https://github.com/<username>/<repo>/settings/secrets/actions
steps:
- uses: actions/checkout@v3
- uses: nixbuild/nix-quick-install-action@master
with:
nix_conf: |
keep-env-derivations = true
keep-outputs = true
# For cacheing the attic package in github actions storage
- name: Restore Nix store cache
id: cache-nix-restore
uses: nix-community/cache-nix-action/restore@v6
with:
primary-key: nix-${{ runner.os }}-${{ hashFiles('**/*.nix', '**/flake.lock') }}
restore-prefixes-first-match: nix-${{ runner.os }}-
- run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client login <pick a name for server> https://nix.example.com ${{ secrets.ATTIC_TOKEN }} || true
- run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache create <cache name> || true
- run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client use <cache name> || true
# For cacheing the attic package in github actions storage
- run: nix build -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build
- name: Save Nix store cache
id: cache-nix-save
uses: nix-community/cache-nix-action/save@v6
with:
primary-key: nix-${{ runner.os }}-${{ hashFiles('**/*.nix', '**/flake.lock') }}
gc-max-store-size-linux: 2G
purge: true
purge-prefixes: nix-${{ runner.os }}-
purge-created: 0
purge-last-accessed: 0
purge-primary-key: never
# `nix-fast-build` is faster then `nix flake check` in my testing
# - run: nix flake check --all-systems
# `--attic-cache` will fail if the cache is down
# - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --attic-cache <cache name> --no-nom --skip-cached
- run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --no-nom --skip-cached
- run: |
for i in {1..10}; do
nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client push <cache name> /nix/store/*/ && break || [ $i -eq 5 ] || sleep 5
done
2025-05-06
:: tags: #benchmarks #llm #llm-benchmarks #lm-studioSize (B) | Speed (T/s) | Model | Type | Quant | Spec Dec (B) | Spec Quant |
---|
1.5 | 282 | qwen 2.5 | MLX | 4 | - | - |
1.5 | 76 | qwen 2.5 | MLX | 8 | - | - |
7 | 70 | qwen 2.5 | GUFF | Q4_K_M | - | - |
7 | 101 | qwen 2.5 | MLX | 4 | - | - |
7 | 58 | qwen 2.5 | MLX | 8 | - | - |
12 | 35 | wayfarer | GUFF | Q6_K | - | - |
12 | 65 | wayfarer | MLX | 4 | - | - |
12 | 45 | wayfarer | MLX | 6 | - | - |
12 | 36 | wayfarer | MLX | 8 | - | - |
14 | 36 | qwen 2.5 | GUFF | Q4_K_M | - | - |
14 | 52 | qwen 2.5 | MLX | 4 | - | - |
14 | 55 | qwen 2.5 | MLX | 4 | 1.5 | 4 |
14 | 30 | qwen 2.5 | MLX | 8 | - | - |
24 | 35 | mistral small 3 | MLX | 4 | - | - |
32 | 18 | qwen 2.5 | GUFF | Q4_K_M | - | - |
32 | 23 | qwen 2.5 | MLX | 4 | - | - |
32 | 30 | qwen 2.5 | MLX | 4 | 1.5 | 4 |
32 | 30 | qwen 2.5 | MLX | 4 | 1.5 | 4 |
32 | 34 | qwen 2.5 | MLX | 4 | 1.5 | 8 |
32 | 26 | qwen 2.5 r1 | MLX | 4 | 1.5 | 4 |
32 | 33 | qwen 2.5 coder | MLX | 4 | 1.5 | 4 |
32 | 31 | qwen 2.5 coder | MLX | 4 | 3 | 4 |
32 | 25 | qwq | MLX | 3 | - | - |
32 | 24 | qwq | MLX | 4 | - | - |
32 | 18 | qwq | MLX | 4 | 1.5 | 4 |
32 | 22 | qwq | MLX | 4 | 1.5 | 8 |
32 | 16 | qwq | MLX | 4 | 7 | 4 |
32 | 16 | qwq | MLX | 4 | 7 | 8 |
32 | 16 | qwq | MLX | 6 | - | - |
32 | 16 | qwq | MLX | 6 | 1.5 | 4 |
32 | 16 | qwq | MLX | 6 | 1.5 | 8 |
70 | 12 | wayfarer large | GUFF | Q2_K_S | - | - |
70 | 15 | wayfarer large | MLX | 3 | - | - |
30 - A3 | 93 | qwen 3 | MLX | 4 | - | - |
30 - A3 | 76 | qwen 3 | MLX | 4 | 1.7 | 4 |
30 - A3 | 81 | qwen 3 | MLX | 6 | - | - |
30 - A3 | 70 | qwen 3 | MLX | 6 | 1.7 | 4 |
30 - A3 | 70 | qwen 3 | MLX | 8 | - | - |
32 | 22 | qwen 3 | MLX | 4 | - | - |
32 | 26 | qwen 3 | MLX | 4 | 1.7 | 4 |
mlx convert and upload to huggingface
https://huggingface.co/docs/hub/en/mlx
https://huggingface.co/mlx-community
git clone [email protected]:NexVeridian/NexVeridian-web.git
uv venv
uv pip install huggingface_hub hf_transfer mlx_lm
uv run huggingface-cli login
just mlx_create "Qwen/QwQ-32B" "4 6 8" "/Users/elijahmcmorris/.cache/lm-studio/models" "mlx-community" "false"
# or
uv run mlx_lm.convert --hf-path Qwen/QwQ-32B -q --q-bits 4 --upload-repo mlx-community/QwQ-32B-4bit --mlx-path /Users/elijahmcmorris/.cache/lm-studio/models/mlx-community/QwQ-32B-4bit
or use https://huggingface.co/spaces/mlx-community/mlx-my-repo
LLM Settings.md
Qwen 3
Temp | Min P | Top P | Top K | Repeat P |
---|
0.6 | 0.00 | 0.95 | 20 | - |
Qwen 3 /no_think
Temp | Min P | Top P | Top K | Repeat P |
---|
0.7 | 0.00 | 0.80 | 20 | 1.5 |
QWQ
Temp | Min P | Top P | Repeat P |
---|
0.7 | 0.05 | 0.95 | - |
1.5 | 0.10 | 1.00 | - |
Prompt Template
{%- if tools %}
{{- '<|im_start|>system\n' }}
{%- if messages[0].role == 'system' %}
{{- messages[0].content + '\n\n' }}
{%- endif %}
{{- "# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>" }}
{%- for tool in tools %}
{{- "\n" }}
{{- tool | tojson }}
{%- endfor %}
{{- "\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call><|im_end|>\n" }}
{%- else %}
{%- if messages[0].role == 'system' %}
{{- '<|im_start|>system\n' + messages[0].content + '<|im_end|>\n' }}
{%- endif %}
{%- endif %}
{%- set ns = namespace(multi_step_tool=true, last_query_index=messages|length - 1) %}
{%- for message in messages[::-1] %}
{%- set index = (messages|length - 1) - loop.index0 %}
{%- set tool_start = "<tool_response>" %}
{%- set tool_start_length = tool_start|length %}
{%- set start_of_message = message.content[:tool_start_length] %}
{%- set tool_end = "</tool_response>" %}
{%- set tool_end_length = tool_end|length %}
{%- set start_pos = (message.content|length) - tool_end_length %}
{%- if start_pos < 0 %}
{%- set start_pos = 0 %}
{%- endif %}
{%- set end_of_message = message.content[start_pos:] %}
{%- if ns.multi_step_tool and message.role == "user" and not(start_of_message == tool_start and end_of_message == tool_end) %}
{%- set ns.multi_step_tool = false %}
{%- set ns.last_query_index = index %}
{%- endif %}
{%- endfor %}
{%- for message in messages %}
{%- if (message.role == "user") or (message.role == "system" and not loop.first) %}
{{- '<|im_start|>' + message.role + '\n' + message.content + '<|im_end|>' + '\n' }}
{%- elif message.role == "assistant" %}
{%- set content = message.content %}
{%- set reasoning_content = '' %}
{%- if message.reasoning_content is defined and message.reasoning_content is not none %}
{%- set reasoning_content = message.reasoning_content %}
{%- else %}
{%- if '</think>' in message.content %}
{%- set content = (message.content.split('</think>')|last).lstrip('\n') %}
{%- set reasoning_content = (message.content.split('</think>')|first).rstrip('\n') %}
{%- set reasoning_content = (reasoning_content.split('<think>')|last).lstrip('\n') %}
{%- endif %}
{%- endif %}
{%- if loop.index0 > ns.last_query_index %}
{%- if loop.last or (not loop.last and reasoning_content) %}
{{- '<|im_start|>' + message.role + '\n<think>\n' + reasoning_content.strip('\n') + '\n</think>\n\n' + content.lstrip('\n') }}
{%- else %}
{{- '<|im_start|>' + message.role + '\n' + content }}
{%- endif %}
{%- else %}
{{- '<|im_start|>' + message.role + '\n' + content }}
{%- endif %}
{%- if message.tool_calls %}
{%- for tool_call in message.tool_calls %}
{%- if (loop.first and content) or (not loop.first) %}
{{- '\n' }}
{%- endif %}
{%- if tool_call.function %}
{%- set tool_call = tool_call.function %}
{%- endif %}
{{- '<tool_call>\n{"name": "' }}
{{- tool_call.name }}
{{- '", "arguments": ' }}
{%- if tool_call.arguments is string %}
{{- tool_call.arguments }}
{%- else %}
{{- tool_call.arguments | tojson }}
{%- endif %}
{{- '}\n</tool_call>' }}
{%- endfor %}
{%- endif %}
{{- '<|im_end|>\n' }}
{%- elif message.role == "tool" %}
{%- if loop.first or (messages[loop.index0 - 1].role != "tool") %}
{{- '<|im_start|>user' }}
{%- endif %}
{{- '\n<tool_response>\n' }}
{{- message.content }}
{{- '\n</tool_response>' }}
{%- if loop.last or (messages[loop.index0 + 1].role != "tool") %}
{{- '<|im_end|>\n' }}
{%- endif %}
{%- endif %}
{%- endfor %}
{%- if add_generation_prompt %}
{{- '<|im_start|>assistant\n' }}
{%- if enable_thinking is defined and enable_thinking is false %}
{{- '<think>\n\n</think>\n\n' }}
{%- endif %}
{%- endif %}