mirror of
https://github.com/fluent/fluentd-ui.git
synced 2025-08-12 09:17:05 +02:00
245 lines
8.0 KiB
Ruby
245 lines
8.0 KiB
Ruby
# pidfile
|
|
# td-agent: /var/run/td-agent/td-agent.pid
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L18
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/deb/td-agent#L24
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/rpm/td-agent#L24
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/td-agent/logrotate.d/td-agent.logrotate#L10
|
|
# fluentd: nothing (or --daemon PIDFILE)
|
|
#
|
|
# logfile
|
|
# td-agent: /var/log/td-agent/td-agent.log
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L21
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/deb/td-agent#L23
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/init.d/rpm/td-agent#L23
|
|
# fluentd: stdout (or --log LOGFILE)
|
|
#
|
|
# config file
|
|
# td-agent: /etc/td-agent/td-agent.conf
|
|
# - https://github.com/treasure-data/omnibus-td-agent/blob/master/templates/etc/systemd/td-agent.service.erb#L14
|
|
# fluentd: /etc/fluent/fluent.conf (created by fluentd -s)
|
|
|
|
require "strscan"
|
|
|
|
class Fluentd
|
|
class Agent
|
|
module Common
|
|
attr_reader :extra_options
|
|
|
|
def self.included(base)
|
|
base.include(Fluentd::Agent::ProcessOperation)
|
|
end
|
|
|
|
# define these methods on each Agent class
|
|
%w(start stop restart version).each do |method|
|
|
define_method(method) do
|
|
raise NotImplementedError, "'#{method}' method is required to be defined"
|
|
end
|
|
end
|
|
|
|
def initialize(options = {})
|
|
@extra_options = options
|
|
end
|
|
|
|
def pid_file
|
|
extra_options[:pid_file] || self.class.default_options[:pid_file]
|
|
end
|
|
|
|
def log_file
|
|
extra_options[:log_file] || self.class.default_options[:log_file]
|
|
end
|
|
|
|
def log
|
|
@log ||= FluentdLog.new(log_file)
|
|
end
|
|
|
|
def config_file
|
|
extra_options[:config_file] || self.class.default_options[:config_file]
|
|
end
|
|
|
|
# -- config
|
|
def config
|
|
File.read(config_file)
|
|
end
|
|
|
|
def config_write(content)
|
|
backup_config
|
|
File.open(config_file, "w") do |f|
|
|
f.write content
|
|
end
|
|
end
|
|
|
|
def config_append(content)
|
|
backup_config
|
|
File.open(config_file, "a") do |f|
|
|
f.write "\n"
|
|
f.write content
|
|
end
|
|
end
|
|
|
|
def config_merge(content)
|
|
if content.start_with?("<label ")
|
|
label = content.slice(/<label\s+(.+?)>/, 1)
|
|
key = "label:#{label}"
|
|
parsed_config = parse_config(config)
|
|
if parsed_config.key?(key)
|
|
offset = parsed_config[key][0][:pos] + parsed_config[key][0][:size]
|
|
label, sections = parse_label_section(content, offset)
|
|
parsed_config[key][0][:sections]["filter"].concat(sections["filter"])
|
|
parsed_config[key][0][:sections]["match"].concat(sections["match"])
|
|
config_write(dump_parsed_config(parsed_config))
|
|
else
|
|
config_append(content)
|
|
end
|
|
else
|
|
config_append(content)
|
|
end
|
|
end
|
|
|
|
def configuration
|
|
if File.exists? config_file
|
|
::Fluentd::Agent::Configuration.new(config_file)
|
|
end
|
|
end
|
|
|
|
# -- backup methods
|
|
def config_backup_dir
|
|
dir = File.join(FluentdUI.data_dir, "#{Rails.env}_confg_backups")
|
|
FileUtils.mkdir_p(dir)
|
|
dir
|
|
end
|
|
|
|
def backup_files
|
|
Dir.glob(File.join("#{config_backup_dir}", "*.conf"))
|
|
end
|
|
|
|
def backup_files_in_old_order
|
|
backup_files.sort
|
|
end
|
|
|
|
def backup_files_in_new_order
|
|
backup_files_in_old_order.reverse
|
|
end
|
|
|
|
def running_config_backup_dir
|
|
dir = File.join(FluentdUI.data_dir, "#{Rails.env}_running_confg_backup")
|
|
FileUtils.mkdir_p(dir)
|
|
dir
|
|
end
|
|
|
|
def running_config_backup_file
|
|
File.join(running_config_backup_dir, "running.conf")
|
|
end
|
|
|
|
# -------------- private --------------
|
|
private
|
|
|
|
def backup_running_config
|
|
#back up config file only when start success
|
|
return unless yield
|
|
|
|
return unless File.exists? config_file
|
|
|
|
FileUtils.cp config_file, running_config_backup_file
|
|
|
|
true
|
|
end
|
|
|
|
def backup_config
|
|
return unless File.exists? config_file
|
|
|
|
FileUtils.cp config_file, File.join(config_backup_dir, "#{Time.zone.now.strftime('%Y%m%d_%H%M%S')}.conf")
|
|
|
|
remove_over_backup_files
|
|
end
|
|
|
|
def remove_over_backup_files
|
|
over_file_count = backup_files.size - ::Settings.max_backup_files_count
|
|
|
|
return if over_file_count <= 0
|
|
|
|
backup_files_in_old_order.first(over_file_count).each do |file|
|
|
note_file_attached_backup = file.sub(/#{Regexp.escape(File.extname(file))}\z/, ::Fluentd::SettingArchive::Note::FILE_EXTENSION)
|
|
FileUtils.rm(note_file_attached_backup) if File.exist? note_file_attached_backup
|
|
FileUtils.rm(file) if File.exist? file
|
|
end
|
|
end
|
|
|
|
def parse_config(content)
|
|
scanner = StringScanner.new(content)
|
|
contents = Hash.new {|h, k| h[k] = [] }
|
|
until scanner.eos? do
|
|
started = scanner.pos
|
|
header = scanner.scan_until(/^<(source|filter|match|label)/)
|
|
section_type = scanner[1]
|
|
break unless header
|
|
case section_type
|
|
when "source", "filter", "match"
|
|
current_source = header + scanner.scan_until(%r{^</(?:source|filter|match)>})
|
|
contents[section_type] << { pos: started, content: current_source.strip }
|
|
when "label"
|
|
label_content = header + scanner.scan_until(%r{^</label>})
|
|
label, sections = parse_label_section(label_content, started)
|
|
contents["label:#{label}"] << { label: label, pos: started, sections: sections, size: label_content.size }
|
|
else
|
|
raise TypeError, "Unknown section: #{started}: #{section_type}"
|
|
end
|
|
end
|
|
contents
|
|
end
|
|
|
|
def parse_label_section(content, offset)
|
|
scanner = StringScanner.new(content)
|
|
scanner.scan_until(/^<label\s+?([^\s]+?)>/)
|
|
label = scanner[1]
|
|
sections = Hash.new {|h, k| h[k] = [] }
|
|
loop do
|
|
break if scanner.match?(%r{\s+?</label>})
|
|
pos = scanner.pos
|
|
header = scanner.scan_until(/^\s*<(filter|match)/)
|
|
type = scanner[1]
|
|
source = header + scanner.scan_until(%r{^\s*</(?:filter|match)>})
|
|
sections[type] << { label: label, pos: pos + offset, content: source.sub(/\n+/, "") }
|
|
end
|
|
return label, sections
|
|
end
|
|
|
|
def dump_parsed_config(parsed_config)
|
|
content = "".dup
|
|
sources = parsed_config["source"] || []
|
|
filters = parsed_config["filter"] || []
|
|
matches = parsed_config["match"] || []
|
|
labels = parsed_config.select do |key, sections|
|
|
key.start_with?("label:")
|
|
end
|
|
labels = labels.values.flatten
|
|
sorted_sections = (sources + filters + matches + labels).sort_by do |section|
|
|
section[:pos]
|
|
end
|
|
sorted_sections.each do |section|
|
|
if section.key?(:label)
|
|
label = section[:label]
|
|
sub_filters = section.dig(:sections, "filter") || []
|
|
sub_matches = section.dig(:sections, "match") || []
|
|
sorted_sub_filters = sub_filters.sort_by do |sub_section|
|
|
sub_section[:pos]
|
|
end
|
|
sorted_sub_matches = sub_matches.sort_by do |sub_section|
|
|
sub_section[:pos]
|
|
end
|
|
sub_sections = sorted_sub_filters + sorted_sub_matches
|
|
content << "<label #{label}>\n"
|
|
sub_sections.each do |sub_section|
|
|
content << sub_section[:content] << "\n\n"
|
|
end
|
|
content.chomp!
|
|
content << "</label>\n\n"
|
|
else
|
|
content << section[:content] << "\n\n"
|
|
end
|
|
end
|
|
content.chomp
|
|
end
|
|
end
|
|
end
|
|
end
|