;;; ;;; Implement a very simple syslog UDP server in order to be able to stream ;;; syslog messages directly into a PostgreSQL database, using the COPY ;;; protocol. ;;; (in-package #:pgloader.syslog) (defun process-message (data queue) "Process the data: send it in the queue for PostgreSQL COPY handling." (lq:push-queue data queue)) (defun syslog-udp-handler (buffer scanners) "Handler to register to usocket:socket-server call." (declare (type (simple-array (unsigned-byte 8) *) buffer)) (let* ((mesg (map 'string #'code-char buffer))) (loop for scanner in scanners for (parser queue) = (destructuring-bind (&key parser queue &allow-other-keys) scanner (list parser queue)) for (match parts) = (multiple-value-list (cl-ppcre:scan-to-strings parser mesg)) until match finally (when match (process-message (coerce parts 'list) queue))))) (defun start-syslog-server (&key scanners (host nil) (port 10514)) "Start our syslog socket and read messages." (usocket:socket-server host port #'syslog-udp-handler (list scanners) :protocol :datagram :reuse-address t :element-type '(unsigned-byte 8))) (defun copy-from-queue (dbname table-name queue &key truncate transforms ((:host *pgconn-host*) *pgconn-host*) ((:port *pgconn-port*) *pgconn-port*) ((:user *pgconn-user*) *pgconn-user*) ((:pass *pgconn-pass*) *pgconn-pass*) ((:gucs *pg-settings*) *pg-settings*)) "Simple wrapper around pgloader.pgsql.copy-from-queue. We need to set the special parameters from within the thread." (pgloader.pgsql:copy-from-queue dbname table-name queue :truncate truncate :transforms transforms)) (defun stream-messages (&key scanners host port truncate transforms) "Listen for syslog messages on given UDP PORT, and stream them to PostgreSQL" (let* ((nb-tasks (+ 1 (length scanners))) (lp:*kernel* (lparallel:make-kernel nb-tasks)) (channel (lp:make-channel)) ;; add a data queue to each scanner (scanners (loop for scanner in scanners for dataq = (lq:make-queue :fixed-capacity 4096) collect `(,@scanner :queue ,dataq)))) ;; listen to syslog messages and match them against the scanners (lp:submit-task channel #'start-syslog-server :scanners scanners :host host :port port) ;; and start another task per scanner to pull that data from the queue ;; to PostgreSQL, each will have a specific connection and queue. (loop for scanner in scanners do (destructuring-bind (&key target gucs queue &allow-other-keys) scanner (destructuring-bind (&key host port user password dbname table-name &allow-other-keys) target (lp:submit-task channel (lambda () ;; beware of thread boundaries for binding ;; special variables: we use an helper here (copy-from-queue dbname table-name queue :host host :port port :user user :pass password :gucs gucs :truncate truncate :transforms transforms)))))) ;; now wait until both the tasks are over (loop for tasks below nb-tasks do (lp:receive-result channel)))) ;;; ;;; A test client, with some not-so random messages ;;; (defun send-message (message &key (host "localhost") (port 10514)) (let ((socket (usocket:socket-connect host port :protocol :datagram))) (usocket:socket-send socket message (length message)))) #| (send-message "<165>1 2013-08-29T00:30:12Z Dimitris-MacBook-Air.local coreaudiod[215]: - - Enabled automatic stack shots because audio IO is inactive") (send-message "<0>Aug 31 21:38:28 Dimitris-MacBook-Air.local Google Chrome[236]: notify name \"com.apple.coregraphics.GUIConsoleSessionChanged\" has been registered 20 times - this may be a leak") SYSLOG> (start-syslog-server :scanners (list (abnf:parse-abnf-grammar abnf:*abnf-rsyslog* :rsyslog-msg :registering-rules '(:timestamp :app-name :data)))) data: STYLE-WARNING: redefining PGLOADER.SYSLOG::SYSLOG-UDP-HANDLER in DEFUN |#