Erlang Central

Difference between revisions of "Process Ring Across Nodes"

From ErlangCentral Wiki

(One intermediate revision by one user not shown)
Line 6:Line 6:
  
 
==Why?==
 
==Why?==
There has been a lot of talk about process rings in Erlang. This was triggered by Joe Armstrong's call to create process rings in different languages and then compare it with a process ring in Erlang. Hearing Joe's call several people have implemented message passing (be it from thread to thread or process to process). Many of these people (with much credit) have proved that they can send a message between two objects/entities really fast... faster then Erlang. Well... very good... but that is not all that is to it, people forget/miss the power of sending a message in Erlang from one process to another (Other from the fact that it is dead simple).
+
There has been a lot of talk about process rings in Erlang. This was triggered by Joe Armstrong's call to create process rings in different languages and then compare it with a process ring in Erlang. Hearing Joe's call several people have implemented message passing (be it from thread to thread or process to process). Many of these people (with much credit) have proved that they can send a message between two objects/entities really fast... faster then Erlang. Well... very good... but that is not all there is to it, people forget/miss the power of sending a message in Erlang from one process to another (Other from the fact that it is dead simple).
  
People forget what the message passing in Erlang actually is able to do and this article is to highlight the strengths in Erlang message passing and distribution.
+
People forget what the message passing in Erlang actually is able to do and this article is to highlight some of the strengths in Erlang message passing and distribution.
  
 
==Process Ring==
 
==Process Ring==
Line 93:Line 93:
 
    ?MODULE:head_proc(nil,Next);
 
    ?MODULE:head_proc(nil,Next);
 
{cmd, stop} ->
 
{cmd, stop} ->
    Next ! {cmd, stop, node()},
+
    Next ! {cmd, stop},
 
    io:format("head: Stop sent, Exiting...!~n");
 
    io:format("head: Stop sent, Exiting...!~n");
 
{cmd, spawn_new} ->
 
{cmd, spawn_new} ->
Line 110:Line 110:
 
      [self(), Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg]),
 
      [self(), Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg]),
 
    timer:sleep(50),
 
    timer:sleep(50),
    Receiver = next(Next, lists:sort([node()|nodes()])),
+
    Receiver = next(Next),
 
    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},
 
    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},
 
    ?MODULE:link_proc(Prev, Next);
 
    ?MODULE:link_proc(Prev, Next);
{cmd, stop, Origin} ->
+
{cmd, stop} ->
 
    io:format("~p: Exiting...~n",[self()]),
 
    io:format("~p: Exiting...~n",[self()]),
    next(Next,Origin) ! {cmd, stop}
+
    next(Next) ! {cmd, stop}
 
     end.
 
     end.
  
next(nil, Nodes) -> {head, nextnode(hd(Nodes), node(), Nodes)};
+
next(nil) ->
next(Pid, _)     -> Pid.
+
    Nodes = lists:sort([node()|nodes()]),
 +
    {head, nextnode(hd(Nodes), node(), Nodes)};
 +
next(Pid) -> Pid.
  
 
nextnode(First, _Me, [])                -> First;
 
nextnode(First, _Me, [])                -> First;
Line 158:Line 160:
 
{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
 
{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
 
    ...
 
    ...
    Receiver = next(Next, lists:sort([node()|nodes()])),    
+
    Receiver = next(Next),
 
    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},
 
    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},
 
</code>
 
</code>
 
First we need to find out who the next receiver is. To do this we have to find out if we are going to another node or if the next receiver is a local process. Well this is pretty easy, first we call the function next/2 and give it the next process that the sender process has in its state (the Next variable) and then a list of all the nodes in the system (including ourself) '''note that this list has to be sorted'''. The code for the next/2 function:
 
First we need to find out who the next receiver is. To do this we have to find out if we are going to another node or if the next receiver is a local process. Well this is pretty easy, first we call the function next/2 and give it the next process that the sender process has in its state (the Next variable) and then a list of all the nodes in the system (including ourself) '''note that this list has to be sorted'''. The code for the next/2 function:
 
<code>
 
<code>
next(nil, Nodes) -> {head, nextnode(hd(Nodes), node(), Nodes)};
+
next(nil) ->
next(Pid, _)     -> Pid.
+
    Nodes = lists:sort([node()|nodes()]),
 +
    {head, nextnode(hd(Nodes), node(), Nodes)};
 +
next(Pid) -> Pid.
 
</code>  
 
</code>  
 
Simple; either the next receiver is undefined which means that we need to send the message to the next node. Otherwise we just send it to the next process. To know which node that is the next node we need to find ourself in the list of the nodes and then see who is next. This is what the nextnode/3 function does:
 
Simple; either the next receiver is undefined which means that we need to send the message to the next node. Otherwise we just send it to the next process. To know which node that is the next node we need to find ourself in the list of the nodes and then see who is next. This is what the nextnode/3 function does:
Line 257:Line 261:
  
 
==Conclusion==
 
==Conclusion==
There is more to it then just sending a single stupid message from one "Actor" to another. Have fun experimenting!
+
There is more to it then just sending a single message from one "Actor" to another. Have fun experimenting!

Revision as of 09:44, 19 January 2010

Contents

Author

Mazen Harake [1] 10:02, 18 May 2009 (BST)

Why?

There has been a lot of talk about process rings in Erlang. This was triggered by Joe Armstrong's call to create process rings in different languages and then compare it with a process ring in Erlang. Hearing Joe's call several people have implemented message passing (be it from thread to thread or process to process). Many of these people (with much credit) have proved that they can send a message between two objects/entities really fast... faster then Erlang. Well... very good... but that is not all there is to it, people forget/miss the power of sending a message in Erlang from one process to another (Other from the fact that it is dead simple).

People forget what the message passing in Erlang actually is able to do and this article is to highlight some of the strengths in Erlang message passing and distribution.

Process Ring

So lets define a process ring (I don't pretend that this is in any way a formal definition, its ad-hoc):

1. Create N number of processes (where N > 1) which are single linked (one-directional).

P1 -> P2 -> P3 -> Pn

2. If any process receives a message then this message should be forwarded to then next process in the chain.

Pn -> Pn+1

3. If there is no process defined as the next one, then send it to the first process

Pn -> (undefined) -> P1

4. When the first process receives a message that has circulated the chain M number of times, it should stop.

Ok.... there we go I think I got everything. So this is pretty easy to implement. Check out this forum post.

Node Ring

Ok so lets assume that we have several machines on a network running Erlang? Well that is really cool because now we can send messages across nodes and create a node ring. So lets extend and change the previous definition a little:

3. If there is no process defined as the next one, then send it to the first process on the next available node after yourself. If there is no node after yourself, then send the message to the first process on your own node.

[n1@host] Pn -> [n1@host] undefined -> [n2@host] P1

and

[n1@host] Pn -> [n1@host] undefined -> [undefined] P1 -> [n1@host] P1

4. When the original node receives a message that has circulated the node chain M number of times, it should stop.

5. One successful loop is defined as a message arriving back to the first process on the first node that sent the message, no matter how many nodes it went through to get there.

Point 5 gives us the ability to dynamically add and remove nodes as the process loop is running which is a cool feature to demonstrate.

The Code

Here is the code I wrote for achieving this. There are 3 limitations that I can think of at the time of writing.

  • First, there is 1 single point of failure; if the first node (that sent a message) goes down then the loop will be endless but I didn't care about that right now (maybe version 2.0 :))
  • Second, the loop stops if the node that currently "has" the message goes down.
  • Third, the loop doesn't discriminate between nodes that have started the main program or not. This means that if you connect one node before the program has started then the loop will simply stop

Here is the code, I will explain it below the code field. After you get the code you can read the next section to see how to use it in a demo.

%% Written by Mazen Harake
%% GPLv3 License -> http://www.gnu.org/licenses/gpl-3.0.txt
-module(nodeloop).
-compile(export_all).

start(N) when N > 0 ->
    erlang:register(head, proc_lib:spawn_link(?MODULE, head_proc, [nil, nil])),
    create_links(N).

create_links(0) -> 
    ok;
create_links(N) when N > 0 ->
    head ! {cmd, spawn_new},
    create_links(N - 1).

stop() -> 
    head ! {cmd, stop}.

send(Msg, NodeSkips) when NodeSkips > 0 -> 
    head ! {msg, node(), NodeSkips, 0, 1, Msg}. 

head_proc(nil, Next) ->
    receive
	{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
	    case {node(), MaxNodeSkips == NodeSkips} of
		{Origin, true} ->
		    io:format("head: Finished!~n");
		{Origin, false} ->
		    Next ! {msg, Origin, MaxNodeSkips, NodeSkips+1, ProcSkips, Msg};
		{_, _} ->
		    Next ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg}
	    end,
	    ?MODULE:head_proc(nil,Next);
	{cmd, stop} ->
	    Next ! {cmd, stop},
	    io:format("head: Stop sent, Exiting...!~n");
	{cmd, spawn_new} ->
	    Pid = proc_lib:spawn_link(?MODULE, link_proc, [nil, nil]),
	    Pid ! {cmd, linkin, [head, Next]},
	    io:format("head: Added ~p...~n",[Pid]),
	    ?MODULE:head_proc(nil, Pid)
    end.

link_proc(Prev, Next) ->
    receive
	{cmd, linkin, [NewPrev, NewNext]} ->
	    ?MODULE:link_proc(NewPrev, NewNext);
	{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
	    io:format("~p: ~p | ~p | ~p | ~p | ~1000p~n",
		      [self(), Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg]),
	    timer:sleep(50),
	    Receiver = next(Next),
	    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},
	    ?MODULE:link_proc(Prev, Next);
	{cmd, stop} ->
	    io:format("~p: Exiting...~n",[self()]),
	    next(Next) ! {cmd, stop}
    end.

next(nil) ->
    Nodes = lists:sort([node()|nodes()]),
    {head, nextnode(hd(Nodes), node(), Nodes)};
next(Pid) -> Pid.

nextnode(First, _Me, [])                 -> First;
nextnode(_First, Me, [Me, Next | _Rest]) -> Next;
nextnode(First, Me, [_|Rest])            -> nextnode(First, Me, Rest).

So... first we start with creating a registered process called 'head'

start(N) when N > 0 ->
    erlang:register(head, proc_lib:spawn_link(?MODULE, head_proc, [nil, nil])),
    create_links(N).

this process is the "first" process which shall exist on all nodes that are started. We then proceed by creating N number of processes to use.

create_links(0) -> 
    ok;
create_links(N) when N > 0 ->
    head ! {cmd, spawn_new},
    create_links(N - 1).

The 'head' process (running in the head_proc function) must always be prepared to stop the looping since it can be the one that started the whole thing.

	{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
	    case {node(), MaxNodeSkips == NodeSkips} of
		{Origin, true} ->
		    io:format("head: Finished!~n");
		{Origin, false} ->
		    Next ! {msg, Origin, MaxNodeSkips, NodeSkips+1, ProcSkips, Msg};
		{_, _} ->
		    Next ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg}
	    end,

Basically we check for each message that is circulating: Am I the origin (the one who started) of this msg? If I am then check if we have reached our goal for how many "NodeSkips" to perform, if we've reached that goal then stop. Otherwise add 1 to the number of skips or just pass the message forward. The other two receive clauses in head_proc/2 are pretty simple.

For each "link" in the chain there is a process running in link_proc/2. A link proc has a next and previous process (the previous is not used). The important lines in that function are these two:

	{msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips, Msg} ->
	    ...
	    Receiver = next(Next),
	    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},

First we need to find out who the next receiver is. To do this we have to find out if we are going to another node or if the next receiver is a local process. Well this is pretty easy, first we call the function next/2 and give it the next process that the sender process has in its state (the Next variable) and then a list of all the nodes in the system (including ourself) note that this list has to be sorted. The code for the next/2 function:

next(nil) ->
    Nodes = lists:sort([node()|nodes()]),
    {head, nextnode(hd(Nodes), node(), Nodes)};
next(Pid) -> Pid.

Simple; either the next receiver is undefined which means that we need to send the message to the next node. Otherwise we just send it to the next process. To know which node that is the next node we need to find ourself in the list of the nodes and then see who is next. This is what the nextnode/3 function does:

nextnode(First, _Me, [])                 -> First;
nextnode(_First, Me, [Me, Next | _Rest]) -> Next;
nextnode(First, Me, [_|Rest])            -> nextnode(First, Me, Rest).

This function has 2 cases; either we don't have any nodes in the system after ourself, which means we send it to the first node or we have a next node and that is the next target.

Now going back to the link_proc/2 function we know who the receiver is. so all we have to do is to send the message forward and sit and wait for the next message

	    Receiver ! {msg, Origin, MaxNodeSkips, NodeSkips, ProcSkips + 1, Msg},

That's it!

Demo

To test the program do the follow:

Start a terminal/shell in which you can start erl. Start erl like this:

erl -sname nX -setcookie nodeloop

where X is a number of the node say 1 for the first and 2 for the second... well you get the idea ;)

Start the head process and the links in it by calling the start/1 function. In this case my computer's hostname is prestine.

(n1@prestine)1> nodeloop:start(100).
...
(n1@prestine)2> head: Added <0.382.0>...
(n1@prestine)2> head: Added <0.383.0>...
(n1@prestine)2>

Try the first case (with only 1 node) by sending a message using the send/2 function. The first parameter is the message to be passed around and the second one is the number of node skips (or node loops or what ever we call it :))

(n1@prestine)2> nodeloop:send("Looping!",2).
<0.138.0>: n1@prestine | 2 | 1 | 1 | "Looping!"
{msg,n1@prestine,2,0,1,"Looping!"}
(n1@prestine)3> <0.137.0>: n1@prestine | 2 | 1 | 2 | "Looping!"
(n1@prestine)3> <0.136.0>: n1@prestine | 2 | 1 | 3 | "Looping!"
...
(n1@prestine)3> <0.39.0>: n1@prestine | 2 | 2 | 199 | "Looping!"
(n1@prestine)3> <0.38.0>: n1@prestine | 2 | 2 | 200 | "Looping!"
(n1@prestine)3> head: Finished!
(n1@prestine)3>

Now all you have to do to make this across nodes is to start any number of nodes and connect them. In other words; if we start a node called 'n2' then this is the output of it after we have started it and re-run the send/2 function on 'n1':

F:\Desktop\nodeloop>erl -sname n2 -setcookie nodeloop
Eshell V5.6.4  (abort with ^G)
(n2@prestine)1> nodeloop:start(10).
head: Added <0.38.0>...
ok
head: Added <0.39.0>...
(n2@prestine)2> head: Added <0.41.0>...
(n2@prestine)2> head: Added <0.42.0>...
(n2@prestine)2> head: Added <0.43.0>...
(n2@prestine)2> head: Added <0.44.0>...
(n2@prestine)2> head: Added <0.45.0>...
(n2@prestine)2> head: Added <0.46.0>...
(n2@prestine)2> head: Added <0.47.0>...
(n2@prestine)2> head: Added <0.48.0>...
(n2@prestine)2> net_kernel:connect('n1@prestine').
true
(n2@prestine)3> <0.48.0>: n1@prestine | 2 | 1 | 101 | "Looping!"
(n2@prestine)3> <0.47.0>: n1@prestine | 2 | 1 | 102 | "Looping!"
(n2@prestine)3> <0.46.0>: n1@prestine | 2 | 1 | 103 | "Looping!"
(n2@prestine)3> <0.45.0>: n1@prestine | 2 | 1 | 104 | "Looping!"
(n2@prestine)3> <0.44.0>: n1@prestine | 2 | 1 | 105 | "Looping!"
(n2@prestine)3> <0.43.0>: n1@prestine | 2 | 1 | 106 | "Looping!"
(n2@prestine)3> <0.42.0>: n1@prestine | 2 | 1 | 107 | "Looping!"
(n2@prestine)3> <0.41.0>: n1@prestine | 2 | 1 | 108 | "Looping!"
(n2@prestine)3> <0.39.0>: n1@prestine | 2 | 1 | 109 | "Looping!"
(n2@prestine)3> <0.38.0>: n1@prestine | 2 | 1 | 110 | "Looping!"
(n2@prestine)3> <0.48.0>: n1@prestine | 2 | 2 | 211 | "Looping!"
(n2@prestine)3> <0.47.0>: n1@prestine | 2 | 2 | 212 | "Looping!"
(n2@prestine)3> <0.46.0>: n1@prestine | 2 | 2 | 213 | "Looping!"
(n2@prestine)3> <0.45.0>: n1@prestine | 2 | 2 | 214 | "Looping!"
(n2@prestine)3> <0.44.0>: n1@prestine | 2 | 2 | 215 | "Looping!"
(n2@prestine)3> <0.43.0>: n1@prestine | 2 | 2 | 216 | "Looping!"
(n2@prestine)3> <0.42.0>: n1@prestine | 2 | 2 | 217 | "Looping!"
(n2@prestine)3> <0.41.0>: n1@prestine | 2 | 2 | 218 | "Looping!"
(n2@prestine)3> <0.39.0>: n1@prestine | 2 | 2 | 219 | "Looping!"
(n2@prestine)3> <0.38.0>: n1@prestine | 2 | 2 | 220 | "Looping!"
(n2@prestine)3>

Now if you want you can dynamically connect/disconnect nodes on the fly as you are looping just beware of the limitations as described before. Cool huh?

Conclusion

There is more to it then just sending a single message from one "Actor" to another. Have fun experimenting!