INTRO
The n2o_pi module dedicated for creating and tracking supervised processes across all applications, using any ETS tables. Any supervised process in N2O is created using n2o_pi, such as: ring vnodes, timers, authorization, web page processes, test processes and other services. Loop process inside info/2 protocol handlers should spawn new async processes proc/2 in case of time consuming operations, because protocol handler is a critical path and it should be handled as soon as possible.
CALLBACK
proc(term(),#pi{}) -> #ok{} | #reply{}.
The proc/2 is a callback that will be called on each gen_server's calls: handle_call, handle_cast and handle_info, its init and terminate. It returns either #ok as initial state of the process (which is the #pi{} too) or its response to gen_server:call/2 with new state included in #reply.
EXAMPLE
Here is literal implementation of N2O Timer which invalidates the caching table used for session variables.
proc(init,#pi{}=Async) ->
{ok,Async#pi{state=timer(ping())}};
proc({timer,ping},#pi{state=Timer}=Async) ->
erlang:cancel_timer(Timer),
io:format("n2o Timer: ~p\r~n",[ping]),
n2o:invalidate_cache(caching),
{noreply,Async#pi{state=timer(ping())}}.
timer(Diff) ->
{X,Y,Z} = Diff,
erlang:send_after(1000*(Z+60*Y+60*60*X),self(),{timer,ping}).
ping() ->
application:get_env(n2o,timer,{0,1,0}).
> n2o_pi:start(#pi{ module = n2o,
table = caching,
sup = n2o,
state = [],
name = "timer"}).
Main purpose of n2o_pi is to create such processes from single proc/2 function and track pid in ETS table which is specified during process #pi{} initialization.
1> supervisor:which_children(n2o).
[{{ring,4},<0.1661.0>,worker,[n2o_mqtt]},
{{ring,3},<0.1655.0>,worker,[n2o_mqtt]},
{{ring,2},<0.1653.0>,worker,[n2o_mqtt]},
{{ring,1},<0.1651.0v,worker,[n2o_mqtt]},
{{caching,"timer"},<0.1604.0>,worker,[n2o]}]
2> ets:tab2list(ring).
[{{ring,4},infinity,<0.1661.0>},
{{ring,1},infinity,<0.1651.0>},
{{ring,2},infinity,<0.1653.0>},
{{ring,3},infinity,<0.1655.0>}]
3> ets:tab2list(caching).
[{{caching,"timer"},infinity,<0.1604.0>}]
4> n2o_pi:cast(caching,"/timer",{timer,ping}).
n2o Timer: ping
ok
5> n2o_pi:pid(caching,"/timer").
<0.1604.0>
RECORDS
Each process is driven by its protocol which in fact a sum of protocol messages. Though n2o_pi as being generic don't limit the protocol messages, however it defines the type of process state, the #pi{} record.
#ok { code = [] :: [] | #pi{} }.
#error { code = [] :: [] | term() }.
#reply { data = [] :: [] | term() ,
code = [] :: [] | #pi{} }.
According to N2O agreement each protocol message field should include [] in its type as default nil.
-record(pi, { name :: atom(),
table :: ets:tid(),
sup :: atom(),
module :: atom(),
state :: term() }).
- name — process name, key in supervised chain.
- module — the module name where proc/2 is placed.
- table — ETS table name where cached pids are stored.
- sup — the application, where supervised processes will be created.
- state — the state of the running supervised process.
API
start(#pi{}) -> {pid(),term()} | #error{}.
Spawns proc/2 function inside a process under supervision.
stop(Class,Name) -> #pi{} | #error{}.
Kills the process and remove from supervision.
restart(Class,Name) -> {pid(),term()} | #error{} | #pi{}.
Tries to stop the process. On success it starts the new one, else return error.
send(Class,Name,term()) -> term().
Sends gen_call message to process, taken from Class table with Name key. Returns the response from gen_server:call.
cast(Class,Name,term()) -> term().
Sends gen_cast message to process, taken from Class table with Name key.
pid(Class,Name) -> pid().
Returns pid that was stored during process initialization in Class table with Name key.
1> n2o_pi:pid(caching,"/timer")
! {timer,ping}.
n2o Timer: ping
{timer,ping}
2> gen_server:cast(
n2o_pi:pid(caching,"/timer"),
{timer,ping}).
n2o Timer: ping
ok