1 /**
2  * Implements `ProcessMonitor`.
3  *
4  * Publicly imports `ProcessException` from `std.process`.
5  *
6  * Examples:
7  * ---
8  * auto monitor = new shared ProcessMonitor;
9  * monitor.stdoutCallback = (string line) {
10  *     writeln(line);
11  * }
12  * monitor.terminateCallback = () {
13  *     writeln("terminated");
14  * }
15  *
16  * monitor.start(["cat"]);
17  *
18  * monitor.send("foo");
19  * monitor.send("bar");
20  * monitor.closeStdin();
21  *
22  * assert(monitor.wait() == 0);
23  * ---
24  */
25 module supervised.monitor;
26 
27 import core.time;
28 import core.thread;
29 import core.sync.barrier;
30 
31 import core.sys.posix.signal : SIGKILL, SIGTERM;
32 
33 import std.stdio;
34 import std.process;
35 import std.typecons;
36 import std.algorithm;
37 import std.exception;
38 static import std.process;
39 
40 import vibe.core.core;
41 import vibe.core.sync;
42 import vibe.core.task;
43 import vibe.core.concurrency;
44 
45 import supervised.logging;
46 
47 /// Exception thrown by `spawnProcess` when `ProcessMonitor.start` is called
48 alias ProcessException = std.process.ProcessException;
49 
50 /// Exception thrown when a method is called on a `ProcessMonitor` during invalid state.
51 class InvalidStateException : Exception {
52     ///
53     this(string msg, string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable nextInChain = null) pure nothrow @nogc @safe {
54         super(msg, file, line, nextInChain);
55     }
56 }
57 
58 /// Sanitizes an output line
59 private string sanitizeLine(string line) {
60     if (line[$ - 1] == '\r') {
61         return line[0..$ - 1];
62     }
63     return line;
64 }
65 
66 /**
67  * Spawns and monitors sub-processes.
68  *
69  * Monitoring processes in a safe way is expensive. This implementation requires
70  * spawning 3 threads per process. Use this sparingly.
71  *
72  * This class is entirely thread and fibre safe. Methods that block only block
73  * on the fibre (not the thread), and locks are used to prevent data-races.
74  */
75 @safe shared final class ProcessMonitor {
76     /// Callback for stdout and stderr events
77     alias FileCallback = void delegate(string);
78 
79     /// Callack for termination
80     alias EventCallback = void delegate();
81 
82     // Events
83     private enum CloseStdin { init };
84     private enum ProcessTerminated { init };
85 
86     private {
87         // Monitor mutex
88         TaskMutex _mutex;
89 
90         Task _watcher;
91         bool _running = false;
92         // Mutex that is locked while the process is running. Used for wait()
93         TaskMutex _runningMutex;
94         int _exitStatus;
95 
96         Duration _killTimeout = 20.seconds;
97 
98         Pid _pid;
99         FileCallback _stdoutCallback;
100         FileCallback _stderrCallback;
101         EventCallback _terminateCallback;
102     }
103 
104     /**
105      * Creates a new `ProcessMonitor`.
106      *
107      * Can also create a new process monitor and immediately start it.
108      * Behaviour is the same as `start`.
109      */
110     this() shared @trusted {
111         this._mutex = cast(shared)new TaskMutex();
112     }
113 
114     /// Ditto
115     this(immutable string[] args,
116          immutable Tuple!(string, string)[] env = null,
117          immutable string workingDir = null) shared {
118         this();
119         start(args, env, workingDir);
120     }
121 
122     private struct Sync {
123         ProcessMonitor instance;
124 
125         @property bool running() {
126             return instance._running;
127         }
128 
129         package @property void running(bool value) {
130             instance._running = value;
131         }
132 
133         package @property Task watcher() {
134             return instance._watcher;
135         }
136 
137         package @property void watcher(Task task) {
138             instance._watcher = task;
139         }
140 
141         package @property TaskMutex runningMutex() @trusted {
142             return cast(TaskMutex)instance._runningMutex;
143         }
144 
145         package @property void runningMutex(TaskMutex mutex) @trusted {
146             instance._runningMutex = cast(shared)mutex;
147         }
148 
149         @property int exitStatus() {
150             return instance._exitStatus;
151         }
152 
153         package @property void exitStatus(int status) {
154             instance._exitStatus = status;
155         }
156 
157         @property Duration killTimeout() {
158             return instance._killTimeout;
159         }
160 
161         @property void killTimeout(Duration timeout) {
162             instance._killTimeout = timeout;
163         }
164 
165         @property Pid pid() @trusted {
166             return cast(Pid)instance._pid;
167         }
168 
169         package @property void pid(Pid pid) @trusted {
170             instance._pid = cast(shared)pid;
171         }
172 
173         @property void stdoutCallback(FileCallback fn) {
174             instance._stdoutCallback = fn;
175         }
176 
177         @property void stderrCallback(FileCallback fn) {
178             instance._stderrCallback = fn;
179         }
180 
181         @property void terminateCallback(EventCallback fn) {
182             instance._terminateCallback = fn;
183         }
184 
185         void send(string message) @trusted {
186             enforce!InvalidStateException(running, "Process is not running, cannot send message.");
187 
188             logger.tracef("Sending stdin message: %s", message);
189             watcher.send(message);
190         }
191     }
192 
193     private auto withLock(T)(T delegate(Sync) fn) shared @trusted {
194         logger.trace("Locking mutex");
195         scope(exit) logger.trace("Unlocked mutex");
196 
197         synchronized (_mutex) {
198             logger.trace("Aquired mutex lock");
199 
200             return fn(Sync(cast(ProcessMonitor)this));
201         }
202     }
203 
204     /**
205      * Returns whether or not the monitored process is currently running.
206      */
207     @property bool running() shared {
208         return withLock(self => self.running);
209     }
210 
211     /**
212      * The time to wait before the process is force-killed by the monitor after
213      * being asked to stop. Defaults to 20 seconds.
214      *
215      * When a process is killed using `kill` it may not stop immediately (or at
216      * all). With this you can define an amount of time the monitor should wait
217      * before forcing the process to stop.
218      */
219     @property Duration killTimeout() shared {
220         return withLock(self => self.killTimeout);
221     }
222 
223     /// Ditto
224     @property void killTimeout(in Duration duration) shared {
225         withLock(self => self.killTimeout = duration);
226     }
227 
228     /// Returns the pid of the running process, or the last process run.
229     /// Otherwise returns `Pid.init`.
230     @property Pid pid() shared {
231         return withLock(self => self.pid);
232     }
233 
234     /**
235      * Sets the callback for when a line from the monitored process's stdout is
236      * read.
237      *
238      * Callbacks are called from native threads, not vibed fibres.
239      */
240     @property void stdoutCallback(in FileCallback fn) {
241         withLock(self => self.stdoutCallback = fn);
242     }
243 
244     /**
245      * Sets the callback for when a line from the monitored process's stderr is
246      * read.
247      *
248      * Callbacks are called from native threads, not vibed fibres.
249      */
250     @property void stderrCallback(in FileCallback fn) {
251         withLock(self => self.stderrCallback = fn);
252     }
253 
254     /**
255      * Sets the callback for when the monitored process exits.
256      *
257      * Callbacks are called from native threads, not vibed fibres.
258      */
259     @property void terminateCallback(in EventCallback fn) {
260         withLock(self => self.terminateCallback = fn);
261     }
262 
263     private void callStdoutCallback(in string message) shared @trusted {
264         auto callback = _stdoutCallback;
265 
266         if (callback !is null) {
267             try {
268                 callback(message);
269             } catch (Throwable e) {
270                 logger.errorf("Stdout Callback Task(%s) Error: %s\nDue to line: %s", Task.getThis(), e, message);
271             }
272         }
273     }
274 
275     private void callStderrCallback(in string message) shared @trusted {
276         auto callback = _stderrCallback;
277 
278         if (callback !is null) {
279             try {
280                 callback(message);
281             } catch (Throwable e) {
282                 logger.errorf("Stderr Callback Task(%s) Error: %s\nDue to line: %s", Task.getThis(), e, message);
283             }
284         }
285     }
286 
287     private void callTerminateCallback() shared @trusted {
288         auto callback = _terminateCallback;
289 
290         if (callback !is null) {
291             try {
292                 callback();
293             } catch (Throwable e) {
294                 logger.errorf("Terminate Callback Task(%s) Error: %s", Task.getThis(), e);
295             }
296         }
297     }
298 
299     /**
300      * Sends a line of input to the monitored process's stdin.
301      *
302      * Throws: `InvalidStateException` when the process is not running.
303      */
304     void send(string message) shared {
305         withLock(self => self.send(message));
306     }
307 
308     /**
309      * Closes the monitor's end of stdin for the process.
310      *
311      * Throws: `InvalidStateException` when the process is not running.
312      */
313     void closeStdin() shared {
314         withLock((self) @trusted {
315             enforce!InvalidStateException(self.running, "Process it not running, cannot close stdin.");
316 
317             logger.tracef("Sending closeStdin message");
318             self.watcher.send(CloseStdin.init);
319         });
320     }
321 
322     /**
323      * Start a monitored process given the arguments (`args`), environment
324      * (`env`) and working directory (`workingDir`).
325      *
326      * Blocks on the fibre until the process starts.
327      *
328      * Throws:
329      *   `InvalidStateException` when a process is already running.
330      *
331      *   `std.process.ProcessException` if an exception is encountered when starting the process.
332      */
333     void start(immutable string[] args, immutable Tuple!(string, string)[] env = null, immutable string workingDir = null) shared {
334         withLock((self) @trusted {
335             enforce!InvalidStateException(!self.running, "Process is already running, cannot start it.");
336 
337             static void fn(shared ProcessMonitor monitor, Tid sender, immutable string[] args, immutable Tuple!(string, string)[] _env, immutable string workingDir) {
338                 // Create the env associative array
339                 string[string] env;
340                 foreach (item; _env) {
341                     env[item[0]] = item[1];
342                 }
343 
344                 try {
345                     monitor.runWatcher(sender, args, env, workingDir);
346                 } catch (Throwable e) {
347                     logger.criticalf("Critical Error in watcher: %s", e);
348                 }
349             }
350 
351             runWorkerTask(&fn, this, thisTid, args.idup, env.idup, workingDir.idup);
352 
353             // Wait for either an exception or a successful start
354             receive(
355                 (Task watcher, shared Pid pid) {
356                     self.watcher = watcher;
357                     self.pid = cast(Pid)pid;
358                 },
359                 (shared Throwable error) {
360                     throw cast(Throwable)error;
361                 },
362             );
363 
364             logger.tracef("Process started, monitor: %s", self.watcher);
365             self.running = true;
366         });
367     }
368 
369     /**
370      * Sends a signal to the process.
371      *
372      * If the process does not die within the `killTimeout`, `SIGKILL` is sent.
373      *
374      * Throws: InvalidStateException if the process is not running.
375      */
376     void kill(int signal = SIGTERM) shared {
377         withLock((self) @trusted {
378             enforce!InvalidStateException(self.running, "Process is not running, cannot kill it");
379 
380             logger.tracef("Killing process with signal %s, monitor: %s", signal, self.watcher);
381             self.watcher.send(signal);
382         });
383     }
384 
385     /**
386      * Block on the current fibre until the process has exited.
387      *
388      * Returns the exit code of the process, regardless of whether it had to
389      * wait or not.
390      *
391      * When called before any process starts, it returns `int.init`.
392      */
393     int wait() shared {
394         auto pair = withLock(self => tuple(self.runningMutex, self.exitStatus));
395         auto runningMutex = pair[0];
396 
397         if (runningMutex is null) {
398            return pair[1];
399         }
400 
401         // Wait on the running mutex
402         logger.trace("Waiting on process to finish");
403         synchronized(runningMutex) {}
404 
405         return withLock(self => self.exitStatus);
406     }
407 
408     private void runWatcher(Tid starter, in string[] args, in string[string] env, in string workingDir) shared @trusted {
409         auto thisTask = Task.getThis();
410         logger.infof("Watcher(%s): started with %s", thisTask, args);
411 
412         // Start the process
413         immutable config = Config.newEnv;
414 
415         ProcessPipes processPipes;
416         try {
417             processPipes = pipeProcess(args, Redirect.all, env, config, workingDir);
418         } catch (ProcessException exception) {
419             logger.tracef("Watcher(%s): Exception starting process: %s", thisTask, exception);
420             starter.send(cast(shared Throwable)exception);
421             return;
422         }
423 
424         auto process = processPipes.pid;
425         auto stdin = processPipes.stdin;
426         auto stdout = processPipes.stdout;
427         auto stderr = processPipes.stderr;
428         logger.tracef("Watcher(%s): [pid: %s, stdin: %s, stdout: %s, stderr: %s]", thisTask, process, stdin.fileno, stdout.fileno, stderr.fileno);
429 
430         // The monitor is locked by our starter, lets sneek in some state
431         auto runningMutex = new TaskMutex;
432         runningMutex.lock(); // We have to lock in this task, or we get an error!
433         _runningMutex = cast(shared)runningMutex;
434 
435         // Fetch some attributes, remember we're still locked
436         immutable killTimeout = _killTimeout;
437 
438         // Notify the task that started the watcher that the watcher has started
439         logger.tracef("Watcher(%s): Notifying starter that process has started", thisTask);
440         starter.send(thisTask, cast(shared)process);
441 
442         // Make sure we clean up after the watcher exits
443         scope(exit) {
444             int exitStatus = process.wait();
445             onProcessTermination(exitStatus);
446         }
447 
448         // Create a barrier to wait for the read threads to start.
449         // This is to ensure that stdout and stderr have started before we try to handle any messages
450         auto readBarrier = new Barrier(3); // stdout + stderr + current threads.
451 
452         // Start threads for reading stdout and stderr
453         auto stdoutThread = new Thread({
454             readBarrier.wait();
455             foreach (line; stdout.byLineCopy()) {
456                 line = line.sanitizeLine;
457                 logger.tracef("Watcher(%s) STDOUT >> %s", thisTask, line);
458                 callStdoutCallback(line);
459             }
460             logger.tracef("Watcher(%s) STDOUT: Thread completed", thisTask);
461         }).start();
462         scope(exit) stdoutThread.join(false);
463 
464         auto stderrThread = new Thread({
465             readBarrier.wait();
466             foreach (line; stderr.byLineCopy()) {
467                 line = line.sanitizeLine;
468                 logger.tracef("Watcher(%s) STDERR >> %s", thisTask, line);
469                 callStderrCallback(line);
470             }
471             logger.tracef("Watcher(%s) STDERR: Thread completed", thisTask);
472         }).start();
473         scope(exit) stderrThread.join(false);
474 
475         // Wait for all read threads to start
476         readBarrier.wait();
477 
478         // Start thread for waiting on the process to finish
479         auto waitThread = new Thread({
480             process.wait();
481             Thread.sleep(50.msecs); // Wait for stderr and stdout to be fully processed.
482             logger.tracef("Watcher(%s) WAIT: Process terminated, notifying watcher", thisTask);
483             thisTask.send(ProcessTerminated.init);
484         }).start();
485         scope(exit) waitThread.join();
486         waitThread.isDaemon = true;
487 
488         // Keep a timer for catching zombies
489         Timer killTimer;
490 
491         bool running = true;
492         while (running) {
493             // Handle messages
494             try {
495                 logger.tracef("Watcher(%s): polling...", thisTask);
496                 receive(
497                     // Send
498                     (string message) {
499                         logger.tracef("Watcher(%s): Sending message: %s", thisTask, message);
500                         stdin.writeln(message);
501                         stdin.flush();
502                     },
503                     // Kill
504                     (int signal) {
505                         //if (process.tryWait().terminated) return;
506 
507                         logger.tracef("Watcher(%s): Sending kill signal %s", thisTask, signal);
508                         process.kill(signal);
509                         logger.tracef("Watcher(%s): Kill signal sent", thisTask, signal);
510 
511                         // Start kill timer
512                         if (killTimeout > 0.msecs && !killTimer) {
513                             logger.tracef("Watcher(%s): Starting kill timeout", thisTask);
514                             killTimer = setTimer(killTimeout, {
515                                 logger.warningf("Watcher(%s): Killing after kill timeout", thisTask);
516                                 thisTask.send(SIGKILL);
517                             });
518                         }
519                     },
520                     (CloseStdin _) {
521                         logger.tracef("Watcher(%s): Closing stdin", thisTask);
522                         stdin.close();
523                     },
524                     // Process terminated
525                     (ProcessTerminated _) {
526                         logger.tracef("Watcher(%s): Process terminated", thisTask);
527                         running = false;
528                     }
529                 );
530             } catch (OwnerTerminated e) {
531                 // Terminate the process if the owner is terminated
532                 logger.errorf("Watcher(%s): Owner terminated, killing child.", thisTask);
533                 process.kill(SIGKILL);
534             }
535         }
536 
537         // Stop the timer if its started.
538         if (killTimer) killTimer.stop();
539 
540         // Close all open pipes
541         logger.tracef("Watcher(%s): Closing all pipes", thisTask);
542         foreach (file; [stdout, stderr, stdin]) {
543             if (file.isOpen) file.close();
544         }
545 
546         // For sanity
547         logger.tracef("Watcher(%s): Sending SIGKILL and waiting", thisTask);
548         try {
549             process.kill(SIGKILL);
550         } catch (Throwable e) {}
551         process.wait();
552 
553         logger.infof("Watcher(%s): stopped", thisTask);
554     }
555 
556     /// Called by watcher thread when the process has terminated
557     private void onProcessTermination(int status) shared {
558         // the watcher pauses itself
559         logger.trace("Process terminated");
560 
561         withLock((self) {
562             self.running = false;
563             self.exitStatus = status;
564             self.runningMutex.unlock();
565             self.runningMutex = null;
566         });
567 
568         logger.trace("Calling process termination callback");
569         callTerminateCallback();
570     }
571 }