Commit cb0c0f13 authored by Lars Dalby's avatar Lars Dalby
Browse files

Merge branch 'dev' into 'main'

Add DB trigger mode for session grouping (SESSION_TRIGGER)

See merge request !10
parents 69ba3557 dbd62f75
Loading
Loading
Loading
Loading
+31 −157
Original line number Diff line number Diff line
@@ -29,8 +29,8 @@ When adding new files, place them in the appropriate directory. Analysis scripts

```bash
# Development (from package root — pkgload::load_all() is used automatically)
Rscript pipelines/download_imgs.R      # Stage 1a: SFTP download + EXIF  parquet
Rscript pipelines/read_exif_http.R     # Stage 1b: HTTP EXIF (no download)  parquet
Rscript pipelines/download_imgs.R      # Stage 1a: SFTP download + EXIF -> parquet
Rscript pipelines/read_exif_http.R     # Stage 1b: HTTP EXIF (no download) -> parquet
Rscript pipelines/read_exif_parquet.R  # Stage 2:  Combine parquets, parse detections
Rscript pipelines/index_erda.R         # ERDA BFS indexer (or update mode)
DEPLOYMENT_ID="IT1#2023" Rscript pipelines/run_deployment.R  # Full deployment pipeline (standalone + IoT)
@@ -64,109 +64,29 @@ Set in `~/.Renviron` or system environment before running:
| `AMI_USER` | PostgreSQL user | -- |
| `AMI_PASSWORD` | PostgreSQL password | -- |
| `DEPLOYMENT_ID` | Deployment ID for run_deployment.R (e.g. `"IT1#2023"`) | -- |
| `SESSION_TRIGGER` | Set to `"true"` to use DB trigger for session grouping in `run_deployment.R` | -- |
| `API_KEY` | API key for REST API auth; omit for dev mode (no auth) | -- |

**Environment variable convention:** Boolean/flag env vars must always be checked against an explicit value (e.g. `identical(tolower(Sys.getenv("EXECUTE")), "true")`), never just tested for being non-empty. Relying on "is it set to something" is fragile and error-prone.

## Architecture

### Package: `erdatools` (`R/`)
Read the roxygen docs in the R/ source files for function-level details. The summary below maps files to responsibilities.

**`R/erda_sftp.R`** -- SSH/SFTP utilities:
### Package (`R/`)

| Function | Description |
| File | Responsibility |
|---|---|
| `et_sftp_connect(sharelink, host, port, tmp_root)` | Establish SSH ControlMaster; returns connection object with `$disconnect()`, `$add_child_pid()`, `$remove_child_pids()` |
| `et_sftp_batch(conn, commands, stdout, stderr)` | Run one SFTP batch session; returns stdout lines |
| `et_sftp_list_folders(conn, remote_path)` | List entries at a remote path (strips SFTP prompts) |
| `et_sftp_list_dated_subfolders(conn, country_path)` | Filter to `YYYY_MM_DD-HH_MM` entries only |
| `et_parquet_folders_done(folders, parquet_dir)` | Named logical vector: which folders already have a `.parquet` file |

Internal helpers (not exported): `parse_sftp_ls()`, `filter_dated_folders()`.

**`R/erda_media.R`** -- Media file utilities:

| Function | Description |
|---|---|
| `et_filter_media_files(filenames)` | Filter to supported media extensions (jpg/jpeg/tif/tiff/mp4/mov/avi) |
| `et_exif_tags()` | Returns tag vector from `EXIF_TAGS` env var or built-in 10-tag defaults |

**`R/erda_index.R`** -- ERDA directory indexer:

| Function | Description |
|---|---|
| `et_index_dir(conn, remote_path, index_path, checkpoint_every)` | BFS traversal of remote tree -> parquet index; auto-resumes if `index_path` exists |
| `et_index_update(conn, remote_path, index_path, update)` | Re-scan all known dirs, diff against index, return `list(added, removed)`; optionally rewrite index |
| `et_index_filter(index, extensions)` | Filter index to files matching extensions; returns filtered data frame |

Internal helper (not exported): `parse_sftp_ls_la()`.

**`R/erda_http.R`** -- HTTP URL utility:

| Function | Description |
|---|---|
| `et_img_url(sharelink, remote_dir, country, folder, filename, http_base)` | Pure function: construct public anonymous URL for one image |

**`R/parse_exif.R`** -- EXIF parsing and filename utilities:

| Function | Description |
|---|---|
| `et_parse_camalien_filename(filenames)` | Parse key-value pairs from CamAlien filenames (underscore-delimited); returns tibble |
| `et_parse_ami_filename(filenames)` | Parse AMI image filenames (standalone and IoT); extracts trigger counter, timestamp, type (snapshot/motion), sequence |
| `et_group_motion_images(index)` | Group motion images into noon-to-noon sessions; adds `session_key`, `timestamp`, and `trigger` columns |
| `et_group_snapshot_images(index)` | Group snapshot images into noon-to-noon sessions; adds `session_key` and `timestamp` columns |

| `parse_detections(user_comment)` | Parse UserComment JSON into a list of per-image detection data frames |

Internal helper (not exported): `et_print_run_summary()`.

**`R/erda_sourceimages.R`** -- Transform index to database schema:

| Function | Description |
|---|---|
| `et_build_sourceimages(index, sharelink, remote_dir, http_base, deploymentid, type)` | Transform filtered ERDA index to `data.sourceimages` schema; constructs URLs, parses timestamps |
| `et_build_sessions(grouped, deploymentid, type)` | Aggregate grouped images into `data.sessions` rows (one per `session_key`/date); `type` is `"motion"` or `"snapshot"` |

**`R/erda_db.R`** -- PostgreSQL database operations:

| Function | Description |
|---|---|
| `et_db_connect(host, port, dbname, user, password, sslmode, sslrootcert)` | Connect to AMI PostgreSQL database with SSL support |
| `et_db_write_sourceimages(dbcon, df, batch_size)` | Batch INSERT into `data.sourceimages` with ON CONFLICT upsert |
| `et_db_diff_sourceimages(dbcon, df, batch_size)` | Diff sourceimages against DB; return only new rows |
| `et_db_get_deployment_id(dbcon, projectid, partnerid, year, code)` | Look up deployment ID by project, partner, year, and code |
| `et_db_get_deployment(dbcon, deployment_id)` | Query `data.deployments` by primary key; returns year, code, partnerid, projectid, traptype |
| `et_db_deployment_path(dbcon, deployment_id)` | Construct canonical ERDA remote path from deployment metadata (branches on traptype) |
| `et_db_get_partner_folder(dbcon, partnerid)` | Query `data.partners` for folder name |
| `et_db_get_project_folder(dbcon, projectid)` | Query `data.projects` for folder name |
| `et_db_write_sessions(dbcon, df, batch_size)` | Batch INSERT into `data.sessions` with ON CONFLICT upsert on `(date, deploymentid, type)` |
| `et_db_assign_sessions(dbcon, deploymentid)` | UPDATE sourceimages to assign `sessionid` based on temporal overlap with sessions (type-aware) |
| `et_db_write_tracks(dbcon, df, batch_size)` | Batch INSERT into `data.tracks` with ON CONFLICT upsert on `(sessionid, edge_id)` |

**`R/erda_edge.R`** -- Edge processing parsers (IoT traps):

| Function | Description |
|---|---|
| `et_parse_edge_tracks(path)` | Parse track summary CSV from IoT edge processing; returns tibble with timestamps and metrics |
| `et_build_tracks(parsed_tracks, sessionid, source)` | Transform parsed tracks to `data.tracks` DB schema |
| `et_parse_edge_detections(path)` | Parse detection CSV from IoT edge processing; returns tibble |
| `et_process_edge_tracks(conn, dbcon, idx_raw, sessions)` | Download, parse, and write edge tracks from ERDA index to database; returns count |

**`R/erda_jobs.R`** -- In-memory async job tracking (for REST API):

| Function | Description |
|---|---|
| `et_job_create(type, params)` | Create a new job record with status "queued" |
| `et_job_set_process(id, process)` | Attach a `callr::r_bg()` process handle; set status to "running" |
| `et_job_update(id, status, result, error)` | Update job status/result/error |
| `et_job_get(id)` | Get job by ID with status synced from background process |
| `et_job_list()` | List all jobs as a data frame |

**`R/erda_api.R`** -- API runner:

| Function | Description |
|---|---|
| `et_api_run(port, host, ...)` | Start the bundled plumber REST API with Swagger UI |
| `erda_sftp.R` | SSH ControlMaster connection, SFTP batch commands, folder listing |
| `erda_media.R` | Media file filtering, EXIF tag resolution |
| `erda_index.R` | BFS directory indexer (full + update mode), index filtering |
| `erda_http.R` | Construct anonymous HTTP URLs for images |
| `parse_exif.R` | CamAlien/AMI filename parsing, motion/snapshot session grouping, detection parsing |
| `erda_sourceimages.R` | Transform index to DB schema (sourceimages, sessions), stray image filtering |
| `erda_db.R` | PostgreSQL operations (connect, upsert, diff, deployment lookups, session assignment) |
| `erda_edge.R` | IoT edge processing: parse track/detection CSVs, write to DB |
| `erda_jobs.R` | In-memory async job tracking for the REST API |
| `erda_api.R` | Start the bundled plumber REST API |

### REST API (`inst/api/plumber.R`)

@@ -177,37 +97,13 @@ Bundled with the package; launched via `et_api_run()` or standalone. Async job-b

### Pipelines (`pipelines/`)

**`pipelines/download_imgs.R`** -- SFTP download stage:
- Uses `et_sftp_connect()` + `et_sftp_list_*()` for discovery
- Uses `et_parquet_folders_done()` for idempotency
- For each folder: downloads JPGs in rolling batches -> reads EXIF -> writes `<folder_name>.parquet` -> deletes local JPGs
- Parallel workers (4), stall detection (60 s), per-worker progress bar
- EXIF tags resolved via `et_exif_tags()` (configurable via `EXIF_TAGS` env var)

**`pipelines/read_exif_http.R`** -- HTTP EXIF stage (no local storage):
- Uses SFTP listing to discover folders/files (no downloads)
- Constructs HTTP URLs with `et_img_url()`
- Calls `exifr::read_exif(urls, ...)` in batches -- ExifTool 12.22+ accepts HTTP URLs natively
- Same parquet output schema as `download_imgs.R`

**`pipelines/index_erda.R`** -- ERDA directory indexer:
- Full-index mode (default): BFS from `ERDA_SFTP_REMOTE_DIR`; resumes automatically from `INDEX_PATH`
- Update mode (`INDEX_UPDATE=1`): re-scans all known dirs, reports additions/removals, recursively indexes new subdirs
- Index schema: `dir`, `name`, `is_dir`, `size` (integer, bytes; `NA` for dirs), `mtime` (character, e.g. `"Jul 10 11:10"`) -- one row per entry

**`pipelines/read_exif_parquet.R`** -- Parsing stage:
- Reads all `*.parquet` files from `PARQUET_DIR` (excluding `exif_combined.parquet`)
- Combines into a single `exif` tibble (one row per image)
- Parses `UserComment` JSON into a `detections` tibble (one row per detection)
- `UserComment` JSON structure: `{"gain": "1.4", "detections": ["<id> <species> <probability> <class_index>", ...]}`
- Detection strings: first token is `plantnet_id` (integer), last two tokens are `probability`/`class_index` (double), middle tokens joined as `species` name
- In script mode (non-interactive): writes `exif_combined.parquet` and `detections.parquet` to `PARQUET_DIR`

**`pipelines/run_deployment.R`** -- Deployment pipeline:
- Full from-scratch pipeline for a single deployment; works for both standalone and IoT trap types
- DB connect -> deployment path lookup -> SFTP index -> filter -> group motion+snapshot images -> build sourceimages/sessions -> diff -> write to DB
- For IoT deployments: also parses edge track CSVs, maps to sessions, and writes to `data.tracks`
- Requires `DEPLOYMENT_ID`, `ERDA_SHARELINK`, `AMI_HOST`, `AMI_USER`, `AMI_PASSWORD`
| Script | Purpose |
|---|---|
| `download_imgs.R` | SFTP download + EXIF extraction -> per-folder parquet files |
| `read_exif_http.R` | HTTP-based EXIF extraction (no local storage) -> per-folder parquet files |
| `read_exif_parquet.R` | Combine parquets, parse UserComment JSON into detections |
| `index_erda.R` | BFS directory indexer (full or update mode) -> parquet index |
| `run_deployment.R` | Full deployment pipeline: index -> filter -> group -> write to DB (legacy + trigger modes) |

### Admin scripts (`admin/`)

@@ -220,36 +116,14 @@ Bundled with the package; launched via `et_api_run()` or standalone. Async job-b

**Every new exported function must have tests.** When adding a function to `R/`, add corresponding tests in `tests/testthat/test-<filename>.R`. Run `devtools::test()` to confirm they pass before finishing.

```r
# Generate documentation (run after editing roxygen2 comments)
roxygen2::roxygenise()

# Run tests
devtools::test()
For standard R package development commands (document, test, check, load), use the `r-lib:r-package-development` skill. Package dependencies are listed in `DESCRIPTION`.

# Full package check
devtools::check()
## Domain knowledge skills

# Load for interactive dev
pkgload::load_all()
```
Use these skills when working in the corresponding areas:

## R package dependencies

Package imports: `cli`, `fs`, `jsonlite`, `nanoparquet`, `purrr`, `tibble`

Package suggests: `testthat (>= 3.0.0)`, `withr`, `pkgload`, `callr`, `httr2`, `httpuv`, `plumber`, `DBI`, `RPostgres`

Script-only (not declared as package deps): `dplyr`, `tidyr`, `exifr`

Dev tools: `devtools`, `roxygen2`

## Database

When working on database-related code (queries, migrations, schema mismatches, or any `R/erda_db.R` functions), always use the `ecos:db-schema` skill to fetch the authoritative schema definitions before writing or modifying code. This ensures code matches the actual database schema.

## ERDA

When working on code that accesses ERDA via SFTP or HTTP, or debugging ERDA connections, use the `ecos:erda` skill for guidance on access patterns, authentication, behavioral quirks, and best practices.

ERDA is the Electronic Research Data Archive at Aarhus University. It is an implementation of [MiG](https://github.com/ucphhpc/migrid-sync).
| Skill | When to use |
|---|---|
| `ecos:db-schema` | Database-related code: queries, migrations, schema mismatches, `R/erda_db.R` functions |
| `ecos:erda` | ERDA access via SFTP or HTTP: access patterns, authentication, behavioral quirks |
| `ecos:ami-trap` | AMI trap hardware, data formats, deployment workflows, IoT vs standalone differences |
+1 −1
Original line number Diff line number Diff line
Package: erdatools
Type: Package
Title: Access 'ERDA' Data Archive via SFTP and HTTP
Version: 0.9.0
Version: 0.10.0
Authors@R: person("Lars", "Dalby", role = c("aut", "cre", "cph"),
    email = "lars@ecos.au.dk")
Description: Utilities for listing folders and reading image EXIF
+2 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ export(et_db_get_deployment)
export(et_db_get_deployment_id)
export(et_db_get_partner_folder)
export(et_db_get_project_folder)
export(et_db_get_sessions)
export(et_db_list_deployments)
export(et_db_write_deployments)
export(et_db_write_sessions)
@@ -22,6 +23,7 @@ export(et_db_write_tracks)
export(et_exif_tags)
export(et_filter_media_files)
export(et_filter_stray_images)
export(et_filter_stray_sourceimages)
export(et_group_motion_images)
export(et_group_snapshot_images)
export(et_img_url)
+227 −242

File changed.

Preview size limit exceeded, changes collapsed.

+28 −33
Original line number Diff line number Diff line
@@ -70,38 +70,34 @@ et_parse_edge_tracks <- function(path) {
#' Build a tracks data frame for database insertion
#'
#' Transforms the output of [et_parse_edge_tracks()] into the `data.tracks`
#' database schema by mapping `id` to `edge_id` and adding `sessionid` and
#' `source` columns.
#' database schema by mapping `id` to `edge_id` and adding `sessionid`,
#' `algorithm`, and `algorithmversion` columns.
#'
#' @param parsed_tracks Tibble as returned by [et_parse_edge_tracks()].
#' @param sessionid Character. Session UUID to associate with all tracks.
#' @param source Character. Source identifier. Default `"edge"`.
#' @param algorithm Character or `NULL`. Algorithm name (e.g. `"yolov8"`).
#'   Default `NULL`.
#' @param algorithmversion Character or `NULL`. Algorithm version string.
#'   Default `NULL`.
#'
#' @return A data frame with columns matching the `data.tracks` schema:
#'   `sessionid`, `edge_id`, `source`, `starttime`, `endtime`, `duration`,
#'   `class`, `counts`, `percentage`, `size`, `distance`.
#'   `sessionid`, `edge_id`, `algorithm`, `algorithmversion`.
#'
#' @examples
#' \dontrun{
#' parsed <- et_parse_edge_tracks("results/tracks/20250508TR.csv")
#' tracks <- et_build_tracks(parsed, sessionid = "abc-123", source = "edge")
#' tracks <- et_build_tracks(parsed, sessionid = "abc-123")
#' }
#'
#' @export
et_build_tracks <- function(parsed_tracks, sessionid, source = "edge") {
et_build_tracks <- function(parsed_tracks, sessionid, algorithm = NULL,
                            algorithmversion = NULL) {
  if (nrow(parsed_tracks) == 0L) {
    return(data.frame(
      sessionid        = character(0L),
      edge_id          = integer(0L),
      source     = character(0L),
      starttime  = as.POSIXct(character(0L)),
      endtime    = as.POSIXct(character(0L)),
      duration   = integer(0L),
      class      = character(0L),
      counts     = integer(0L),
      percentage = double(0L),
      size       = double(0L),
      distance   = double(0L),
      algorithm        = character(0L),
      algorithmversion = character(0L),
      stringsAsFactors = FALSE
    ))
  }
@@ -109,15 +105,8 @@ et_build_tracks <- function(parsed_tracks, sessionid, source = "edge") {
  data.frame(
    sessionid        = as.character(sessionid),
    edge_id          = as.integer(parsed_tracks$id),
    source     = as.character(source),
    starttime  = parsed_tracks$starttime,
    endtime    = parsed_tracks$endtime,
    duration   = as.integer(parsed_tracks$duration),
    class      = as.character(parsed_tracks$class),
    counts     = as.integer(parsed_tracks$counts),
    percentage = as.double(parsed_tracks$percentage),
    size       = as.double(parsed_tracks$size),
    distance   = as.double(parsed_tracks$distance),
    algorithm        = if (is.null(algorithm)) NA_character_ else as.character(algorithm),
    algorithmversion = if (is.null(algorithmversion)) NA_character_ else as.character(algorithmversion),
    stringsAsFactors = FALSE
  )
}
@@ -225,7 +214,7 @@ et_process_edge_tracks <- function(conn, dbcon, idx_raw, sessions) {

  if (nrow(track_entries) == 0L) return(0L)

  n_total <- 0L
  all_tracks <- list()

  for (i in seq_len(nrow(track_entries))) {
    remote_file <- paste0(track_entries$dir[i], "/", track_entries$name[i])
@@ -254,10 +243,16 @@ et_process_edge_tracks <- function(conn, dbcon, idx_raw, sessions) {
        parsed[mask, , drop = FALSE],
        sessionid = sess_match$id[1L]
      )
      et_db_write_tracks(dbcon, tracks_df)
      n_total <- n_total + nrow(tracks_df)
      all_tracks[[length(all_tracks) + 1L]] <- tracks_df
    }
  }

  n_total
  combined <- if (length(all_tracks) > 0L) do.call(rbind, all_tracks)
              else data.frame(sessionid = character(0L), edge_id = integer(0L),
                              algorithm = character(0L),
                              algorithmversion = character(0L),
                              stringsAsFactors = FALSE)

  if (nrow(combined) > 0L) et_db_write_tracks(dbcon, combined)
  nrow(combined)
}
Loading