1 /** 2 * Implements `ProcessMonitor`. 3 * 4 * Publicly imports `ProcessException` from `std.process`. 5 * 6 * Examples: 7 * --- 8 * auto monitor = new shared ProcessMonitor; 9 * monitor.stdoutCallback = (string line) { 10 * writeln(line); 11 * } 12 * monitor.terminateCallback = () { 13 * writeln("terminated"); 14 * } 15 * 16 * monitor.start(["cat"]); 17 * 18 * monitor.send("foo"); 19 * monitor.send("bar"); 20 * monitor.closeStdin(); 21 * 22 * assert(monitor.wait() == 0); 23 * --- 24 */ 25 module supervised.monitor; 26 27 import core.time; 28 import core.thread; 29 import core.sync.barrier; 30 31 import core.sys.posix.signal : SIGKILL, SIGTERM; 32 33 import std.stdio; 34 import std.process; 35 import std.typecons; 36 import std.algorithm; 37 import std.exception; 38 static import std.process; 39 40 import vibe.core.core; 41 import vibe.core.sync; 42 import vibe.core.task; 43 import vibe.core.concurrency; 44 45 import supervised.logging; 46 47 /// Exception thrown by `spawnProcess` when `ProcessMonitor.start` is called 48 alias ProcessException = std.process.ProcessException; 49 50 /// Exception thrown when a method is called on a `ProcessMonitor` during invalid state. 51 class InvalidStateException : Exception { 52 /// 53 this(string msg, string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable nextInChain = null) pure nothrow @nogc @safe { 54 super(msg, file, line, nextInChain); 55 } 56 } 57 58 /// Sanitizes an output line 59 private string sanitizeLine(string line) { 60 if (line[$ - 1] == '\r') { 61 return line[0..$ - 1]; 62 } 63 return line; 64 } 65 66 /** 67 * Spawns and monitors sub-processes. 68 * 69 * Monitoring processes in a safe way is expensive. This implementation requires 70 * spawning 3 threads per process. Use this sparingly. 71 * 72 * This class is entirely thread and fibre safe. Methods that block only block 73 * on the fibre (not the thread), and locks are used to prevent data-races. 74 */ 75 @safe shared final class ProcessMonitor { 76 /// Callback for stdout and stderr events 77 alias FileCallback = void delegate(string); 78 79 /// Callack for termination 80 alias EventCallback = void delegate(); 81 82 // Events 83 private enum CloseStdin { init }; 84 private enum ProcessTerminated { init }; 85 86 private { 87 // Monitor mutex 88 TaskMutex _mutex; 89 90 Task _watcher; 91 bool _running = false; 92 // Mutex that is locked while the process is running. Used for wait() 93 TaskMutex _runningMutex; 94 int _exitStatus; 95 96 Duration _killTimeout = 20.seconds; 97 98 Pid _pid; 99 FileCallback _stdoutCallback; 100 FileCallback _stderrCallback; 101 EventCallback _terminateCallback; 102 } 103 104 /** 105 * Creates a new `ProcessMonitor`. 106 * 107 * Can also create a new process monitor and immediately start it. 108 * Behaviour is the same as `start`. 109 */ 110 this() shared @trusted { 111 this._mutex = cast(shared)new TaskMutex(); 112 } 113 114 /// Ditto 115 this(immutable string[] args, 116 immutable Tuple!(string, string)[] env = null, 117 immutable string workingDir = null) shared { 118 this(); 119 start(args, env, workingDir); 120 } 121 122 private struct Sync { 123 ProcessMonitor instance; 124 125 @property bool running() { 126 return instance._running; 127 } 128 129 package @property void running(bool value) { 130 instance._running = value; 131 } 132 133 package @property Task watcher() { 134 return instance._watcher; 135 } 136 137 package @property void watcher(Task task) { 138 instance._watcher = task; 139 } 140 141 package @property TaskMutex runningMutex() @trusted { 142 return cast(TaskMutex)instance._runningMutex; 143 } 144 145 package @property void runningMutex(TaskMutex mutex) @trusted { 146 instance._runningMutex = cast(shared)mutex; 147 } 148 149 @property int exitStatus() { 150 return instance._exitStatus; 151 } 152 153 package @property void exitStatus(int status) { 154 instance._exitStatus = status; 155 } 156 157 @property Duration killTimeout() { 158 return instance._killTimeout; 159 } 160 161 @property void killTimeout(Duration timeout) { 162 instance._killTimeout = timeout; 163 } 164 165 @property Pid pid() @trusted { 166 return cast(Pid)instance._pid; 167 } 168 169 package @property void pid(Pid pid) @trusted { 170 instance._pid = cast(shared)pid; 171 } 172 173 @property void stdoutCallback(FileCallback fn) { 174 instance._stdoutCallback = fn; 175 } 176 177 @property void stderrCallback(FileCallback fn) { 178 instance._stderrCallback = fn; 179 } 180 181 @property void terminateCallback(EventCallback fn) { 182 instance._terminateCallback = fn; 183 } 184 185 void send(string message) @trusted { 186 enforce!InvalidStateException(running, "Process is not running, cannot send message."); 187 188 logger.tracef("Sending stdin message: %s", message); 189 watcher.send(message); 190 } 191 } 192 193 private auto withLock(T)(T delegate(Sync) fn) shared @trusted { 194 logger.trace("Locking mutex"); 195 scope(exit) logger.trace("Unlocked mutex"); 196 197 synchronized (_mutex) { 198 logger.trace("Aquired mutex lock"); 199 200 return fn(Sync(cast(ProcessMonitor)this)); 201 } 202 } 203 204 /** 205 * Returns whether or not the monitored process is currently running. 206 */ 207 @property bool running() shared { 208 return withLock(self => self.running); 209 } 210 211 /** 212 * The time to wait before the process is force-killed by the monitor after 213 * being asked to stop. Defaults to 20 seconds. 214 * 215 * When a process is killed using `kill` it may not stop immediately (or at 216 * all). With this you can define an amount of time the monitor should wait 217 * before forcing the process to stop. 218 */ 219 @property Duration killTimeout() shared { 220 return withLock(self => self.killTimeout); 221 } 222 223 /// Ditto 224 @property void killTimeout(in Duration duration) shared { 225 withLock(self => self.killTimeout = duration); 226 } 227 228 /// Returns the pid of the running process, or the last process run. 229 /// Otherwise returns `Pid.init`. 230 @property Pid pid() shared { 231 return withLock(self => self.pid); 232 } 233 234 /** 235 * Sets the callback for when a line from the monitored process's stdout is 236 * read. 237 * 238 * Callbacks are called from native threads, not vibed fibres. 239 */ 240 @property void stdoutCallback(in FileCallback fn) { 241 withLock(self => self.stdoutCallback = fn); 242 } 243 244 /** 245 * Sets the callback for when a line from the monitored process's stderr is 246 * read. 247 * 248 * Callbacks are called from native threads, not vibed fibres. 249 */ 250 @property void stderrCallback(in FileCallback fn) { 251 withLock(self => self.stderrCallback = fn); 252 } 253 254 /** 255 * Sets the callback for when the monitored process exits. 256 * 257 * Callbacks are called from native threads, not vibed fibres. 258 */ 259 @property void terminateCallback(in EventCallback fn) { 260 withLock(self => self.terminateCallback = fn); 261 } 262 263 private void callStdoutCallback(in string message) shared @trusted { 264 auto callback = _stdoutCallback; 265 266 if (callback !is null) { 267 try { 268 callback(message); 269 } catch (Throwable e) { 270 logger.errorf("Stdout Callback Task(%s) Error: %s\nDue to line: %s", Task.getThis(), e, message); 271 } 272 } 273 } 274 275 private void callStderrCallback(in string message) shared @trusted { 276 auto callback = _stderrCallback; 277 278 if (callback !is null) { 279 try { 280 callback(message); 281 } catch (Throwable e) { 282 logger.errorf("Stderr Callback Task(%s) Error: %s\nDue to line: %s", Task.getThis(), e, message); 283 } 284 } 285 } 286 287 private void callTerminateCallback() shared @trusted { 288 auto callback = _terminateCallback; 289 290 if (callback !is null) { 291 try { 292 callback(); 293 } catch (Throwable e) { 294 logger.errorf("Terminate Callback Task(%s) Error: %s", Task.getThis(), e); 295 } 296 } 297 } 298 299 /** 300 * Sends a line of input to the monitored process's stdin. 301 * 302 * Throws: `InvalidStateException` when the process is not running. 303 */ 304 void send(string message) shared { 305 withLock(self => self.send(message)); 306 } 307 308 /** 309 * Closes the monitor's end of stdin for the process. 310 * 311 * Throws: `InvalidStateException` when the process is not running. 312 */ 313 void closeStdin() shared { 314 withLock((self) @trusted { 315 enforce!InvalidStateException(self.running, "Process it not running, cannot close stdin."); 316 317 logger.tracef("Sending closeStdin message"); 318 self.watcher.send(CloseStdin.init); 319 }); 320 } 321 322 /** 323 * Start a monitored process given the arguments (`args`), environment 324 * (`env`) and working directory (`workingDir`). 325 * 326 * Blocks on the fibre until the process starts. 327 * 328 * Throws: 329 * `InvalidStateException` when a process is already running. 330 * 331 * `std.process.ProcessException` if an exception is encountered when starting the process. 332 */ 333 void start(immutable string[] args, immutable Tuple!(string, string)[] env = null, immutable string workingDir = null) shared { 334 withLock((self) @trusted { 335 enforce!InvalidStateException(!self.running, "Process is already running, cannot start it."); 336 337 static void fn(shared ProcessMonitor monitor, Tid sender, immutable string[] args, immutable Tuple!(string, string)[] _env, immutable string workingDir) { 338 // Create the env associative array 339 string[string] env; 340 foreach (item; _env) { 341 env[item[0]] = item[1]; 342 } 343 344 try { 345 monitor.runWatcher(sender, args, env, workingDir); 346 } catch (Throwable e) { 347 logger.criticalf("Critical Error in watcher: %s", e); 348 } 349 } 350 351 runWorkerTask(&fn, this, thisTid, args.idup, env.idup, workingDir.idup); 352 353 // Wait for either an exception or a successful start 354 receive( 355 (Task watcher, shared Pid pid) { 356 self.watcher = watcher; 357 self.pid = cast(Pid)pid; 358 }, 359 (shared Throwable error) { 360 throw cast(Throwable)error; 361 }, 362 ); 363 364 logger.tracef("Process started, monitor: %s", self.watcher); 365 self.running = true; 366 }); 367 } 368 369 /** 370 * Sends a signal to the process. 371 * 372 * If the process does not die within the `killTimeout`, `SIGKILL` is sent. 373 * 374 * Throws: InvalidStateException if the process is not running. 375 */ 376 void kill(int signal = SIGTERM) shared { 377 withLock((self) @trusted { 378 enforce!InvalidStateException(self.running, "Process is not running, cannot kill it"); 379 380 logger.tracef("Killing process with signal %s, monitor: %s", signal, self.watcher); 381 self.watcher.send(signal); 382 }); 383 } 384 385 /** 386 * Block on the current fibre until the process has exited. 387 * 388 * Returns the exit code of the process, regardless of whether it had to 389 * wait or not. 390 * 391 * When called before any process starts, it returns `int.init`. 392 */ 393 int wait() shared { 394 auto pair = withLock(self => tuple(self.runningMutex, self.exitStatus)); 395 auto runningMutex = pair[0]; 396 397 if (runningMutex is null) { 398 return pair[1]; 399 } 400 401 // Wait on the running mutex 402 logger.trace("Waiting on process to finish"); 403 synchronized(runningMutex) {} 404 405 return withLock(self => self.exitStatus); 406 } 407 408 private void runWatcher(Tid starter, in string[] args, in string[string] env, in string workingDir) shared @trusted { 409 auto thisTask = Task.getThis(); 410 logger.infof("Watcher(%s): started with %s", thisTask, args); 411 412 // Start the process 413 immutable config = Config.newEnv; 414 415 ProcessPipes processPipes; 416 try { 417 processPipes = pipeProcess(args, Redirect.all, env, config, workingDir); 418 } catch (ProcessException exception) { 419 logger.tracef("Watcher(%s): Exception starting process: %s", thisTask, exception); 420 starter.send(cast(shared Throwable)exception); 421 return; 422 } 423 424 auto process = processPipes.pid; 425 auto stdin = processPipes.stdin; 426 auto stdout = processPipes.stdout; 427 auto stderr = processPipes.stderr; 428 logger.tracef("Watcher(%s): [pid: %s, stdin: %s, stdout: %s, stderr: %s]", thisTask, process, stdin.fileno, stdout.fileno, stderr.fileno); 429 430 // The monitor is locked by our starter, lets sneek in some state 431 auto runningMutex = new TaskMutex; 432 runningMutex.lock(); // We have to lock in this task, or we get an error! 433 _runningMutex = cast(shared)runningMutex; 434 435 // Fetch some attributes, remember we're still locked 436 immutable killTimeout = _killTimeout; 437 438 // Notify the task that started the watcher that the watcher has started 439 logger.tracef("Watcher(%s): Notifying starter that process has started", thisTask); 440 starter.send(thisTask, cast(shared)process); 441 442 // Make sure we clean up after the watcher exits 443 scope(exit) { 444 int exitStatus = process.wait(); 445 onProcessTermination(exitStatus); 446 } 447 448 // Create a barrier to wait for the read threads to start. 449 // This is to ensure that stdout and stderr have started before we try to handle any messages 450 auto readBarrier = new Barrier(3); // stdout + stderr + current threads. 451 452 // Start threads for reading stdout and stderr 453 auto stdoutThread = new Thread({ 454 readBarrier.wait(); 455 foreach (line; stdout.byLineCopy()) { 456 line = line.sanitizeLine; 457 logger.tracef("Watcher(%s) STDOUT >> %s", thisTask, line); 458 callStdoutCallback(line); 459 } 460 logger.tracef("Watcher(%s) STDOUT: Thread completed", thisTask); 461 }).start(); 462 scope(exit) stdoutThread.join(false); 463 464 auto stderrThread = new Thread({ 465 readBarrier.wait(); 466 foreach (line; stderr.byLineCopy()) { 467 line = line.sanitizeLine; 468 logger.tracef("Watcher(%s) STDERR >> %s", thisTask, line); 469 callStderrCallback(line); 470 } 471 logger.tracef("Watcher(%s) STDERR: Thread completed", thisTask); 472 }).start(); 473 scope(exit) stderrThread.join(false); 474 475 // Wait for all read threads to start 476 readBarrier.wait(); 477 478 // Start thread for waiting on the process to finish 479 auto waitThread = new Thread({ 480 process.wait(); 481 Thread.sleep(50.msecs); // Wait for stderr and stdout to be fully processed. 482 logger.tracef("Watcher(%s) WAIT: Process terminated, notifying watcher", thisTask); 483 thisTask.send(ProcessTerminated.init); 484 }).start(); 485 scope(exit) waitThread.join(); 486 waitThread.isDaemon = true; 487 488 // Keep a timer for catching zombies 489 Timer killTimer; 490 491 bool running = true; 492 while (running) { 493 // Handle messages 494 try { 495 logger.tracef("Watcher(%s): polling...", thisTask); 496 receive( 497 // Send 498 (string message) { 499 logger.tracef("Watcher(%s): Sending message: %s", thisTask, message); 500 stdin.writeln(message); 501 stdin.flush(); 502 }, 503 // Kill 504 (int signal) { 505 //if (process.tryWait().terminated) return; 506 507 logger.tracef("Watcher(%s): Sending kill signal %s", thisTask, signal); 508 process.kill(signal); 509 logger.tracef("Watcher(%s): Kill signal sent", thisTask, signal); 510 511 // Start kill timer 512 if (killTimeout > 0.msecs && !killTimer) { 513 logger.tracef("Watcher(%s): Starting kill timeout", thisTask); 514 killTimer = setTimer(killTimeout, { 515 logger.warningf("Watcher(%s): Killing after kill timeout", thisTask); 516 thisTask.send(SIGKILL); 517 }); 518 } 519 }, 520 (CloseStdin _) { 521 logger.tracef("Watcher(%s): Closing stdin", thisTask); 522 stdin.close(); 523 }, 524 // Process terminated 525 (ProcessTerminated _) { 526 logger.tracef("Watcher(%s): Process terminated", thisTask); 527 running = false; 528 } 529 ); 530 } catch (OwnerTerminated e) { 531 // Terminate the process if the owner is terminated 532 logger.errorf("Watcher(%s): Owner terminated, killing child.", thisTask); 533 process.kill(SIGKILL); 534 } 535 } 536 537 // Stop the timer if its started. 538 if (killTimer) killTimer.stop(); 539 540 // Close all open pipes 541 logger.tracef("Watcher(%s): Closing all pipes", thisTask); 542 foreach (file; [stdout, stderr, stdin]) { 543 if (file.isOpen) file.close(); 544 } 545 546 // For sanity 547 logger.tracef("Watcher(%s): Sending SIGKILL and waiting", thisTask); 548 try { 549 process.kill(SIGKILL); 550 } catch (Throwable e) {} 551 process.wait(); 552 553 logger.infof("Watcher(%s): stopped", thisTask); 554 } 555 556 /// Called by watcher thread when the process has terminated 557 private void onProcessTermination(int status) shared { 558 // the watcher pauses itself 559 logger.trace("Process terminated"); 560 561 withLock((self) { 562 self.running = false; 563 self.exitStatus = status; 564 self.runningMutex.unlock(); 565 self.runningMutex = null; 566 }); 567 568 logger.trace("Calling process termination callback"); 569 callTerminateCallback(); 570 } 571 }