Commit d3ccc739 authored by Lars Dalby's avatar Lars Dalby
Browse files

Merge branch 'dev' into 'main'

Add et_db_diff_sourceimages() and /sourceimages/diff endpoint

See merge request !6
parents d7385635 e6b5f6d4
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -126,6 +126,10 @@ Script-only: `dplyr`, `jsonlite`, `purrr`, `tidyr`

Dev tools: `devtools`, `roxygen2`

## Database

When working on database-related code (queries, migrations, schema mismatches, or any `R/erda_db.R` functions), always use the `ecos:db-schema` skill to fetch the authoritative schema definitions before writing or modifying code. This ensures code matches the actual database schema.

## ERDA

ERDA is the Electronic Research Data Archive at Aarhus University. It is an implementation of [MiG](https://github.com/ucphhpc/migrid-sync)
+1 −1
Original line number Diff line number Diff line
Package: erdatools
Type: Package
Title: Access 'ERDA' Data Archive via SFTP and HTTP
Version: 0.3.0
Version: 0.4.0
Authors@R: person("Lars", "Dalby", role = c("aut", "cre", "cph"),
    email = "lars@ecos.au.dk")
Description: Utilities for listing folders and reading image EXIF
+7 −0
Original line number Diff line number Diff line
# Generated by roxygen2: do not edit by hand

export(et_build_sessions)
export(et_build_sourceimages)
export(et_build_subsessions)
export(et_db_connect)
export(et_db_diff_sourceimages)
export(et_db_get_deployment_id)
export(et_db_get_partner_folder)
export(et_db_write_sessions)
export(et_db_write_sourceimages)
export(et_db_write_subsessions)
export(et_exif_tags)
export(et_filter_media_files)
export(et_group_motion_images)
export(et_img_url)
export(et_index_dir)
export(et_index_filter)
@@ -17,6 +23,7 @@ export(et_job_list)
export(et_job_set_process)
export(et_job_update)
export(et_parquet_folders_done)
export(et_parse_ami_filename)
export(et_parse_camalien_filename)
export(et_sftp_batch)
export(et_sftp_connect)
+306 −20
Original line number Diff line number Diff line
@@ -13,6 +13,8 @@
#' @param dbname Database name. Defaults to `AMI_DB` env var or `"ami_ias"`.
#' @param user Database user. Defaults to `AMI_USER` env var.
#' @param password Database password. Defaults to `AMI_PASSWORD` env var.
#' @param sslmode SSL mode for the connection. Defaults to `"require"`
#'   (encrypted, no certificate verification).
#'
#' @return A DBI connection object.
#'
@@ -28,27 +30,40 @@ et_db_connect <- function(
    port     = 5432L,
    dbname   = Sys.getenv("AMI_DB", unset = "ami_ias"),
    user     = Sys.getenv("AMI_USER"),
    password = Sys.getenv("AMI_PASSWORD")) {
    password = Sys.getenv("AMI_PASSWORD"),
    sslmode  = "require") {

  if (!requireNamespace("DBI", quietly = TRUE) ||
      !requireNamespace("RPostgres", quietly = TRUE)) {
    cli::cli_abort("Packages {.pkg DBI} and {.pkg RPostgres} are required for database operations.")
  }

  # Override libpq env vars to avoid conflicts with user's ~/.Renviron
  # (e.g. PGSSLMODE=verify-full with a cert that doesn't match this host)
  old_sslmode <- Sys.getenv("PGSSLMODE", unset = NA)
  old_gssenc  <- Sys.getenv("PGGSSENCMODE", unset = NA)
  Sys.setenv(PGSSLMODE = sslmode, PGGSSENCMODE = "disable")
  on.exit({
    if (is.na(old_sslmode)) Sys.unsetenv("PGSSLMODE") else Sys.setenv(PGSSLMODE = old_sslmode)
    if (is.na(old_gssenc))  Sys.unsetenv("PGGSSENCMODE") else Sys.setenv(PGGSSENCMODE = old_gssenc)
  }, add = TRUE)

  DBI::dbConnect(
    RPostgres::Postgres(),
    host     = host,
    port     = as.integer(port),
    dbname   = dbname,
    user     = user,
    password = password
    password = password,
    sslmode  = sslmode,
    gssencmode = "disable"
  )
}

#' Write sourceimages data frame to the database
#'
#' Performs batch INSERT into `data.sourceimages` with ON CONFLICT upsert on
#' the `filename` column. Existing rows are updated with the new values.
#' `(filename, deploymentid)`. Existing rows are updated with the new values.
#' Uses parameterized queries via [DBI::dbExecute()].
#'
#' @param dbcon A DBI connection object (from [et_db_connect()]).
@@ -78,6 +93,11 @@ et_db_write_sourceimages <- function(dbcon, df, batch_size = 250L) {
    cli::cli_abort("Missing required columns: {.val {missing}}")
  }

  batch_size <- as.integer(batch_size)
  if (is.na(batch_size) || batch_size < 1L) {
    cli::cli_abort("{.arg batch_size} must be a positive integer.")
  }

  n <- nrow(df)
  if (n == 0L) {
    cli::cli_alert_info("No rows to write.")
@@ -103,8 +123,7 @@ et_db_write_sourceimages <- function(dbcon, df, batch_size = 250L) {
      "INSERT INTO data.sourceimages ",
      "(filename, deploymentid, url, timestamp, issnapshot, processed, queued) ",
      "VALUES ", paste(placeholders, collapse = ","), " ",
      "ON CONFLICT (filename) DO UPDATE SET ",
      "deploymentid = EXCLUDED.deploymentid, ",
      "ON CONFLICT (filename, deploymentid) DO UPDATE SET ",
      "url = EXCLUDED.url, ",
      "timestamp = EXCLUDED.timestamp, ",
      "issnapshot = EXCLUDED.issnapshot, ",
@@ -117,7 +136,7 @@ et_db_write_sourceimages <- function(dbcon, df, batch_size = 250L) {
      row <- batch[i, , drop = FALSE]
      list(
        as.character(row$filename),
        as.integer(row$deploymentid),
        as.character(row$deploymentid),
        as.character(row$url),
        as.character(row$timestamp),
        as.logical(row$issnapshot),
@@ -138,37 +157,41 @@ et_db_write_sourceimages <- function(dbcon, df, batch_size = 250L) {
#' Look up a deployment ID from the database
#'
#' Queries `data.deployments` for a deployment matching the given project,
#' partner, year, and trap code.
#' partner, year, and code. The deployment ID is a character string
#' (e.g. `"IT1#2023"`).
#'
#' @param dbcon A DBI connection object.
#' @param projectid Character. Project identifier (e.g. `"ias"`).
#' @param partnerid Integer. Partner ID.
#' @param partnerid Character. Partner ID (e.g. `"8"`).
#' @param year Integer. Deployment year.
#' @param trap_code Character. Trap code (e.g. `"BG1"`).
#' @param code Character. Deployment code (e.g. `"IT1"`).
#'
#' @return Integer deployment ID, or `NA_integer_` if not found.
#' @return Character deployment ID (e.g. `"IT1#2023"`), or `NA_character_`
#'   if not found.
#'
#' @examples
#' \dontrun{
#' con <- et_db_connect()
#' dep_id <- et_db_get_deployment_id(con, "ias", 1L, 2025L, "BG1")
#' dep_id <- et_db_get_deployment_id(con, "ias", "8", 2023L, "IT1")
#' }
#'
#' @export
et_db_get_deployment_id <- function(dbcon, projectid, partnerid, year, trap_code) {
et_db_get_deployment_id <- function(dbcon, projectid, partnerid, year, code) {
  if (!requireNamespace("DBI", quietly = TRUE)) {
    cli::cli_abort("Package {.pkg DBI} is required for database operations.")
  }

  sql <- paste0(
    "SELECT deploymentid FROM data.deployments ",
    "WHERE projectid = $1 AND partnerid = $2 AND year = $3 AND trap_code = $4 ",
    "SELECT id FROM data.deployments ",
    "WHERE projectid = $1 AND partnerid = $2 AND year = $3 AND code = $4 ",
    "LIMIT 1"
  )
  result <- DBI::dbGetQuery(dbcon, sql, params = list(projectid, as.integer(partnerid),
                                                       as.integer(year), trap_code))
  if (nrow(result) == 0L) return(NA_integer_)
  as.integer(result$deploymentid[1L])
  result <- DBI::dbGetQuery(dbcon, sql, params = list(
    as.character(projectid), as.character(partnerid),
    as.integer(year), as.character(code)
  ))
  if (nrow(result) == 0L) return(NA_character_)
  as.character(result$id[1L])
}

#' Look up a partner's folder name from the database
@@ -192,8 +215,271 @@ et_db_get_partner_folder <- function(dbcon, partnerid) {
    cli::cli_abort("Package {.pkg DBI} is required for database operations.")
  }

  sql <- "SELECT folder FROM data.partners WHERE partnerid = $1 LIMIT 1"
  result <- DBI::dbGetQuery(dbcon, sql, params = list(as.integer(partnerid)))
  sql <- "SELECT folder FROM data.partners WHERE id = $1 LIMIT 1"
  result <- DBI::dbGetQuery(dbcon, sql, params = list(as.character(partnerid)))
  if (nrow(result) == 0L) return(NA_character_)
  as.character(result$folder[1L])
}

#' Diff sourceimages against the database
#'
#' Compares a sourceimages data frame against `data.sourceimages` in the
#' database and returns only the rows whose `filename` is not yet present.
#' Queries are batched to stay within PostgreSQL's parameter limit.
#'
#' @param dbcon A DBI connection object (from [et_db_connect()]).
#' @param df Data frame with at least a `filename` column (typically the
#'   output of [et_build_sourceimages()]).
#' @param batch_size Integer. Number of filenames per query. Default `5000L`.
#'
#' @return A subset of `df` containing only rows not already in the database.
#'   All original columns are preserved.
#'
#' @examples
#' \dontrun{
#' con <- et_db_connect()
#' on.exit(DBI::dbDisconnect(con))
#' si  <- et_build_sourceimages(idx, sharelink, remote_dir, http_base, 1L)
#' new <- et_db_diff_sourceimages(con, si)
#' et_db_write_sourceimages(con, new)
#' }
#'
#' @export
et_db_diff_sourceimages <- function(dbcon, df, batch_size = 5000L) {
  if (!requireNamespace("DBI", quietly = TRUE)) {
    cli::cli_abort("Package {.pkg DBI} is required for database operations.")
  }

  if (!"filename" %in% names(df)) {
    cli::cli_abort("Column {.val filename} is required in {.arg df}.")
  }

  batch_size <- as.integer(batch_size)
  if (is.na(batch_size) || batch_size < 1L) {
    cli::cli_abort("{.arg batch_size} must be a positive integer.")
  }

  n <- nrow(df)
  if (n == 0L) {
    cli::cli_alert_info("Empty input — nothing to diff.")
    return(df)
  }

  # Query DB in batches to find existing filenames
  batches <- split(seq_len(n), ceiling(seq_len(n) / batch_size))
  existing_parts <- vector("list", length(batches))

  for (i in seq_along(batches)) {
    fns <- df$filename[batches[[i]]]
    placeholders <- paste0("$", seq_along(fns), collapse = ", ")
    sql <- paste0("SELECT filename FROM data.sourceimages WHERE filename IN (", placeholders, ")")
    result <- DBI::dbGetQuery(dbcon, sql, params = as.list(fns))
    existing_parts[[i]] <- result$filename
  }

  existing <- unlist(existing_parts, use.names = FALSE)
  new_df <- df[!df$filename %in% existing, , drop = FALSE]
  n_new <- nrow(new_df)
  cli::cli_alert_info(
    "Diff: {n} total, {n - n_new} already in DB, {n_new} new."
  )
  new_df
}

#' Write sessions to the database
#'
#' Performs batch INSERT into `data.sessions` with ON CONFLICT upsert on
#' `(date, deploymentid)`. Returns the input data frame augmented with `id`
#' (UUID) from the database.
#'
#' @param dbcon A DBI connection object (from [et_db_connect()]).
#' @param df Data frame with columns: `date`, `deploymentid`, `starttime`,
#'   `endtime`. Typically the output of [et_build_sessions()].
#' @param batch_size Integer. Number of rows per INSERT statement. Default `250L`.
#'
#' @return The input data frame with an `id` column (character UUID) populated
#'   from the database.
#'
#' @examples
#' \dontrun{
#' con <- et_db_connect()
#' on.exit(DBI::dbDisconnect(con))
#' sessions <- et_build_sessions(grouped, "IT1#2023")
#' sessions_with_ids <- et_db_write_sessions(con, sessions)
#' }
#'
#' @export
et_db_write_sessions <- function(dbcon, df, batch_size = 250L) {
  if (!requireNamespace("DBI", quietly = TRUE)) {
    cli::cli_abort("Package {.pkg DBI} is required for database operations.")
  }

  required_cols <- c("date", "deploymentid", "starttime", "endtime")
  missing <- setdiff(required_cols, names(df))
  if (length(missing) > 0L) {
    cli::cli_abort("Missing required columns: {.val {missing}}")
  }

  batch_size <- as.integer(batch_size)
  if (is.na(batch_size) || batch_size < 1L) {
    cli::cli_abort("{.arg batch_size} must be a positive integer.")
  }

  n <- nrow(df)
  if (n == 0L) {
    cli::cli_alert_info("No sessions to write.")
    df$id <- character(0L)
    return(df)
  }

  batches <- split(seq_len(n), ceiling(seq_len(n) / batch_size))
  total_affected <- 0L
  ncols <- 5L
  pb <- cli::cli_progress_bar("Writing sessions", total = length(batches))

  for (batch_idx in batches) {
    batch <- df[batch_idx, , drop = FALSE]
    k <- nrow(batch)

    placeholders <- vapply(seq_len(k), function(i) {
      offset <- (i - 1L) * ncols
      paste0("(", paste0("$", offset + seq_len(ncols), collapse = ","), ")")
    }, character(1L))

    sql <- paste0(
      "INSERT INTO data.sessions ",
      "(date, deploymentid, starttime, endtime, subsessions_inferred) ",
      "VALUES ", paste(placeholders, collapse = ","), " ",
      "ON CONFLICT (date, deploymentid) DO UPDATE SET ",
      "starttime = EXCLUDED.starttime, ",
      "endtime = EXCLUDED.endtime, ",
      "subsessions_inferred = EXCLUDED.subsessions_inferred"
    )

    params <- unlist(lapply(seq_len(k), function(i) {
      row <- batch[i, , drop = FALSE]
      list(
        as.character(row$date),
        as.character(row$deploymentid),
        as.character(row$starttime),
        as.character(row$endtime),
        TRUE
      )
    }), recursive = FALSE)

    total_affected <- total_affected + DBI::dbExecute(dbcon, sql, params = params)
    cli::cli_progress_update(id = pb)
  }

  cli::cli_progress_done(id = pb)
  cli::cli_alert_success("Wrote {n} row(s) to data.sessions.")

  # Fetch IDs back from DB
  unique_deps <- unique(df$deploymentid)
  id_placeholders <- paste0("$", seq_along(unique_deps), collapse = ", ")
  id_sql <- paste0(
    "SELECT id, date, deploymentid FROM data.sessions ",
    "WHERE deploymentid IN (", id_placeholders, ")"
  )
  ids <- DBI::dbGetQuery(dbcon, id_sql, params = as.list(unique_deps))

  # Match IDs back to input rows
  ids$date <- as.Date(ids$date)
  df$id <- ids$id[match(
    paste(df$date, df$deploymentid),
    paste(ids$date, ids$deploymentid)
  )]

  df
}

#' Write subsessions to the database
#'
#' Performs batch INSERT into `data.subsessions` with ON CONFLICT upsert on
#' `(starttime, sessionid)`. The `sessionid` column must be populated before
#' calling (by joining on section/date after writing sessions).
#'
#' @param dbcon A DBI connection object (from [et_db_connect()]).
#' @param df Data frame with columns: `sessionid`, `deploymentid`, `starttime`,
#'   `endtime`. Typically the output of [et_build_subsessions()] after
#'   populating `sessionid` from [et_db_write_sessions()].
#' @param batch_size Integer. Number of rows per INSERT statement. Default `250L`.
#'
#' @return The total number of rows affected (invisibly).
#'
#' @examples
#' \dontrun{
#' con <- et_db_connect()
#' on.exit(DBI::dbDisconnect(con))
#' subsessions$sessionid <- sessions_with_ids$id[
#'   match(subsessions$section, sessions_with_ids$section)
#' ]
#' et_db_write_subsessions(con, subsessions)
#' }
#'
#' @export
et_db_write_subsessions <- function(dbcon, df, batch_size = 250L) {
  if (!requireNamespace("DBI", quietly = TRUE)) {
    cli::cli_abort("Package {.pkg DBI} is required for database operations.")
  }

  required_cols <- c("sessionid", "deploymentid", "starttime", "endtime")
  missing <- setdiff(required_cols, names(df))
  if (length(missing) > 0L) {
    cli::cli_abort("Missing required columns: {.val {missing}}")
  }

  batch_size <- as.integer(batch_size)
  if (is.na(batch_size) || batch_size < 1L) {
    cli::cli_abort("{.arg batch_size} must be a positive integer.")
  }

  n <- nrow(df)
  if (n == 0L) {
    cli::cli_alert_info("No subsessions to write.")
    return(invisible(0L))
  }

  batches <- split(seq_len(n), ceiling(seq_len(n) / batch_size))
  total_affected <- 0L
  ncols <- 5L
  pb <- cli::cli_progress_bar("Writing subsessions", total = length(batches))

  for (batch_idx in batches) {
    batch <- df[batch_idx, , drop = FALSE]
    k <- nrow(batch)

    placeholders <- vapply(seq_len(k), function(i) {
      offset <- (i - 1L) * ncols
      paste0("(", paste0("$", offset + seq_len(ncols), collapse = ","), ")")
    }, character(1L))

    sql <- paste0(
      "INSERT INTO data.subsessions ",
      "(sessionid, deploymentid, starttime, endtime, status) ",
      "VALUES ", paste(placeholders, collapse = ","), " ",
      "ON CONFLICT (starttime, sessionid) DO UPDATE SET ",
      "deploymentid = EXCLUDED.deploymentid, ",
      "endtime = EXCLUDED.endtime, ",
      "status = EXCLUDED.status"
    )

    params <- unlist(lapply(seq_len(k), function(i) {
      row <- batch[i, , drop = FALSE]
      list(
        as.character(row$sessionid),
        as.character(row$deploymentid),
        as.character(row$starttime),
        as.character(row$endtime),
        "pending"
      )
    }), recursive = FALSE)

    total_affected <- total_affected + DBI::dbExecute(dbcon, sql, params = params)
    cli::cli_progress_update(id = pb)
  }

  cli::cli_progress_done(id = pb)
  cli::cli_alert_success("Wrote {n} row(s) to data.subsessions.")
  invisible(total_affected)
}
+133 −57
Original line number Diff line number Diff line
@@ -3,25 +3,23 @@
#' Build a sourceimages data frame from an ERDA index
#'
#' Transforms a filtered ERDA index (files only) into the `data.sourceimages`
#' schema used by the AMI database. Constructs public URLs via [et_img_url()]
#' and extracts timestamps from CamAlien filenames via
#' [et_parse_camalien_filename()].
#' schema used by the AMI database. Constructs public URLs from the full
#' `dir/name` path in the index and extracts timestamps from CamAlien
#' filenames via [et_parse_camalien_filename()].
#'
#' The `index_ts` column records the time this function is called (i.e. the
#' build timestamp), not the time the index was originally created.
#'
#' Path parsing assumes the directory structure
#' `remote_dir/<country>/<folder>/...`. Files at unexpected depths will
#' produce a warning and may generate incorrect URLs.
#'
#' @param index Data frame as returned by [et_index_filter()] — must contain
#'   columns `dir`, `name`, `is_dir`, `size`, `mtime`. Should be filtered to
#'   files only (no directories).
#' @param sharelink ERDA sharelink token.
#' @param remote_dir Remote root directory (relative to sharelink root).
#' @param remote_dir Remote root directory (relative to sharelink root). Used
#'   only to strip the prefix when building URLs — the remainder of the path
#'   is taken from the index `dir` column.
#' @param http_base Base URL for the ERDA share-redirect endpoint.
#' @param deploymentid Integer. Deployment ID for all rows.
#' @param type Character. Image type: `"snapshot"` or `"timeseries"`.
#' @param deploymentid Character. Deployment ID for all rows (e.g. `"IT1#2023"`).
#' @param type Character. Image type: `"snapshot"` or `"motion"`.
#'
#' @return Data frame with columns: `index_ts` (character, build timestamp),
#'   `filename`, `deploymentid`, `url`, `timestamp`, `issnapshot`, `processed`,
@@ -33,9 +31,9 @@
#' si <- et_build_sourceimages(
#'   idx,
#'   sharelink    = Sys.getenv("ERDA_SHARELINK"),
#'   remote_dir   = "/storage/onestop/2025/",
#'   remote_dir   = "/ias/",
#'   http_base    = "https://anon.erda.au.dk/share_redirect",
#'   deploymentid = 42L,
#'   deploymentid = "IT1#2023",
#'   type         = "snapshot"
#' )
#' }
@@ -53,7 +51,7 @@ et_build_sourceimages <- function(
    return(data.frame(
      index_ts     = character(0L),
      filename     = character(0L),
      deploymentid = integer(0L),
      deploymentid = character(0L),
      url          = character(0L),
      timestamp    = character(0L),
      issnapshot   = logical(0L),
@@ -63,58 +61,35 @@ et_build_sourceimages <- function(
    ))
  }

  # Strip remote_dir prefix from dir to get relative path components.
  # Expected structure: <country>/<folder>/...
  clean_remote <- gsub("^/+|/+$", "", remote_dir)
  rel_paths <- sub(paste0("^/*", gsub("([.\\\\|(){}^$*+?])", "\\\\\\1", clean_remote), "/*"), "",
                   index$dir)
  # Build full file paths from dir + name, then construct URLs
  clean <- function(x) gsub("^/+|/+$", "", x)
  base <- paste(clean(http_base), clean(sharelink), sep = "/")
  file_paths <- paste(clean(index$dir), index$name, sep = "/")
  urls <- paste(base, file_paths, sep = "/")

  # Split relative path: first component = country, second = folder
  parts <- strsplit(rel_paths, "/")
  country <- vapply(parts, function(p) if (length(p) >= 1L) p[1L] else "", character(1L))
  folder  <- vapply(parts, function(p) if (length(p) >= 2L) p[2L] else "", character(1L))

  # Warn about files with missing path components
  empty_country <- !nzchar(country)
  empty_folder  <- !nzchar(folder)
  if (any(empty_country)) {
    cli::cli_warn(
      "{sum(empty_country)} file(s) have no country component in their path. URLs may be incorrect."
    )
  }
  if (any(empty_folder & !empty_country)) {
    cli::cli_warn(
      "{sum(empty_folder & !empty_country)} file(s) have no folder component in their path. URLs may be incorrect."
    )
  }

  # Construct URLs
  urls <- mapply(
    et_img_url,
    sharelink  = sharelink,
    remote_dir = remote_dir,
    country    = country,
    folder     = folder,
    filename   = index$name,
    http_base  = http_base,
    USE.NAMES  = FALSE
  )

  # Parse timestamps from filenames
  parsed <- tryCatch(
  # Parse timestamps from filenames.
  # Try AMI format first (<id>-<YYYYMMDDHHmmss>-<type>.ext),
  # then fall back to CamAlien format (CT_<ts>_GT_...).
  ami_parsed <- et_parse_ami_filename(index$name)
  if (any(!is.na(ami_parsed$timestamp))) {
    timestamp <- ami_parsed$timestamp
  } else {
    camalien_parsed <- tryCatch(
      et_parse_camalien_filename(index$name),
      error = function(e) NULL
    )
  timestamp <- if (!is.null(parsed) && "CT" %in% names(parsed)) {
    parsed$CT
    timestamp <- if (!is.null(camalien_parsed) && "CT" %in% names(camalien_parsed)) {
      camalien_parsed$CT
    } else {
      rep(NA_character_, nrow(index))
    }
  }

  data.frame(
    index_ts     = format(Sys.time(), "%Y-%m-%dT%H:%M:%S"),
    filename     = index$name,
    deploymentid = as.integer(deploymentid),
    deploymentid = as.character(deploymentid),
    url          = urls,
    timestamp    = timestamp,
    issnapshot   = type == "snapshot",
@@ -123,3 +98,104 @@ et_build_sourceimages <- function(
    stringsAsFactors = FALSE
  )
}

# et_build_sessions -----------------------------------------------------------

#' Build a sessions data frame from grouped motion images
#'
#' Aggregates a grouped data frame (from [et_group_motion_images()]) into one
#' row per unique section, matching the `data.sessions` database schema.
#'
#' @param grouped Data frame as returned by [et_group_motion_images()] — must
#'   contain columns `section` and `timestamp`.
#' @param deploymentid Character. Deployment ID (e.g. `"IT1#2023"`).
#'
#' @return Data frame with columns: `section` (character, retained for joining),
#'   `date` (Date), `deploymentid` (character), `starttime` (POSIXct),
#'   `endtime` (POSIXct). One row per unique section.
#'
#' @examples
#' \dontrun{
#' grouped <- et_group_motion_images(idx)
#' sessions <- et_build_sessions(grouped, deploymentid = "IT1#2023")
#' }
#'
#' @export
et_build_sessions <- function(grouped, deploymentid) {
  if (nrow(grouped) == 0L) {
    return(data.frame(
      section      = character(0L),
      date         = as.Date(character(0L)),
      deploymentid = character(0L),
      starttime    = as.POSIXct(character(0L)),
      endtime      = as.POSIXct(character(0L)),
      stringsAsFactors = FALSE
    ))
  }

  sections <- unique(grouped$section)
  rows <- lapply(sections, function(sec) {
    mask <- grouped$section == sec
    ts <- grouped$timestamp[mask]
    data.frame(
      section      = sec,
      date         = as.Date(gsub("_", "-", sec)),
      deploymentid = as.character(deploymentid),
      starttime    = min(ts),
      endtime      = max(ts),
      stringsAsFactors = FALSE
    )
  })
  do.call(rbind, rows)
}

# et_build_subsessions --------------------------------------------------------

#' Build a subsessions data frame from grouped motion images
#'
#' Aggregates a grouped data frame (from [et_group_motion_images()]) into one
#' row per unique (section, subsection) pair, matching the `data.subsessions`
#' database schema.
#'
#' @param grouped Data frame as returned by [et_group_motion_images()] — must
#'   contain columns `section`, `subsection`, and `timestamp`.
#' @param deploymentid Character. Deployment ID (e.g. `"IT1#2023"`).
#'
#' @return Data frame with columns: `section` (character, retained for joining
#'   to sessions), `deploymentid` (character), `starttime` (POSIXct),
#'   `endtime` (POSIXct). One row per unique (section, subsection) pair.
#'
#' @examples
#' \dontrun{
#' grouped <- et_group_motion_images(idx)
#' subsessions <- et_build_subsessions(grouped, deploymentid = "IT1#2023")
#' }
#'
#' @export
et_build_subsessions <- function(grouped, deploymentid) {
  if (nrow(grouped) == 0L) {
    return(data.frame(
      section      = character(0L),
      deploymentid = character(0L),
      starttime    = as.POSIXct(character(0L)),
      endtime      = as.POSIXct(character(0L)),
      stringsAsFactors = FALSE
    ))
  }

  combos <- unique(grouped[, c("section", "subsection"), drop = FALSE])
  rows <- lapply(seq_len(nrow(combos)), function(i) {
    sec <- combos$section[i]
    sub <- combos$subsection[i]
    mask <- grouped$section == sec & grouped$subsection == sub
    ts <- grouped$timestamp[mask]
    data.frame(
      section      = sec,
      deploymentid = as.character(deploymentid),
      starttime    = min(ts),
      endtime      = max(ts),
      stringsAsFactors = FALSE
    )
  })
  do.call(rbind, rows)
}
Loading