Combine all streams named in streams
into a single dataset, filling down columns matched by fill_match
and
creating windowed features using windows
as offsets for columns matched by window_match
.
The full dataset is rebuilt if rebuild
is TRUE
(default: FALSE
), and data is appended to the
existing cache if incremental
is TRUE
(default: TRUE
)
Usage
stream(
streams = c("email_stream", "ticket_stream", "contribution_stream",
"membership_stream", "ticket_future_stream", "address_stream"),
fill_match =
"^(email|ticket|contribution|membership|ticket|address).+(amt|level|count|max|min|last)",
window_match = "^(email|ticket|contribution|membership|ticket|address).+(count|amt)$",
rebuild = FALSE,
since = now() - dyears(),
until = now() + dyears(10),
incremental = !rebuild,
windows = lapply(c(1, 7, 30, 90, 365), lubridate::period, units = "day"),
...
)
stream_chunk_write(
stream,
fill_cols = setdiff(colnames(stream), c(by, "timestamp")),
window_cols = fill_cols,
since = as_datetime(min(stream$timestamp)),
by = "group_customer_no",
incremental = TRUE,
...
)
stream_window_features(
stream,
window_cols = setdiff(colnames(stream), c(by, "timestamp")),
since = min(stream$timestamp),
windows = NULL,
by = "group_customer_no",
...
)
Arguments
- streams
character vector of streams to combine
- fill_match
character regular expression to use when matching columns to fill down
- window_match
character regular expression to use when matching columns to window
- rebuild
logical whether or not to rebuild the whole dataset (
TRUE
) or just append to the end of it (FALSE
)- since
POSIXct only the data with timestamps greater than or equal to
since
will be written- until
POSIXct date until which to build the stream
- incremental
logical whether or not to update the cache incrementally. Can require huge amounts of memory (approximately double the total dataset size to be appended).
- windows
lubridate::period vector that determines the offsets used when constructing the windowed features.
- ...
not used
- stream
data.table data to process and write
- fill_cols
character columns to fill down
- window_cols
character columns to window
- by
character column name to group by for filling down and windowing
Functions
stream_chunk_write()
: Fill down cols instream_cols
and add windowed features tostream
for timestamps aftersince
stream_window_features()
: construct windowed features for columns instream_cols
, usingwindows
, a list of lubridate::periods as offsets, and grouped byby