| Title: | Shared Loader Utilities for ecospg |
|---|---|
| Description: | Extracts duplicated R code from ecospg data loaders into a reusable package. Provides database connection, catalog registration, partitioned table management, staging/ingest, S3 upload, and CLI argument parsing utilities. |
| Authors: | Lars Dalby [aut, cre] |
| Maintainer: | Lars Dalby <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.3.0 |
| Built: | 2026-06-10 11:58:02 UTC |
| Source: | https://gitlab.au.dk/ecos/tools/r-pkgs/ecospgr |
Build base source metadata
epg_build_source_meta(source_url, metadata_url = NULL, extra = list())epg_build_source_meta(source_url, metadata_url = NULL, extra = list())
source_url |
The source URL for the download. |
metadata_url |
Optional metadata URL. |
extra |
Named list of additional metadata fields to merge. |
A named list of source metadata.
Reads optional JSON from a file path and/or an inline JSON string and merges them (inline takes precedence over file).
epg_collect_source_meta_overrides(json_file_path = NULL, json_inline = NULL)epg_collect_source_meta_overrides(json_file_path = NULL, json_inline = NULL)
json_file_path |
Path to a JSON file (or NULL). |
json_inline |
Inline JSON string or |
A named list of override fields (may be empty).
Uses glue_sql() for safe SQL interpolation. Identifier components are
validated to contain only safe characters (alphanumeric + underscore).
epg_create_partition(con, dataset, ver_id, geom_type = "MultiPolygon")epg_create_partition(con, dataset, ver_id, geom_type = "MultiPolygon")
con |
A DBI connection. |
dataset |
Dataset name (used as part of table names in |
ver_id |
The dataset_version.id (integer). |
geom_type |
Geometry type constraint for the partition. Defaults to
|
Looks for schemaLocation hints in the GML header, then falls back to filename-based matching and sibling XSD files.
epg_detect_xsd_for_gml(gml_path)epg_detect_xsd_for_gml(gml_path)
gml_path |
Path to a GML file. |
Path to the XSD file, or NA_character_ if none found.
Returns a named list of credential query parameters for Datafordeler URLs.
Prefers API-key auth (apikey=) when DFD_API_KEY is set; falls back to
legacy username/password (username=…&password=…). Errors if neither is
available.
epg_dfd_auth_params( api_key = Sys.getenv("DFD_API_KEY", ""), username = Sys.getenv("DFD_USERNAME", ""), password = Sys.getenv("DFD_PASSWORD", "") )epg_dfd_auth_params( api_key = Sys.getenv("DFD_API_KEY", ""), username = Sys.getenv("DFD_USERNAME", ""), password = Sys.getenv("DFD_PASSWORD", "") )
api_key |
API key (default |
username |
Legacy service-user name (default |
password |
Legacy service-user password (default |
Named list with either apikey OR username + password.
Returns a URL-encoded query-string fragment for the credentials returned by
epg_dfd_auth_params(), e.g. "apikey=abc123" or
"username=u&password=p". Does not include a leading ? or &.
epg_dfd_auth_qs(...)epg_dfd_auth_qs(...)
... |
Passed to |
Character scalar.
Ensure a dataset row exists in meta.dataset
epg_ensure_dataset(con, topic, dataset)epg_ensure_dataset(con, topic, dataset)
con |
A DBI connection. |
topic |
Topic name (must already exist). |
dataset |
Dataset name. |
Ensure a topic row exists in meta.topic
epg_ensure_topic(con, topic, description = NULL)epg_ensure_topic(con, topic, description = NULL)
con |
A DBI connection. |
topic |
Topic name. |
description |
Optional topic description. |
Finalize a dataset version: mark latest and drop staging
epg_finalize_version( con, topic, dataset, version_tag, stage_tbl, set_latest = TRUE )epg_finalize_version( con, topic, dataset, version_tag, stage_tbl, set_latest = TRUE )
con |
A DBI connection. |
topic |
Topic name. |
dataset |
Dataset name. |
version_tag |
The version tag string. |
stage_tbl |
The qualified staging table name to drop. |
set_latest |
Whether to mark this version as latest. Default |
Insert normalized geometries from staging into a vector partition
epg_insert_normalized( con, dataset, ver_id, stage_tbl, geom_transform = "ST_Multi(ST_CollectionExtract(ST_MakeValid(s.geom), 3))::geometry(MultiPolygon,25832)", delete_before_insert = FALSE )epg_insert_normalized( con, dataset, ver_id, stage_tbl, geom_transform = "ST_Multi(ST_CollectionExtract(ST_MakeValid(s.geom), 3))::geometry(MultiPolygon,25832)", delete_before_insert = FALSE )
con |
A DBI connection. |
dataset |
Dataset name (parent table is |
ver_id |
The dataset_version.id. |
stage_tbl |
The qualified staging table name (e.g., |
geom_transform |
SQL expression for geometry normalization. Defaults to
the standard MultiPolygon transform. Use
|
delete_before_insert |
If |
The number of rows inserted (invisible).
Parse a command-line flag argument
epg_parse_arg(flag, default = NULL, args = commandArgs(trailingOnly = TRUE))epg_parse_arg(flag, default = NULL, args = commandArgs(trailingOnly = TRUE))
flag |
The flag string (e.g., "–src-url"). |
default |
Default value if flag is not found. |
args |
Character vector of arguments. Defaults to
|
The value following the flag, or default.
If value starts with @, the remainder is treated as a file path whose
contents are read and parsed as JSON.
epg_parse_json_arg(value, label)epg_parse_json_arg(value, label)
value |
A JSON string or |
label |
A human-readable label used in error messages. |
Parsed JSON as an R list, or NULL if value is empty/NULL.
Parse a JSON string
epg_parse_json_payload(payload, label)epg_parse_json_payload(payload, label)
payload |
A JSON string. |
label |
A human-readable label used in error messages. |
Parsed JSON as an R list.
Persist source metadata on a dataset version
epg_persist_source_meta(con, ver_id, source_meta)epg_persist_source_meta(con, ver_id, source_meta)
con |
A DBI connection. |
ver_id |
The dataset_version.id. |
source_meta |
A named list of metadata to store as JSONB. |
Reads PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD from the
environment with sensible defaults for local Docker development.
epg_pg_connect(application_name = NULL)epg_pg_connect(application_name = NULL)
application_name |
Optional application name tag for the connection
(visible in |
A PqConnection object.
Read and parse a JSON file
epg_read_json_file(path, label)epg_read_json_file(path, label)
path |
Path to a JSON file. |
label |
A human-readable label used in error messages. |
Parsed JSON as an R list, or NULL if path is empty/NULL.
Register a new dataset version
epg_register_version( con, topic, dataset, version_tag, on_conflict = c("error", "reuse") )epg_register_version( con, topic, dataset, version_tag, on_conflict = c("error", "reuse") )
con |
A DBI connection. |
topic |
Topic name. |
dataset |
Dataset name. |
version_tag |
Version tag string. |
on_conflict |
|
The dataset_version.id (integer).
Reads RAW_* env vars, falling back to SCW_* for backward compatibility.
epg_resolve_s3_config(default_prefix = "")epg_resolve_s3_config(default_prefix = "")
default_prefix |
Default S3 key prefix if |
A named list with components endpoint, region, bucket, prefix,
access_key, secret_key, and configured (logical).
Convenience wrapper that calls epg_ensure_topic(), epg_ensure_dataset(),
epg_register_version(), epg_set_raw_file_url(), epg_persist_source_meta(),
epg_create_partition(), epg_write_staging(), epg_insert_normalized(),
and epg_finalize_version() inside dbWithTransaction().
epg_run_pipeline( con, sf_obj, topic, dataset, version_tag, topic_description = NULL, source_meta = NULL, raw_url = NA_character_, geom_type = "MultiPolygon", geom_transform = "ST_Multi(ST_CollectionExtract(ST_MakeValid(s.geom), 3))::geometry(MultiPolygon,25832)", delete_before_insert = FALSE, set_latest = TRUE, on_conflict_version = "error" )epg_run_pipeline( con, sf_obj, topic, dataset, version_tag, topic_description = NULL, source_meta = NULL, raw_url = NA_character_, geom_type = "MultiPolygon", geom_transform = "ST_Multi(ST_CollectionExtract(ST_MakeValid(s.geom), 3))::geometry(MultiPolygon,25832)", delete_before_insert = FALSE, set_latest = TRUE, on_conflict_version = "error" )
con |
A DBI connection. |
sf_obj |
An sf object with the features to load. |
topic |
Topic name. |
dataset |
Dataset name. |
version_tag |
Version tag string. |
topic_description |
Optional topic description. |
source_meta |
Named list of source metadata (or NULL). |
raw_url |
Optional S3 URI of the raw file. |
geom_type |
Geometry type for the partition. Default |
geom_transform |
SQL expression for geometry normalization. |
delete_before_insert |
If |
set_latest |
Whether to mark this version as latest. |
on_conflict_version |
Passed as |
The dataset_version.id (invisible).
Lowercases, replaces non-alphanumeric characters with underscores, and strips leading/trailing underscores.
epg_sanitize_name(x)epg_sanitize_name(x)
x |
A character string. |
A sanitized character string.
Set the raw file URL on a dataset version
epg_set_raw_file_url(con, ver_id, raw_url)epg_set_raw_file_url(con, ver_id, raw_url)
con |
A DBI connection. |
ver_id |
The dataset_version.id. |
raw_url |
The S3 URI of the raw file. |
Upload raw files to S3-compatible object storage
epg_upload_raw_files(files, s3_config, on_error = c("stop", "warn"))epg_upload_raw_files(files, s3_config, on_error = c("stop", "warn"))
files |
Character vector of local file paths to upload. |
s3_config |
A list as returned by |
on_error |
|
The S3 URI(s) of uploaded files (character vector), or
NA_character_ for files that failed when on_error = "warn".
Creates the stage schema if needed, writes the sf object via st_write,
verifies creation, normalizes the geometry column name to geom, and
returns a row count.
epg_write_staging(con, sf_obj, dataset, ver_id)epg_write_staging(con, sf_obj, dataset, ver_id)
con |
A DBI connection. |
sf_obj |
An sf object to stage. |
dataset |
Dataset name. |
ver_id |
The dataset_version.id (integer). |
A list with stage_tbl (qualified name like "stage.foo_s_42")
and n_rows (count of rows with non-NULL geometry).