Friday, August 28, 2015

Making Complex Streams Easier to Understand

Recently the topic of complicated pipe sequences came up on the Elixir Slack channel and there were comments about the inscrutability of a really complicated Stream built of pipes. This is a typical example:


 File.stream!("priv/data/stops.csv")
    |> Stream.drop(1)
    |> Stream.map((&(Station.fromCSVRow &1)
    |> Enum.each(&(Station.Store.put(&1)))

One thing to always keep in mind is that Elixir is an expression based language and any result can always be replaced by a function. The other is that Streams are merely a composition of functions. A more readable way to write the above would be
   
def station_stream(file) do
   File.stream!(file) 
   |> Stream.drop(1)
   |> Stream.map((&(Station.fromCSVRow &1))
end

station_stream("priv/data/stops.csv")
  |> Enum.each(&(Station.Store.put(&1))

Additionally, you can put that Stream in a variable and use it anywhere in the program. As long as the underlying file does not change, the values of the enumeration will not change.

stops = station_stream("priv/data/stops.csv")


A File.stream always enumerates through the whole file each time it is evaluated. 
Pipes are one of the features that draws programmers to Elixir, but like anything good you can always over do it. Readability and maintainability should be the first goal. I'm not sure where the exact limit is, but I am sure that pipes can be abused to make code very difficult to reason about. My personal limit seems to be about 3-4 in a sequence.

Saturday, August 1, 2015

Thinking about Stream and Enum in Terms of Function Composition

Coming from a Ruby background it took me a while to wrap my head around what Stream and Enum really are. It's far too easy to think about them as a generalized version of a Ruby Array.

At their heart, they are both tools for composing functions.

Let's start with Enum, in most object style languages enumerators support an "each" method
that allows you to operate one at a time on members of the collection.

Elixir Enum looks quite different at first. It requires that the collection implement these basic functions.



count(collection)
Retrieves the collection’s size
member?(collection, value)
Checks if a value exists within the collection
reduce(collection, acc, fun)
Reduces the collection into a value

None of these looks like an "each" method[1]. 

All the Enum functions do is use these 3 basic functions to provide handy shortcuts for 
using the reduce function of the original collection. Unlike a typical "each" method in Ruby,
an Enum function can bail out at any time. For example, look at the implementation for 
Enum.all?

  def all?(collection, fun) do
    Enumerable.reduce(collection, {:cont, true}, fn(entry, _) ->
      if fun.(entry), do: {:cont, true}, else: {:halt, false}
    end) |> elem(1)
  end

Like most of the functions in Enum, it's using the collection's reduce function to implement a specific kind of reduction. In effect it's a composition of functions. It's also "lazy" in the sense that it only iterates as far as it has to generate the correct result. It's important to note that Enumerable implements a different reduce function than the standard one in Enum. ( The accumulator variable must return a tuple consisting of status and "real" accumulator. )

Now let's consider the case of Stream. What it does is very similar to Enum, except that it doesn't
actually execute the reduce function. In effect it creates an anonymous function for transforming the 
collection into another collection. You can use this "dynamic" collection anyplace you can use a regular collection. 

iex(9)> foo =  1..10 |> Stream.map( fn(x) -> x * 2 end )
#Stream<[enum: 1..10, 
funs: [#Function<45.113986093/1 in Stream.map/2>]]>

iex(10)> foo |> Enum.max
20

iex(11)> foo |> Enum.min
2

iex(12)> foo |> Enum.take(1)
[2]

iex(13)> foo |> Enum.take(3)
[2, 4, 6]

The important thing to note from this example is that the a collection defined by Stream does not imply state. It behaves just like a normal collection and is just as immutable. Every time you use it
it starts at the beginning. The decision about when to use a Stream or Enum is should be based on the tradeoffs of storing the collection verses creating it runtime. One difference that is important to note is that Stream only supports using reduce, using count or member directly from Enumerable will fail. Looking at the code for Enum.count we see how you can use reduce to 
emulate these functions. 

def count(collection) do
    case Enumerable.count(collection) do
      {:ok, value} when is_integer(value) ->
        value
      {:error, module} ->
        module.reduce(collection, {:cont, 0}, fn
          _, acc -> {:cont, acc + 1}
        end) |> elem(1)
    end
  end



[1]- It's simple enough to create an "each" from a reduce once you've played around with enough
examples of reduce.