Archive

Posts Tagged ‘pg’

Process group in erlang: some thoughts about the pg module

September 18, 2012 1 comment

One of the most common ways to achieve fault tolerance in distributed systems, consists in organizing several identical processes into a group, that can be accessed by a common name. The key concept here is that whenever a message is sent to the group, all members of the group receive it. This is a really nice feature, since if one process in the group fails, some other process can take over for it and handle the message, doing all the operations required.

Process groups allow also abstraction: when we send a message to a group, we don’t need to know who are the members and where they are. In fact process groups are all but static. Any process can join an existing group or leave one at runtime, moreover a process can be part of more groups at the same time.

Since this blog is about Erlang I will not go further into the general topic, which can be explored online or by reading the well known book by Prof. Andrew Tanenbaum‘s “Distributed Systems: Principles and Paradigms” which is bloody expensive but worths its price.

Instead, I will introduce you one of the Erlang modules which implements process groups: the module pg. This module, which is anyhow still experimental, is very interesting, very useful and can be easily understood by reading its source code. Even though the module pg is inspired by the ISIS system, not all of its features are in place.

As I wrote above all messages are sent to the group, thus all members of the group will receive the message.

The two things I like the most about pg are:

  1. messages are serialized. This means that if one process P1 sends the message M1, and process P2 sends the message M2 at the same time, all the members of the group will receive the two messages in the same order.
  2. whenever one of the members of a group terminates, it is automatically removed from the group, and this totally handled by pg module.

There are a couple of things I don’t like about pg module as well:

  1. the aforesaid serialization is achieved by using a group master process which takes care of forwarding the message to all the group members. This could potentially explode in your face if the amount of messages and members is very lange, but still we are talking about Erlang here, so when I say very large I mean VERY VERY LARGE). That’s why as a common practice, you had better benchmark carefully your code when using such a module and understand how to tune your process group.
  2. causal ordering is not implemented in pg module. Message ordering is a really complex topic, so in order understand it well I suggest you once again the very good book a linked above, which helped me a lot during my Distributed Systems lessons  at University of Trento.

Ok, I know you want to see some erlang code here, but actually pg group functions are somehow similar to pg2 ones, which I discussed in one of my previous posts. There are big differences though between the two modules: for example with pg each message is sent to all members in the group while using pg2, each message may be sent to one, some, or all members using a custom function.

Last thing: if you are using a gen_server behaviour, you can easily handle messages coming from pg group in your handle_info/2. So, for example you can have something like:

handle_info({pg_message, From , PgName, Message}, State) ->
      io:format("Received message: ~p ~n", [Message]),
      {noreply, State};

That’s all folks! For more information about pg, take a look at its documentation!

Categories: English, Erlang Tags: ,

A survival guide on pg2 Erlang module

January 12, 2012 4 comments

Today I will enjoy part of my holidays to introduce you a very interesting Erlang module: pg2.

This module  implements a trivial distributed process groups. Now, if you already took a look at this topic in Erlang, you should know that other modules provides “similar” functionalities for handling distributed process groups (e.g. pg and pool). I leave the analysis of these two modules to a possible future post, but I would like to emphasize what is in my humble opinion the main difference between pg2 and pg.

Giving that a process group is a group of processes that can be accessed by a common name:

  • in pg module, when messages are sent to the named group, all members of the group receive the message
  • in pg2 module, each message may be sent to one, some, or all members

You can imagine a process group as a group which is populated with one or more processes (e.g. a gen_server). The aforesaid group can be accessed by a common name. The  processes can be located on the same node or on  different nodes and messages can be sent to one, some, or all members of the group.

As a rule of thumb, in this blog I never go really deep with theory…(I’m a practical guy) so to explain more pg2 I will write and comment few lines of code. Hopefully the example will help you to understand quite well the topic! 🙂

pegaso:~ pdincau$ erl
Erlang (BEAM) emulator version X.Y.Z

Eshell VX.Y.Z (abort with ^G)

1> pg2:create(test).
ok
2> pg2:which_groups().
[test]
3> pg2:join(test, self()).
ok
4> pg2:get_members(test).
[<0.31.0>]
5> a = b.
** exception error: no match of right hand side value b
6> self().
<0.38.0>
7> pg2:get_members(test).
[]
8> pg2:join(test, self()).
ok
9> pg2:get_closest_pid(test).
<0.38.0>
10> pg2:leave(test, self()).
ok
11> pg2:delete(test).
ok

First of all I used pg2:create/1 which creates a new process group given a name, in my case I used the atom ‘test’. Ok, now we have a working group, and we can check this out by calling pg2:which_groups/0.

This group for now is totally not useful, since no process has joined it yet. Let’s now add some processes using the function pg2:join/2 which takes as input the name of the group you want to join to and the pid of the process which will join (in this example I added only the shell process). I the join procedure goes in the right way you should receive an ok, but you can double check this by using pg2:get_members/1, a function that given as input the name of a group, returns the processes that joined the group and are still running. You may also want to try pg2:get_local_members/1 which does basically the same thing, but in local node.

What happens if a process exits? Should you remove manually the process from the group? No! The nice thing of pg2 is that dead processes are automatically removed from the group. As you may see at line 5 I crashed on purpose the process of the VM. After that I checked for the members of the group and voilà: no process was in the group! I have to be honest with you: in some versions of the Erlang machine, the module pg2 is not working as expected (remember this module is a beta!). When I did my first experiments with this module I didn’t know the existence of this problem, so I struggled with it since I found this webpage. I have to say also that my problem was solved simply by upgrading my Erlang environment. I have digressed a little here! Forgive me for this but I think that this information was really important!

What is left to say about pg2? Let’s say you want your process to leave a group it joined before, you can use pg2:leave/2 which works in the opposite way of pg2:join/2. Last but not least you may want to delete a group: in this case you can use pg2:delete/1 which will delete the group registered with the name given as input to the function.

Hei!!! Something is missing here! 🙂 How do I interact with a process group? You may have noticed I used pg2:get:closest_pid/1 in the code above, well that function:

  • randomly chooses a local process of group if one exists
  • otherwise randomly chooses a remote process of group

After the pid is returned we can contact as we do with normal processes (e.g. the ! operator or a gen_server call).

Is there a better way to choose one among all the group members? Well, in this post I have read time ago there is a really nice solution:

get_best_pid(Group) ->
  Members = pg2:get_members(Group),
  Members1 = lists:map(fun(Pid) ->
      [{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
                                                                 {Pid, Messages}
                       end, Members),
case lists:keysort(2, Members1) of
  [{Pid, _} | _] -> Pid;
  [] -> {error, empty_process_group}
end.

As you can see here, the best_pid is the one with the lower message_queue_len. Other solutions are feasible, so take a look at erlang:process_info/1 for details.

In one of my previous posts I created a sort of “configuration process” which after the start used message passing to furnish configuration parameters to other processes. That kind of implementation introduced few problems, one of them was a possible bottleneck: we could have had thousands of processes asking for configuration parameters, and only one process to provide them. In the next post I will show you how I implemented such code.

Note: if you are interested in process pool implementation take a look at this: http://learnyousomeerlang.com/building-applications-with-otp

Categories: English, Erlang Tags: , ,