We are using erlport to communicate with pools of Python processes in a production system. Each Python OS process has a 1-1 gen_server that utilizes erlport for bi-directional communication between the erlang VM and Python. The problem we have is the pooler library we are using does brutal_kill on the gen_server when a pool is shutdown leading to an untrappable exit of the gen_server. So we do not have an opportunity in our gen_server to terminate the port and we end up leaking Python processes. One way to resolve this problem is to enhance erlport to monitor its callers and call port_close whenever the last caller associated with the port has been shutdown. Here is a solution that I have tested in our environment.
$ git diff
diff --git a/src/erlport.erl b/src/erlport.erl
index ffa0670..afdf85c 100644
--- a/src/erlport.erl
+++ b/src/erlport.erl
@@ -122,9 +122,10 @@ init(Fun) when is_function(Fun, 0) ->
%% @doc Synchronous event handler
%% @hidden
%%
-handle_call(Call={call, _M, _F, _A, Options}, From, State=#state{})
+handle_call(Call={call, _M, _F, _A, Options}, {Pid, _Ref} = From, State)
when is_list(Options) ->
- send_request(Call, From, Options, State);
+ NewState = monitor_caller(Pid, State),
+ send_request(Call, From, Options, NewState);
handle_call(Request, From, State) ->
Error = {unknown_call, ?MODULE, Request, From},
{reply, Error, State}.
@@ -160,6 +161,21 @@ handle_info({'EXIT', Pid, {Id, Result}}, State=#state{calls=Calls}) ->
error ->
{noreply, State}
end;
+handle_info({'DOWN', Ref, process, Pid, _Reason}, State=#state{port=Port, callers=Callers}) ->
+ case orddict:find(Pid, Callers) of
+ error ->
+ {noreply, State};
+ _ ->
+ erlang:demonitor(Ref),
+ Callers2 = orddict:erase(Pid, Callers),
+ case orddict:size(Callers2) of
+ 0 ->
+ port_close(Port),
+ {stop, shutdown, State#state{callers=Callers2}};
+ _ ->
+ {noreply, State#state{callers=Callers2}}
+ end
+ end;
handle_info({erlport_timeout, {in, Id}}, State=#state{calls=Calls}) ->
case orddict:find(Id, Calls) of
{ok, {Pid, _Timer}} ->
@@ -410,3 +426,15 @@ incoming_call(Id, Module, Function, Args, _Context, State=#state{
Calls2 = orddict:store(Id, Info, Calls),
{noreply, State#state{calls=Calls2}}
end.
+
+%%
+%% @doc Add monitor for caller
+%%
+monitor_caller(Pid, State=#state{callers = Callers}) ->
+ case orddict:find(Pid, Callers) of
+ error ->
+ Ref = erlang:monitor(process, Pid),
+ State#state{callers=orddict:store(Pid, Ref, Callers)};
+ _ ->
+ State
+ end.
diff --git a/src/erlport.hrl b/src/erlport.hrl
index d3eb01b..0d4bad7 100644
--- a/src/erlport.hrl
+++ b/src/erlport.hrl
@@ -40,7 +40,9 @@
% orddict(): CallId -> {From::term(), Timer::reference() | undefined}
sent = orddict:new() :: list(),
% orddict(): CallId -> {Pid::pid(), Timer::reference()}
- calls = orddict:new() :: list()
+ calls = orddict:new() :: list(),
+ % orddict(): CallerId -> Monitor::reference()
+ callers = orddict:new() :: list()
}).
-endif. % ERLPORT_HRL