mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-09 07:47:00 +02:00
218 lines
7.0 KiB
Common Lisp
218 lines
7.0 KiB
Common Lisp
;;;
|
|
;;; Implement a very simple syslog UDP server in order to be able to stream
|
|
;;; syslog messages directly into a PostgreSQL database, using the COPY
|
|
;;; protocol.
|
|
;;;
|
|
|
|
(in-package #:pgloader.syslog)
|
|
|
|
(defvar *buffer* "" "Buffer we are currently parsing")
|
|
(defvar *position* 0 "Reading position in the message")
|
|
(defvar *message* nil "Current message parsed from the *buffer*")
|
|
|
|
;;
|
|
;; Those "constants" straight from the RFC 3164
|
|
;;
|
|
(defvar *facility-bytespec* (byte 3 0) "Read facility from priority")
|
|
(defvar *severity-bytespec* (byte 5 3) "Read severity from priority")
|
|
|
|
(defvar *space-code-set* #. (quote (coerce " " 'list)))
|
|
(defvar *decimal-code-set* #. (quote (coerce "0123456789" 'list)))
|
|
(defvar *header-code-set* #. (quote (loop
|
|
:for i :from 32 :to 126
|
|
:collect (code-char i))))
|
|
(defvar *alphabetical-code-set*
|
|
#. (quote (concatenate 'list
|
|
"abcdefghijklmnopqrstuvwxyz"
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
|
|
|
|
(defvar *month-abbrevs*
|
|
'("Jan" "Feb" "Mar" "Apr" "May" "Jun" "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
|
|
|
|
(defun parse-priority (pri &aux (priority (parse-integer pri)))
|
|
"Return facility and severity from PRI, as multiple values"
|
|
(values (ldb *facility-bytespec* priority)
|
|
(ldb *severity-bytespec* priority)))
|
|
|
|
(defstruct message facility severity timestamp hostname tag content)
|
|
|
|
;;
|
|
;; Some low level utils
|
|
;;
|
|
(defun skip-one (character-bag)
|
|
"Skip a character in the stream"
|
|
;; NOOP as of now TODO: signal a condition if current *buffer* *position*
|
|
;; is not containing a character of the bag
|
|
(declare (ignore character-bag))
|
|
(incf *position*))
|
|
|
|
(defun skip-many (character-bag)
|
|
"Skip all characters from CHARACTER-BAG current *buffer*"
|
|
(loop
|
|
:while (member (aref *buffer* *position*) character-bag)
|
|
:do (incf *position*)))
|
|
|
|
(defun collect-many (character-bag)
|
|
"Return a string of the chars found in *buffer* at *position* that are in
|
|
the given CHARACTER-BAG."
|
|
(coerce
|
|
(loop
|
|
:until (= *position* (length *buffer*))
|
|
:for char = (aref *buffer* *position*)
|
|
:while (member char character-bag)
|
|
:collect char
|
|
:do (incf *position*))
|
|
'string))
|
|
|
|
(defun collect-max (character-bag limit)
|
|
"Return a string of the chars found in *buffer* at *position* that are in
|
|
the given CHARACTER-BAG."
|
|
(coerce
|
|
(loop
|
|
:repeat limit
|
|
:for char = (aref *buffer* *position*)
|
|
:while (member char character-bag)
|
|
:collect char
|
|
:do (incf *position*))
|
|
'string))
|
|
|
|
(defun collect-until (character-bag)
|
|
"Collect characters and increment *position* until we reach one of
|
|
CHARACTER-BAG."
|
|
(coerce
|
|
(loop
|
|
:for char = (aref *buffer* *position*)
|
|
:until (member char character-bag)
|
|
:collect char
|
|
:do (incf *position*))
|
|
'string))
|
|
|
|
(defun read-one-of (list-of-strings &optional length)
|
|
"Return one of the strings given in LIST-OF-STRINGS and increment *position*."
|
|
(loop
|
|
:for match :in list-of-strings
|
|
:for len = (or length (length match))
|
|
:until (string= match
|
|
(coerce
|
|
(subseq *buffer* *position* (+ *position* len)) 'string))
|
|
:finally (progn (incf *position* len)
|
|
(return match))))
|
|
|
|
;;
|
|
;; Read parts of the syslog message
|
|
;;
|
|
(defun read-priority ()
|
|
"Reads the priority from the BUFFER position POS and return the priority
|
|
and the next item position as multiple values"
|
|
(skip-one "<")
|
|
(multiple-value-bind (facility severity)
|
|
(parse-priority (collect-many *decimal-code-set*))
|
|
(setf (message-facility *message*) facility
|
|
(message-severity *message*) severity))
|
|
(skip-one ">"))
|
|
|
|
(defun read-spaces-and-number ()
|
|
(skip-many *space-code-set*)
|
|
(let ((digits (collect-many *decimal-code-set*)))
|
|
(or (parse-integer digits :junk-allowed t) 0)))
|
|
|
|
(defun maybe-read-year ()
|
|
"See if there's a year and/or a TZ at current *position* in *buffer*."
|
|
(let* ((start-position *position*)
|
|
(ignore-space (skip-one " "))
|
|
(digits-at-point (collect-many *decimal-code-set*)))
|
|
(declare (ignore ignore-space))
|
|
(if (and digits-at-point
|
|
(char= (aref *buffer* *position*) #\Space)
|
|
(or (= 2 (length digits-at-point))
|
|
(= 4 (length digits-at-point))))
|
|
(parse-integer digits-at-point)
|
|
;; that was not a year after all
|
|
(prog2 (setf *position* start-position)
|
|
;; get current year
|
|
(nth 5 (multiple-value-list (get-decoded-time)))))))
|
|
|
|
(defun read-timestamp ()
|
|
"The TIMESTAMP field is the local time and is in the format of \"Mmm dd
|
|
hh:mm:ss\" (without the quote marks)"
|
|
(multiple-value-bind (month day hours mins secs year)
|
|
(values (+ 1 (position (read-one-of *month-abbrevs*)
|
|
*month-abbrevs*
|
|
:test #'string=)) ; month
|
|
(read-spaces-and-number) ; day
|
|
(read-spaces-and-number) ; hours
|
|
(prog2 (skip-one ":") ;
|
|
(read-spaces-and-number)) ; mins
|
|
(prog2 (skip-one ":") ;
|
|
(read-spaces-and-number)) ; secs
|
|
(maybe-read-year)) ; year
|
|
(setf (message-timestamp *message*)
|
|
(local-time:encode-timestamp 0 secs mins hours day month year))))
|
|
|
|
(defun read-hostname ()
|
|
"Any non whitespace is constituent of the HOSTNAME field."
|
|
(setf (message-hostname *message*) (collect-until *space-code-set*)))
|
|
|
|
(defun read-header ()
|
|
"Read the HEADER part of a syslog UDP buffer"
|
|
(read-timestamp)
|
|
(skip-one " ")
|
|
(read-hostname)
|
|
(skip-one " "))
|
|
|
|
(defun read-tag ()
|
|
(setf (message-tag *message*) (collect-max *alphabetical-code-set* 32)))
|
|
|
|
(defun read-content ()
|
|
(setf (message-content *message*) (subseq *buffer* *position*)))
|
|
|
|
(defun read-msg ()
|
|
(read-tag)
|
|
(read-content))
|
|
|
|
(defun parse-message (&key
|
|
((:buffer *buffer*) *buffer*)
|
|
((:position *position*) *position*))
|
|
"Parse a whole syslog message and return a message structure."
|
|
;; (declare (type (simple-array (unsigned-byte 8) *) *buffer*))
|
|
(let ((*message* (make-message)))
|
|
(read-priority)
|
|
(read-header)
|
|
(read-msg)
|
|
*message*))
|
|
|
|
(defun syslog-udp-handler (buffer)
|
|
"Handler to register to usocket:socket-server call."
|
|
(declare (type (simple-array (unsigned-byte 8) *) buffer))
|
|
(let* ((*buffer* (map 'string #'code-char buffer))
|
|
(*position* 0)
|
|
(mesg (parse-message)))
|
|
(format t "recv: ~a~%" mesg)))
|
|
|
|
(defun start-syslog-server (&key
|
|
(host nil)
|
|
(port 10514))
|
|
"Start our syslog socket and read messages."
|
|
(usocket:socket-server host port #'syslog-udp-handler nil
|
|
:protocol :datagram
|
|
:reuse-address t
|
|
:element-type '(unsigned-byte 8)))
|
|
|
|
;;;
|
|
;;; A test client, with some not-so random messages
|
|
;;;
|
|
(defun send-message (message
|
|
&key
|
|
(host "localhost")
|
|
(port 10514))
|
|
(let ((socket (usocket:socket-connect host port :protocol :datagram)))
|
|
(usocket:socket-send socket message (length message))))
|
|
|
|
#|
|
|
|
|
(send-message "<165>Aug 29 00:30:12 Dimitris-MacBook-Air.local coreaudiod[215]: Enabled automatic stack shots because audio IO is inactive")
|
|
|
|
(send-message "<0>Aug 31 21:38:28 Dimitris-MacBook-Air.local Google Chrome[236]: notify name \"com.apple.coregraphics.GUIConsoleSessionChanged\" has been registered 20 times - this may be a leak")
|
|
|
|
|#
|