Blog

Using `serde_json` or `serde` data, in `datafusion`

Getting data into datafusion is not well documented, especially using serde_json or serde data.

This example shows how to convert a serde_json::Value::Array into a datafusion DataFrame, manipulate the dataframe in datafusion, then convert it back to serde_json.

# Cargo.toml
datafusion = "47.0.0"
serde_arrow = { version = "0.13.3", features = ["arrow-55"] }
// `serde_json::Value`
let json = serde_json::json!([{
    "date": "2025-06-05",
    "test": "test",
    "price": 1.01,
}]);

let ctx = SessionContext::new();

let serde_json::Value::Array(json_array) = &json else {
    return Err(anyhow::anyhow!("Expected JSON array, got different type"));
};

if json_array.is_empty() {
    return Ok(Vec::new());
}

// Configure `TracingOptions` to allow null fields and coerce numbers
let tracing_options = TracingOptions::default()
    .allow_null_fields(true)
    .coerce_numbers(true);

// Get the schema from actual data, using samples, with `TracingOptions`
let fields = Vec::<FieldRef>::from_samples(json_array, tracing_options)?;

// Convert `serde_json::Value::Array` to `RecordBatch` using `serde_arrow`
let record_batch = serde_arrow::to_record_batch(&fields, &json_array)?;

// Create a DataFrame from the `RecordBatch`
let mut df = ctx.read_batch(record_batch)?;

// Add a new column `new_col` using DataFrame API
df = df.with_column("new_col", lit("test".to_string()))?;

// Execute the DataFrame query
let result_batches = df.collect().await?;

// Convert back to `serde_json` using `serde_arrow`
let all_json_values = result_batches
    .into_iter()
    .flat_map(|batch| {
        serde_arrow::from_record_batch(&batch).unwrap_or_else(|_| Vec::new())
    })
    .collect::<Vec<serde_json::Value>>();

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct TestData {
    date: String,
    test: String,
    price: f64,
    new_col: String,
}

// Convert the `serde_json::Value` to Vec<TestData>
let test_data: Vec<TestData> =
    serde_json::from_value(serde_json::Value::Array(all_json_values))?;

assert_eq!(
    test_data,
    Vec![
        TestData {
            date: "2025-06-05".to_string(),
            test: "test".to_string(),
            price: 1.01,
            new_col: "test".to_string(),
        },
    ]
);

Or you use can use this datafusion_ext

// src/utils/datafusion_ext.rs
use anyhow::Error;
use datafusion::{arrow::datatypes::FieldRef, dataframe::DataFrame, prelude::*};
use serde_arrow::schema::{SchemaLike, TracingOptions};

pub trait JsonValueExt {
    /// Converts a `serde_json::Value::Array` into a `datafusion::dataframe`
    fn to_df(&self) -> Result<DataFrame, Error>;
}

impl JsonValueExt for serde_json::Value {
    fn to_df(&self) -> Result<DataFrame, Error> {
        let ctx = SessionContext::new();

        let Self::Array(json_array) = self else {
            return Err(anyhow::anyhow!(
                "Expected `serde_json::Value::Array`, got different type"
            ));
        };

        if json_array.is_empty() {
            return Err(anyhow::anyhow!("Empty `serde_json::Value::Array` provided"));
        }

        let tracing_options = TracingOptions::default()
            .allow_null_fields(true)
            .coerce_numbers(true);

        let fields = Vec::<FieldRef>::from_samples(json_array, tracing_options)?;
        let record_batch = serde_arrow::to_record_batch(&fields, &json_array)?;

        let df = ctx.read_batch(record_batch)?;

        Ok(df)
    }
}

#[async_trait::async_trait]
pub trait DataFrameExt {
    /// Collects a `datafusion::dataframe` and deserializes it to a Vec of the
    /// specified type
    async fn to_vec<T>(&self) -> Result<Vec<T>, Error>
    where
        T: serde::de::DeserializeOwned;
}

#[async_trait::async_trait]
impl DataFrameExt for DataFrame {
    async fn to_vec<T>(&self) -> Result<Vec<T>, Error>
    where
        T: serde::de::DeserializeOwned,
    {
        let result_batches = self.clone().collect().await?;

        let all_json_values = result_batches
            .into_iter()
            .flat_map(|batch| serde_arrow::from_record_batch(&batch).unwrap_or_else(|_| Vec::new()))
            .collect::<Vec<serde_json::Value>>();

        let typed_result: Vec<T> =
            serde_json::from_value(serde_json::Value::Array(all_json_values))?;

        Ok(typed_result)
    }
}
use utils::datafusion_ext::{DataFrameExt, JsonValueExt};

let json = serde_json::json!([{
    "date": "2025-06-05",
    "test": "test",
    "price": 1.01,
}]);

let mut df = json.to_df()?;

df = df.with_column("new_col", lit("test".to_string()))?;

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct TestData {
    date: String,
    test: String,
    price: f64,
    new_col: String,
}

let etfs = df.to_vec::<TestData>().await?;

assert_eq!(
    test_data,
    Vec![
        TestData {
            date: "2025-06-05".to_string(),
            test: "test".to_string(),
            price: 1.01,
            new_col: "test".to_string(),
        },
    ]
);

Deploying `Attic` Nix Binary Cache With Docker Compose, for local use and CI

Server Install

Install docker and docker compose

Example docker-compose.yaml

services:
  attic:
    container_name: attic
    image: ghcr.io/zhaofengli/attic:latest
    command: ["-f", "/attic/server.toml"]
    restart: unless-stopped
    ports:
      - 8080:8080
    networks:
      attic:
      pgattic:
    volumes:
      - ./server.toml:/attic/server.toml
      - attic-data:/attic/storage
    env_file:
      - prod.env
    depends_on:
      pgattic:
          condition: service_healthy
    healthcheck:
        test:
            [
                "CMD-SHELL",
                "wget --no-verbose --tries=1 --spider http://attic:8080 || exit 1",
            ]
        interval: 15s
        timeout: 10s
        retries: 10
        start_period: 15s
    deploy:
        resources:
            reservations:
                cpus: 1.0

  pgattic:
    container_name: pgattic
    image: postgres:17.6-alpine
    restart: unless-stopped
    ports:
      - 5432:5432
    networks:
      pgattic:
    volumes:
      - postgres-data:/var/lib/postgresql/data
    env_file:
      - prod.env
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U $${POSTGRES_USER} -d $${POSTGRES_DB}"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  attic-data:
  postgres-data:

networks:
  attic:
  pgattic:

Example server.toml

listen = "[::]:8080"

[database]
url = "postgres://attic:attic@pgattic:5432/attic_prod"

[storage]
type = "local"
path = "/attic/storage"

[chunking]
nar-size-threshold = 65536
min-size = 16384
avg-size = 65536
max-size = 262144

[compression]
type = "zstd"

[garbage-collection]
interval = "12 hours"

Example prod.env

POSTGRES_DB=attic_prod
POSTGRES_USER=attic
POSTGRES_PASSWORD=attic
DATABASE_URL=postgres://attic:attic@localhost:5432/attic_prod
ATTIC_SERVER_TOKEN_HS256_SECRET_BASE64="<openssl rand 64 | base64 -w0>"

Exmaple Traefik Label

traefik:
  # ...
  command:
    # ...
    - "--entrypoints.websecure.transport.respondingTimeouts.readTimeout=0s"

attic:
  # ...
  labels:
    - "traefik.enable=true"
    - "traefik.http.routers.attic.rule=Host(`nix.example.com`)"
    - "traefik.http.routers.attic.entrypoints=websecure"
    - "traefik.http.routers.attic.tls.certresolver=myhttpchallenge"

    - "traefik.http.routers.attic-http.rule=Host(`nix.example.com`)"
    - "traefik.http.routers.attic-http.entrypoints=web"
    - "traefik.http.routers.attic-http.service=attic"

    - "traefik.http.services.attic.loadbalancer.server.port=8080"
    - "traefik.docker.network=<network name>"

Cloudflare

If you are using cloudflare make the subdomain DNS only

Create the Token

docker compose up

docker exec -it attic sh -c 'atticadm make-token --sub "{{<your username here>}}" --validity "10y" --pull "*" --push "*" --create-cache "*" --configure-cache "*" --configure-cache-retention "*" --destroy-cache "*" --delete "*" -f "./attic/server.toml"'

Check if it works

If working nix.example.com should say attic push

Client Install

Install pkg.attic-client

make sure your user is trusted

nix.settings = {
  trusted-users = [
    "root"
    "<your username here>"
  ];
};
# then login to attic
attic login <pick a name for server> https://nix.example.com <token from just create_token>

# create a cache to push to
attic cache create <cache name>

# use the cache
attic use <cache name>

# pushing to the cache
attic push <cache name> /nix/store/*/

Github Actions Install

Add the token named from just create_token, named ATTIC_TOKEN, to your repository secrets https://github.com/<username>/<repo>/settings/secrets/actions

name: nix

on:
  pull_request:
    branches: [main]
  push:
  schedule:
    - cron: 0 0 * * 1

concurrency:
  group: ${{ github.workflow }}-${{ github.head_ref && github.ref || github.run_id }}
  cancel-in-progress: true

env:
  CARGO_TERM_COLOR: always

steps:
  - uses: actions/checkout@v3
  - uses: nixbuild/nix-quick-install-action@v32
    with:
      nix_conf: |
        keep-env-derivations = true
        keep-outputs = true

  # For caching the attic package in github actions storage
  - name: Restore Nix store cache
    id: cache-nix-restore
    uses: nix-community/cache-nix-action/restore@v6
    with:
      primary-key: nix-${{ runner.os }}-${{ hashFiles('**/*.nix', '**/flake.lock') }}
      restore-prefixes-first-match: nix-${{ runner.os }}-

  - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client login <pick a name for server> https://nix.example.com ${{ secrets.ATTIC_TOKEN }} || true
  - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache create <cache name> || true
  - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache configure <cache name> -- --priority 30 || true
  - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client use <cache name> || true

  # For caching the attic package in github actions storage
  - run: nix build -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build
  - name: Save Nix store cache
    id: cache-nix-save
    uses: nix-community/cache-nix-action/save@v6
    with:
      primary-key: nix-${{ runner.os }}-${{ hashFiles('**/*.nix', '**/flake.lock') }}
      gc-max-store-size-linux: 2G
      purge: true
      purge-prefixes: nix-${{ runner.os }}-
      purge-created: 0
      purge-last-accessed: 0
      purge-primary-key: never

  # `nix-fast-build` is faster then `nix flake check` in my testing
  # - name: check
  #   run: |
  #     nix flake check --all-systems

  # `--attic-cache` will fail if the cache is down
  # - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --attic-cache <cache name> --no-nom --skip-cached
  - name: check
    run: |
      nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --no-nom --skip-cached

  # Paths will be invalid if tests fail, need to push all other paths
  - name: Push to attic
    if: always()
    run: |
      nix shell nixpkgs/nixos-unstable#findutils nixpkgs/nixos-unstable#util-linux nixpkgs/nixos-unstable#coreutils -c bash -c '
        valid_paths=$(find /nix/store -maxdepth 1 -type d -name "*-*" | \
          head -1000 | \
          xargs -I {} -P $(nproc) sh -c "nix path-info \"\$1\" >/dev/null 2>&1 && echo \"\$1\"" _ {} | \
          tr "\n" " ")

        if [ -n "$valid_paths" ]; then
          for i in {1..10}; do
            nix run nixpkgs/nixos-unstable#attic-client push <cache name> $valid_paths && break || [ $i -eq 10 ] || sleep 5
          done
        fi
      '

Github Action Install, with matrix for each derivation

name: crane

on:
  pull_request:
    branches: [main]
  push:
    branches: [main]
  schedule:
    - cron: 0 0 * * 1

concurrency:
  group: ${{ github.workflow }}-${{ github.head_ref && github.ref || github.run_id }}
  cancel-in-progress: true

env:
  CARGO_TERM_COLOR: always

jobs:
  check-dependencies:
    name: check-dependencies
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write
      actions: write

    strategy:
      matrix:
        system: [x86_64-linux]
        check-type: [my-server, my-crate-fmt, my-crate-toml-fmt]

    steps:
      - uses: actions/checkout@v5
      - uses: nixbuild/nix-quick-install-action@v32
        with:
          nix_conf: |
            keep-env-derivations = true
            keep-outputs = true

      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client login nex https://nix.example.com ${{ secrets.ATTIC_TOKEN }} || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache create <cache name> || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache configure <cache name> -- --priority 30 || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client use <cache name> || true

      - run: nix build -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build

      - name: check
        run: |
          nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --flake ".#checks.$(nix eval --raw --impure --expr builtins.currentSystem).${{ matrix.check-type }}" --no-nom --skip-cached

      - name: Push to attic
        if: always()
        run: |
          for i in {1..10}; do
            nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client push <cache name> /nix/store/*/ && break || [ $i -eq 5 ] || sleep 5
          done

  check-matrix:
    name: check-matrix
    needs: check-dependencies
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write
      actions: write

    strategy:
      fail-fast: false
      matrix:
        system: [x86_64-linux]
        check-type: [my-crate-clippy, my-crate-nextest]

    steps:
      - uses: actions/checkout@v5
      - uses: nixbuild/nix-quick-install-action@v32
        with:
          nix_conf: |
            keep-env-derivations = true
            keep-outputs = true

      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client login nex https://nix.example.com ${{ secrets.ATTIC_TOKEN }} || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache create <cache name> || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache configure <cache name> -- --priority 30 || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client use <cache name> || true

      - name: check
        run: |
          nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --flake ".#checks.$(nix eval --raw --impure --expr builtins.currentSystem).${{ matrix.check-type }}" --no-nom --skip-cached

      - name: Push to attic
        if: always()
        run: |
          nix shell nixpkgs/nixos-unstable#findutils nixpkgs/nixos-unstable#util-linux nixpkgs/nixos-unstable#coreutils -c bash -c '
            valid_paths=$(find /nix/store -maxdepth 1 -type d -name "*-*" | \
              head -1000 | \
              xargs -I {} -P $(nproc) sh -c "nix path-info \"\$1\" >/dev/null 2>&1 && echo \"\$1\"" _ {} | \
              tr "\n" " ")

            if [ -n "$valid_paths" ]; then
              for i in {1..10}; do
                nix run nixpkgs/nixos-unstable#attic-client push <cache name> $valid_paths && break || [ $i -eq 10 ] || sleep 5
              done
            fi
          '

Forgejo Actions Install

See Available runner images for the runs-on image

name: nix

on:
  pull_request:
    branches: [main]
  push:
  schedule:
    - cron: 0 0 * * 1

env:
  CARGO_TERM_COLOR: always
  NIX_CONFIG: "experimental-features = nix-command flakes"

jobs:
  check-dependencies:
    name: check-dependencies
    runs-on: nix
    permissions:
      contents: read
      id-token: write
      actions: write

    steps:
      # Add secrets.ATTIC_TOKEN here https://forgejo.example.com/user/settings/actions/secrets
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client login nex https://nix.example.com ${{ secrets.ATTIC_TOKEN }} || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache create <cache name> || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client cache configure <cache name> -- --priority 30 || true
      - run: nix run -I nixpkgs=channel:nixos-unstable nixpkgs#attic-client use <cache name> || true

      - name: Install Node.js
        run: |
          mkdir -p ~/.local/bin
          nix build -I nixpkgs=channel:nixos-unstable nixpkgs#nodejs_24 -o ~/.local/nodejs
          ln -sf ~/.local/nodejs/bin/node ~/.local/bin/node
          ln -sf ~/.local/nodejs/bin/npm ~/.local/bin/npm
          echo "$HOME/.local/bin" >> $GITHUB_PATH

      - uses: actions/checkout@v5

      - run: nix build -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build

      - name: check
        run: |
          nix run -I nixpkgs=channel:nixos-unstable nixpkgs#nix-fast-build -- --no-nom --skip-cached

      - name: Push to attic
        if: always()
        run: |
          nix shell nixpkgs/nixos-unstable#findutils nixpkgs/nixos-unstable#util-linux nixpkgs/nixos-unstable#coreutils -c bash -c '
            valid_paths=$(find /nix/store -maxdepth 1 -type d -name "*-*" | \
              head -1000 | \
              xargs -I {} -P $(nproc) sh -c "nix path-info \"\$1\" >/dev/null 2>&1 && echo \"\$1\"" _ {} | \
              tr "\n" " ")

            if [ -n "$valid_paths" ]; then
              for i in {1..10}; do
                nix run nixpkgs/nixos-unstable#attic-client push <cache name> $valid_paths && break || [ $i -eq 10 ] || sleep 5
              done
            fi
          '

LLM Inference Benchmarks - Apple M4 Max 48GB 16 Core 16-inch using LM Studio

Size (B)Speed (T/s)ModelTypeQuantSpec Dec (B)Spec Quant
1.5282qwen 2.5MLX4--
1.576qwen 2.5MLX8--
770qwen 2.5GUFFQ4_K_M--
7101qwen 2.5MLX4--
758qwen 2.5MLX8--
1235wayfarerGUFFQ6_K--
1265wayfarerMLX4--
1245wayfarerMLX6--
1236wayfarerMLX8--
1436qwen 2.5GUFFQ4_K_M--
1452qwen 2.5MLX4--
1455qwen 2.5MLX41.54
1430qwen 2.5MLX8--
2435mistral small 3MLX4--
3218qwen 2.5GUFFQ4_K_M--
3223qwen 2.5MLX4--
3230qwen 2.5MLX41.54
3230qwen 2.5MLX41.54
3234qwen 2.5MLX41.58
3226qwen 2.5 r1MLX41.54
3233qwen 2.5 coderMLX41.54
3231qwen 2.5 coderMLX434
3225qwqMLX3--
3224qwqMLX4--
3218qwqMLX41.54
3222qwqMLX41.58
3216qwqMLX474
3216qwqMLX478
3216qwqMLX6--
3216qwqMLX61.54
3216qwqMLX61.58
7012wayfarer largeGUFFQ2_K_S--
7015wayfarer largeMLX3--
30 - A393qwen 3MLX4--
30 - A376qwen 3MLX41.74
30 - A381qwen 3MLX6--
30 - A370qwen 3MLX61.74
30 - A370qwen 3MLX8--
3222qwen 3MLX4--
3226qwen 3MLX41.74
2418Devstral Small 2507MLX8--
20124gpt ossMLX5--
3088qwen 3 coderMLX5--
3060qwen 3 nextMLX3--

mlx convert and upload to huggingface

https://huggingface.co/docs/hub/en/mlx

https://huggingface.co/mlx-community

git clone [email protected]:NexVeridian/NexVeridian-web.git

just uv

just mlx_create "Qwen/QwQ-32B" "4 6 8" "/Users/elijahmcmorris/.cache/lm-studio/models" "mlx-community" fasle false
# or
uv venv
uv pip install huggingface_hub hf_transfer mlx_lm
uv run huggingface-cli login

uv run mlx_lm.convert --hf-path Qwen/QwQ-32B -q --q-bits 4 --upload-repo mlx-community/QwQ-32B-4bit --mlx-path /Users/elijahmcmorris/.cache/lm-studio/models/mlx-community/QwQ-32B-4bit

or use https://huggingface.co/spaces/mlx-community/mlx-my-repo

LLM Settings.md

Qwen 3

TempMin PTop PTop KRepeat P
0.60.000.9520-

Qwen 3 /no_think

TempMin PTop PTop KRepeat P
0.70.000.80201.5