rusqlite_gpkg/arrow/
reader.rs

1use std::sync::Arc;
2
3use arrow_array::{ArrayRef, RecordBatchReader};
4use arrow_schema::{FieldRef, SchemaRef};
5use geoarrow_array::{GeoArrowArray, builder::WkbBuilder};
6
7use crate::{
8    ColumnSpec, Gpkg, GpkgError, GpkgLayer, gpkg::gpkg_geometry_to_wkb_bytes,
9    ogc_sql::sql_select_features,
10};
11
12/// Iterator that yields Arrow `RecordBatch`es of features from a GeoPackage layer.
13///
14/// `ArrowGpkgReader` is the primary entry point for Arrow users. Construct it with
15/// [`ArrowGpkgReader::new`] and then iterate to receive batches of features.
16/// The reader holds a prepared `rusqlite::Statement`, so it borrows the `Gpkg`
17/// that created it and must not outlive that `Gpkg`.
18pub struct ArrowGpkgReader<'a> {
19    stmt: rusqlite::Statement<'a>,
20    property_columns: Vec<ColumnSpec>,
21    srs_id: u32,
22    batch_size: usize,
23    offset: u32,
24    end_or_invalid_state: bool,
25    schema_ref: SchemaRef,
26}
27
28impl<'a> ArrowGpkgReader<'a> {
29    /// Create a new Arrow reader for a layer.
30    ///
31    /// Why this takes a `&Gpkg` instead of a `Read` implementor:
32    /// - `rusqlite` opens databases from a path, not a streaming `Read`.
33    /// - the prepared `Statement` held by this reader borrows the `Gpkg`'s
34    ///   connection, so the `Gpkg` must live outside this struct to define the
35    ///   statement's lifetime.
36    pub fn new(gpkg: &'a Gpkg, layer_name: &str, batch_size: u32) -> crate::error::Result<Self> {
37        let layer = gpkg.get_layer(layer_name)?;
38        let columns = layer.property_columns.iter().map(|spec| spec.name.as_str());
39        let sql = sql_select_features(
40            &layer.layer_name,
41            &layer.geometry_column,
42            &layer.primary_key_column,
43            columns,
44            Some(batch_size),
45        );
46
47        let stmt = gpkg.conn.prepare(&sql)?;
48        Ok(Self::new_inner(stmt, &layer, batch_size))
49    }
50
51    pub(crate) fn new_inner(
52        stmt: rusqlite::Statement<'a>,
53        layer: &GpkgLayer,
54        batch_size: u32,
55    ) -> Self {
56        let schema_ref = Self::construct_arrow_schema(
57            &layer.property_columns,
58            &layer.geometry_column,
59            &layer.srs_id.to_string(),
60        );
61
62        Self {
63            stmt,
64            batch_size: batch_size as usize,
65            property_columns: layer.property_columns.clone(),
66            srs_id: layer.srs_id.clone(),
67            offset: 0,
68            end_or_invalid_state: false,
69            schema_ref,
70        }
71    }
72
73    fn construct_arrow_schema(
74        property_columns: &[ColumnSpec],
75        geometry_column: &str,
76        srs_id: &str,
77    ) -> SchemaRef {
78        let mut fields: Vec<FieldRef> = property_columns
79            .iter()
80            .map(|col| {
81                let field = match col.column_type {
82                    crate::ColumnType::Boolean => {
83                        arrow_schema::Field::new(&col.name, arrow_schema::DataType::Boolean, true)
84                    }
85                    crate::ColumnType::Varchar => {
86                        arrow_schema::Field::new(&col.name, arrow_schema::DataType::Utf8, true)
87                    }
88                    crate::ColumnType::Double => {
89                        arrow_schema::Field::new(&col.name, arrow_schema::DataType::Float64, true)
90                    }
91                    crate::ColumnType::Integer => {
92                        arrow_schema::Field::new(&col.name, arrow_schema::DataType::Int64, true)
93                    }
94                    crate::ColumnType::Geometry => {
95                        wkb_geometry_field(&col.name, srs_id.to_string())
96                    }
97                };
98
99                Arc::new(field)
100            })
101            .collect();
102
103        fields.push(Arc::new(wkb_geometry_field(
104            geometry_column,
105            srs_id.to_string(),
106        )));
107
108        Arc::new(arrow_schema::Schema::new(fields))
109    }
110
111    fn create_record_batch_builder(&self) -> GpkgRecordBatchBuilder {
112        let builders: Vec<GpkgArrayBuilder> =
113            self.property_columns
114                .iter()
115                .map(|col| match col.column_type {
116                    crate::ColumnType::Boolean => GpkgArrayBuilder::Boolean(
117                        arrow_array::builder::BooleanBuilder::with_capacity(self.batch_size),
118                    ),
119                    crate::ColumnType::Varchar => GpkgArrayBuilder::Varchar(
120                        arrow_array::builder::StringBuilder::with_capacity(
121                            self.batch_size,
122                            8 * self.batch_size,
123                        ),
124                    ),
125                    crate::ColumnType::Double => GpkgArrayBuilder::Double(
126                        arrow_array::builder::Float64Builder::with_capacity(self.batch_size),
127                    ),
128                    crate::ColumnType::Integer => GpkgArrayBuilder::Integer(
129                        arrow_array::builder::Int64Builder::with_capacity(self.batch_size),
130                    ),
131                    crate::ColumnType::Geometry => GpkgArrayBuilder::Geometry(
132                        wkb_geometry_builder(self.srs_id.to_string(), self.batch_size),
133                    ),
134                })
135                .collect();
136
137        GpkgRecordBatchBuilder {
138            schema_ref: self.schema_ref.clone(),
139            builders,
140            geo_builder: wkb_geometry_builder(self.srs_id.to_string(), self.batch_size),
141        }
142    }
143
144    // This doesn't advance the offset.
145    fn get_record_batch(&mut self) -> crate::error::Result<arrow_array::RecordBatch> {
146        let mut builders = self.create_record_batch_builder();
147        let mut rows = self.stmt.query([self.offset])?;
148        while let Some(row) = rows.next()? {
149            builders.push(row)?;
150        }
151
152        builders.finish()
153    }
154}
155
156impl<'a> Iterator for ArrowGpkgReader<'a> {
157    type Item = Result<arrow_array::RecordBatch, arrow_schema::ArrowError>;
158
159    fn next(&mut self) -> Option<Self::Item> {
160        if self.end_or_invalid_state {
161            return None;
162        }
163
164        let result = self.get_record_batch();
165
166        let features = match result {
167            Ok(features) => features,
168            Err(e) => {
169                // I don't know in what case some error happens, but I bet it's unrecoverable.
170                self.end_or_invalid_state = true;
171                return Some(Err(e.into()));
172            }
173        };
174
175        // If the result is less than the batch size, it means it reached the end.
176        let result_size = features.num_rows();
177        if result_size < self.batch_size as usize {
178            self.end_or_invalid_state = true;
179            if result_size == 0 {
180                return None;
181            }
182        }
183
184        self.offset += result_size as u32;
185
186        Some(Ok(features))
187    }
188}
189
190impl<'a> RecordBatchReader for ArrowGpkgReader<'a> {
191    fn schema(&self) -> SchemaRef {
192        self.schema_ref.clone()
193    }
194}
195
196enum GpkgArrayBuilder {
197    Boolean(arrow_array::builder::BooleanBuilder),
198    Varchar(arrow_array::builder::StringBuilder),
199    Double(arrow_array::builder::Float64Builder),
200    Integer(arrow_array::builder::Int64Builder),
201    // Note: Since WkbBuilder doesn't implement ArrayBuilder trait, we cannot use Box<dyn ArrayBuilder> to unify this
202    Geometry(WkbBuilder<i32>),
203}
204
205impl GpkgArrayBuilder {
206    fn push(&mut self, value: rusqlite::types::Value) -> crate::error::Result<()> {
207        match (self, value) {
208            // null
209            (GpkgArrayBuilder::Boolean(builder), rusqlite::types::Value::Null) => {
210                builder.append_null();
211            }
212            (GpkgArrayBuilder::Varchar(builder), rusqlite::types::Value::Null) => {
213                builder.append_null();
214            }
215            (GpkgArrayBuilder::Double(builder), rusqlite::types::Value::Null) => {
216                builder.append_null();
217            }
218            (GpkgArrayBuilder::Integer(builder), rusqlite::types::Value::Null) => {
219                builder.append_null();
220            }
221            (GpkgArrayBuilder::Geometry(builder), rusqlite::types::Value::Null) => {
222                builder.push_wkb(None).unwrap();
223            }
224            // non-null value
225            (GpkgArrayBuilder::Boolean(builder), rusqlite::types::Value::Integer(i)) => {
226                builder.append_value(i == 1);
227            }
228            (GpkgArrayBuilder::Varchar(builder), rusqlite::types::Value::Text(t)) => {
229                builder.append_value(t);
230            }
231            (GpkgArrayBuilder::Double(builder), rusqlite::types::Value::Real(f)) => {
232                builder.append_value(f);
233            }
234            (GpkgArrayBuilder::Integer(builder), rusqlite::types::Value::Integer(i)) => {
235                builder.append_value(i);
236            }
237            (GpkgArrayBuilder::Geometry(builder), rusqlite::types::Value::Blob(b)) => {
238                let wkb_bytes = gpkg_geometry_to_wkb_bytes(&b)?;
239                builder
240                    .push_wkb(Some(wkb_bytes))
241                    .map_err(|e| GpkgError::Message(format!("{e:?}")))?;
242            }
243            _ => unreachable!(),
244        }
245
246        Ok(())
247    }
248}
249
250struct GpkgRecordBatchBuilder {
251    pub(crate) schema_ref: SchemaRef,
252    pub(crate) builders: Vec<GpkgArrayBuilder>,
253    pub(crate) geo_builder: WkbBuilder<i32>,
254}
255
256impl GpkgRecordBatchBuilder {
257    pub(crate) fn push(&mut self, row: &rusqlite::Row<'_>) -> crate::error::Result<()> {
258        let n = self.builders.len();
259        for i in 0..n {
260            let column_index = i + 2;
261            match row.get::<usize, rusqlite::types::Value>(column_index) {
262                Ok(v) => self.builders[i].push(v)?,
263                Err(e) => return Err(GpkgError::Sql(e)),
264            }
265        }
266
267        match row.get::<usize, rusqlite::types::Value>(0) {
268            Ok(rusqlite::types::Value::Blob(b)) => {
269                let wkb_bytes = gpkg_geometry_to_wkb_bytes(&b)?;
270                self.geo_builder
271                    .push_wkb(Some(wkb_bytes))
272                    .map_err(|e| GpkgError::Message(format!("{e:?}")))?;
273            }
274            Ok(rusqlite::types::Value::Null) => {
275                self.geo_builder.push_wkb(None).unwrap();
276            }
277            Ok(_) => return Err(GpkgError::Message("Invalid value".to_string())),
278            Err(e) => return Err(GpkgError::Sql(e)),
279        }
280
281        Ok(())
282    }
283
284    fn finish(self) -> crate::error::Result<arrow_array::RecordBatch> {
285        let mut columns: Vec<ArrayRef> = self
286            .builders
287            .into_iter()
288            .map(|b| match b {
289                GpkgArrayBuilder::Boolean(mut builder) => {
290                    arrow_array::builder::ArrayBuilder::finish(&mut builder)
291                }
292                GpkgArrayBuilder::Varchar(mut builder) => {
293                    arrow_array::builder::ArrayBuilder::finish(&mut builder)
294                }
295                GpkgArrayBuilder::Double(mut builder) => {
296                    arrow_array::builder::ArrayBuilder::finish(&mut builder)
297                }
298                GpkgArrayBuilder::Integer(mut builder) => {
299                    arrow_array::builder::ArrayBuilder::finish(&mut builder)
300                }
301                GpkgArrayBuilder::Geometry(builder) => builder.finish().into_array_ref(),
302            })
303            .collect();
304        columns.push(self.geo_builder.finish().into_array_ref());
305
306        Ok(arrow_array::RecordBatch::try_new(self.schema_ref, columns).unwrap())
307    }
308}
309
310fn wkb_geometry_field(field_name: &str, srs_id: String) -> arrow_schema::Field {
311    let geoarrow_metadata =
312        geoarrow_schema::Metadata::new(geoarrow_schema::Crs::from_srid(srs_id.clone()), None);
313    geoarrow_schema::GeoArrowType::Wkb(geoarrow_schema::WkbType::new(geoarrow_metadata.into()))
314        .to_field(field_name, true)
315}
316
317fn wkb_geometry_builder(srs_id: String, batch_size: usize) -> WkbBuilder<i32> {
318    let geoarrow_metadata =
319        geoarrow_schema::Metadata::new(geoarrow_schema::Crs::from_srid(srs_id.clone()), None);
320    WkbBuilder::with_capacity(
321        geoarrow_schema::WkbType::new(geoarrow_metadata.into()),
322        geoarrow_array::capacity::WkbCapacity::new(21 * batch_size, batch_size),
323    )
324}
325
326#[cfg(all(test, feature = "arrow"))]
327mod tests {
328    use super::ArrowGpkgReader;
329    use crate::Result;
330    use crate::gpkg::Gpkg;
331    use crate::params;
332    use crate::types::{ColumnSpec, ColumnType};
333    use arrow_array::{BooleanArray, Float64Array, Int64Array, StringArray};
334    use arrow_schema::DataType;
335    use geo_types::Point;
336    use geoarrow_array::GeoArrowArrayAccessor;
337    use geoarrow_array::array::WkbArray;
338    use wkb::reader::GeometryType;
339
340    fn create_test_layer(gpkg: &Gpkg) -> Result<crate::GpkgLayer> {
341        let columns = vec![
342            ColumnSpec {
343                name: "active".to_string(),
344                column_type: ColumnType::Boolean,
345            },
346            ColumnSpec {
347                name: "name".to_string(),
348                column_type: ColumnType::Varchar,
349            },
350            ColumnSpec {
351                name: "score".to_string(),
352                column_type: ColumnType::Double,
353            },
354            ColumnSpec {
355                name: "count".to_string(),
356                column_type: ColumnType::Integer,
357            },
358        ];
359
360        gpkg.create_layer(
361            "arrow_points",
362            "geom",
363            GeometryType::Point,
364            wkb::reader::Dimension::Xy,
365            4326,
366            &columns,
367        )
368    }
369
370    #[test]
371    fn record_batch_has_expected_types_and_values() -> Result<()> {
372        let gpkg = Gpkg::open_in_memory()?;
373        let layer = create_test_layer(&gpkg)?;
374
375        let first_geom = Point::new(1.0, 2.0);
376        let second_geom = Point::new(3.0, 4.0);
377
378        layer.insert(first_geom, params![true, "alpha", 1.25, 7])?;
379        layer.insert(second_geom, params![false, "beta", 2.5, 9])?;
380
381        let mut iter = ArrowGpkgReader::new(&gpkg, &layer.layer_name, 10)?;
382        let batch = iter.next().transpose()?.expect("first batch");
383
384        let schema = batch.schema();
385        let fields = schema.fields();
386        assert_eq!(fields.len(), 5);
387        assert_eq!(fields[0].name(), "active");
388        assert_eq!(fields[1].name(), "name");
389        assert_eq!(fields[2].name(), "score");
390        assert_eq!(fields[3].name(), "count");
391        assert_eq!(fields[4].name(), "geom");
392        assert_eq!(fields[0].data_type(), &DataType::Boolean);
393        assert_eq!(fields[1].data_type(), &DataType::Utf8);
394        assert_eq!(fields[2].data_type(), &DataType::Float64);
395        assert_eq!(fields[3].data_type(), &DataType::Int64);
396
397        let active = batch
398            .column(0)
399            .as_any()
400            .downcast_ref::<BooleanArray>()
401            .expect("boolean array");
402        let name = batch
403            .column(1)
404            .as_any()
405            .downcast_ref::<StringArray>()
406            .expect("string array");
407        let score = batch
408            .column(2)
409            .as_any()
410            .downcast_ref::<Float64Array>()
411            .expect("float array");
412        let count = batch
413            .column(3)
414            .as_any()
415            .downcast_ref::<Int64Array>()
416            .expect("int array");
417
418        assert_eq!(active.value(0), true);
419        assert_eq!(active.value(1), false);
420        assert_eq!(name.value(0), "alpha");
421        assert_eq!(name.value(1), "beta");
422        assert_eq!(score.value(0), 1.25);
423        assert_eq!(score.value(1), 2.5);
424        assert_eq!(count.value(0), 7);
425        assert_eq!(count.value(1), 9);
426
427        let geom_field = fields[4].as_ref();
428        let geom_array = WkbArray::try_from((batch.column(4).as_ref(), geom_field)).unwrap();
429        let geom = geom_array.value(0).unwrap();
430        let mut expected = Vec::new();
431        wkb::writer::write_geometry(&mut expected, &Point::new(1.0, 2.0), &Default::default())?;
432        assert_eq!(geom.buf(), expected.as_slice());
433
434        Ok(())
435    }
436
437    #[test]
438    fn record_batch_iterator_respects_offsets_and_limits() -> Result<()> {
439        let gpkg = Gpkg::open_in_memory()?;
440        let columns = vec![ColumnSpec {
441            name: "rank".to_string(),
442            column_type: ColumnType::Integer,
443        }];
444        let layer = gpkg.create_layer(
445            "arrow_offsets",
446            "geom",
447            GeometryType::Point,
448            wkb::reader::Dimension::Xy,
449            4326,
450            &columns,
451        )?;
452
453        for i in 0..5 {
454            layer.insert(Point::new(i as f64, i as f64), params![i as i64])?;
455        }
456
457        let mut values = Vec::new();
458        let mut batch_sizes = Vec::new();
459        let iter = ArrowGpkgReader::new(&gpkg, &layer.layer_name, 2)?;
460        for batch in iter {
461            let batch = batch?;
462            batch_sizes.push(batch.num_rows());
463            let array = batch
464                .column(0)
465                .as_any()
466                .downcast_ref::<Int64Array>()
467                .expect("int array");
468            for row in 0..array.len() {
469                values.push(array.value(row));
470            }
471        }
472
473        assert_eq!(values, vec![0, 1, 2, 3, 4]);
474        assert_eq!(batch_sizes, vec![2, 2, 1]);
475
476        Ok(())
477    }
478}