Title: | Enabling stream processing of large files |
---|---|
Description: | Large data files can be difficult to work with in R, where data generally resides in memory. This package encourages a style of programming where data is 'streamed' from disk into R via a `producer' and through a series of `consumers' that, typically reduce the original data to a manageable size. The package provides useful Producer and Consumer stream components for operations such as data input, sampling, indexing, and transformation; see package?Streamer for details. |
Authors: | Martin Morgan, Nishant Gopalakrishnan |
Maintainer: | Martin Morgan <[email protected]> |
License: | Artistic-2.0 |
Version: | 1.53.0 |
Built: | 2024-12-19 04:07:46 UTC |
Source: | https://github.com/bioc/Streamer |
Large data files can be difficult to work with in R, where data
generally resides in memory. This package encourages a style of
programming where data is 'streamed' from disk into R through a series
of components that, typically, reduce the original data to a
manageable size. The package provides useful
Producer
and Consumer
components for operations such as data input, sampling, indexing, and
transformation.
The central paradigm in this package is a Stream
composed of a
Producer
and zero or more
Consumer
components. The Producer
is
responsible for input of data, e.g., from the file system. A
Consumer
accepts data from a Producer
and performs
transformations on it. The Stream
function is used to
assemble a Producer
and zero or more Consumer
components
into a single string.
The yield
function can be applied to a stream to
generate one ‘chunk’ of data. The definition of chunk depends on the
stream and its components. A common paradigm repeatedly invokes
yield
on a stream, retrieving chunks of the stream for further
processing.
Martin Morgan [email protected]
Producer
, Consumer
are the
main types of stream components. Use Stream
to connect
components, and yield
to iterate a stream.
## About this package packageDescription("Streamer") ## Existing stream components getClass("Producer") # Producer classes getClass("Consumer") # Consumer classes ## An example fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) s <- Stream(RawToChar(), Rev(), b) s head(yield(s)) # First chunk close(b) b <- RawInput(fl, 5000L, verbose=TRUE) d <- Downsample(sampledSize=50) s <- Stream(RawToChar(), d, b) s s[[2]] ## Processing the first ten chunks of the file i <- 1 while (10 >= i && 0L != length(chunk <- yield(s))) { cat("chunk", i, "length", length(chunk), "\n") i <- i + 1 } close(b)
## About this package packageDescription("Streamer") ## Existing stream components getClass("Producer") # Producer classes getClass("Consumer") # Consumer classes ## An example fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) s <- Stream(RawToChar(), Rev(), b) s head(yield(s)) # First chunk close(b) b <- RawInput(fl, 5000L, verbose=TRUE) d <- Downsample(sampledSize=50) s <- Stream(RawToChar(), d, b) s s[[2]] ## Processing the first ten chunks of the file i <- 1 while (10 >= i && 0L != length(chunk <- yield(s))) { cat("chunk", i, "length", length(chunk), "\n") i <- i + 1 } close(b)
ConnectionProducer
classes include ScanProducer
,
ReadLinesProducer
, and ReadTableProducer
, providing
Streamer interfaces to scan
, readLines
, and
read.table
.
ScanProducer(file, ..., fileArgs=list(), scanArgs=list(...)) ReadLinesProducer(con, ..., conArgs=list(), readLinesArgs=list(...)) ReadTableProducer(file, ..., fileArgs=list(), readTableArgs=list(...)) ## S3 method for class 'ConnectionProducer' close(con, ...)
ScanProducer(file, ..., fileArgs=list(), scanArgs=list(...)) ReadLinesProducer(con, ..., conArgs=list(), readLinesArgs=list(...)) ReadTableProducer(file, ..., fileArgs=list(), readTableArgs=list(...)) ## S3 method for class 'ConnectionProducer' close(con, ...)
file , con
|
The file or connection to be used for input. See
|
... |
Additional arguments, e.g., |
fileArgs , conArgs
|
Arguments, e.g., |
scanArgs , readLinesArgs , readTableArgs
|
Arguments to
|
See Producer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("ReadLinesProducer")$fields
.
Internal methods of this class are described with
getRefClass("ReadLinesProducer")$methods()
and
getRefClass("ReadLinesProducer")$help()
.
Martin Morgan [email protected]
Streamer-package
, Producer-class
,
Streamer-class
.
fl <- system.file(package="Rsamtools", "extdata", "ex1.sam") p <- ReadLinesProducer(fl, n = 1000) # read 1000 lines at a time while (length(y <- yield(p))) print(length(y)) close(p) p <- ReadTableProducer(fl, quote="", fill=TRUE, nrows=1000) while (length(y <- yield(p))) print(dim(y)) reset(p) dim(yield(p)) ## connections opened 'under the hood' are closed, with warnings rm(p); gc() ## avoid warnings by managing connections p <- ScanProducer(file(fl, "r"), verbose=TRUE, scanArgs=list(what=character())) length(yield(p)) close(p) rm(p); gc()
fl <- system.file(package="Rsamtools", "extdata", "ex1.sam") p <- ReadLinesProducer(fl, n = 1000) # read 1000 lines at a time while (length(y <- yield(p))) print(length(y)) close(p) p <- ReadTableProducer(fl, quote="", fill=TRUE, nrows=1000) while (length(y <- yield(p))) print(dim(y)) reset(p) dim(yield(p)) ## connections opened 'under the hood' are closed, with warnings rm(p); gc() ## avoid warnings by managing connections p <- ScanProducer(file(fl, "r"), verbose=TRUE, scanArgs=list(what=character())) length(yield(p)) close(p) rm(p); gc()
A virtual base class representing components that can consume data
from a Producer
, and yield data to the user or other
Consumer
instances. A Consumer
typically transforms
records from one form to another. Producer
and Consumer
instances are associated with each other through the
Stream
function.
Methods defined on this class include:
Construct a stream from one Producer
and one or
more Consumer
. See ?Stream
.
Internal fields of this class are are described with, e.g.,
getRefClass("Consumer")$fields
.
Internal methods of this class are described with
getRefClass("Consumer")$methods()
and
getRefClass("Consumer")$help()
.
Martin Morgan [email protected]
Streamer-package
, Streamer-class
,
Producer-class
,
Stream-class
.
showClass("Consumer")
showClass("Consumer")
A Consumer
to route incoming tasks through nodes
connected as a directed acyclic graph.
DAGParam(x, ...) DAGTeam(..., dagParam = DAGParam(), teamParam = MulticoreParam(1L)) ## S3 method for class 'DAGTeam' plot(x, y, ...)
DAGParam(x, ...) DAGTeam(..., dagParam = DAGParam(), teamParam = MulticoreParam(1L)) ## S3 method for class 'DAGTeam' plot(x, y, ...)
x |
A matrix or data.frame with columns ‘From’,
‘To’, or a |
... |
For For |
dagParam |
A |
teamParam |
A |
y |
Unused. |
Use DAGParam
and DAGTeam
to construct instances of these
classes, with ParallelParam
instances created by, e.g.,
MulticoreParam
.
See Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("MulticoreTeam")$fields
.
Internal methods of this class are described with
getRefClass("MulticoreTeam")$methods()
and
getRefClass("MulticoreTeam")$help()
.
Martin Morgan [email protected]
Team
applies a single function across multiple threads..
df <- data.frame(From = c("A", "A", "B", "C"), To = c("B", "C", "D", "D"), stringsAsFactors=FALSE) dagParam <- DAGParam(df) dteam <- DAGTeam(A=FunctionConsumer(function(y) y), B=FunctionConsumer(function(A) -A), C=FunctionConsumer(function(A) 1 / A), D=FunctionConsumer(function(B, C) B + C), dagParam=dagParam) plot(dteam) strm <- Stream(Seq(to=10), dteam) sapply(strm, c) reset(strm)
df <- data.frame(From = c("A", "A", "B", "C"), To = c("B", "C", "D", "D"), stringsAsFactors=FALSE) dagParam <- DAGParam(df) dteam <- DAGTeam(A=FunctionConsumer(function(y) y), B=FunctionConsumer(function(A) -A), C=FunctionConsumer(function(A) 1 / A), D=FunctionConsumer(function(B, C) B + C), dagParam=dagParam) plot(dteam) strm <- Stream(Seq(to=10), dteam) sapply(strm, c) reset(strm)
A Consumer
-class to select records with fixed
probability, returning a yield of fixed size. Successive calls to
yield
result in sampling of subsequent records in the stream,
until the stream is exhausted.
Downsample(probability=0.1, sampledSize=1e6, ...)
Downsample(probability=0.1, sampledSize=1e6, ...)
probability |
A |
... |
Additional arguments, passed to the |
sampledSize |
A |
See Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("Downsample")$fields
.
Internal methods of this class are described with
getRefClass("Downsample")$methods()
and
getRefClass("Downsample")$help()
.
Martin Morgan [email protected]
showClass("Downsample")
showClass("Downsample")
The FunctionProducer
and FunctionConsumer
classes
provide an easy way to quickly create Producer
and
Consumer
instances from user-provided functions.
FunctionProducer(FUN, RESET, ..., state=NULL) FunctionConsumer(FUN, RESET, ..., state=NULL)
FunctionProducer(FUN, RESET, ..., state=NULL) FunctionConsumer(FUN, RESET, ..., state=NULL)
FUN |
User defined function to yield successive records in the
stream. The |
RESET |
An optional function of one arugment (‘state’) to reset the stream to its original state. If missing, the stream cannot be reset. |
... |
Arguments passed to the |
state |
Any information, made available to |
Use FunctionProducer
or FunctionConsumer
to construct
instances of this class.
See Producer
and Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("FunctionProducer")$fields
.
Internal methods of this class are described with
getRefClass("FunctionProducer")$methods()
and
getRefClass("FunctionProducer")$help()
.
Nishant Gopalakrishnan [email protected]
## A ProducerFunction producerFun <- function() ## produce the mean of 10 random uniform numbers ## stop when the mean is greater than 0.8 { x <- mean(runif(10)) if (x > .8) numeric(0) else x } randomSampleMeans <- FunctionProducer(producerFun) result <- sapply(randomSampleMeans, c) length(result) head(result) ## A FunctionConsumer: consumerFun <- function(y) ## transform input by -10 log10 { -10 * log10(y) } neg10log10 <- FunctionConsumer(consumerFun) strm <- Stream(randomSampleMeans, neg10log10) result <- sapply(strm, c) length(result) head(result)
## A ProducerFunction producerFun <- function() ## produce the mean of 10 random uniform numbers ## stop when the mean is greater than 0.8 { x <- mean(runif(10)) if (x > .8) numeric(0) else x } randomSampleMeans <- FunctionProducer(producerFun) result <- sapply(randomSampleMeans, c) length(result) head(result) ## A FunctionConsumer: consumerFun <- function(y) ## transform input by -10 log10 { -10 * log10(y) } neg10log10 <- FunctionConsumer(consumerFun) strm <- Stream(randomSampleMeans, neg10log10) result <- sapply(strm, c) length(result) head(result)
Configure and register parallel calculations, e.g., for
Team
evaluation.
MulticoreParam(size = getOption("mc.cores", 2L), mc.set.seed = TRUE, ...) register(param)
MulticoreParam(size = getOption("mc.cores", 2L), mc.set.seed = TRUE, ...) register(param)
size |
The number of members in the parallel cluster. |
mc.set.seed |
|
param |
A |
... |
Additional arguments, e.g., |
Use MulticoreParam
to construct instances of this class.
Invoked with an argument param
stores the
param
for use in subsequent parallel computation. Use
NULL
to clear the register. The function returns,
invisibly, the previously registered parameter instance, if any.
Internal fields of this class are are described with, e.g.,
getRefClass("MulticoreParam")$fields
.
Internal methods of this class are described with
getRefClass("MulticoreParam")$methods()
and
getRefClass("MulticoreParam")$help()
.
Martin Morgan [email protected]
Team
to apply one function in parallel,
DAGTeam
to evaluate functions whose dependencies are
represented as directed acyclic graphs.
if (.Platform$OS.type != "windows") { oparam <- register() ## previous setting param <- MulticoreParam() ## default multicore settings register(param) ## register for future use, e.g,. Team register(oparam) ## reset original }
if (.Platform$OS.type != "windows") { oparam <- register() ## previous setting param <- MulticoreParam() ## default multicore settings register(param) ## register for future use, e.g,. Team register(oparam) ## reset original }
A virtual class representing components that can read data from
connections, and yield records to the user or a Consumer
instance. A Producer
represents a source of data, responsible
for parsing a file or other data source into records to be passed to
Consumer
classes. Producer
and Consumer
instances
are associated with each other through the Stream
function.
## S4 method for signature 'Producer' lapply(X, FUN, ...) ## S4 method for signature 'Producer' sapply(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
## S4 method for signature 'Producer' lapply(X, FUN, ...) ## S4 method for signature 'Producer' sapply(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
X |
An instance of class |
FUN |
A function to be applied to each successful |
... |
Additional arguments to |
simplify |
See |
USE.NAMES |
See |
Methods defined on this class include:
Construct a stream from one Producer
and one or
more Consumer
. See ?Stream
.
Yield a single result (e.g., data.frame
) from
the Producer.
Reset, if possible, the Producer.
Apply FUN
to each result applied to
yield()
, simplifying (using simplify2array
) if
possible for sapply
. Partial results on error can be
recovered using tryCatch
, as illustrated
below. Infinite producers will of course exhaust memory.
Internal fields of this class are are described with, e.g.,
getRefClass("Producer")$fields
.
Internal methods of this class are described with
getRefClass("Producer")$methods()
and
getRefClass("Producer")$help()
.
Martin Morgan [email protected]
Streamer-package
, Consumer-class
,
Streamer-class
.
showClass("Producer") showMethods(class="Producer", where="package:Streamer") sapply(Seq(to=47, yieldSize=7), function(elt) { c(n = length(elt), xbar = mean(elt)) }) ## recover partial results fun = function(i) if (i == 5) stop("oops, i == 5") else i res <- tryCatch(sapply(Seq(to=10), fun), error=function(err) { warning(conditionMessage(err), "\n only partial results available") simplify2array(err$partialResult) }) res
showClass("Producer") showMethods(class="Producer", where="package:Streamer") sapply(Seq(to=47, yieldSize=7), function(elt) { c(n = length(elt), xbar = mean(elt)) }) ## recover partial results fun = function(i) if (i == 5) stop("oops, i == 5") else i res <- tryCatch(sapply(Seq(to=10), fun), error=function(err) { warning(conditionMessage(err), "\n only partial results available") simplify2array(err$partialResult) }) res
A Producer
-class to interpret files as raw
(binary) data. Users interact with this class through the constructor
RawInput
and methods yield
,
reset
, and Stream
.
This class requires two helper functions; the ‘factory’ methods
defined on this page can be used to supply these.
rawReaderFactory
creates a ‘reader’, whose
responsibility it is to accept a connection and return a vector of
predefined type, e.g., raw
. rawParserFactory
creates
a ‘parser’, responsible for parsing a buffer and vector of the
same type as produced by the reader into records. The final record may
be incomplete (e.g., because reader
does not return complete
records), and regardless of completion status is the content of
buf
on the subsequent invocation of
parser
. length(buf)
or length(bin)
may be 0, as
when the first or final record is parsed.
RawInput(con, yieldSize = 1e+06, reader = rawReaderFactory(), parser = rawParserFactory(), ...) rawReaderFactory(blockSize = 1e+06, what) rawParserFactory(separator = charToRaw("\n"), trim = separator)
RawInput(con, yieldSize = 1e+06, reader = rawReaderFactory(), parser = rawParserFactory(), ...) rawReaderFactory(blockSize = 1e+06, what) rawParserFactory(separator = charToRaw("\n"), trim = separator)
con |
A character string or connection (opened as |
yieldSize |
The number of records the input parser is to yield. |
reader |
A function of one argument ( |
parser |
A function of two arguments ( |
... |
Additional arguments, passed to the |
blockSize |
The number of bytes to read at one time. |
what |
The type of data to read, as the argument to
|
separator |
A |
trim |
A |
con
:Object of class connection
. An R
connection
opened in “rb” mode from which
data will be read.
blockSize
:Object of class integer
. Size
(e.g., number of raw bytes) input during each
yield
.
reader
:Object of class function
. A function
used to input blockSize
elements. See
rawReaderFactory
.
parser
:Object of class function
. A function
used to parse raw input into records, e.g., breaking a
raw
vector on new lines ‘\n’. See
rawParserFactory
.buffer
:Object of class raw
. Contains read but
not parsed raw stream data.
.records
:Object of class list
. Parsed but not
yet yield-ed records.
.parsedRecords
:Object of class integer
. Total
number of records parsed by the Producer.
reset()
:Remove buffer and current records, reset
record counter, re-open con
.
Martin Morgan [email protected]
fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) length(value <- yield(b)) head(value) close(b)
fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) length(value <- yield(b)) head(value) close(b)
A Consumer
-class to reduce N successive records
into a single yield.
Reducer(FUN, init, ..., yieldNth = NA_integer_)
Reducer(FUN, init, ..., yieldNth = NA_integer_)
FUN |
A function of two arguments, where the first argument is
the result of the previous reduction (or |
init |
An optional initial value to initiate the reduction. When
present, |
... |
Additional arguments, passed to the |
yieldNth |
A positive integer indicating how many upstream yields
are combined before the Reducer yields. A value of
|
See Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("Reducer")$fields
.
Internal methods of this class are described with
getRefClass("Reducer")$methods()
and
getRefClass("Reducer")$help()
.
Martin Morgan [email protected]
s <- Stream(Seq(to=10), Reducer("+")) yield(s) ## sum(1:10), i.e., Reduce over the entire stream s <- Stream(Seq(to=10), Reducer("+", yieldNth=5)) yield(s) ## sum(1:5) yield(s) ## sum(6:10) s <- Stream(Seq(to=10), Reducer("+", init=10, yieldNth=5)) sapply(s, c) ## 10 + c(sum(1:5), sum(6:10)) if (.Platform$OS.type != "windows") { s <- Stream(Seq(to=10), Team(function(i) { Sys.sleep(1); i }, param=MulticoreParam(10L)), Reducer("+")) system.time(y <- yield(s)) y }
s <- Stream(Seq(to=10), Reducer("+")) yield(s) ## sum(1:10), i.e., Reduce over the entire stream s <- Stream(Seq(to=10), Reducer("+", yieldNth=5)) yield(s) ## sum(1:5) yield(s) ## sum(6:10) s <- Stream(Seq(to=10), Reducer("+", init=10, yieldNth=5)) sapply(s, c) ## 10 + c(sum(1:5), sum(6:10)) if (.Platform$OS.type != "windows") { s <- Stream(Seq(to=10), Team(function(i) { Sys.sleep(1); i }, param=MulticoreParam(10L)), Reducer("+")) system.time(y <- yield(s)) y }
reset
on a stream invokes the reset
method of all
components of the stream; on a component, it invokes the reset
method of the component and all inputs to the component.
reset(x, ...)
reset(x, ...)
x |
A |
... |
Additional arguments, currently unused. |
A reference to x
, the stream or component on which reset
was invoked.
Martin Morgan [email protected]
## see example(Stream)
## see example(Stream)
A Producer
-class to generate a sequence (possibly
long) of numbers.
Seq(from = 1L, to=.Machine$integer.max, by = 1L, yieldSize=1L, ...)
Seq(from = 1L, to=.Machine$integer.max, by = 1L, yieldSize=1L, ...)
from |
A starting value of any type (e.g., |
to |
An ending value, typically of the same type as |
by |
A value, typically of the same class as |
yieldSize |
A |
... |
Additional arguments passed to |
Use Seq
to construct instances of this class.
See Producer
Methods.
Internal fields of this class are are described with
getRefClass("Seq")$fields
.
Internal methods of this class are described with
getRefClass("Seq")$methods()
and
getRefClass("Seq")$help()
.
Martin Morgan [email protected]
s <- Seq(1, 10, yieldSize=5) while(length(y <- yield(s))) print(y)
s <- Seq(1, 10, yieldSize=5) while(length(y <- yield(s))) print(y)
status
invoked on a stream yields the current status of the
stream, as reported by the status
methods of each component.
status(x, ...) ## S4 method for signature 'Streamer' status(x, ...)
status(x, ...) ## S4 method for signature 'Streamer' status(x, ...)
x |
A |
... |
Additional arguments, currently unused. |
A component-specific summary the current status
Martin Morgan [email protected]
## see example(Stream)
## see example(Stream)
An ordered collection of Consumer
and Producer
components combined into a single entity. Applying a method such as
yield
to Stream
invokes yield
on the terminal
Consumer
component of the stream, yielding one batch from the
stream. The result of yield
is defined by the Producer
and Consumer
components of the stream.
Stream(x, ..., verbose=FALSE) ## S4 method for signature 'Stream' length(x) ## S4 method for signature 'Stream,numeric' x[[i, j, ...]] ## S4 method for signature 'Stream' lapply(X, FUN, ...) ## S4 method for signature 'Stream' sapply(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
Stream(x, ..., verbose=FALSE) ## S4 method for signature 'Stream' length(x) ## S4 method for signature 'Stream,numeric' x[[i, j, ...]] ## S4 method for signature 'Stream' lapply(X, FUN, ...) ## S4 method for signature 'Stream' sapply(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
x , X
|
For |
FUN |
A function to be applied to each successful |
i , j
|
Numeric index of the ith stream element ( |
... |
For |
simplify |
See |
USE.NAMES |
See |
verbose |
A |
Arguments to Stream
must consist of a single Producer
and zero or more Consumer
components.
When invoked with the Producer
as the first argument,
Stream(P, C1, C2)
produces a stream in which the data is read
by P
, then processed by C1
, then processed by C2
.
When invoked with the Consumer
as the first argument, the
...
must include a Producer
as the last
argument. Stream(C1, C2, P)
produces a stream in which the data
is read by P
, then processed by C2
, then processed by
C1
.
Methods defined on this class include:
The number of components in this stream.
The i
th component (including inputs) of this
stream.
Yield a single result (e.g., data.frame
) from
the stream.
Reset, if possible, each component of the stream.
Apply FUN
to each result applied to
yield()
, simplifying (using simplify2array
) if
possible for sapply
. Partial results on error can be
recovered using tryCatch
, as illustrated
on the help page Producer
.
Internal fields of this class are are described with, e.g.,
getRefClass("FunctionProducer")$fields
.
Internal methods of this class are described with
getRefClass("FunctionProducer")$methods()
and
getRefClass("FunctionProducer")$help()
.
Martin Morgan [email protected]
Streamer-package
, Consumer-class
,
Producer-class
.
fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) s <- Stream(b, Rev(), RawToChar()) s yield(s) reset(s) while (length(yield(s))) cat("tick\n") close(b) strm <- Stream(Seq(to=10), FunctionConsumer(function(y) 1/y)) sapply(strm, c)
fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer") b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4)) s <- Stream(b, Rev(), RawToChar()) s yield(s) reset(s) while (length(yield(s))) cat("tick\n") close(b) strm <- Stream(Seq(to=10), FunctionConsumer(function(y) 1/y)) sapply(strm, c)
A Consumer
to divide incoming tasks amongst
processes for parallel evaluation; not supported on Windows.
Team(FUN, ..., param)
Team(FUN, ..., param)
FUN |
A |
... |
Additional arguments (e.g., |
param |
If provided, a |
Use Team
to construct instances of this class.
When param
is missing, Team
consults the registry (see
register
) for a parallel parameter class. If none is
found and .Platform$OS.type == "unix"
, a default
MulticoreParam
instance is used. An error is signaled on
other operating systems (i.e., Windows)
See Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("MulticoreTeam")$fields
.
Internal methods of this class are described with
getRefClass("MulticoreTeam")$methods()
and
getRefClass("MulticoreTeam")$help()
.
Martin Morgan [email protected]
ParallelParam
for configuring parallel
environments. DAGTeam
apply functions organized as a
directed acyclic graph.
if (.Platform$OS.type != "windows") { param <- MulticoreParam(size=5) team <- Team(function(x) { Sys.sleep(1); mean(x) }, param=param) s <- Stream(Seq(to=50, yieldSize=5), team) system.time({while(length(y <- yield(s))) print(y) }) ## about 2 seconds }
if (.Platform$OS.type != "windows") { param <- MulticoreParam(size=5) team <- Team(function(x) { Sys.sleep(1); mean(x) }, param=param) s <- Stream(Seq(to=50, yieldSize=5), team) system.time({while(length(y <- yield(s))) print(y) }) ## about 2 seconds }
Utility
is a virtual class containing components to create
light weight Consumer
classes.
RawToChar
is a class to convert raw
(binary) records to
char
, applying rawToChar
to each record.
Rev
reverses the order of current task.
RawToChar(...) Rev(...)
RawToChar(...) Rev(...)
... |
Arguments passed to the
|
Use constructors RawToChar
, Rev
.
See Consumer
Methods.
Internal fields of this class are are described with, e.g.,
getRefClass("Utility")$fields
.
Internal methods of this class are described with
getRefClass("Utility")$methods()
and
getRefClass("Utility")$help()
.
Martin Morgan [email protected]
Streamer-package
, Consumer-class
,
Streamer-class
.
showClass("Utility")
showClass("Utility")
yield
invoked on a stream yields one chunk of data or, if the
stream is complete, a length zero element of the data. Successive
invocations of yield
produce successive chunks of data.
yield(x, ...)
yield(x, ...)
x |
A |
... |
Additional arguments, currently unused. |
A chunk of data, with the specific notion of chunk defined by the final component of the stream.
Martin Morgan [email protected]
## see example(Stream)
## see example(Stream)