Skip to contents

Combine all streams named in streams into a single dataset, filling down columns matched by fill_match and creating windowed features using windows as offsets for columns matched by window_match. The full dataset is rebuilt if rebuild is TRUE (default: FALSE), and data is appended to the existing cache if incremental is TRUE (default: TRUE)

Usage

stream(
  streams = c("email_stream", "ticket_stream", "contribution_stream",
    "membership_stream", "ticket_future_stream", "address_stream"),
  fill_match =
    "^(email|ticket|contribution|membership|ticket|address).+(amt|level|count|max|min|last)",
  window_match = "^(email|ticket|contribution|membership|ticket|address).+(count|amt)$",
  rebuild = FALSE,
  since = now() - dyears(),
  until = now() + dyears(10),
  incremental = !rebuild,
  windows = lapply(c(1, 7, 30, 90, 365), lubridate::period, units = "day"),
  ...
)

stream_chunk_write(
  stream,
  fill_cols = setdiff(colnames(stream), c(by, "timestamp")),
  window_cols = fill_cols,
  since = as_datetime(min(stream$timestamp)),
  by = "group_customer_no",
  incremental = TRUE,
  ...
)

stream_window_features(
  stream,
  window_cols = setdiff(colnames(stream), c(by, "timestamp")),
  since = min(stream$timestamp),
  windows = NULL,
  by = "group_customer_no",
  ...
)

Arguments

streams

character vector of streams to combine

fill_match

character regular expression to use when matching columns to fill down

window_match

character regular expression to use when matching columns to window

rebuild

logical whether or not to rebuild the whole dataset (TRUE) or just append to the end of it (FALSE)

since

POSIXct only the data with timestamps greater than or equal to since will be written

until

POSIXct date until which to build the stream

incremental

logical whether or not to update the cache incrementally. Can require huge amounts of memory (approximately double the total dataset size to be appended).

windows

lubridate::period vector that determines the offsets used when constructing the windowed features.

...

not used

stream

data.table data to process and write

fill_cols

character columns to fill down

window_cols

character columns to window

by

character column name to group by for filling down and windowing

Value

stream dataset as an arrow::Table

Functions

  • stream_chunk_write(): Fill down cols in stream_cols and add windowed features to stream for timestamps after since

  • stream_window_features(): construct windowed features for columns in stream_cols, using windows, a list of lubridate::periods as offsets, and grouped by by