java/com.sap.sailing.landscape/src/com/sap/sailing/landscape/SailingAnalyticsProcess.java
... ...
@@ -2,6 +2,7 @@ package com.sap.sailing.landscape;
2 2
3 3
import java.io.IOException;
4 4
import java.util.Optional;
5
+import java.util.concurrent.TimeoutException;
5 6
import java.util.logging.Logger;
6 7
7 8
import com.jcraft.jsch.JSchException;
... ...
@@ -36,4 +37,10 @@ public interface SailingAnalyticsProcess<ShardingKey> extends AwsApplicationProc
36 37
*/
37 38
void refreshToRelease(Release release, Optional<String> optionalKeyName, byte[] privateKeyEncryptionPassphrase)
38 39
throws IOException, InterruptedException, JSchException, Exception;
40
+
41
+ double getLastMinuteSystemLoadAverage(Optional<Duration> optionalTimeout) throws TimeoutException, Exception;
42
+
43
+ int getDefaultBackgroundThreadPoolExecutorQueueSize(Optional<Duration> optionalTimeout) throws TimeoutException, Exception;
44
+
45
+ int getDefaultForegroundThreadPoolExecutorQueueSize(Optional<Duration> optionalTimeout) throws TimeoutException, Exception;
39 46
}
java/com.sap.sailing.landscape/src/com/sap/sailing/landscape/impl/ArchiveCandidateMonitoringBackgroundTask.java
... ...
@@ -0,0 +1,151 @@
1
+package com.sap.sailing.landscape.impl;
2
+
3
+import java.io.IOException;
4
+import java.util.Arrays;
5
+import java.util.Iterator;
6
+import java.util.LinkedList;
7
+import java.util.List;
8
+import java.util.Optional;
9
+import java.util.concurrent.ScheduledExecutorService;
10
+import java.util.concurrent.TimeUnit;
11
+import java.util.concurrent.TimeoutException;
12
+import java.util.logging.Logger;
13
+
14
+import org.apache.shiro.subject.Subject;
15
+
16
+import com.sap.sailing.landscape.SailingAnalyticsMetrics;
17
+import com.sap.sailing.landscape.SailingAnalyticsProcess;
18
+import com.sap.sse.common.Duration;
19
+import com.sap.sse.common.TimePoint;
20
+import com.sap.sse.landscape.Landscape;
21
+import com.sap.sse.landscape.RotatingFileBasedLog;
22
+import com.sap.sse.landscape.aws.AwsApplicationReplicaSet;
23
+import com.sap.sse.landscape.aws.AwsLandscape;
24
+import com.sap.sse.landscape.aws.ReverseProxy;
25
+
26
+/**
27
+ * A stateful monitoring task that can be {@link #run run} to observe an {@code ARCHIVE} candidate process and wait for
28
+ * it to be ready for comparing its contents with a production {@code ARCHIVE} instance. When run for the first time,
29
+ * the task check the {@code /gwt/status} end point on {@code candidateHostname} to see whether it is already serving
30
+ * requests. If not, it will re-schedule itself after some delay to check again until either the candidate becomes
31
+ * healthy or a timeout is reached.
32
+ * <p>
33
+ *
34
+ * If the candidate was seen serving a {@code /gwt/status} request, this task changes state and now looks at the
35
+ * contents of the status response. Four conditions must be fulfilled for the candidate to be considered ready for
36
+ * comparison:
37
+ *
38
+ * <ol>
39
+ * <li>the overall status must be {@code available: true}.</li>
40
+ * <li>the one-minute system load average must be below 2 (per cent)</li>
41
+ * <li>the default foreground thread pool queue must contain less than 10 tasks</li>
42
+ * <li>the default background thread pool queue must contain less than 10 tasks</li>
43
+ * </ol>
44
+ *
45
+ * When any of these conditions is not fulfilled, the task will re-schedule itself after some delay to check again until
46
+ * either the candidate fulfills all conditions or a timeout is reached.
47
+ * <p>
48
+ *
49
+ * @author Axel Uhl (d043530)
50
+ *
51
+ */
52
+public class ArchiveCandidateMonitoringBackgroundTask implements Runnable {
53
+ @FunctionalInterface
54
+ private static interface BooleanSupplierWithException {
55
+ boolean getAsBoolean() throws Exception;
56
+ }
57
+
58
+ private static final Logger logger = Logger.getLogger(ArchiveCandidateMonitoringBackgroundTask.class.getName());
59
+
60
+ private final static Duration DELAY_BETWEEN_CHECKS = Duration.ONE_MINUTE.times(5);
61
+ private final static double MAXIMUM_ONE_MINUTE_SYSTEM_LOAD_AVERAGE = 2.0;
62
+ private final static int MAXIMUM_THREAD_POOL_QUEUE_SIZE = 10;
63
+ private final static Optional<Duration> TIMEOUT_FIRST_CONTACT = Optional.of(Landscape.WAIT_FOR_PROCESS_TIMEOUT.get().plus(Landscape.WAIT_FOR_HOST_TIMEOUT.get()));
64
+ private final Subject subject;
65
+ private final AwsLandscape<String> landscape;
66
+ private final AwsApplicationReplicaSet<String, SailingAnalyticsMetrics, SailingAnalyticsProcess<String>> replicaSet;
67
+ private final ReverseProxy<String, SailingAnalyticsMetrics, SailingAnalyticsProcess<String>, RotatingFileBasedLog> reverseProxyCluster;
68
+ private final String optionalKeyName;
69
+ private final byte[] privateKeyEncryptionPassphrase;
70
+ private final ScheduledExecutorService executor;
71
+ private final TimePoint firstRun;
72
+ private final List<String> messagesToSendToProcessOwner;
73
+ private Iterable<BooleanSupplierWithException> checks;
74
+ private Iterator<BooleanSupplierWithException> checksIterator;
75
+ private BooleanSupplierWithException currentCheck;
76
+ private boolean candidateSeenServingStatusRequest;
77
+
78
+ public ArchiveCandidateMonitoringBackgroundTask(Subject subject, AwsLandscape<String> landscape,
79
+ AwsApplicationReplicaSet<String, SailingAnalyticsMetrics, SailingAnalyticsProcess<String>> replicaSet,
80
+ String candidateHostname,
81
+ ReverseProxy<String, SailingAnalyticsMetrics, SailingAnalyticsProcess<String>, RotatingFileBasedLog> reverseProxyCluster,
82
+ String optionalKeyName, byte[] privateKeyEncryptionPassphrase, ScheduledExecutorService executor) {
83
+ this.subject = subject;
84
+ this.landscape = landscape;
85
+ this.replicaSet = replicaSet;
86
+ this.reverseProxyCluster = reverseProxyCluster;
87
+ this.optionalKeyName = optionalKeyName;
88
+ this.privateKeyEncryptionPassphrase = privateKeyEncryptionPassphrase;
89
+ this.executor = executor;
90
+ this.firstRun = TimePoint.now();
91
+ this.messagesToSendToProcessOwner = new LinkedList<>();
92
+ this.candidateSeenServingStatusRequest = false;
93
+ this.checks = Arrays.asList(
94
+ this::isReady,
95
+ this::hasLowEnoughSystemLoad,
96
+ this::hasShortEnoughDefaultBackgroundThreadPoolExecutorQueue,
97
+ this::hasShortEnoughDefaultForegroundThreadPoolExecutorQueue,
98
+ this::compareServersWithRestAPI,
99
+ this::compareServersByLeaderboardGroups);
100
+ this.checksIterator = this.checks.iterator();
101
+ this.currentCheck = checksIterator.next();
102
+ }
103
+
104
+ @Override
105
+ public void run() {
106
+ try {
107
+ if (currentCheck.getAsBoolean()) {
108
+ logger.info("Another check passed.");
109
+ // the check passed; proceed to next check, if any
110
+ currentCheck = checksIterator.hasNext() ? checksIterator.next() : null;
111
+ }
112
+ if (currentCheck != null) {
113
+ logger.info("More checks to do; re-scheduling.");
114
+ // re-schedule this task to run next check in a while
115
+ executor.schedule(this, DELAY_BETWEEN_CHECKS.asMillis(), TimeUnit.MILLISECONDS);
116
+ } else {
117
+ logger.info("Done with all checks; candidate is ready for comparison.");
118
+ // all checks passed; candidate is ready for comparison; nothing more to do here
119
+ }
120
+ } catch (Exception e) {
121
+ logger.warning("Exception while running check " + currentCheck + " for candidate " + replicaSet.getMaster().getHost().getHostname() + ": " + e.getMessage());
122
+ }
123
+
124
+ }
125
+
126
+ private Boolean isReady() throws IOException {
127
+ return replicaSet.getMaster().isReady(Landscape.WAIT_FOR_PROCESS_TIMEOUT);
128
+ }
129
+
130
+ private boolean hasLowEnoughSystemLoad() throws TimeoutException, Exception {
131
+ return replicaSet.getMaster().getLastMinuteSystemLoadAverage(Landscape.WAIT_FOR_PROCESS_TIMEOUT) < MAXIMUM_ONE_MINUTE_SYSTEM_LOAD_AVERAGE;
132
+ }
133
+
134
+ private boolean hasShortEnoughDefaultBackgroundThreadPoolExecutorQueue() throws TimeoutException, Exception {
135
+ return replicaSet.getMaster().getDefaultBackgroundThreadPoolExecutorQueueSize(Landscape.WAIT_FOR_PROCESS_TIMEOUT) < MAXIMUM_THREAD_POOL_QUEUE_SIZE;
136
+ }
137
+
138
+ private boolean hasShortEnoughDefaultForegroundThreadPoolExecutorQueue() throws TimeoutException, Exception {
139
+ return replicaSet.getMaster().getDefaultForegroundThreadPoolExecutorQueueSize(Landscape.WAIT_FOR_PROCESS_TIMEOUT) < MAXIMUM_THREAD_POOL_QUEUE_SIZE;
140
+ }
141
+
142
+ private boolean compareServersWithRestAPI() throws Exception {
143
+ // TODO
144
+ return false;
145
+ }
146
+
147
+ private boolean compareServersByLeaderboardGroups() throws Exception {
148
+ // TODO
149
+ return false;
150
+ }
151
+}
java/com.sap.sailing.landscape/src/com/sap/sailing/landscape/impl/LandscapeServiceImpl.java
... ...
@@ -18,12 +18,14 @@ import java.util.Set;
18 18
import java.util.UUID;
19 19
import java.util.concurrent.CompletableFuture;
20 20
import java.util.concurrent.ExecutionException;
21
+import java.util.concurrent.ScheduledExecutorService;
21 22
import java.util.concurrent.TimeoutException;
22 23
import java.util.function.Function;
23 24
import java.util.logging.Level;
24 25
import java.util.logging.Logger;
25 26
26 27
import org.apache.http.client.ClientProtocolException;
28
+import org.apache.shiro.SecurityUtils;
27 29
import org.apache.shiro.authz.AuthorizationException;
28 30
import org.json.simple.parser.ParseException;
29 31
import org.osgi.framework.BundleContext;
... ...
@@ -282,6 +284,11 @@ public class LandscapeServiceImpl implements LandscapeService {
282 284
logger.info("Adding reverse proxy rule for archive candidate with hostname "+ hostname + " and private ip address " + privateIpAdress);
283 285
reverseProxyCluster.setPlainRedirect(hostname, master, Optional.ofNullable(optionalKeyName), privateKeyEncryptionPassphrase);
284 286
sendMailAboutNewArchiveCandidate(replicaSet);
287
+ final ScheduledExecutorService monitorTaskExecutor = ThreadPoolUtil.INSTANCE.getDefaultBackgroundTaskThreadPoolExecutor();
288
+ final ArchiveCandidateMonitoringBackgroundTask monitoringTask = new ArchiveCandidateMonitoringBackgroundTask(
289
+ SecurityUtils.getSubject(), getLandscape(), replicaSet, hostname, reverseProxyCluster, optionalKeyName,
290
+ privateKeyEncryptionPassphrase, monitorTaskExecutor);
291
+ monitorTaskExecutor.execute(monitoringTask);
285 292
return replicaSet;
286 293
}
287 294
java/com.sap.sailing.landscape/src/com/sap/sailing/landscape/impl/SailingAnalyticsProcessImpl.java
... ...
@@ -51,6 +51,9 @@ implements SailingAnalyticsProcess<ShardingKey> {
51 51
private static final String STATUS_SERVERDIRECTORY_PROPERTY_NAME = "serverdirectory";
52 52
private static final String STATUS_RELEASE_PROPERTY_NAME = "release";
53 53
private static final String MONGODB_CONFIGURATION_PROPERTY_NAME = "mongoDbConfiguration";
54
+ private static final String SYSTEM_LOAD_AVERAGE_LAST_MINUTE_NAME = "systemloadaveragelastminute";
55
+ private static final String DEFAULT_BACKGROUND_THREAD_POOL_EXECUTOR_QUEUE_LENGTH_NAME = "defaultbackgroundthreadpoolexecutorqueuelength";
56
+ private static final String DEFAULT_FOREGROUND_THREAD_POOL_EXECUTOR_QUEUE_LENGTH_NAME = "defaultforegroundthreadpoolexecutorqueuelength";
54 57
private Integer expeditionUdpPort;
55 58
private Integer igtimiRiotPort;
56 59
private Release release;
... ...
@@ -127,6 +130,24 @@ implements SailingAnalyticsProcess<ShardingKey> {
127 130
}
128 131
129 132
@Override
133
+ public double getLastMinuteSystemLoadAverage(Optional<Duration> optionalTimeout) throws TimeoutException, Exception {
134
+ final JSONObject status = getStatus(optionalTimeout);
135
+ return ((Number) status.get(SYSTEM_LOAD_AVERAGE_LAST_MINUTE_NAME)).doubleValue();
136
+ }
137
+
138
+ @Override
139
+ public int getDefaultBackgroundThreadPoolExecutorQueueSize(Optional<Duration> optionalTimeout) throws TimeoutException, Exception {
140
+ final JSONObject status = getStatus(optionalTimeout);
141
+ return ((Number) status.get(DEFAULT_BACKGROUND_THREAD_POOL_EXECUTOR_QUEUE_LENGTH_NAME)).intValue();
142
+ }
143
+
144
+ @Override
145
+ public int getDefaultForegroundThreadPoolExecutorQueueSize(Optional<Duration> optionalTimeout) throws TimeoutException, Exception {
146
+ final JSONObject status = getStatus(optionalTimeout);
147
+ return ((Number) status.get(DEFAULT_FOREGROUND_THREAD_POOL_EXECUTOR_QUEUE_LENGTH_NAME)).intValue();
148
+ }
149
+
150
+ @Override
130 151
public Database getDatabaseConfiguration(Region region, Optional<Duration> optionalTimeout,
131 152
Optional<String> optionalKeyName, byte[] privateKeyEncryptionPassphrase) throws Exception {
132 153
final JSONObject mongoDBConfiguration = (JSONObject) getStatus(optionalTimeout).get(MONGODB_CONFIGURATION_PROPERTY_NAME);
java/com.sap.sse.landscape/src/com/sap/sse/landscape/Landscape.java
... ...
@@ -13,10 +13,12 @@ public interface Landscape<ShardingKey> {
13 13
* constant ("image-type"). The tag value then must match what the subclass wants.
14 14
*/
15 15
String IMAGE_TYPE_TAG_NAME = "image-type";
16
+
16 17
/**
17 18
* The timeout for a host to come up
18 19
*/
19 20
Optional<Duration> WAIT_FOR_HOST_TIMEOUT = Optional.of(Duration.ONE_HOUR.times(2));
21
+
20 22
/**
21 23
* The timeout for a running process to respond
22 24
*/