1use std::sync::Arc;
2
3use arrow_array::{ArrayRef, RecordBatchReader};
4use arrow_schema::{FieldRef, SchemaRef};
5use geoarrow_array::{GeoArrowArray, builder::WkbBuilder};
6
7use crate::{
8 ColumnSpec, Gpkg, GpkgError, GpkgLayer, gpkg::gpkg_geometry_to_wkb_bytes,
9 ogc_sql::sql_select_features,
10};
11
12pub struct ArrowGpkgReader<'a> {
19 stmt: rusqlite::Statement<'a>,
20 property_columns: Vec<ColumnSpec>,
21 srs_id: u32,
22 batch_size: usize,
23 offset: u32,
24 end_or_invalid_state: bool,
25 schema_ref: SchemaRef,
26}
27
28impl<'a> ArrowGpkgReader<'a> {
29 pub fn new(gpkg: &'a Gpkg, layer_name: &str, batch_size: u32) -> crate::error::Result<Self> {
37 let layer = gpkg.get_layer(layer_name)?;
38 let columns = layer.property_columns.iter().map(|spec| spec.name.as_str());
39 let sql = sql_select_features(
40 &layer.layer_name,
41 &layer.geometry_column,
42 &layer.primary_key_column,
43 columns,
44 Some(batch_size),
45 );
46
47 let stmt = gpkg.conn.prepare(&sql)?;
48 Ok(Self::new_inner(stmt, &layer, batch_size))
49 }
50
51 pub(crate) fn new_inner(
52 stmt: rusqlite::Statement<'a>,
53 layer: &GpkgLayer,
54 batch_size: u32,
55 ) -> Self {
56 let schema_ref = Self::construct_arrow_schema(
57 &layer.property_columns,
58 &layer.geometry_column,
59 &layer.srs_id.to_string(),
60 );
61
62 Self {
63 stmt,
64 batch_size: batch_size as usize,
65 property_columns: layer.property_columns.clone(),
66 srs_id: layer.srs_id.clone(),
67 offset: 0,
68 end_or_invalid_state: false,
69 schema_ref,
70 }
71 }
72
73 fn construct_arrow_schema(
74 property_columns: &[ColumnSpec],
75 geometry_column: &str,
76 srs_id: &str,
77 ) -> SchemaRef {
78 let mut fields: Vec<FieldRef> = property_columns
79 .iter()
80 .map(|col| {
81 let field = match col.column_type {
82 crate::ColumnType::Boolean => {
83 arrow_schema::Field::new(&col.name, arrow_schema::DataType::Boolean, true)
84 }
85 crate::ColumnType::Varchar => {
86 arrow_schema::Field::new(&col.name, arrow_schema::DataType::Utf8, true)
87 }
88 crate::ColumnType::Double => {
89 arrow_schema::Field::new(&col.name, arrow_schema::DataType::Float64, true)
90 }
91 crate::ColumnType::Integer => {
92 arrow_schema::Field::new(&col.name, arrow_schema::DataType::Int64, true)
93 }
94 crate::ColumnType::Geometry => {
95 wkb_geometry_field(&col.name, srs_id.to_string())
96 }
97 };
98
99 Arc::new(field)
100 })
101 .collect();
102
103 fields.push(Arc::new(wkb_geometry_field(
104 geometry_column,
105 srs_id.to_string(),
106 )));
107
108 Arc::new(arrow_schema::Schema::new(fields))
109 }
110
111 fn create_record_batch_builder(&self) -> GpkgRecordBatchBuilder {
112 let builders: Vec<GpkgArrayBuilder> =
113 self.property_columns
114 .iter()
115 .map(|col| match col.column_type {
116 crate::ColumnType::Boolean => GpkgArrayBuilder::Boolean(
117 arrow_array::builder::BooleanBuilder::with_capacity(self.batch_size),
118 ),
119 crate::ColumnType::Varchar => GpkgArrayBuilder::Varchar(
120 arrow_array::builder::StringBuilder::with_capacity(
121 self.batch_size,
122 8 * self.batch_size,
123 ),
124 ),
125 crate::ColumnType::Double => GpkgArrayBuilder::Double(
126 arrow_array::builder::Float64Builder::with_capacity(self.batch_size),
127 ),
128 crate::ColumnType::Integer => GpkgArrayBuilder::Integer(
129 arrow_array::builder::Int64Builder::with_capacity(self.batch_size),
130 ),
131 crate::ColumnType::Geometry => GpkgArrayBuilder::Geometry(
132 wkb_geometry_builder(self.srs_id.to_string(), self.batch_size),
133 ),
134 })
135 .collect();
136
137 GpkgRecordBatchBuilder {
138 schema_ref: self.schema_ref.clone(),
139 builders,
140 geo_builder: wkb_geometry_builder(self.srs_id.to_string(), self.batch_size),
141 }
142 }
143
144 fn get_record_batch(&mut self) -> crate::error::Result<arrow_array::RecordBatch> {
146 let mut builders = self.create_record_batch_builder();
147 let mut rows = self.stmt.query([self.offset])?;
148 while let Some(row) = rows.next()? {
149 builders.push(row)?;
150 }
151
152 builders.finish()
153 }
154}
155
156impl<'a> Iterator for ArrowGpkgReader<'a> {
157 type Item = Result<arrow_array::RecordBatch, arrow_schema::ArrowError>;
158
159 fn next(&mut self) -> Option<Self::Item> {
160 if self.end_or_invalid_state {
161 return None;
162 }
163
164 let result = self.get_record_batch();
165
166 let features = match result {
167 Ok(features) => features,
168 Err(e) => {
169 self.end_or_invalid_state = true;
171 return Some(Err(e.into()));
172 }
173 };
174
175 let result_size = features.num_rows();
177 if result_size < self.batch_size as usize {
178 self.end_or_invalid_state = true;
179 if result_size == 0 {
180 return None;
181 }
182 }
183
184 self.offset += result_size as u32;
185
186 Some(Ok(features))
187 }
188}
189
190impl<'a> RecordBatchReader for ArrowGpkgReader<'a> {
191 fn schema(&self) -> SchemaRef {
192 self.schema_ref.clone()
193 }
194}
195
196enum GpkgArrayBuilder {
197 Boolean(arrow_array::builder::BooleanBuilder),
198 Varchar(arrow_array::builder::StringBuilder),
199 Double(arrow_array::builder::Float64Builder),
200 Integer(arrow_array::builder::Int64Builder),
201 Geometry(WkbBuilder<i32>),
203}
204
205impl GpkgArrayBuilder {
206 fn push(&mut self, value: rusqlite::types::Value) -> crate::error::Result<()> {
207 match (self, value) {
208 (GpkgArrayBuilder::Boolean(builder), rusqlite::types::Value::Null) => {
210 builder.append_null();
211 }
212 (GpkgArrayBuilder::Varchar(builder), rusqlite::types::Value::Null) => {
213 builder.append_null();
214 }
215 (GpkgArrayBuilder::Double(builder), rusqlite::types::Value::Null) => {
216 builder.append_null();
217 }
218 (GpkgArrayBuilder::Integer(builder), rusqlite::types::Value::Null) => {
219 builder.append_null();
220 }
221 (GpkgArrayBuilder::Geometry(builder), rusqlite::types::Value::Null) => {
222 builder.push_wkb(None).unwrap();
223 }
224 (GpkgArrayBuilder::Boolean(builder), rusqlite::types::Value::Integer(i)) => {
226 builder.append_value(i == 1);
227 }
228 (GpkgArrayBuilder::Varchar(builder), rusqlite::types::Value::Text(t)) => {
229 builder.append_value(t);
230 }
231 (GpkgArrayBuilder::Double(builder), rusqlite::types::Value::Real(f)) => {
232 builder.append_value(f);
233 }
234 (GpkgArrayBuilder::Integer(builder), rusqlite::types::Value::Integer(i)) => {
235 builder.append_value(i);
236 }
237 (GpkgArrayBuilder::Geometry(builder), rusqlite::types::Value::Blob(b)) => {
238 let wkb_bytes = gpkg_geometry_to_wkb_bytes(&b)?;
239 builder
240 .push_wkb(Some(wkb_bytes))
241 .map_err(|e| GpkgError::Message(format!("{e:?}")))?;
242 }
243 _ => unreachable!(),
244 }
245
246 Ok(())
247 }
248}
249
250struct GpkgRecordBatchBuilder {
251 pub(crate) schema_ref: SchemaRef,
252 pub(crate) builders: Vec<GpkgArrayBuilder>,
253 pub(crate) geo_builder: WkbBuilder<i32>,
254}
255
256impl GpkgRecordBatchBuilder {
257 pub(crate) fn push(&mut self, row: &rusqlite::Row<'_>) -> crate::error::Result<()> {
258 let n = self.builders.len();
259 for i in 0..n {
260 let column_index = i + 2;
261 match row.get::<usize, rusqlite::types::Value>(column_index) {
262 Ok(v) => self.builders[i].push(v)?,
263 Err(e) => return Err(GpkgError::Sql(e)),
264 }
265 }
266
267 match row.get::<usize, rusqlite::types::Value>(0) {
268 Ok(rusqlite::types::Value::Blob(b)) => {
269 let wkb_bytes = gpkg_geometry_to_wkb_bytes(&b)?;
270 self.geo_builder
271 .push_wkb(Some(wkb_bytes))
272 .map_err(|e| GpkgError::Message(format!("{e:?}")))?;
273 }
274 Ok(rusqlite::types::Value::Null) => {
275 self.geo_builder.push_wkb(None).unwrap();
276 }
277 Ok(_) => return Err(GpkgError::Message("Invalid value".to_string())),
278 Err(e) => return Err(GpkgError::Sql(e)),
279 }
280
281 Ok(())
282 }
283
284 fn finish(self) -> crate::error::Result<arrow_array::RecordBatch> {
285 let mut columns: Vec<ArrayRef> = self
286 .builders
287 .into_iter()
288 .map(|b| match b {
289 GpkgArrayBuilder::Boolean(mut builder) => {
290 arrow_array::builder::ArrayBuilder::finish(&mut builder)
291 }
292 GpkgArrayBuilder::Varchar(mut builder) => {
293 arrow_array::builder::ArrayBuilder::finish(&mut builder)
294 }
295 GpkgArrayBuilder::Double(mut builder) => {
296 arrow_array::builder::ArrayBuilder::finish(&mut builder)
297 }
298 GpkgArrayBuilder::Integer(mut builder) => {
299 arrow_array::builder::ArrayBuilder::finish(&mut builder)
300 }
301 GpkgArrayBuilder::Geometry(builder) => builder.finish().into_array_ref(),
302 })
303 .collect();
304 columns.push(self.geo_builder.finish().into_array_ref());
305
306 Ok(arrow_array::RecordBatch::try_new(self.schema_ref, columns).unwrap())
307 }
308}
309
310fn wkb_geometry_field(field_name: &str, srs_id: String) -> arrow_schema::Field {
311 let geoarrow_metadata =
312 geoarrow_schema::Metadata::new(geoarrow_schema::Crs::from_srid(srs_id.clone()), None);
313 geoarrow_schema::GeoArrowType::Wkb(geoarrow_schema::WkbType::new(geoarrow_metadata.into()))
314 .to_field(field_name, true)
315}
316
317fn wkb_geometry_builder(srs_id: String, batch_size: usize) -> WkbBuilder<i32> {
318 let geoarrow_metadata =
319 geoarrow_schema::Metadata::new(geoarrow_schema::Crs::from_srid(srs_id.clone()), None);
320 WkbBuilder::with_capacity(
321 geoarrow_schema::WkbType::new(geoarrow_metadata.into()),
322 geoarrow_array::capacity::WkbCapacity::new(21 * batch_size, batch_size),
323 )
324}
325
326#[cfg(all(test, feature = "arrow"))]
327mod tests {
328 use super::ArrowGpkgReader;
329 use crate::Result;
330 use crate::gpkg::Gpkg;
331 use crate::params;
332 use crate::types::{ColumnSpec, ColumnType};
333 use arrow_array::{BooleanArray, Float64Array, Int64Array, StringArray};
334 use arrow_schema::DataType;
335 use geo_types::Point;
336 use geoarrow_array::GeoArrowArrayAccessor;
337 use geoarrow_array::array::WkbArray;
338 use wkb::reader::GeometryType;
339
340 fn create_test_layer(gpkg: &Gpkg) -> Result<crate::GpkgLayer> {
341 let columns = vec![
342 ColumnSpec {
343 name: "active".to_string(),
344 column_type: ColumnType::Boolean,
345 },
346 ColumnSpec {
347 name: "name".to_string(),
348 column_type: ColumnType::Varchar,
349 },
350 ColumnSpec {
351 name: "score".to_string(),
352 column_type: ColumnType::Double,
353 },
354 ColumnSpec {
355 name: "count".to_string(),
356 column_type: ColumnType::Integer,
357 },
358 ];
359
360 gpkg.create_layer(
361 "arrow_points",
362 "geom",
363 GeometryType::Point,
364 wkb::reader::Dimension::Xy,
365 4326,
366 &columns,
367 )
368 }
369
370 #[test]
371 fn record_batch_has_expected_types_and_values() -> Result<()> {
372 let gpkg = Gpkg::open_in_memory()?;
373 let layer = create_test_layer(&gpkg)?;
374
375 let first_geom = Point::new(1.0, 2.0);
376 let second_geom = Point::new(3.0, 4.0);
377
378 layer.insert(first_geom, params![true, "alpha", 1.25, 7])?;
379 layer.insert(second_geom, params![false, "beta", 2.5, 9])?;
380
381 let mut iter = ArrowGpkgReader::new(&gpkg, &layer.layer_name, 10)?;
382 let batch = iter.next().transpose()?.expect("first batch");
383
384 let schema = batch.schema();
385 let fields = schema.fields();
386 assert_eq!(fields.len(), 5);
387 assert_eq!(fields[0].name(), "active");
388 assert_eq!(fields[1].name(), "name");
389 assert_eq!(fields[2].name(), "score");
390 assert_eq!(fields[3].name(), "count");
391 assert_eq!(fields[4].name(), "geom");
392 assert_eq!(fields[0].data_type(), &DataType::Boolean);
393 assert_eq!(fields[1].data_type(), &DataType::Utf8);
394 assert_eq!(fields[2].data_type(), &DataType::Float64);
395 assert_eq!(fields[3].data_type(), &DataType::Int64);
396
397 let active = batch
398 .column(0)
399 .as_any()
400 .downcast_ref::<BooleanArray>()
401 .expect("boolean array");
402 let name = batch
403 .column(1)
404 .as_any()
405 .downcast_ref::<StringArray>()
406 .expect("string array");
407 let score = batch
408 .column(2)
409 .as_any()
410 .downcast_ref::<Float64Array>()
411 .expect("float array");
412 let count = batch
413 .column(3)
414 .as_any()
415 .downcast_ref::<Int64Array>()
416 .expect("int array");
417
418 assert_eq!(active.value(0), true);
419 assert_eq!(active.value(1), false);
420 assert_eq!(name.value(0), "alpha");
421 assert_eq!(name.value(1), "beta");
422 assert_eq!(score.value(0), 1.25);
423 assert_eq!(score.value(1), 2.5);
424 assert_eq!(count.value(0), 7);
425 assert_eq!(count.value(1), 9);
426
427 let geom_field = fields[4].as_ref();
428 let geom_array = WkbArray::try_from((batch.column(4).as_ref(), geom_field)).unwrap();
429 let geom = geom_array.value(0).unwrap();
430 let mut expected = Vec::new();
431 wkb::writer::write_geometry(&mut expected, &Point::new(1.0, 2.0), &Default::default())?;
432 assert_eq!(geom.buf(), expected.as_slice());
433
434 Ok(())
435 }
436
437 #[test]
438 fn record_batch_iterator_respects_offsets_and_limits() -> Result<()> {
439 let gpkg = Gpkg::open_in_memory()?;
440 let columns = vec![ColumnSpec {
441 name: "rank".to_string(),
442 column_type: ColumnType::Integer,
443 }];
444 let layer = gpkg.create_layer(
445 "arrow_offsets",
446 "geom",
447 GeometryType::Point,
448 wkb::reader::Dimension::Xy,
449 4326,
450 &columns,
451 )?;
452
453 for i in 0..5 {
454 layer.insert(Point::new(i as f64, i as f64), params![i as i64])?;
455 }
456
457 let mut values = Vec::new();
458 let mut batch_sizes = Vec::new();
459 let iter = ArrowGpkgReader::new(&gpkg, &layer.layer_name, 2)?;
460 for batch in iter {
461 let batch = batch?;
462 batch_sizes.push(batch.num_rows());
463 let array = batch
464 .column(0)
465 .as_any()
466 .downcast_ref::<Int64Array>()
467 .expect("int array");
468 for row in 0..array.len() {
469 values.push(array.value(row));
470 }
471 }
472
473 assert_eq!(values, vec![0, 1, 2, 3, 4]);
474 assert_eq!(batch_sizes, vec![2, 2, 1]);
475
476 Ok(())
477 }
478}