FuncFrog is a library for performing efficient, parallel, lazy map
, reduce
, filter
and many other operations on slices and other data sequences in a pipeline. The sequence can be set by a variety of generating functions. Everything is supported to be executed in parallel with minimal overhead on copying and locks. There is a built-in support of error handling with Yeet/Snag methods
The library is easy to use and has a clean, intuitive API.
You can measure performance comparing to vanilla for
loop on your machine using cd perf/; make
(spoiler: FuncFrog
is better when multithreading).
- Getting Started
- Basic information
- Supported functions list
- Using prefix
Pipe
to transformPipe
type - Using
ff
package to write shortened pipes - Look for useful functions in
Pipies
package - Examples
- Basic example
- Example using
Func
andTake
- Example using
Func
andGen
- Example difference between
Take
andGen
- Example using
Filter
andMap
- Example using
Map
andReduce
- Example of
Map
andReduce
with the underlying array type change - Example using
Sort
- Example of infine sequence generation
- Example using
Range
andMap
- Example using
Repeat
andMap
- Example using
Cycle
andFilter
- Example using
Erase
andCollect
- Example of simple error handling
- Example of multiple error handling
- Is this package stable?
- Contributions
- What's next?
To use FuncFrog in your project, run the following command:
go get github.com/koss-null/funcfrog
Then, import the library into your Go code (basically you need the pipe package):
import "github.com/koss-null/funcfrog/pkg/pipe"
You can then use the pipe
package to create a pipeline of operations on a slice:
res := pipe.Slice(a).
Map(func(x int) int { return x * x }).
Filter(func(x *int) bool { return *x > 100 }).
Parallel(12).
Do()
All operations are carefully fenced with interfaces, so feel free to use anything, autosuggestion suggests you.
If you want it fast and short, you may use ff
:
import "github.com/koss-null/funcfrog/pkg/ff"
res := ff.Map(strArr, strings.ToUpper).Do()
To see some code snippets, check out the Examples.
The Piper
(or PiperNoLen
for pipes with undetermined lengths) is an interface that represents a lazy-evaluated sequence of data. The Piper
interface provides a set of methods that can be used to transform, filter, collect and analyze data in the sequence.
Every pipe can be conveniently copied at every moment just by equating it to a variable. Some methods (as Take
or Gen
) lead from PiperNoLen
to Piper
interface making wider method range available.
The following functions can be used to create a new Pipe
(this is how I call the inner representation of a sequence ofelements and a sequence operations on them):
- 🐸
Slice([]T) Piper
: creates aPipe
of a given typeT
from a slice, the length is known. - 🐸
Func(func(i int) (T, bool)) PiperNL
: creates aPipe
of typeT
from a function. The function returns an element which is considered to be ati
th position in thePipe
, as well as a boolean indicating whether the element should be included (true
) or skipped (false
), the length is unknown. - 🐸
Fn(func(i int) (T)) PiperNL
: creates aPipe
of typeT
from a function. The function should return the value of the element at thei
th position in thePipe
; to be able to skip values useFunc
. - 🐸
FuncP(func(i int) (*T, bool)) PiperNL
: creates aPipe
of typeT
from a function. The function returns a pointer to an element which is considered to be ati
th position in thePipe
, as well as a boolean indicating whether the element should be included (true
) or skipped (false
), the length is unknown. - 🐸
Cycle(data []T) PiperNL
: creates a newPipe
that cycles through the elements of the provided slice indefinitely. The length is unknown. - 🐸
Range(start, end, step T) Piper
: creates a newPipe
that generates a sequence of values of typeT
fromstart
toend
(exclusive) with a fixedstep
value between each element.T
can be any numeric type, such asint
,float32
, orfloat64
. The length is known. - 🐸
Repeat(x T, n int) Piper
: creates a newPipe
that generates a sequence of values of typeT
and value x with the length of n. The length is known.
- 🐸
Take(n int) Piper
: if it's aFunc
-madePipe
, expectsn
values to be eventually returned. Transforms unknown length to known. - 🐸
Gen(n int) Piper
: if it's aFunc
-madePipe
, generates a sequence from[0, n)
and applies the function to it. Transforms unknown length to known.
- 🐸
Parallel(n int) Pipe
: sets the number of goroutines to be executed on (1 by default). This function can be used to specify the level of parallelism in the pipeline. Availabble for unknown length.
- 🐸
Map(fn func(x T) T) Pipe
: applies the functionfn
to every element of thePipe
and returns a newPipe
with the transformed data. Available for unknown length. - 🐸
Filter(fn func(x *T) bool) Pipe
: applies the predicate functionfn
to every element of thePipe
and returns a newPipe
with only the elements that satisfy the predicate. Available for unknown length. - 🐸
MapFilter(fn func(T) (T, bool)) Piper[T]
: applies given function to each element of the underlying slice. If the second returning value offn
is false, the element is skipped (may be useful for error handling). - 🐸
Reduce(fn func(x, y *T) T) *T
: applies the binary functionfn
to the elements of thePipe
and returns a single value that is the result of the reduction. Returnsnil
if thePipe
was empty before reduction. - 🐸
Sum(plus func(x, y *T) T) T
: makes parallel reduce with associative functionplus
. - 🐸
Sort(less func(x, y *T) bool) Pipe
: sorts the elements of thePipe
using the providedless
function as the comparison function.
- 🐸
Any() T
: returns a random element existing in the pipe. Available for unknown length. - 🐸
First() T
: returns the first element of thePipe
, ornil
if thePipe
is empty. Available for unknown length. - 🐸
Count() int
: returns the number of elements in thePipe
. It does not allocate memory for the elements, but instead simply returns the number of elements in thePipe
.
- 🐸
Do() []T
function is used to execute the pipeline and return the resulting slice of data. This function should be called at the end of the pipeline to retrieve the final result.
- 🐸
Erase() Pipe[any]
: returns a pipe where all objects are the objects from the initialPipe
but with erased type. Basically for eachx
it returnsany(&x)
. Usepipe.Collect[T](Piper[any]) PiperT
to collect it back into some type (orpipe.CollectNL
for slices with length not set yet).
- 🐸
pipe.Collect[T](Piper[any]) PiperNoLen[T]
- 🐸
pipe.CollectNL[T](PiperNoLen[any]) PiperNoLen[T]
This functions takes a Pipe of erasedinterface{}
type (which is pretty useful if you have a lot of type conversions along your pipeline and can be achieved by callingErase()
on aPipe
). Basically, for each elementx
in a sequenceCollect
returns*(x.(*T))
element.
- 🐸
Yeti(yeti) Pipe[T]
:set ayeti
- an object that will collect errors thrown withyeti.Yeet(error)
and will be used to handle them. - 🐸
Snag(func(error)) Pipe[T]
: set a function that will handle all errors which have been sent withyeti.Yeet(error)
to the lastyeti
object that was set throughPipe[T].Yeti(yeti) Pipe[T]
method. Error handling may look pretty uncommon at a first glance. To get better intuition about it you may like to check out examples section.
- 🌱 TBD:
Until(fn func(*T) bool)
: if it's aFunc
-madePipe
, it evaluates one-by-one until fn return false. This feature may require some newPipe
interfaces, since it is applicable only in a context of a single thread - 🌱 TBD:
IsAny() bool
: returnstrue
if thePipe
contains any elements, andfalse
otherwise. Available for unknown length. - 🌱 TBD:
MoreThan(n int) bool
: returnstrue
if thePipe
contains more thann
elements, andfalse
otherwise. Available for unknown length. - 🌱 TBD:
Reverse() *Pipe
: reverses the underlying slice.
In addition to the functions described above, the pipe
package also provides several utility functions that can be used to create common types of Pipe
s, such as Range
, Repeat
, and Cycle
. These functions can be useful for creating Pipe
s of data that follow a certain pattern or sequence.
Also it is highly recommended to get familiarize with the pipies
package, containing some useful predecates, comparators and accumulators.
You may found that using Erase()
is not so convenient as it makes you to do some pointer conversions. Fortunately there is another way to convert a pipe type: use functions from pipe/prefixpipe.go
. These functions takes Piper
or PiperNoLen
as a first parameter and function to apply as the second and returns a resulting pipe (or the result itself) of a destination type.
- 🐸
pipe.Map(Piper[SrcT], func(x SrcT) DstT) Piper[DstT]
- applies map from one type to another for thePipe
with known length. - 🐸
pipe.MapNL(PiperNoLen[SrcT], func(x SrcT) DstT) PiperNoLen[DstT]
- applies map from one type to another for thePipe
with unknown length. - 🐸
Reduce(Piper[SrcT], func(*DstT, *SrcT) DstT, initVal ...DstT)
- applies reduce operation onPipe
of typeSrcT
and returns result of typeDstT
.initVal
is an optional parameter to initialize a value that should be used on the first steps of reduce.
Sometimes you need just to apply a function. Creating a pipe using pipe.Slice
and then call Map
looks a little bit verbose, especially when you need to call Map
or Reduce
from one type to another. The solution for it is funcfrog/pkg/ff
package. It contains shortened Map
and Reduce
functions which can be called directly with a slice as a first parameter.
- 🐸
Map([]SrcT, func(SrcT) DstT) pipe.Piper[DstT]
- applies sent function to a slice, returns aPipe
of resulting type - 🐸
Reduce([]SrcT, func(*DstT, *SrcT) DstT, initVal ...DstT) DstT
- applies reduce operation on a slice and returns the result of typeDstT
.initVal
is an optional parameter to initialize a value that should be used on the first steps of reduce.
Some of the functions that are sent to Map
, Filter
or Reduce
(or other Pipe
methods) are pretty common. Also there is a common comparator for any integers and floats for a Sort
method.
res := pipe.Slice(a).
Map(func(x int) int { return x * x }).
Map(func(x int) int { return x + 1 }).
Filter(func(x *int) bool { return *x > 100 }).
Filter(func(x *int) bool { return *x < 1000 }).
Parallel(12).
Do()
p := pipe.Func(func(i int) (v int, b bool) {
if i < 10 {
return i * i, true
}; return
}).Take(5).Do()
// p will be [0, 1, 4, 9, 16]
p := pipe.Func(func(i int) (v int, b bool) {
if i < 10 {
return i * i, true
}; return
}).Gen(5).Do()
// p will be [0, 1, 4, 9, 16]
Gen(n) generates the sequence of n elements and applies all pipeline afterwards.
p := pipe.Func(func(i int) (v int, b bool) {
return i, true
}).
Filter(func(x *int) bool { return (*x) % 2 == 0})
Gen(10).
Do()
// p will be [0, 2, 4]
Take(n) expects the result to be of n length.
p := pipe.Func(func(i int) (v int, b bool) {
return i, true
}).
Filter(func(x *int) bool { return (*x) % 2 == 0})
Take(10).
Do()
// p will be [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Watch out, if Take value is set uncarefully, it may jam the whole pipenile.
// DO NOT DO THIS, IT WILL JAM
p := pipe.Func(func(i int) (v int, b bool) {
return i, i < 10 // only 10 first values are not skipped
}).
Take(11). // we can't get any 11th value ever
Parallel(4). // why not
Do()
// Do() will try to evaluate the 11th value in 4 goroutines until it reaches maximum int value
p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}).
Filter(func(x *int) bool { return *x % 2 == 0 }).
Map(func(x int) int { return len(strconv.Itoa(x)) }).
Do()
// p will be [1, 1, 1, 1, 2]
In this example Reduce is used in it's prefix form to be able to convert ints to string.
p := pipe.Reduce(
pipe.Slice([]int{1, 2, 3, 4, 5}).
Map(func(x int) int { return x * x }),
func(x, y *int) string {
return strconv.Itoa(*x) + "-" + strconv.Itoa(y)
},
)
// p will be "1-4-9-16-25"
In this example Reduce is used as usual in it's postfix form.
p := pipe.Slice([]stirng{"Hello", "darkness", "my", "old", "friend"}).
Map(strings.Title).
Reduce(func(x, y *string) string {
return *x + " " + *y
})
)
// p will be "Hello Darkness My Old Friend"
p := pipe.Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})
strP := pipe.Map(p, func(x int) string { return strconv.Itoa(x) })
result := pipe.Reduce(strP, func(x, y *string) int { return len(*x) + len(*y) }).Do()
// result will be 45
p := pipe.Func(func(i int) (float32, bool) {
return 100500-float32(i) * 0.9, true
}).
Map(func(x float32) float32 { return x * x * 0.1 }).
Gen(100500). // Sort is only availavle on pipes with known length
Sort(pipies.Less[float32]). // pipies.Less(x, y *T) bool is available to all comparables
// check out pipies package to find more usefull things
Parallel(12).
Do()
// p will contain the elements sorted in ascending order
Here is an example of generating an infinite sequence of Fibonacci:
var fib []chan int
p := pipe.Func(func(i int) (int, bool) {
if i < 2 {
fib[i] <- i
return i, true
}
p1 := <-fib[i-1]; fib[i-1] <- p1
p2 := <-fib[i-2]; fib[i-2] <- p2
fib[i] <- p1 + p2
return p1 + p2, true
}).Parallel(20)
To generate a specific number of values, you can use the Take
or Gen
method:
// fill the array first
fib = make([]chan int, 60)
for i := range fib { fib[i] = make(chan int, 1) }
// do the Take
p = p.Take(60)
To accumulate the elements of the Pipe
, you can use the Reduce
or Sum
method:
sum := p.Sum(pipe.Sum[float32])
//also you can: sum := p.Reduce(func(x, y *float32) float32 { return *x + *y})
// sum will be the sum of the first 65000 random float32 values greater than 0.5
p := pipe.Range(10, 20, 2).Map(func(x int) int { return x * x }).Do()
// p will be [100, 144, 196, 256, 324]
p := pipe.Repeat("hello", 5).Map(strings.ToUpper).Do()
// p will be ["HELLO", "HELLO", "HELLO", "HELLO", "HELLO"]
Here is an example how you can handle multiple function returning error call this way:
func foo() error {
// <...>
return nil
}
errs := pipe.Map(
pipe.Repeat(foo, 50),
func(f func() error) error { return f() },
).Do()
for _, e := range errs {
if e != nil {
log.Err(e)
}
}
p := pipe.Cycle([]int{1, 2, 3}).Filter(func(x *int) bool { return *x % 2 == 0 }).Take(4).Do()
// p will be [2, 2, 2, 2]
p := pipe.Slice([]int{1, 2, 3}).
Erase().
Map(func(x any) any {
i := *(x.(*int))
return &MyStruct{Weight: i}
}).Filter(x *any) bool {
return (*x).(*MyStruct).Weight > 10
}
ms := pipe.Collect[MyStruct](p).Parallel(10).Do()
y := pipe.NewYeti()
p := pipe.Range[int](-10, 10, 1).
Yeti(y). // it's important to set yeti before yeeting, or the handle process will not be called
MapFilter(func(x int) (int, bool) {
if x == 0 {
y.Yeet(errors.New("zero devision")) // yeet the error
return 0, false // use MapFilter to filter out this value
}
return int(256.0 / float64(x)), true
}).Snag(func(err error) {
fmt.Println("oopsie-doopsie: ", err)
}).Do()
fmt.Println("p is: ", p)
/////////// output is:
// oopsie-doopsie: zero devision
// p is: [-25 -28 -32 -36 -42 -51 -64 -85 -128 -256 256 128 85 64 51 42 36 32 28]
This example demonstrates generating a set of values 256/i, where i ranges from -10 to 9 (excluding 10) with a step of 1. To handle division by zero, the library provides an error handling mechanism.
To begin, you need to create an error handler using the pipe.NewYeti()
function. Then, register the error handler by calling the Yeti(yeti)
method on your pipe
object. This registered yeti
will be the last error handler used in the pipe
chain.
To yeet an error, you can use y.Yeet(error)
from the registered yeti
object.
To handle the yeeted error, use the Snag(func(error))
method, which sets up an error handling function. You can set up multiple Snag
functions, but all of them will consider the last yeti
object set with the Yeti(yeti)
method.
This is a simple example of how to handle basic errors. Below, you will find a more realistic example of error handling in a real-life scenario.
y1, y2 := pipe.NewYeti(), pipe.NewYeti()
users := pipe.Func(func(i int) (*domain.DomObj, bool) {
domObj, err := uc.GetUser(i)
if err != nil {
y1.Yeet(err)
return nil, false
}
return domObj, true
}).
Yeti(y1).Snag(handleGetUserErr). // suppose we have some pre-defined handler
MapFilter(func(do *domain.DomObj) (*domain.DomObj, bool) {
enriched, err := uc.EnrichUser(do)
if err != nil {
return nil, false
}
return enriched, true
}).Yeti(y2).Snag(handleEnrichUserErr).
Do()
The full working code with samples of handlers and implementations of usecase functions can be found at: https://go.dev/play/p/YGtM-OeMWqu.
This example demonstrates how multiple error handling functions can be set up at different stages of the data processing pipeline to handle errors specific to each stage.
Lets break down what is happening here.
In this code fragment, there are two instances of pipe.Yeti
created: y1
and y2
. These Yeti
instances are used to handle errors at different stages of the data processing pipeline.
Within the pipe.Func
operation, there are error-handling statements. When calling uc.GetUser(i)
, if an error occurs, it is yeeted using y1.Yeet(err)
, and the function returns nil
and false
to indicate the failure.
The Yeti(y1).Snag(handleGetUserErr)
statement sets up an error handling function handleGetUserErr
to handle the error thrown by uc.GetUser(i)
. This function is defined elsewhere and specifies how to handle the error.
After that, the MapFilter
operation is performed on the resulting *domain.DomObj
. If the uc.EnrichUser(do)
operation encounters an error, it returns nil
and false
to filter out the value.
The Yeti(y2).Snag(handleEnrichUserErr)
statement sets up another error handling function handleEnrichUserErr
to handle the error thrown by uc.EnrichUser(do)
.
Finally, the Do()
method executes the entire pipeline and assigns the result to the users
variable.
Yes it finally is stable since v1.0.0! All listed functionality is fully covered by unit-tests. Functionality marked as TBD will be implemented as it described in the README and covered by unit-tests to be delivered stable.
If there will be any method signature changes, the major version will be incremented.
I will accept any pr's with the functionality marked as TBD.
Also I will accept any sane unit-tests.
Bugfixes.
You are welcome to create any issues and connect to me via email.
I hope to provide some roadmap of the project soon.
Feel free to fork, inspire and use!