mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-10 08:17:00 +02:00
Use the ABNF machinery in the syslog message receiver and parser.
This commit is contained in:
parent
a48706d638
commit
d95bef644a
195
syslog.lisp
195
syslog.lisp
@ -6,194 +6,27 @@
|
||||
|
||||
(in-package #:pgloader.syslog)
|
||||
|
||||
(defvar *buffer* "" "Buffer we are currently parsing")
|
||||
(defvar *position* 0 "Reading position in the message")
|
||||
(defvar *message* nil "Current message parsed from the *buffer*")
|
||||
(defun process-message (data)
|
||||
"Process the data: send it in the queue for PostgreSQL COPY handling."
|
||||
(format t "data: ~{~a~^, ~}~%" data))
|
||||
|
||||
;;
|
||||
;; Those "constants" straight from the RFC 3164
|
||||
;;
|
||||
(defvar *facility-bytespec* (byte 3 0) "Read facility from priority")
|
||||
(defvar *severity-bytespec* (byte 5 3) "Read severity from priority")
|
||||
|
||||
(defvar *space-code-set* #. (quote (coerce " " 'list)))
|
||||
(defvar *decimal-code-set* #. (quote (coerce "0123456789" 'list)))
|
||||
(defvar *header-code-set* #. (quote (loop
|
||||
:for i :from 32 :to 126
|
||||
:collect (code-char i))))
|
||||
(defvar *alphabetical-code-set*
|
||||
#. (quote (concatenate 'list
|
||||
"abcdefghijklmnopqrstuvwxyz"
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
|
||||
|
||||
(defvar *month-abbrevs*
|
||||
'("Jan" "Feb" "Mar" "Apr" "May" "Jun" "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
|
||||
|
||||
(defun parse-priority (pri &aux (priority (parse-integer pri)))
|
||||
"Return facility and severity from PRI, as multiple values"
|
||||
(values (ldb *facility-bytespec* priority)
|
||||
(ldb *severity-bytespec* priority)))
|
||||
|
||||
(defstruct message facility severity timestamp hostname tag content)
|
||||
|
||||
;;
|
||||
;; Some low level utils
|
||||
;;
|
||||
(defun skip-one (character-bag)
|
||||
"Skip a character in the stream"
|
||||
;; NOOP as of now TODO: signal a condition if current *buffer* *position*
|
||||
;; is not containing a character of the bag
|
||||
(declare (ignore character-bag))
|
||||
(incf *position*))
|
||||
|
||||
(defun skip-many (character-bag)
|
||||
"Skip all characters from CHARACTER-BAG current *buffer*"
|
||||
(loop
|
||||
:while (member (aref *buffer* *position*) character-bag)
|
||||
:do (incf *position*)))
|
||||
|
||||
(defun collect-many (character-bag)
|
||||
"Return a string of the chars found in *buffer* at *position* that are in
|
||||
the given CHARACTER-BAG."
|
||||
(coerce
|
||||
(loop
|
||||
:until (= *position* (length *buffer*))
|
||||
:for char = (aref *buffer* *position*)
|
||||
:while (member char character-bag)
|
||||
:collect char
|
||||
:do (incf *position*))
|
||||
'string))
|
||||
|
||||
(defun collect-max (character-bag limit)
|
||||
"Return a string of the chars found in *buffer* at *position* that are in
|
||||
the given CHARACTER-BAG."
|
||||
(coerce
|
||||
(loop
|
||||
:repeat limit
|
||||
:for char = (aref *buffer* *position*)
|
||||
:while (member char character-bag)
|
||||
:collect char
|
||||
:do (incf *position*))
|
||||
'string))
|
||||
|
||||
(defun collect-until (character-bag)
|
||||
"Collect characters and increment *position* until we reach one of
|
||||
CHARACTER-BAG."
|
||||
(coerce
|
||||
(loop
|
||||
:for char = (aref *buffer* *position*)
|
||||
:until (member char character-bag)
|
||||
:collect char
|
||||
:do (incf *position*))
|
||||
'string))
|
||||
|
||||
(defun read-one-of (list-of-strings &optional length)
|
||||
"Return one of the strings given in LIST-OF-STRINGS and increment *position*."
|
||||
(loop
|
||||
:for match :in list-of-strings
|
||||
:for len = (or length (length match))
|
||||
:until (string= match
|
||||
(coerce
|
||||
(subseq *buffer* *position* (+ *position* len)) 'string))
|
||||
:finally (progn (incf *position* len)
|
||||
(return match))))
|
||||
|
||||
;;
|
||||
;; Read parts of the syslog message
|
||||
;;
|
||||
(defun read-priority ()
|
||||
"Reads the priority from the BUFFER position POS and return the priority
|
||||
and the next item position as multiple values"
|
||||
(skip-one "<")
|
||||
(multiple-value-bind (facility severity)
|
||||
(parse-priority (collect-many *decimal-code-set*))
|
||||
(setf (message-facility *message*) facility
|
||||
(message-severity *message*) severity))
|
||||
(skip-one ">"))
|
||||
|
||||
(defun read-spaces-and-number ()
|
||||
(skip-many *space-code-set*)
|
||||
(let ((digits (collect-many *decimal-code-set*)))
|
||||
(or (parse-integer digits :junk-allowed t) 0)))
|
||||
|
||||
(defun maybe-read-year ()
|
||||
"See if there's a year and/or a TZ at current *position* in *buffer*."
|
||||
(let* ((start-position *position*)
|
||||
(ignore-space (skip-one " "))
|
||||
(digits-at-point (collect-many *decimal-code-set*)))
|
||||
(declare (ignore ignore-space))
|
||||
(if (and digits-at-point
|
||||
(char= (aref *buffer* *position*) #\Space)
|
||||
(or (= 2 (length digits-at-point))
|
||||
(= 4 (length digits-at-point))))
|
||||
(parse-integer digits-at-point)
|
||||
;; that was not a year after all
|
||||
(prog2 (setf *position* start-position)
|
||||
;; get current year
|
||||
(nth 5 (multiple-value-list (get-decoded-time)))))))
|
||||
|
||||
(defun read-timestamp ()
|
||||
"The TIMESTAMP field is the local time and is in the format of \"Mmm dd
|
||||
hh:mm:ss\" (without the quote marks)"
|
||||
(multiple-value-bind (month day hours mins secs year)
|
||||
(values (+ 1 (position (read-one-of *month-abbrevs*)
|
||||
*month-abbrevs*
|
||||
:test #'string=)) ; month
|
||||
(read-spaces-and-number) ; day
|
||||
(read-spaces-and-number) ; hours
|
||||
(prog2 (skip-one ":") ;
|
||||
(read-spaces-and-number)) ; mins
|
||||
(prog2 (skip-one ":") ;
|
||||
(read-spaces-and-number)) ; secs
|
||||
(maybe-read-year)) ; year
|
||||
(setf (message-timestamp *message*)
|
||||
(local-time:encode-timestamp 0 secs mins hours day month year))))
|
||||
|
||||
(defun read-hostname ()
|
||||
"Any non whitespace is constituent of the HOSTNAME field."
|
||||
(setf (message-hostname *message*) (collect-until *space-code-set*)))
|
||||
|
||||
(defun read-header ()
|
||||
"Read the HEADER part of a syslog UDP buffer"
|
||||
(read-timestamp)
|
||||
(skip-one " ")
|
||||
(read-hostname)
|
||||
(skip-one " "))
|
||||
|
||||
(defun read-tag ()
|
||||
(setf (message-tag *message*) (collect-max *alphabetical-code-set* 32)))
|
||||
|
||||
(defun read-content ()
|
||||
(setf (message-content *message*) (subseq *buffer* *position*)))
|
||||
|
||||
(defun read-msg ()
|
||||
(read-tag)
|
||||
(read-content))
|
||||
|
||||
(defun parse-message (&key
|
||||
((:buffer *buffer*) *buffer*)
|
||||
((:position *position*) *position*))
|
||||
"Parse a whole syslog message and return a message structure."
|
||||
;; (declare (type (simple-array (unsigned-byte 8) *) *buffer*))
|
||||
(let ((*message* (make-message)))
|
||||
(read-priority)
|
||||
(read-header)
|
||||
(read-msg)
|
||||
*message*))
|
||||
|
||||
(defun syslog-udp-handler (buffer)
|
||||
(defun syslog-udp-handler (buffer scanners)
|
||||
"Handler to register to usocket:socket-server call."
|
||||
(declare (type (simple-array (unsigned-byte 8) *) buffer))
|
||||
(let* ((*buffer* (map 'string #'code-char buffer))
|
||||
(*position* 0)
|
||||
(mesg (parse-message)))
|
||||
(format t "recv: ~a~%" mesg)))
|
||||
(let* ((mesg (map 'string #'code-char buffer)))
|
||||
(loop
|
||||
for scanner in scanners
|
||||
for (match parts) = (multiple-value-list
|
||||
(cl-ppcre:scan-to-strings scanner mesg))
|
||||
until match
|
||||
finally (process-message (coerce parts 'list)))))
|
||||
|
||||
(defun start-syslog-server (&key
|
||||
scanners
|
||||
(host nil)
|
||||
(port 10514))
|
||||
"Start our syslog socket and read messages."
|
||||
(usocket:socket-server host port #'syslog-udp-handler nil
|
||||
(usocket:socket-server host port #'syslog-udp-handler (list scanners)
|
||||
:protocol :datagram
|
||||
:reuse-address t
|
||||
:element-type '(unsigned-byte 8)))
|
||||
@ -210,7 +43,7 @@
|
||||
|
||||
#|
|
||||
|
||||
(send-message "<165>Aug 29 00:30:12 Dimitris-MacBook-Air.local coreaudiod[215]: Enabled automatic stack shots because audio IO is inactive")
|
||||
(send-message "<165>1 2013-08-29T00:30:12Z Dimitris-MacBook-Air.local coreaudiod[215]: - - Enabled automatic stack shots because audio IO is inactive")
|
||||
|
||||
(send-message "<0>Aug 31 21:38:28 Dimitris-MacBook-Air.local Google Chrome[236]: notify name \"com.apple.coregraphics.GUIConsoleSessionChanged\" has been registered 20 times - this may be a leak")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user