Use standard deviation in Phi calculation to account for GC pauses and network delays

Change-Id: I17cd255ad8d661b531500e50743778a16eeb5fd2
(cherry picked from commit b75aa082b520663c7f2b7e551e2565402e9d1600)
This commit is contained in:
Jordan Halterman 2018-05-14 15:00:42 -07:00 committed by Ray Milkey
parent 8f0021b0d9
commit 4a082ec0de
2 changed files with 69 additions and 31 deletions

View File

@ -94,6 +94,11 @@ public class DistributedClusterStore
label = "the value of Phi threshold to detect accrual failure")
private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
private static final long DEFAULT_MIN_STANDARD_DEVIATION_MILLIS = 50;
@Property(name = "minStandardDeviationMillis", longValue = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS,
label = "The minimum standard deviation to take into account when computing the Phi value")
private long minStandardDeviationMillis = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS;
private static final Serializer SERIALIZER = Serializer.using(
KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@ -168,7 +173,7 @@ public class DistributedClusterStore
messagingService.registerHandler(HEARTBEAT_MESSAGE,
new HeartbeatMessageHandler(), heartBeatMessageHandler);
failureDetector = new PhiAccrualFailureDetector();
failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
heartbeatInterval, TimeUnit.MILLISECONDS);
@ -405,6 +410,21 @@ public class DistributedClusterStore
phiFailureThreshold);
}
}
if ("minStandardDeviationMillis".equals(property.name())) {
String s = property.value();
if (s == null) {
setMinStandardDeviationMillis(DEFAULT_MIN_STANDARD_DEVIATION_MILLIS);
log.info("Minimum standard deviation is not configured, default value is {}",
DEFAULT_MIN_STANDARD_DEVIATION_MILLIS);
} else {
long newMinStandardDeviationMillis = isNullOrEmpty(s)
? DEFAULT_MIN_STANDARD_DEVIATION_MILLIS
: Long.parseLong(s.trim());
setMinStandardDeviationMillis(newMinStandardDeviationMillis);
log.info("Configured. Minimum standard deviation is configured to {}",
newMinStandardDeviationMillis);
}
}
}
}
@ -434,6 +454,22 @@ public class DistributedClusterStore
phiFailureThreshold = threshold;
}
/**
* Sets the minimum standard deviation milliseconds.
*
* @param minStandardDeviationMillis the updated minimum standard deviation
*/
private void setMinStandardDeviationMillis(long minStandardDeviationMillis) {
this.minStandardDeviationMillis = minStandardDeviationMillis;
try {
failureDetector = new PhiAccrualFailureDetector(minStandardDeviationMillis);
} catch (IllegalArgumentException e) {
log.warn(e.getMessage());
this.minStandardDeviationMillis = DEFAULT_MIN_STANDARD_DEVIATION_MILLIS;
failureDetector = new PhiAccrualFailureDetector(this.minStandardDeviationMillis);
}
}
/**
* Restarts heartbeatSender executor.
*/

View File

@ -36,17 +36,31 @@ public class PhiAccrualFailureDetector {
// Default value
private static final int DEFAULT_WINDOW_SIZE = 250;
private static final int DEFAULT_MIN_SAMPLES = 25;
private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);
private static final long DEFAULT_MIN_STANDARD_DEVIATION_MILLIS = 50;
// If a node does not have any heartbeats, this is the phi
// value to report. Indicates the node is inactive (from the
// detectors perspective.
private static final double DEFAULT_BOOTSTRAP_PHI_VALUE = 100.0;
private final int minSamples;
private final long minStandardDeviationMillis;
private final double bootstrapPhiValue = DEFAULT_BOOTSTRAP_PHI_VALUE;
private int minSamples = DEFAULT_MIN_SAMPLES;
private double phiFactor = DEFAULT_PHI_FACTOR;
private double bootstrapPhiValue = DEFAULT_BOOTSTRAP_PHI_VALUE;
public PhiAccrualFailureDetector() {
this(DEFAULT_MIN_SAMPLES, DEFAULT_MIN_STANDARD_DEVIATION_MILLIS);
}
public PhiAccrualFailureDetector(long minStandardDeviationMillis) {
this(DEFAULT_MIN_SAMPLES, minStandardDeviationMillis);
}
public PhiAccrualFailureDetector(int minSamples, long minStandardDeviationMillis) {
checkArgument(minSamples > 0, "minSamples must be positive");
checkArgument(minStandardDeviationMillis > 0, "minStandardDeviationMillis must be positive");
this.minSamples = minSamples;
this.minStandardDeviationMillis = minStandardDeviationMillis;
}
/**
* Returns the last heartbeat time for the given node.
@ -75,8 +89,7 @@ public class PhiAccrualFailureDetector {
public void report(NodeId nodeId, long arrivalTime) {
checkNotNull(nodeId, "NodeId must not be null");
checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");
History nodeState =
states.computeIfAbsent(nodeId, key -> new History());
History nodeState = states.computeIfAbsent(nodeId, key -> new History());
synchronized (nodeState) {
long latestHeartbeat = nodeState.latestHeartbeatTime();
if (latestHeartbeat != -1) {
@ -117,41 +130,30 @@ public class PhiAccrualFailureDetector {
}
private double computePhi(DescriptiveStatistics samples, long tLast, long tNow) {
long size = samples.getN();
long t = tNow - tLast;
return (size > 0)
? phiFactor * t / samples.getMean()
: bootstrapPhiValue;
long elapsedTime = tNow - tLast;
double meanMillis = samples.getMean();
double y = (elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), minStandardDeviationMillis);
double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
if (elapsedTime > meanMillis) {
return -Math.log10(e / (1.0 + e));
} else {
return -Math.log10(1.0 - 1.0 / (1.0 + e));
}
}
private void setMinSamples(int samples) {
minSamples = samples;
}
private void setPhiFactor(double factor) {
phiFactor = factor;
}
private void setBootstrapPhiValue(double phiValue) {
bootstrapPhiValue = phiValue;
}
private static class History {
DescriptiveStatistics samples =
new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
DescriptiveStatistics samples = new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
long lastHeartbeatTime = -1;
public DescriptiveStatistics samples() {
DescriptiveStatistics samples() {
return samples;
}
public long latestHeartbeatTime() {
long latestHeartbeatTime() {
return lastHeartbeatTime;
}
public void setLatestHeartbeatTime(long value) {
void setLatestHeartbeatTime(long value) {
lastHeartbeatTime = value;
}
}