mirror of
https://gitlab.alpinelinux.org/alpine/aports.git
synced 2025-08-06 05:47:13 +02:00
903 lines
29 KiB
Diff
903 lines
29 KiB
Diff
commit 4a51f83efc89412f1954b3cb32a3ee83ff354095
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Sun Nov 19 22:06:15 2017 +0000
|
||
|
||
fix detection of unknown action in netdisco-do
|
||
|
||
diff --git a/bin/netdisco-do b/bin/netdisco-do
|
||
index 1c5b0b43..ab60f98e 100755
|
||
--- a/bin/netdisco-do
|
||
+++ b/bin/netdisco-do
|
||
@@ -139,7 +139,7 @@ foreach my $host (@hostlist) {
|
||
$job->log("error running job: $_");
|
||
};
|
||
|
||
- if ($job->log eq 'no worker succeeded during main phase') {
|
||
+ if ($job->log eq 'failed to report from any worker!') {
|
||
pod2usage(
|
||
-msg => (sprintf 'error: %s is not a valid action', $action),
|
||
-verbose => 2,
|
||
|
||
commit c576a755af61186ec46a0dc2576b43aff0ed903a
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Tue Nov 21 10:00:53 2017 +0000
|
||
|
||
tweak log message
|
||
|
||
diff --git a/lib/App/Netdisco/Worker/Runner.pm b/lib/App/Netdisco/Worker/Runner.pm
|
||
index 2a81ab0c..334e9e97 100644
|
||
--- a/lib/App/Netdisco/Worker/Runner.pm
|
||
+++ b/lib/App/Netdisco/Worker/Runner.pm
|
||
@@ -76,7 +76,7 @@ sub run_workers {
|
||
foreach my $worker (@{ $self->$set }) {
|
||
try { $job->add_status( $worker->($job) ) }
|
||
catch {
|
||
- debug "=> $_" if $_;
|
||
+ debug "-> $_" if $_;
|
||
$job->add_status( Status->error($_) );
|
||
};
|
||
}
|
||
|
||
commit b694258a6580382c5a66864468e4e1b136709f79
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Tue Nov 21 10:01:09 2017 +0000
|
||
|
||
leave community rows in place
|
||
|
||
diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm
|
||
index 6352e444..3d3e0a74 100644
|
||
--- a/lib/App/Netdisco/DB/Result/Device.pm
|
||
+++ b/lib/App/Netdisco/DB/Result/Device.pm
|
||
@@ -249,7 +249,6 @@ sub renumber {
|
||
DevicePortSsid
|
||
DevicePortVlan
|
||
DevicePortWireless
|
||
- Community
|
||
/) {
|
||
$schema->resultset($set)
|
||
->search({ip => $old_ip})
|
||
|
||
commit 1bbe8c916481b9e5dc0c2ec26c4d922a6dabd5f0
|
||
Merge: b694258a 4a51f83e
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Tue Nov 21 10:02:57 2017 +0000
|
||
|
||
Merge branch 'master' of github.com:netdisco/netdisco
|
||
|
||
commit 0bb15f36b9e8f374f995b5cfeaf493b74fb458e6
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Thu Nov 23 19:23:55 2017 +0000
|
||
|
||
fixes for race conditions and dupes in job queue
|
||
|
||
we had situations where the manager would start workers on the same job,
|
||
either because of race conditions or because at the time of queueing it wasn't
|
||
known that the jobs were targeting the same device (due to device aliases).
|
||
|
||
this commit removes duplicate jobs, reduces the need for locking on the job
|
||
queue, and makes use of lldpRemChassisId to try to deduplicate jobs before
|
||
they are started. in effect we have several goes to prevent duplicate jobs:
|
||
|
||
1. at neighbor discovery time we try to skip queueing same lldpRemChassisId
|
||
2. at job selection we 'error out' jobs with same profile as job selected
|
||
3. at job selection we check for running job with same profile as selected
|
||
4. the job manager process also checks for duplicate job profiles
|
||
5. at job lock we abort if the job was 'errored out'
|
||
|
||
all together this seems to work well. a test on a large university network of
|
||
303 devices (four core routers and the rest edge routers, runing VRF with many
|
||
duplicate identities), ~1200 subnets, ~50k hosts, resulted in no DB deadlock
|
||
or contention and a complete discover+arpnip+macsuck (909 jobs) in ~3 minutes
|
||
(with ~150 duplicate jobs identified and skipped).
|
||
|
||
diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
|
||
index 7811b53f..72850e8f 100644
|
||
--- a/lib/App/Netdisco/Backend/Job.pm
|
||
+++ b/lib/App/Netdisco/Backend/Job.pm
|
||
@@ -19,6 +19,7 @@ foreach my $slot (qw/
|
||
username
|
||
userip
|
||
log
|
||
+ device_key
|
||
|
||
_current_phase
|
||
_last_namespace
|
||
diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
index 25d13ba7..d594df4d 100644
|
||
--- a/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
+++ b/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
@@ -34,6 +34,16 @@ sub worker_begin {
|
||
}
|
||
}
|
||
|
||
+# creates a 'signature' for each job so that we can check for duplicates ...
|
||
+# it happens from time to time due to the distributed nature of the job queue
|
||
+# and manager(s) - also kinder to the DB to skip here rather than jq_lock()
|
||
+my $memoize = sub {
|
||
+ no warnings 'uninitialized';
|
||
+ my $job = shift;
|
||
+ return join chr(28), map {$job->{$_}}
|
||
+ (qw/action port subaction/, ($job->{device_key} ? 'device_key' : 'device'));
|
||
+};
|
||
+
|
||
sub worker_body {
|
||
my $self = shift;
|
||
my $wid = $self->wid;
|
||
@@ -46,6 +56,7 @@ sub worker_body {
|
||
while (1) {
|
||
prctl sprintf 'nd2: #%s mgr: gathering', $wid;
|
||
my $num_slots = 0;
|
||
+ my %seen_job = ();
|
||
|
||
$num_slots = parse_max_workers( setting('workers')->{tasks} )
|
||
- $self->{queue}->pending();
|
||
@@ -54,6 +65,7 @@ sub worker_body {
|
||
# get some high priority jobs
|
||
# TODO also check for stale jobs in Netdisco DB
|
||
foreach my $job ( jq_getsomep($num_slots) ) {
|
||
+ next if $seen_job{ $memoize->($job) }++;
|
||
|
||
# mark job as running
|
||
next unless jq_lock($job);
|
||
@@ -71,6 +83,7 @@ sub worker_body {
|
||
# get some normal priority jobs
|
||
# TODO also check for stale jobs in Netdisco DB
|
||
foreach my $job ( jq_getsome($num_slots) ) {
|
||
+ next if $seen_job{ $memoize->($job) }++;
|
||
|
||
# mark job as running
|
||
next unless jq_lock($job);
|
||
@@ -81,6 +94,11 @@ sub worker_body {
|
||
$self->{queue}->enqueue($job);
|
||
}
|
||
|
||
+ #if (scalar grep {$_ > 1} values %seen_job) {
|
||
+ # debug 'WARNING: saw duplicate jobs after getsome()';
|
||
+ # use DDP; debug p %seen_job;
|
||
+ #}
|
||
+
|
||
debug "mgr ($wid): sleeping now...";
|
||
prctl sprintf 'nd2: #%s mgr: idle', $wid;
|
||
sleep( setting('workers')->{sleep_time} || 1 );
|
||
diff --git a/lib/App/Netdisco/DB.pm b/lib/App/Netdisco/DB.pm
|
||
index 808b45b6..7e864c36 100644
|
||
--- a/lib/App/Netdisco/DB.pm
|
||
+++ b/lib/App/Netdisco/DB.pm
|
||
@@ -11,7 +11,7 @@ __PACKAGE__->load_namespaces(
|
||
);
|
||
|
||
our # try to hide from kwalitee
|
||
- $VERSION = 44; # schema version used for upgrades, keep as integer
|
||
+ $VERSION = 45; # schema version used for upgrades, keep as integer
|
||
|
||
use Path::Class;
|
||
use File::ShareDir 'dist_dir';
|
||
diff --git a/lib/App/Netdisco/DB/Result/Admin.pm b/lib/App/Netdisco/DB/Result/Admin.pm
|
||
index 27e0cf9f..c3075c08 100644
|
||
--- a/lib/App/Netdisco/DB/Result/Admin.pm
|
||
+++ b/lib/App/Netdisco/DB/Result/Admin.pm
|
||
@@ -46,6 +46,8 @@ __PACKAGE__->add_columns(
|
||
{ data_type => "text", is_nullable => 1 },
|
||
"debug",
|
||
{ data_type => "boolean", is_nullable => 1 },
|
||
+ "device_key",
|
||
+ { data_type => "text", is_nullable => 1 },
|
||
);
|
||
|
||
|
||
diff --git a/lib/App/Netdisco/DB/Result/Device.pm b/lib/App/Netdisco/DB/Result/Device.pm
|
||
index 3d3e0a74..e3a794d8 100644
|
||
--- a/lib/App/Netdisco/DB/Result/Device.pm
|
||
+++ b/lib/App/Netdisco/DB/Result/Device.pm
|
||
@@ -238,6 +238,7 @@ sub renumber {
|
||
if $new_ip eq '0.0.0.0'
|
||
or $new_ip eq '127.0.0.1';
|
||
|
||
+ # Community is not included as SNMP::test_connection will take care of it
|
||
foreach my $set (qw/
|
||
DeviceIp
|
||
DeviceModule
|
||
@@ -259,10 +260,6 @@ sub renumber {
|
||
->search({remote_ip => $old_ip})
|
||
->update({remote_ip => $new_ip});
|
||
|
||
- $schema->resultset('Admin')
|
||
- ->search({device => $old_ip})
|
||
- ->update({device => $new_ip});
|
||
-
|
||
$schema->resultset('Node')
|
||
->search({switch => $old_ip})
|
||
->update({switch => $new_ip});
|
||
diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
index d7b73d53..859e9380 100644
|
||
--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
+++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
@@ -13,11 +13,11 @@ use Try::Tiny;
|
||
use base 'Exporter';
|
||
our @EXPORT = ();
|
||
our @EXPORT_OK = qw/
|
||
+ jq_warm_thrusters
|
||
jq_getsome
|
||
jq_getsomep
|
||
jq_locked
|
||
jq_queued
|
||
- jq_warm_thrusters
|
||
jq_lock
|
||
jq_defer
|
||
jq_complete
|
||
@@ -28,6 +28,47 @@ our @EXPORT_OK = qw/
|
||
/;
|
||
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
|
||
|
||
+# given a device, tests if any of the primary acls applies
|
||
+# returns a list of job actions to be denied/skipped on this host.
|
||
+sub _get_denied_actions {
|
||
+ my $device = shift;
|
||
+ my @badactions = ();
|
||
+ return @badactions unless $device;
|
||
+
|
||
+ push @badactions, ('discover', @{ setting('job_prio')->{high} })
|
||
+ if not is_discoverable($device);
|
||
+
|
||
+ push @badactions, (qw/macsuck nbtstat/)
|
||
+ if not is_macsuckable($device);
|
||
+
|
||
+ push @badactions, 'arpnip'
|
||
+ if not is_arpnipable($device);
|
||
+
|
||
+ return @badactions;
|
||
+}
|
||
+
|
||
+sub jq_warm_thrusters {
|
||
+ my @devices = schema('netdisco')->resultset('Device')->all;
|
||
+ my $rs = schema('netdisco')->resultset('DeviceSkip');
|
||
+ my %actionset = ();
|
||
+
|
||
+ foreach my $d (@devices) {
|
||
+ my @badactions = _get_denied_actions($d);
|
||
+ $actionset{$d->ip} = \@badactions if scalar @badactions;
|
||
+ }
|
||
+
|
||
+ schema('netdisco')->txn_do(sub {
|
||
+ $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete;
|
||
+ $rs->populate([
|
||
+ map {{
|
||
+ backend => setting('workers')->{'BACKEND'},
|
||
+ device => $_,
|
||
+ actionset => $actionset{$_},
|
||
+ }} keys %actionset
|
||
+ ]);
|
||
+ });
|
||
+}
|
||
+
|
||
sub _getsome {
|
||
my ($num_slots, $where) = @_;
|
||
return () if ((!defined $num_slots) or ($num_slots < 1));
|
||
@@ -46,8 +87,56 @@ sub _getsome {
|
||
|
||
my @returned = ();
|
||
while (my $job = $rs->next) {
|
||
- push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
||
+ if ($job->device) {
|
||
+ # need to handle device discovered since backend daemon started
|
||
+ # and the skiplist was primed. these should be checked against
|
||
+ # the various acls and have device_skip entry added if needed,
|
||
+ # and return false if it should have been skipped.
|
||
+ my @badactions = _get_denied_actions($job->device);
|
||
+ if (scalar @badactions) {
|
||
+ schema('netdisco')->resultset('DeviceSkip')->find_or_create({
|
||
+ backend => setting('workers')->{'BACKEND'}, device => $job->device,
|
||
+ },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
|
||
+
|
||
+ # will now not be selected in a future _getsome()
|
||
+ next if scalar grep {$_ eq $job->action} @badactions;
|
||
+ }
|
||
+ }
|
||
+
|
||
+ # remove any duplicate jobs, incuding possibly this job if there
|
||
+ # is already an equivalent job running
|
||
+
|
||
+ my %job_properties = (
|
||
+ action => $job->action,
|
||
+ port => $job->port,
|
||
+ subaction => $job->subaction,
|
||
+ -or => [
|
||
+ { device => $job->device },
|
||
+ ($job->device_key ? ({ device_key => $job->device_key }) : ()),
|
||
+ ],
|
||
+ );
|
||
+
|
||
+ my $gone = $jobs->search({
|
||
+ status => 'queued',
|
||
+ -and => [
|
||
+ %job_properties,
|
||
+ -or => [{
|
||
+ job => { '!=' => $job->id },
|
||
+ },{
|
||
+ job => $job->id,
|
||
+ -exists => $jobs->search({
|
||
+ status => { -like => 'queued-%' },
|
||
+ %job_properties,
|
||
+ })->as_query,
|
||
+ }],
|
||
+ ],
|
||
+ }, {for => 'update'})
|
||
+ ->update({ status => 'error', log => (sprintf 'duplicate of %s', $job->id) });
|
||
+
|
||
+ debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
|
||
+ push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
|
||
}
|
||
+
|
||
return @returned;
|
||
}
|
||
|
||
@@ -89,90 +178,17 @@ sub jq_queued {
|
||
})->get_column('device')->all;
|
||
}
|
||
|
||
-# given a device, tests if any of the primary acls applies
|
||
-# returns a list of job actions to be denied/skipped on this host.
|
||
-sub _get_denied_actions {
|
||
- my $device = shift;
|
||
- my @badactions = ();
|
||
- return @badactions unless $device;
|
||
-
|
||
- push @badactions, ('discover', @{ setting('job_prio')->{high} })
|
||
- if not is_discoverable($device);
|
||
-
|
||
- push @badactions, (qw/macsuck nbtstat/)
|
||
- if not is_macsuckable($device);
|
||
-
|
||
- push @badactions, 'arpnip'
|
||
- if not is_arpnipable($device);
|
||
-
|
||
- return @badactions;
|
||
-}
|
||
-
|
||
-sub jq_warm_thrusters {
|
||
- my @devices = schema('netdisco')->resultset('Device')->all;
|
||
- my $rs = schema('netdisco')->resultset('DeviceSkip');
|
||
- my %actionset = ();
|
||
-
|
||
- foreach my $d (@devices) {
|
||
- my @badactions = _get_denied_actions($d);
|
||
- $actionset{$d->ip} = \@badactions if scalar @badactions;
|
||
- }
|
||
-
|
||
- schema('netdisco')->txn_do(sub {
|
||
- $rs->search({ backend => setting('workers')->{'BACKEND'} })->delete;
|
||
- $rs->populate([
|
||
- map {{
|
||
- backend => setting('workers')->{'BACKEND'},
|
||
- device => $_,
|
||
- actionset => $actionset{$_},
|
||
- }} keys %actionset
|
||
- ]);
|
||
- });
|
||
-}
|
||
-
|
||
sub jq_lock {
|
||
my $job = shift;
|
||
my $happy = false;
|
||
|
||
- if ($job->device) {
|
||
- # need to handle device discovered since backend daemon started
|
||
- # and the skiplist was primed. these should be checked against
|
||
- # the various acls and have device_skip entry added if needed,
|
||
- # and return false if it should have been skipped.
|
||
- my @badactions = _get_denied_actions($job->device);
|
||
- if (scalar @badactions) {
|
||
- schema('netdisco')->resultset('DeviceSkip')->find_or_create({
|
||
- backend => setting('workers')->{'BACKEND'}, device => $job->device,
|
||
- },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
|
||
-
|
||
- return false if scalar grep {$_ eq $job->action} @badactions;
|
||
- }
|
||
- }
|
||
-
|
||
# lock db row and update to show job has been picked
|
||
try {
|
||
- schema('netdisco')->txn_do(sub {
|
||
- schema('netdisco')->resultset('Admin')
|
||
- ->search({ job => $job->id }, { for => 'update' })
|
||
- ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
|
||
-
|
||
- return unless
|
||
- schema('netdisco')->resultset('Admin')
|
||
- ->count({ job => $job->id,
|
||
- status => ('queued-'. setting('workers')->{'BACKEND'}) });
|
||
-
|
||
- # remove any duplicate jobs, needed because we have race conditions
|
||
- # when queueing jobs of a type for all devices
|
||
- schema('netdisco')->resultset('Admin')->search({
|
||
- status => 'queued',
|
||
- device => $job->device,
|
||
- port => $job->port,
|
||
- action => $job->action,
|
||
- subaction => $job->subaction,
|
||
- }, {for => 'update'})->delete();
|
||
-
|
||
- $happy = true;
|
||
- });
|
||
+ my $updated = schema('netdisco')->resultset('Admin')
|
||
+ ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
|
||
+ ->update({ status => ('queued-'. setting('workers')->{'BACKEND'}) });
|
||
+
|
||
+ $happy = true if $updated > 0;
|
||
}
|
||
catch {
|
||
error $_;
|
||
@@ -243,6 +259,7 @@ sub jq_complete {
|
||
$happy = true;
|
||
}
|
||
catch {
|
||
+ # use DDP; p $job;
|
||
error $_;
|
||
};
|
||
|
||
@@ -274,13 +291,14 @@ sub jq_insert {
|
||
schema('netdisco')->txn_do(sub {
|
||
schema('netdisco')->resultset('Admin')->populate([
|
||
map {{
|
||
- device => $_->{device},
|
||
- port => $_->{port},
|
||
- action => $_->{action},
|
||
- subaction => ($_->{extra} || $_->{subaction}),
|
||
- username => $_->{username},
|
||
- userip => $_->{userip},
|
||
- status => 'queued',
|
||
+ device => $_->{device},
|
||
+ device_key => $_->{device_key},
|
||
+ port => $_->{port},
|
||
+ action => $_->{action},
|
||
+ subaction => ($_->{extra} || $_->{subaction}),
|
||
+ username => $_->{username},
|
||
+ userip => $_->{userip},
|
||
+ status => 'queued',
|
||
}} @$jobs
|
||
]);
|
||
});
|
||
diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
|
||
index 105f4185..c6335187 100644
|
||
--- a/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
|
||
+++ b/lib/App/Netdisco/Worker/Plugin/Discover/CanonicalIP.pm
|
||
@@ -72,6 +72,10 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
|
||
$device->renumber($new_ip)
|
||
or die "cannot renumber to: $new_ip"; # rollback
|
||
|
||
+ # is not done in renumber but required otherwise confusing at job end!
|
||
+ schema('netdisco')->resultset('Admin')
|
||
+ ->find({job => $job->id})->update({device => $new_ip});
|
||
+
|
||
return Status->noop(sprintf ' [%s] device - changed IP to %s (%s)',
|
||
$old_ip, $device->ip, ($device->dns || ''));
|
||
});
|
||
diff --git a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
|
||
index aecec0e9..a3b6b20a 100644
|
||
--- a/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
|
||
+++ b/lib/App/Netdisco/Worker/Plugin/Discover/Neighbors.pm
|
||
@@ -39,11 +39,13 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
|
||
or return Status->defer("discover failed: could not SNMP connect to $device");
|
||
|
||
my @to_discover = store_neighbors($device);
|
||
+ my %seen_id = ();
|
||
|
||
# only enqueue if device is not already discovered,
|
||
# discover_* config permits the discovery
|
||
foreach my $neighbor (@to_discover) {
|
||
- my ($ip, $remote_type) = @$neighbor;
|
||
+ my ($ip, $remote_type, $remote_id) = @$neighbor;
|
||
+ next if $remote_id and $seen_id{ $remote_id }++;
|
||
|
||
my $device = get_device($ip);
|
||
next if $device->in_storage;
|
||
@@ -55,10 +57,14 @@ register_worker({ phase => 'main', driver => 'snmp' }, sub {
|
||
next;
|
||
}
|
||
|
||
+ # risk of things going wrong...?
|
||
+ # https://quickview.cloudapps.cisco.com/quickview/bug/CSCur12254
|
||
+
|
||
jq_insert({
|
||
device => $ip,
|
||
action => 'discover',
|
||
subaction => 'with-nodes',
|
||
+ ($remote_id ? (device_key => $remote_id) : ()),
|
||
});
|
||
}
|
||
});
|
||
@@ -171,7 +177,7 @@ sub store_neighbors {
|
||
# useable remote IP...
|
||
|
||
if ($remote_ip eq '0.0.0.0' or
|
||
- check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) {
|
||
+ check_acl_no($remote_ip, 'group:__LOCAL_ADDRESSES__')) {
|
||
|
||
if ($remote_id) {
|
||
my $devices = schema('netdisco')->resultset('Device');
|
||
@@ -228,7 +234,7 @@ sub store_neighbors {
|
||
debug sprintf
|
||
' [%s] neigh - adding neighbor %s, type [%s], on %s to discovery queue',
|
||
$device->ip, $remote_ip, ($remote_type || ''), $port;
|
||
- push @to_discover, [$remote_ip, $remote_type];
|
||
+ push @to_discover, [$remote_ip, $remote_type, $remote_id];
|
||
|
||
$remote_port = $c_port->{$entry};
|
||
if (defined $remote_port) {
|
||
diff --git a/lib/App/Netdisco/Worker/Plugin/Expire.pm b/lib/App/Netdisco/Worker/Plugin/Expire.pm
|
||
index 28c93cc4..0625338d 100644
|
||
--- a/lib/App/Netdisco/Worker/Plugin/Expire.pm
|
||
+++ b/lib/App/Netdisco/Worker/Plugin/Expire.pm
|
||
@@ -6,6 +6,7 @@ use aliased 'App::Netdisco::Worker::Status';
|
||
|
||
use Dancer::Plugin::DBIC 'schema';
|
||
use App::Netdisco::Util::Statistics 'update_stats';
|
||
+use App::Netdisco::DB::ExplicitLocking ':modes';
|
||
|
||
register_worker({ phase => 'main' }, sub {
|
||
my ($job, $workerconf) = @_;
|
||
@@ -40,7 +41,7 @@ register_worker({ phase => 'main' }, sub {
|
||
}
|
||
|
||
if (setting('expire_jobs') and setting('expire_jobs') > 0) {
|
||
- schema('netdisco')->txn_do(sub {
|
||
+ schema('netdisco')->txn_do_locked('admin', 'EXCLUSIVE', sub {
|
||
schema('netdisco')->resultset('Admin')->search({
|
||
entered => \[q/< (now() - ?::interval)/,
|
||
(setting('expire_jobs') * 86400)],
|
||
diff --git a/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql
|
||
new file mode 100644
|
||
index 00000000..31e258e8
|
||
--- /dev/null
|
||
+++ b/share/schema_versions/App-Netdisco-DB-44-45-PostgreSQL.sql
|
||
@@ -0,0 +1,5 @@
|
||
+BEGIN;
|
||
+
|
||
+ALTER TABLE "admin" ADD "device_key" text;
|
||
+
|
||
+COMMIT;
|
||
|
||
commit 3db242cbe868e0672cba3b8ba1756e55cf46980c
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Thu Nov 23 22:16:50 2017 +0000
|
||
|
||
support action::namespace for netdisco-do
|
||
|
||
diff --git a/bin/netdisco-do b/bin/netdisco-do
|
||
index ab60f98e..859de29d 100755
|
||
--- a/bin/netdisco-do
|
||
+++ b/bin/netdisco-do
|
||
@@ -109,7 +109,7 @@ my $exitstatus = 0;
|
||
|
||
foreach my $host (@hostlist) {
|
||
my $dev = $host ? get_device($host->addr) : undef;
|
||
- if ($dev and not (blessed $dev and $dev->in_storage) and $action ne 'discover') {
|
||
+ if ($dev and not (blessed $dev and $dev->in_storage) and $action !~ m/^discover/) {
|
||
info sprintf "%s: error - Don't know device: %s", $action, $host->addr;
|
||
next;
|
||
}
|
||
@@ -139,7 +139,7 @@ foreach my $host (@hostlist) {
|
||
$job->log("error running job: $_");
|
||
};
|
||
|
||
- if ($job->log eq 'failed to report from any worker!') {
|
||
+ if ($job->log eq 'failed to report from any worker!' and not $job->only_namespace) {
|
||
pod2usage(
|
||
-msg => (sprintf 'error: %s is not a valid action', $action),
|
||
-verbose => 2,
|
||
diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
|
||
index 72850e8f..ddbfd034 100644
|
||
--- a/lib/App/Netdisco/Backend/Job.pm
|
||
+++ b/lib/App/Netdisco/Backend/Job.pm
|
||
@@ -14,6 +14,7 @@ foreach my $slot (qw/
|
||
device
|
||
port
|
||
action
|
||
+ only_namespace
|
||
subaction
|
||
status
|
||
username
|
||
@@ -36,6 +37,15 @@ has '_statuslist' => (
|
||
default => sub { [] },
|
||
);
|
||
|
||
+sub BUILD {
|
||
+ my ($job, $args) = @_;
|
||
+
|
||
+ if ($job->action =~ m/^(\w+)::(\w+)$/i) {
|
||
+ $job->action($1);
|
||
+ $job->only_namespace($2);
|
||
+ }
|
||
+}
|
||
+
|
||
=head1 METHODS
|
||
|
||
=head2 summary
|
||
diff --git a/lib/App/Netdisco/Worker/Plugin.pm b/lib/App/Netdisco/Worker/Plugin.pm
|
||
index 68440f52..746d8de0 100644
|
||
--- a/lib/App/Netdisco/Worker/Plugin.pm
|
||
+++ b/lib/App/Netdisco/Worker/Plugin.pm
|
||
@@ -36,6 +36,13 @@ register 'register_worker' => sub {
|
||
# check to see if this namespace has already passed at higher priority
|
||
return if $job->namespace_passed($workerconf);
|
||
|
||
+ # support part-actions via action::namespace
|
||
+ if ($job->only_namespace and $workerconf->{phase} ne 'check') {
|
||
+ return unless $workerconf->{namespace} eq lc( $job->only_namespace )
|
||
+ or (($workerconf->{phase} eq 'early')
|
||
+ and ($job->device and not $job->device->in_storage));
|
||
+ }
|
||
+
|
||
my @newuserconf = ();
|
||
my @userconf = @{ setting('device_auth') || [] };
|
||
|
||
|
||
commit de594c647ff3e8d43afa69a1ce1bdfc54442e5c0 (HEAD -> master, origin/master, origin/HEAD)
|
||
Author: Oliver Gorwits <oliver@cpan.org>
|
||
Date: Fri Nov 24 06:31:34 2017 +0000
|
||
|
||
single DB poll for new jobs both high and normal priority
|
||
|
||
diff --git a/lib/App/Netdisco/Backend/Job.pm b/lib/App/Netdisco/Backend/Job.pm
|
||
index ddbfd034..9eef998e 100644
|
||
--- a/lib/App/Netdisco/Backend/Job.pm
|
||
+++ b/lib/App/Netdisco/Backend/Job.pm
|
||
@@ -21,6 +21,7 @@ foreach my $slot (qw/
|
||
userip
|
||
log
|
||
device_key
|
||
+ job_priority
|
||
|
||
_current_phase
|
||
_last_namespace
|
||
diff --git a/lib/App/Netdisco/Backend/Role/Manager.pm b/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
index d594df4d..3c6e7e00 100644
|
||
--- a/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
+++ b/lib/App/Netdisco/Backend/Role/Manager.pm
|
||
@@ -6,7 +6,7 @@ use List::Util 'sum';
|
||
use App::Netdisco::Util::MCE;
|
||
|
||
use App::Netdisco::JobQueue
|
||
- qw/jq_locked jq_getsome jq_getsomep jq_lock jq_warm_thrusters/;
|
||
+ qw/jq_locked jq_getsome jq_lock jq_warm_thrusters/;
|
||
|
||
use Role::Tiny;
|
||
use namespace::clean;
|
||
@@ -60,28 +60,8 @@ sub worker_body {
|
||
|
||
$num_slots = parse_max_workers( setting('workers')->{tasks} )
|
||
- $self->{queue}->pending();
|
||
- debug "mgr ($wid): getting potential jobs for $num_slots workers (HP)";
|
||
+ debug "mgr ($wid): getting potential jobs for $num_slots workers";
|
||
|
||
- # get some high priority jobs
|
||
- # TODO also check for stale jobs in Netdisco DB
|
||
- foreach my $job ( jq_getsomep($num_slots) ) {
|
||
- next if $seen_job{ $memoize->($job) }++;
|
||
-
|
||
- # mark job as running
|
||
- next unless jq_lock($job);
|
||
- info sprintf "mgr (%s): job %s booked out for this processing node",
|
||
- $wid, $job->id;
|
||
-
|
||
- # copy job to local queue
|
||
- $self->{queue}->enqueuep(100, $job);
|
||
- }
|
||
-
|
||
- $num_slots = parse_max_workers( setting('workers')->{tasks} )
|
||
- - $self->{queue}->pending();
|
||
- debug "mgr ($wid): getting potential jobs for $num_slots workers (NP)";
|
||
-
|
||
- # get some normal priority jobs
|
||
- # TODO also check for stale jobs in Netdisco DB
|
||
foreach my $job ( jq_getsome($num_slots) ) {
|
||
next if $seen_job{ $memoize->($job) }++;
|
||
|
||
@@ -91,7 +71,7 @@ sub worker_body {
|
||
$wid, $job->id;
|
||
|
||
# copy job to local queue
|
||
- $self->{queue}->enqueue($job);
|
||
+ $self->{queue}->enqueuep($job->job_priority, $job);
|
||
}
|
||
|
||
#if (scalar grep {$_ > 1} values %seen_job) {
|
||
diff --git a/lib/App/Netdisco/DB/ResultSet.pm b/lib/App/Netdisco/DB/ResultSet.pm
|
||
index 953c8e80..22c25cf8 100644
|
||
--- a/lib/App/Netdisco/DB/ResultSet.pm
|
||
+++ b/lib/App/Netdisco/DB/ResultSet.pm
|
||
@@ -6,7 +6,7 @@ use warnings;
|
||
use base 'DBIx::Class::ResultSet';
|
||
|
||
__PACKAGE__->load_components(qw/
|
||
- Helper::ResultSet::SetOperations
|
||
+ +App::Netdisco::DB::SetOperations
|
||
Helper::ResultSet::Shortcut
|
||
Helper::ResultSet::CorrelateRelationship
|
||
/);
|
||
diff --git a/lib/App/Netdisco/DB/SetOperations.pm b/lib/App/Netdisco/DB/SetOperations.pm
|
||
new file mode 100644
|
||
index 00000000..fef5efb4
|
||
--- /dev/null
|
||
+++ b/lib/App/Netdisco/DB/SetOperations.pm
|
||
@@ -0,0 +1,50 @@
|
||
+package App::Netdisco::DB::SetOperations;
|
||
+
|
||
+use strict;
|
||
+use warnings;
|
||
+
|
||
+use parent 'DBIx::Class::Helper::ResultSet::SetOperations';
|
||
+
|
||
+sub _set_operation {
|
||
+ my ( $self, $operation, $other ) = @_;
|
||
+
|
||
+ my @sql;
|
||
+ my @params;
|
||
+
|
||
+ my $as = $self->_resolved_attrs->{as};
|
||
+
|
||
+ my @operands = ( $self, ref $other eq 'ARRAY' ? @$other : $other );
|
||
+
|
||
+ for (@operands) {
|
||
+ $self->throw_exception("ResultClass of ResultSets do not match!")
|
||
+ unless $self->result_class eq $_->result_class;
|
||
+
|
||
+ my $attrs = $_->_resolved_attrs;
|
||
+
|
||
+ $self->throw_exception('ResultSets do not all have the same selected columns!')
|
||
+ unless $self->_compare_arrays($as, $attrs->{as});
|
||
+
|
||
+ my ($sql, @bind) = @{${$_->as_query}};
|
||
+ # $sql =~ s/^\s*\((.*)\)\s*$/$1/;
|
||
+ $sql = q<(> . $sql . q<)>;
|
||
+
|
||
+ push @sql, $sql;
|
||
+ push @params, @bind;
|
||
+ }
|
||
+
|
||
+ my $query = q<(> . join(" $operation ", @sql). q<)>;
|
||
+
|
||
+ my $attrs = $self->_resolved_attrs;
|
||
+ return $self->result_source->resultset->search(undef, {
|
||
+ alias => $self->current_source_alias,
|
||
+ from => [{
|
||
+ $self->current_source_alias => \[ $query, @params ],
|
||
+ -alias => $self->current_source_alias,
|
||
+ -source_handle => $self->result_source->handle,
|
||
+ }],
|
||
+ columns => $attrs->{as},
|
||
+ result_class => $self->result_class,
|
||
+ });
|
||
+}
|
||
+
|
||
+1;
|
||
diff --git a/lib/App/Netdisco/JobQueue.pm b/lib/App/Netdisco/JobQueue.pm
|
||
index 875ec468..733cf4bb 100644
|
||
--- a/lib/App/Netdisco/JobQueue.pm
|
||
+++ b/lib/App/Netdisco/JobQueue.pm
|
||
@@ -9,16 +9,15 @@ Module::Load::load
|
||
use base 'Exporter';
|
||
our @EXPORT = ();
|
||
our @EXPORT_OK = qw/
|
||
+ jq_warm_thrusters
|
||
jq_getsome
|
||
- jq_getsomep
|
||
jq_locked
|
||
jq_queued
|
||
- jq_warm_thrusters
|
||
- jq_log
|
||
- jq_userlog
|
||
jq_lock
|
||
jq_defer
|
||
jq_complete
|
||
+ jq_log
|
||
+ jq_userlog
|
||
jq_insert
|
||
jq_delete
|
||
/;
|
||
@@ -43,10 +42,6 @@ Returns a list of randomly selected queued jobs. Default is to return one job,
|
||
unless C<$num> is provided. Jobs are returned as objects which implement the
|
||
Netdisco job instance interface (see below).
|
||
|
||
-=head2 jq_getsomep( $num? )
|
||
-
|
||
-Same as C<jq_getsome> but for high priority jobs.
|
||
-
|
||
=head2 jq_locked()
|
||
|
||
Returns the list of jobs currently booked out to this processing node (denoted
|
||
diff --git a/lib/App/Netdisco/JobQueue/PostgreSQL.pm b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
index 859e9380..80405701 100644
|
||
--- a/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
+++ b/lib/App/Netdisco/JobQueue/PostgreSQL.pm
|
||
@@ -15,7 +15,6 @@ our @EXPORT = ();
|
||
our @EXPORT_OK = qw/
|
||
jq_warm_thrusters
|
||
jq_getsome
|
||
- jq_getsomep
|
||
jq_locked
|
||
jq_queued
|
||
jq_lock
|
||
@@ -69,23 +68,49 @@ sub jq_warm_thrusters {
|
||
});
|
||
}
|
||
|
||
-sub _getsome {
|
||
- my ($num_slots, $where) = @_;
|
||
- return () if ((!defined $num_slots) or ($num_slots < 1));
|
||
- return () if ((!defined $where) or (ref {} ne ref $where));
|
||
+sub jq_getsome {
|
||
+ my $num_slots = shift;
|
||
+ return () unless $num_slots and $num_slots > 0;
|
||
|
||
my $jobs = schema('netdisco')->resultset('Admin');
|
||
- my $rs = $jobs->search({
|
||
+ my @returned = ();
|
||
+
|
||
+ my %jobsearch = (
|
||
status => 'queued',
|
||
device => { '-not_in' =>
|
||
$jobs->skipped(setting('workers')->{'BACKEND'},
|
||
setting('workers')->{'max_deferrals'},
|
||
setting('workers')->{'retry_after'})
|
||
->columns('device')->as_query },
|
||
- %$where,
|
||
- }, { order_by => 'random()', rows => $num_slots });
|
||
+ );
|
||
+ my %randoms = (order_by => 'random()', rows => $num_slots );
|
||
+
|
||
+ my $hiprio = $jobs->search({
|
||
+ %jobsearch,
|
||
+ -or => [{
|
||
+ username => { '!=' => undef },
|
||
+ action => { -in => setting('job_prio')->{'normal'} },
|
||
+ },{
|
||
+ action => { -in => setting('job_prio')->{'high'} },
|
||
+ }],
|
||
+ }, {
|
||
+ %randoms,
|
||
+ '+select' => [\'100 as job_priority'], '+as' => ['me.job_priority'],
|
||
+ });
|
||
+
|
||
+ my $loprio = $jobs->search({
|
||
+ %jobsearch,
|
||
+ action => { -in => setting('job_prio')->{'normal'} },
|
||
+ }, {
|
||
+ %randoms,
|
||
+ '+select' => [\'0 as job_priority'], '+as' => ['me.job_priority'],
|
||
+ });
|
||
+
|
||
+ my $rs = $hiprio->union($loprio)->search(undef, {
|
||
+ order_by => { '-desc' => 'job_priority' },
|
||
+ rows => $num_slots,
|
||
+ });
|
||
|
||
- my @returned = ();
|
||
while (my $job = $rs->next) {
|
||
if ($job->device) {
|
||
# need to handle device discovered since backend daemon started
|
||
@@ -140,23 +165,6 @@ sub _getsome {
|
||
return @returned;
|
||
}
|
||
|
||
-sub jq_getsome {
|
||
- return _getsome(shift,
|
||
- { action => { -in => setting('job_prio')->{'normal'} } }
|
||
- );
|
||
-}
|
||
-
|
||
-sub jq_getsomep {
|
||
- return _getsome(shift, {
|
||
- -or => [{
|
||
- username => { '!=' => undef },
|
||
- action => { -in => setting('job_prio')->{'normal'} },
|
||
- },{
|
||
- action => { -in => setting('job_prio')->{'high'} },
|
||
- }],
|
||
- });
|
||
-}
|
||
-
|
||
sub jq_locked {
|
||
my @returned = ();
|
||
my $rs = schema('netdisco')->resultset('Admin')
|