java/com.sap.sse.replication/src/com/sap/sse/replication/impl/OperationSerializerBuffer.java
... ...
@@ -0,0 +1,261 @@
1
+package com.sap.sse.replication.impl;
2
+
3
+import java.io.ByteArrayOutputStream;
4
+import java.io.DataOutputStream;
5
+import java.io.IOException;
6
+import java.io.ObjectOutputStream;
7
+import java.util.ArrayList;
8
+import java.util.Deque;
9
+import java.util.List;
10
+import java.util.Timer;
11
+import java.util.TimerTask;
12
+import java.util.concurrent.BlockingQueue;
13
+import java.util.logging.Level;
14
+import java.util.logging.Logger;
15
+
16
+import com.sap.sse.common.Duration;
17
+import com.sap.sse.common.Util.Pair;
18
+import com.sap.sse.operationaltransformation.Operation;
19
+import com.sap.sse.replication.OperationWithResult;
20
+import com.sap.sse.replication.Replicable;
21
+import com.sap.sse.replication.ReplicationService;
22
+
23
+import net.jpountz.lz4.LZ4BlockOutputStream;
24
+
25
+/**
26
+ * Encapsulates the serialization (using an {@link ObjectOutputStream}) and the buffering with delayed sending for a
27
+ * sequence of {@link Operation}s. For sending, a {@link Pair} of {@code byte[]} and {@code List<Class<?>>} is
28
+ * {@link Deque#add(Object) added} to the {@link Deque} passed to the constructor. This assumes that a thread is trying
29
+ * to {@link BlockingQueue#take() take} elements from that queue to send them out one by one to the replicas, logging
30
+ * the metrics about the types of objects being sent this way.
31
+ * <p>
32
+ *
33
+ * Buffering is limited by two factors: maximum message size and maximum buffering duration. Limiting the message size
34
+ * avoids clogging the heap with buffered messages and respects the underlying messaging framework's maximum message
35
+ * size. Limiting the buffering duration even in the face of a continuous stream of incoming operations helps keeping
36
+ * the replication delay below some threshold.
37
+ * <p>
38
+ *
39
+ * Besides the {@link Deque} to which to send complete messages, two other important objects form the core of the state
40
+ * of this serializing buffer:
41
+ * <ul>
42
+ * <li>An {@link ObjectOutputStream} constructed freshly for each binary message assembled; it can serialize multiple
43
+ * operations which has the benefit of reducing the metadata overhead (type information) that needs to be written to the
44
+ * stream and of increasing sharing potential for common strings.</li>
45
+ * <li>A {@link ByteArrayOutputStream} into which the {@link ObjectOutputStream} serializes the {@link Operation}s and
46
+ * whose contents will be sent as the {@code byte[]} to the {@link Deque} passed to the constructor.</li>
47
+ * </ul>
48
+ *
49
+ * Time-limiting the buffering process is supported by a {@link Timer} passed to the constructor, and a
50
+ * {@link TimerTask} constructed by an instance of this class once the first operation has been serialized to a new
51
+ * buffer. The {@link TimerTask} is scheduled to run after the replication buffering delay, passed as a configuration
52
+ * parameter to the constructor, too.
53
+ * <p>
54
+ *
55
+ * This serializing buffer is thread-safe. Threads can write operations to it at any time after object construction has
56
+ * finished. The implementation takes care of synchronizing those writes and the subsequent object serialization to the
57
+ * {@code byte[]} buffer with each other and with sending and clearing the buffer. Objects of this class can be
58
+ * "recycled" through any number of message sends; "sending" another message by adding it to the {@link Deque} passed to
59
+ * the constructor will empty the buffer; the next operation written to this buffer will create a new
60
+ * {@link ObjectOutputStream} and {@code byte[]} buffer.
61
+ * <p>
62
+ *
63
+ * Use multiple instances of this class in conjunction with a thread pool to send out operations that allow for
64
+ * {@link Operation#requiresSynchronousExecution() asynchronous execution} on the replica side. For these operations the
65
+ * rather expensive object serialization may run in parallel, thus increasing replication throughput which in typical
66
+ * configurations is limited by the single-threaded {@link ObjectOutputStream} and compression throughput and not so
67
+ * much by the bandwidth offered by the messaging infrastructure. One instance should be reserved for operations that
68
+ * {@link Operation#requiresSynchronousExecution() require synchronous execution}. Those need to be applied in the order
69
+ * in which they were received by the {@link ReplicationService}. The number of additional such buffers should be
70
+ * determined by the typical thread pool size (e.g., equaling the number of CPUs available to the VM). Operations
71
+ * allowing for asynchronous execution can then be serialized truly in parallel, and their serialization may even happen
72
+ * outside the thread requesting their broadcast. This can accelerate, e.g., the loading process of tracking data on the
73
+ * master because the loading process no longer has to wait for the serialization to finish but may enqueue the
74
+ * operations for serialization.
75
+ * <p>
76
+ *
77
+ * <em>Implementation Quirk:</em> When a buffer is sent because it reached the maximum message size, the
78
+ * {@link TimerTask} scheduled will still fire at some later point; this will lead to a sending delay that can be
79
+ * shorter than the timeout configured.
80
+ * <p>
81
+ *
82
+ * Invariants when outside of a {@code synchronized} block using this object's monitor:
83
+ * <ul>
84
+ * <li>If and only if there is a valid {@link #objectOutputStream} to which the next operation can be written, the
85
+ * {@link #replicableIdAsString} is properly set.</li>
86
+ * <li>The {@link #objectOutputStream}, the {@link #listOfClasses} and the {@link #bos} byte-array output stream hold
87
+ * equivalent content; objects serialized through the output output stream are serialized to the byte-array output
88
+ * stream</li>
89
+ * <li>When there is at least one operation serialized in the streams, a {@link #timerTask} is scheduled with the
90
+ * {@link #timer} which will guarantee sending the buffer after the {@link #timeout} has expired since writing the first
91
+ * operation to the streams.</li>
92
+ * </ul>
93
+ *
94
+ * @author Axel Uhl (d043530)
95
+ *
96
+ */
97
+public class OperationSerializerBuffer {
98
+ private static final Logger logger = Logger.getLogger(OperationSerializerBuffer.class.getName());
99
+
100
+ /**
101
+ * The {@link #objectOutputStream} contains serialized operations originating from a single
102
+ * {@link Replicable} whose {@link Replicable#getId() ID} is written as its string value to the beginning of the
103
+ * {@link #bos} stream using {@link DataOutputStream#writeUTF(String)}. When an operation of a {@link Replicable} with a
104
+ * different ID is to be {@link #write(OperationWithResult) written}, the existing
105
+ * {@link #bos} contents need to be transmitted and a new stream is started for the {@link Replicable} now wanting
106
+ * to replicate an operation.
107
+ */
108
+ private String replicableIdAsString;
109
+
110
+ private final ReplicationMessageSender sender;
111
+
112
+ /**
113
+ * An output stream that is initialized by the constructor and re-used (see {@link ByteArrayOutputStream#reset()}
114
+ * thereafter.<p>
115
+ *
116
+ * To send operations as serialized Java objects using binary RabbitMQ messages comes at an overhead. To reduce the
117
+ * overhead, several operations can be serialized into a single message. The actual serialization of the buffer
118
+ * happens after a short duration has passed since the last sending, managed by a {@link #timer} or when the
119
+ * {@link #maximumBufferSizeInBytes} message size has been exceeded.
120
+ */
121
+ private final ByteArrayOutputStream bos;
122
+
123
+ /**
124
+ * An object output stream that writes to {@link #bos}, through a compressing output stream. Operations are
125
+ * serialized into this stream until the timer acquires the {@link #outboundBufferMonitor}, closes the stream and
126
+ * transmits the contents of {@link #outboundBuffer} as a RabbitMQ message. While still holding the monitor, the
127
+ * timer task creates a new {@link #outboundBuffer} and a new {@link #outboundObjectBuffer} wrapping the
128
+ * {@link #outboundBuffer}.
129
+ */
130
+ private ObjectOutputStream objectOutputStream;
131
+
132
+ /**
133
+ * Remembers the classes of the operations serialized into {@link #objectOutputStream}. The list of classes in this
134
+ * list matches with the sequence of objects written to {@link #objectOutputStream}. Used for logging and metrics.
135
+ * See, e.g., {@link ReplicationInstancesManager#log(List, long)}. Passed to
136
+ * {@link #broadcastOperations(byte[], List)} when sending out the current buffer.
137
+ */
138
+ private List<Class<?>> listOfClasses;
139
+
140
+ private final Duration timeout;
141
+
142
+ private final int maximumBufferSizeInBytes;
143
+
144
+ /**
145
+ * Used to schedule the sending of all operations in {@link #outboundBuffer} using the {@link #sendingTask}.
146
+ */
147
+ private final Timer timer;
148
+
149
+ /**
150
+ * Sends all operations in {@link #objectOutputStream}. While holding {@code this} object's monitor, the following
151
+ * rules apply:
152
+ *
153
+ * <ul>
154
+ * <li>if <code>null</code>, adding an operation to {@link #objectOutputStream} needs to schedules a sending task with {@link #timer}.</li>
155
+ * <li>if not <code>null</code>, an operation added to {@link #outboundBuffer} is guaranteed to be sent by the timer
156
+ * </li>
157
+ * </ul>
158
+ */
159
+ private TimerTask timerTask;
160
+
161
+ public OperationSerializerBuffer(final ReplicationMessageSender sender, final Duration timeout,
162
+ final int maximumBufferSizeInBytes, final Timer timer) throws IOException {
163
+ this.sender = sender;
164
+ this.bos = new ByteArrayOutputStream();
165
+ this.timeout = timeout;
166
+ this.timer = timer;
167
+ this.maximumBufferSizeInBytes = maximumBufferSizeInBytes;
168
+ }
169
+
170
+ /**
171
+ * Creates a new compressing object output stream which writes its compressed contents into {@link #bos} which is
172
+ * {@link ByteArrayOutputStream#reset()reset} before creating the new object output stream. A new empty
173
+ * {@link #listOfClasses} is created, too. As another side effect, cancels any scheduled {@link #timerTask} and
174
+ * nulls it.
175
+ */
176
+ private synchronized void createNewObjectOutputStream() throws IOException {
177
+ this.bos.reset();
178
+ if (timerTask != null) {
179
+ timerTask.cancel();
180
+ timerTask = null;
181
+ }
182
+ listOfClasses = new ArrayList<>();
183
+ LZ4BlockOutputStream zipper = new LZ4BlockOutputStream(this.bos);
184
+ @SuppressWarnings("resource") // left open on purpose; it would otherwise close the underlying ByteArrayOutputStream
185
+ final DataOutputStream dos = new DataOutputStream(zipper);
186
+ dos.writeUTF(replicableIdAsString);
187
+ final ObjectOutputStream compressingObjectOutputStream = new ObjectOutputStream(zipper);
188
+ objectOutputStream = compressingObjectOutputStream;
189
+ }
190
+
191
+ /**
192
+ * Extracts the binary message from {@link #bos} and, together with the {@link #listOfClasses} representing the
193
+ * types of the objects that were written to the buffer since its creation or last sending, enqueues it for
194
+ * sending out to the replicas by appending it to the {@link #queueToWriteTo} queue.
195
+ */
196
+ private synchronized void sendBuffer() throws IOException {
197
+ if (!listOfClasses.isEmpty()) {
198
+ logger.fine("Preparing " + listOfClasses.size() + " operations for sending to RabbitMQ exchange");
199
+ try {
200
+ objectOutputStream.close();
201
+ logger.fine("Sucessfully closed ObjectOutputStream");
202
+ } catch (IOException e) {
203
+ logger.log(Level.SEVERE, "Error trying to replicate " + listOfClasses.size() + " operations", e);
204
+ }
205
+ final byte[] message = bos.toByteArray();
206
+ logger.fine(()->"Successfully produced message array for replicable "+replicableIdAsString+" of length " + message.length);
207
+ final List<Class<?>> listOfClasses = this.listOfClasses;
208
+ createNewObjectOutputStream();
209
+ sender.send(message, listOfClasses);
210
+ }
211
+ }
212
+
213
+ /**
214
+ * Serializes the operation into this buffer synchronously. If the {@link #maximumBufferSizeInBytes} is exceeded by
215
+ * this, the buffer is sent. If this was the first operation to be serialized into a fresh object output stream, a
216
+ * new {@link #timerTask} is scheduled which will send the buffer after {@link #timeout} even if the maximum size has not
217
+ * been reached or exceeded until then.
218
+ */
219
+ public synchronized <S, O extends OperationWithResult<S, ?>> void write(final OperationWithResult<?, ?> operation, Replicable<S, O> replicable) throws IOException {
220
+ if (replicableIdAsString == null) {
221
+ replicableIdAsString = replicable.getId().toString();
222
+ createNewObjectOutputStream();
223
+ } else if (!replicableIdAsString.equals(replicable.getId().toString())) {
224
+ logger.fine(()->"Received operation for replicable "+replicable.getId().toString()+" which is different from "+replicableIdAsString+"; sending buffer first");
225
+ sendBuffer();
226
+ replicableIdAsString = replicable.getId().toString();
227
+ }
228
+ objectOutputStream.writeObject(operation);
229
+ listOfClasses.add(operation.getClassForLogging());
230
+ if (bos.size() > maximumBufferSizeInBytes) {
231
+ logger.info("Triggering replication for replicable ID "+replicableIdAsString+" because buffer holds " + bos.size()
232
+ + " bytes which exceeds trigger size " + maximumBufferSizeInBytes + " bytes");
233
+ sendBuffer();
234
+ } else {
235
+ if (timerTask == null) {
236
+ timerTask = new TimerTask() {
237
+ @Override
238
+ public void run() {
239
+ synchronized (OperationSerializerBuffer.this) {
240
+ try {
241
+ timerTask = null;
242
+ logger.fine("Running timer task for replicable ID "+replicableIdAsString+", flushing buffer");
243
+ sendBuffer();
244
+ } catch (Exception e) {
245
+ logger.log(Level.SEVERE, "Exception while trying to replicate operations", e);
246
+ }
247
+ }
248
+ }
249
+ };
250
+ timer.schedule(timerTask, timeout.asMillis());
251
+ }
252
+ }
253
+ }
254
+
255
+ /**
256
+ * @return the number of operations currently in the buffer
257
+ */
258
+ public int size() {
259
+ return listOfClasses.size();
260
+ }
261
+}
java/com.sap.sse.replication/src/com/sap/sse/replication/impl/ReplicationMessageSender.java
... ...
@@ -0,0 +1,7 @@
1
+package com.sap.sse.replication.impl;
2
+
3
+import java.util.List;
4
+
5
+public interface ReplicationMessageSender {
6
+ void send(byte[] message, List<Class<?>> typesInMessage);
7
+}
java/com.sap.sse.replication/src/com/sap/sse/replication/impl/ReplicationServiceImpl.java
... ...
@@ -1,24 +1,18 @@
1 1
package com.sap.sse.replication.impl;
2 2
3 3
import java.io.BufferedReader;
4
-import java.io.ByteArrayOutputStream;
5
-import java.io.DataOutputStream;
6 4
import java.io.IOException;
7 5
import java.io.InputStream;
8 6
import java.io.InputStreamReader;
9
-import java.io.ObjectOutputStream;
10
-import java.io.OutputStream;
11 7
import java.net.ConnectException;
12 8
import java.net.SocketException;
13 9
import java.net.URL;
14 10
import java.net.URLConnection;
15
-import java.util.ArrayList;
16 11
import java.util.HashMap;
17 12
import java.util.HashSet;
18 13
import java.util.List;
19 14
import java.util.Map;
20 15
import java.util.Optional;
21
-import java.util.Random;
22 16
import java.util.Set;
23 17
import java.util.Timer;
24 18
import java.util.TimerTask;
... ...
@@ -61,7 +55,6 @@ import com.sap.sse.replication.persistence.MongoObjectFactory;
61 55
import com.sap.sse.util.HttpUrlConnectionHelper;
62 56
63 57
import net.jpountz.lz4.LZ4BlockInputStream;
64
-import net.jpountz.lz4.LZ4BlockOutputStream;
65 58
66 59
/**
67 60
* Manages a set of observers of {@link Replicable}, receiving notifications for the operations they perform that
... ...
@@ -90,7 +83,7 @@ import net.jpountz.lz4.LZ4BlockOutputStream;
90 83
* @author Frank Mittag, Axel Uhl (d043530)
91 84
*
92 85
*/
93
-public class ReplicationServiceImpl implements ReplicationService, OperationsToMasterSendingQueue {
86
+public class ReplicationServiceImpl implements ReplicationService, OperationsToMasterSendingQueue, ReplicationMessageSender {
94 87
private static final Logger logger = Logger.getLogger(ReplicationServiceImpl.class.getName());
95 88
96 89
private final ReplicationInstancesManager replicationInstancesManager;
... ...
@@ -147,14 +140,6 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
147 140
private Thread replicatorThread;
148 141
149 142
/**
150
- * Used to synchronize write access and replacements of {@link #outboundBuffer}, {@link #outboundObjectBuffer} and
151
- * {@link #outboundBufferClasses} when the timer scoops up the messages to send. Ensure we get a unique object
152
- * by using a random number appended to an empty string; otherwise, string collation may collate different monitors
153
- * constructed from equal string literals.
154
- */
155
- private final Object outboundBufferMonitor = ""+new Random().nextDouble();
156
-
157
- /**
158 143
* Allow queued outbound replication messages to consume at most 1/4 of the maximum amount of memory available to this VM.
159 144
*/
160 145
private static final long SEND_JOB_QUEUE_SIZE_THRESHOLD_IN_BYTES = Runtime.getRuntime().maxMemory() / 4;
... ...
@@ -162,18 +147,26 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
162 147
/**
163 148
* All outbound sending jobs go through this queue where a thread removing from this queue picks them up and sends
164 149
* them out. The order is first-in, first-out (FIFO). The total size of the {@code byte[]} objects contained in the
165
- * queue is counted in {@link #totalSendJobsSize}. If the {@link #SEND_JOB_QUEUE_SIZE_THRESHOLD_IN_BYTES} is exceeded,
166
- * messages are dropped, which is logged as a {@link Level#SEVERE} error.<p>
150
+ * queue is counted in {@link #totalSendJobsSize}. If the {@link #SEND_JOB_QUEUE_SIZE_THRESHOLD_IN_BYTES} is
151
+ * exceeded, messages are dropped, which is logged as a {@link Level#SEVERE} error.
152
+ * <p>
153
+ *
154
+ * Being a {@link BlockingDeque}, the queue is thread-safe. Multiple threads can {@link BlockingDeque#add(Object)
155
+ * add} to it without problems, and the single thread created in {@link #createSendJob()} will continue to
156
+ * {@link BlockingDeque#take() take} elements from it as they become available. This way, this queue serves as a
157
+ * serialization and synchronization element for sending out replication operations to a message queue.
158
+ * <p>
167 159
*
168 160
* Any significant queue-up may be expected if...
169 161
* <ul>
170
- * <li>...writing to the RabbitMQ queue connected to the fanout exchange is slower than new replication
171
- * content is produced; in this case the network simply is too slow to support the replication scenario at hand.</li>
172
- * <li>...the connection to the RabbitMQ queue connected to the fanout exchange is temporarily or permanently interrupted;
173
- * for temporary interruptions there is hope that re-connecting attempts succeed before the buffer runs over and that after
174
- * successfully re-connecting the buffer can be send out message by message. For connections interrupted permanently a
175
- * buffer overrun has to be expected at some point, and it will be better to at least keep the master healthy instead of
176
- * letting it run into an {@link OutOfMemoryError} because the buffer eats up all available heap space.</li>
162
+ * <li>...writing to the RabbitMQ queue connected to the fanout exchange is slower than new replication content is
163
+ * produced; in this case the network simply is too slow to support the replication scenario at hand.</li>
164
+ * <li>...the connection to the RabbitMQ queue connected to the fanout exchange is temporarily or permanently
165
+ * interrupted; for temporary interruptions there is hope that re-connecting attempts succeed before the buffer runs
166
+ * over and that after successfully re-connecting the buffer can be send out message by message. For connections
167
+ * interrupted permanently a buffer overrun has to be expected at some point, and it will be better to at least keep
168
+ * the master healthy instead of letting it run into an {@link OutOfMemoryError} because the buffer eats up all
169
+ * available heap space.</li>
177 170
* </ul>
178 171
*/
179 172
private final BlockingDeque<Pair<byte[], List<Class<?>>>> sendJobQueue;
... ...
@@ -193,60 +186,18 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
193 186
private static final Duration DURATION_BETWTEEN_SEND_RETRIES = Duration.ONE_SECOND.times(5);
194 187
195 188
/**
196
- * Sending operations as serialized Java objects using binary RabbitMQ messages comes at an overhead. To reduce the
197
- * overhead, several operations can be serialized into a single message. The actual serialization of the buffer
198
- * happens after a short duration has passed since the last sending, managed by a {@link Timer}. Writers need to
199
- * synchronize on {@link #outboundBufferMonitor} which protects all of {@link #outboundBuffer},
200
- * {@link #outboundObjectBuffer} and {@link #outboundBufferClasses} which are replaced or cleared when the timer
201
- * scoops up the currently buffered operations to send them out.
202
- */
203
- private ByteArrayOutputStream outboundBuffer;
204
-
205
- /**
206
- * The {@link #outboundBuffer} contains a message with serialized operations originating from a single
207
- * {@link Replicable} whose {@link Replicable#getId() ID} is written as its string value to the beginning of the
208
- * stream using {@link DataOutputStream#writeUTF(String)}. When an operation of a {@link Replicable} with a
209
- * different ID is to be {@link #broadcastOperation(OperationWithResult, Replicable) broadcast}, the existing
210
- * {@link #outboundBuffer} needs to be transmitted and a new one is started for the {@link Replicable} now wanting
211
- * to replicate an operation. Access is synchronized, as for {@link #outboundBuffer} using the
212
- * {@link #outboundBufferMonitor}.
213
- */
214
- private String outboundBufferReplicableIdAsString;
215
-
216
- /**
217
- * An object output stream that writes to {@link #outboundBuffer}. Operations are serialized into this stream until
218
- * the timer acquires the {@link #outboundBufferMonitor}, closes the stream and transmits the contents of
219
- * {@link #outboundBuffer} as a RabbitMQ message. While still holding the monitor, the timer task creates a new
220
- * {@link #outboundBuffer} and a new {@link #outboundObjectBuffer} wrapping the {@link #outboundBuffer}.
189
+ * A single buffer for all operations that need to be sent out to replicas through a fan-out exchange.
190
+ * Bug 5741 will split this up into one buffer for all synchronous and multiple buffers for asynchronous operations
191
+ * where those for asynchronous operations will be fed from a queue to which all asynchronous operations are
192
+ * added.
221 193
*/
222
- private ObjectOutputStream outboundObjectBuffer;
223
-
224
- /**
225
- * Remembers the classes of the operations serialized into {@link #outboundObjectBuffer}. The list of classes in
226
- * this list matches with the sequence of objects written to {@link #outboundObjectBuffer} as long as the
227
- * {@link #outboundBufferMonitor} is being held.
228
- */
229
- private List<Class<?>> outboundBufferClasses;
194
+ private final OperationSerializerBuffer outboundBufferForAllOperations;
230 195
231 196
/**
232 197
* Used to schedule the sending of all operations in {@link #outboundBuffer} using the {@link #sendingTask}.
233 198
*/
234 199
private final Timer timer;
235 200
236
- /**
237
- * Sends all operations in {@link #outboundBuffer}. When holding the monitor of {@link #outboundBuffer}, the
238
- * following rules hold:
239
- *
240
- * <ul>
241
- * <li>if <code>null</code>, adding an operation to {@link #outboundBuffer} needs to create and assign a new timer
242
- * that schedules a sending task.</li>
243
- * <li>if not <code>null</code>, an operation added to {@link #outboundBuffer} is guaranteed to be sent by the timer
244
- * </li>
245
- * </ul>
246
- *
247
- */
248
- private TimerTask sendingTask;
249
-
250 201
private final UnsentOperationsSenderJob unsentOperationsSenderJob;
251 202
252 203
/**
... ...
@@ -255,13 +206,13 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
255 206
* more operations are likely to be sent per message transmitted, reducing overhead but correspondingly increasing
256 207
* latency.
257 208
*/
258
- private final long TRANSMISSION_DELAY_MILLIS = 100;
209
+ private static final Duration TRANSMISSION_DELAY = Duration.ofMillis(100);
259 210
260 211
/**
261
- * Defines at which message size in bytes the message will be sent regardless the {@link #TRANSMISSION_DELAY_MILLIS}
212
+ * Defines at which message size in bytes the message will be sent regardless the {@link #TRANSMISSION_DELAY}
262 213
* .
263 214
*/
264
- private final int TRIGGER_MESSAGE_SIZE_IN_BYTES = 1024 * 1024;
215
+ private static final int TRIGGER_MESSAGE_SIZE_IN_BYTES = 1024 * 1024;
265 216
266 217
/**
267 218
* Counts the messages sent out by this replicator
... ...
@@ -369,11 +320,12 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
369 320
Optional<MongoObjectFactory> optionalMongoObjectFactory, String exchangeName, String exchangeHost, int exchangePort,
370 321
ReplicationInstancesManager replicationInstancesManager, ReplicablesProvider replicablesProvider) throws Exception {
371 322
this.sendJobQueue = new LinkedBlockingDeque<>();
323
+ timer = new Timer("ReplicationServiceImpl timer for delayed task sending", /* isDaemon */ true);
324
+ this.outboundBufferForAllOperations = new OperationSerializerBuffer(this, TRANSMISSION_DELAY, TRIGGER_MESSAGE_SIZE_IN_BYTES, timer);
372 325
this.totalSendJobsSize = new AtomicLong(0);
373 326
createSendJob().start();
374 327
this.mongoObjectFactory = optionalMongoObjectFactory;
375 328
this.replicationStartingListeners = new HashSet<>();
376
- timer = new Timer("ReplicationServiceImpl timer for delayed task sending", /* isDaemon */ true);
377 329
unsentOperationsSenderJob = new UnsentOperationsSenderJob();
378 330
executionListenersByReplicableIdAsString = new ConcurrentHashMap<>();
379 331
initialLoadChannels = new ConcurrentHashMap<>();
... ...
@@ -551,7 +503,7 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
551 503
552 504
/**
553 505
* Schedules a single operation for broadcast. The operation is added to {@link #outboundBuffer}, and if not already
554
- * scheduled, a {@link #timer} is created and scheduled to send in {@link #TRANSMISSION_DELAY_MILLIS} milliseconds.
506
+ * scheduled, a {@link #timer} is created and scheduled to send in {@link #TRANSMISSION_DELAY} milliseconds.
555 507
*
556 508
* @param replicable
557 509
* the replicable by which the operation was executed that now will be broadcast to all replicas; the
... ...
@@ -561,112 +513,33 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
561 513
Replicable<S, O> replicable) throws IOException {
562 514
// need to write the operations one by one, making sure the ObjectOutputStream always writes
563 515
// identical objects again if required because they may have changed state in between
564
- synchronized (outboundBufferMonitor) {
565
- final String replicaIdAsString = replicable.getId().toString();
566
- if (outboundBuffer != null && !Util.equalsWithNull(outboundBufferReplicableIdAsString, replicaIdAsString)) {
567
- flushBufferToRabbitMQ(); // operation from a replicable different from that for which operations are
568
- // buffered so far --> flush
569
- } // still holding the monitor, so no other broadcast request from a different replicable can step in
570
- // between
571
- if (outboundBuffer == null) {
572
- outboundBuffer = new ByteArrayOutputStream();
573
- outboundBufferReplicableIdAsString = replicaIdAsString;
574
- ObjectOutputStream compressingObjectOutputStream = createCompressingObjectOutputStream(replicaIdAsString, outboundBuffer);
575
- outboundObjectBuffer = compressingObjectOutputStream;
576
- outboundBufferClasses = new ArrayList<>();
577
- }
578
- outboundObjectBuffer.writeObject(operation);
579
- outboundBufferClasses.add(operation.getClassForLogging());
580
- if (outboundBuffer.size() > TRIGGER_MESSAGE_SIZE_IN_BYTES) {
581
- logger.info("Triggering replication because buffer holds " + outboundBuffer.size()
582
- + " bytes which exceeds trigger size " + TRIGGER_MESSAGE_SIZE_IN_BYTES + " bytes");
583
- flushBufferToRabbitMQ();
584
- } else {
585
- if (sendingTask == null) {
586
- sendingTask = new TimerTask() {
587
- @Override
588
- public void run() {
589
- try {
590
- sendingTask = null;
591
- logger.fine("Running timer task, flushing buffer");
592
- flushBufferToRabbitMQ();
593
- } catch (Exception e) {
594
- logger.log(Level.SEVERE, "Exception while trying to replicate operations", e);
595
- }
596
- }
597
- };
598
- timer.schedule(sendingTask, TRANSMISSION_DELAY_MILLIS);
599
- }
600
- }
601
- if (++messageCount % 10000l == 0) {
602
- logger.info("Handled " + messageCount
603
- + " messages for replication. Current outbound replication queue size: "
604
- + outboundBufferClasses.size());
605
- }
516
+ outboundBufferForAllOperations.write(operation, replicable);
517
+ if (++messageCount % 10000l == 0) {
518
+ logger.info("Handled " + messageCount
519
+ + " messages for replication. Current outbound replication queue size: "
520
+ + outboundBufferForAllOperations.size());
606 521
}
607 522
}
608
-
609
- private static ObjectOutputStream createCompressingObjectOutputStream(final String replicaIdAsString, OutputStream streamToWrap) throws IOException {
610
- LZ4BlockOutputStream zipper = new LZ4BlockOutputStream(streamToWrap);
611
- new DataOutputStream(zipper).writeUTF(replicaIdAsString);
612
- ObjectOutputStream compressingObjectOutputStream = new ObjectOutputStream(zipper);
613
- return compressingObjectOutputStream;
614
- }
615 523
524
+ @Override
525
+ public void send(byte[] message, List<Class<?>> typesInMessage) {
526
+ if (totalSendJobsSize.get() < SEND_JOB_QUEUE_SIZE_THRESHOLD_IN_BYTES) {
527
+ sendJobQueue.add(new Pair<>(message, typesInMessage));
528
+ final long newTotalSendJobsSize = totalSendJobsSize.addAndGet(message.length);
529
+ logger.fine("Successfully handed " + typesInMessage.size() +
530
+ " operations to broadcaster; new outbound send queue length "+sendJobQueue.size()+
531
+ " ("+newTotalSendJobsSize+"B)");
532
+ } else {
533
+ logger.severe("Queue for outbound replication operations full ("+totalSendJobsSize.get()+
534
+ "B. Dropping operations buffer with size "+message.length+"B");
535
+ }
536
+ }
537
+
616 538
static InputStream createUncompressingInputStream(InputStream streamToWrap) {
617 539
return new LZ4BlockInputStream(streamToWrap);
618 540
}
619 541
620 542
/**
621
- * Obtains the monitor on {@link #outboundBufferMonitor}, copies the references to the buffers, nulls out the
622
- * buffers, then releases the monitor and broadcasts the buffer.
623
- */
624
- private void flushBufferToRabbitMQ() {
625
- logger.fine("Trying to acquire monitor");
626
- final byte[] bytesToSend;
627
- final List<Class<?>> classesOfOperationsToSend;
628
- final boolean doSend;
629
- synchronized (outboundBufferMonitor) {
630
- if (outboundBuffer != null) {
631
- logger.fine("Preparing " + outboundBufferClasses.size()
632
- + " operations for sending to RabbitMQ exchange");
633
- try {
634
- outboundObjectBuffer.close();
635
- logger.fine("Sucessfully closed ObjectOutputStream");
636
- } catch (IOException e) {
637
- logger.log(Level.SEVERE, "Error trying to replicate " + outboundBufferClasses.size()
638
- + " operations", e);
639
- }
640
- bytesToSend = outboundBuffer.toByteArray();
641
- logger.fine("Successfully produced bytesToSend array of length " + bytesToSend.length);
642
- classesOfOperationsToSend = outboundBufferClasses;
643
- doSend = true;
644
- outboundBuffer = null;
645
- outboundBufferReplicableIdAsString = null;
646
- outboundObjectBuffer = null;
647
- outboundBufferClasses = null;
648
- } else {
649
- logger.fine("No buffer set; probably two timer tasks were scheduled concurrently. No problem, just not sending this time around.");
650
- doSend = false;
651
- bytesToSend = null;
652
- classesOfOperationsToSend = null;
653
- }
654
- if (doSend) {
655
- if (totalSendJobsSize.get() < SEND_JOB_QUEUE_SIZE_THRESHOLD_IN_BYTES) {
656
- sendJobQueue.add(new Pair<>(bytesToSend, classesOfOperationsToSend));
657
- final long newTotalSendJobsSize = totalSendJobsSize.addAndGet(bytesToSend.length);
658
- logger.fine("Successfully handed " + classesOfOperationsToSend.size() +
659
- " operations to broadcaster; new outbound send queue length "+sendJobQueue.size()+
660
- " ("+newTotalSendJobsSize+"B)");
661
- } else {
662
- logger.severe("Queue for outbound replication operations full ("+totalSendJobsSize.get()+
663
- "B. Dropping operations buffer with size "+bytesToSend.length+"B");
664
- }
665
- }
666
- }
667
- }
668
-
669
- /**
670 543
* Constructs the thread assigned to {@link #sendJob}, which is responsible for watching the {@link #sendJobQueue},
671 544
* {@link BlockingDeque#take() taking} element from the queue and trying to send them to the {@link #exchangeHost}/{@link #exchangePort}
672 545
* to the exchange named as specified by the field {@link #exchangeName}, using the {@link #masterChannel}. If this fails,
... ...
@@ -726,7 +599,7 @@ public class ReplicationServiceImpl implements ReplicationService, OperationsToM
726 599
}
727 600
728 601
/**
729
- * Bytes arriving here have gone through Java object serialization and compession, probably also grouping based on
602
+ * Bytes arriving here have gone through Java object serialization and compression, probably also grouping based on
730 603
* time delays, and are supposed to be sent out immediately. During sending, exceptions may occur, e.g., due to an
731 604
* interrupted connection, the message queuing system being temporarily unavailable or some form of re-configuration
732 605
* or scaling that is taking place. In order not to lose such messages, a FIFO queue exists which keeps track of all
wiki/howto/onboarding.md
... ...
@@ -8,17 +8,18 @@ First of all, make sure you've looked at [http://www.amazon.de/Patterns-Elements
8 8
9 9
#### Installations
10 10
11
-1. Eclipse (Eclipse IDE for Eclipse Committers, version 4.15.0 ["2021-03"](https://www.eclipse.org/downloads/packages/release/2021-03/r/eclipse-ide-eclipse-committers)), [http://www.eclipse.org](http://www.eclipse.org)
12
-2. Get the content of the git repository (see
11
+1. JDK >= 11 (it is required by Eclipse)
12
+2. Eclipse (Eclipse IDE for Eclipse Committers, version 4.15.0 ["2021-03"](https://www.eclipse.org/downloads/packages/release/2021-03/r/eclipse-ide-eclipse-committers)), [http://www.eclipse.org](http://www.eclipse.org)
13
+3. Get the content of the git repository (see
13 14
Steps to build and run the Race Analysis Suite below)
14
-3. Install the eclipse plugins (see Automatic Eclipse plugin installation below)
15
-4. Git (e.g. Git for Windows v2.18), [http://git-scm.com](http://git-scm.com) / [https://git-for-windows.github.io](https://git-for-windows.github.io)
16
-5. MongoDB (at least Release 4.4), download: [https://www.mongodb.com/](https://www.mongodb.com/)
17
-6. RabbitMQ, download from [http://www.rabbitmq.com](http://www.rabbitmq.com). Requires Erlang to be installed. RabbitMQ installer will assist in installing Erlang. Some sources report that there may be trouble with latest versions of RabbitMQ. In some cases, McAffee seems to block the installation of the latest version on SAP hardware; in other cases connection problems to newest versions have been reported. We know that version 3.6.8 works well. [https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_8](https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_8) is the link.
18
-7. JDK 1.8 (Java SE 8), [http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) --- Alternatively you can use the SAPJVM 1.8: Go to [http://sapjvm.wdf.sap.corp:1080/downloads](http://sapjvm.wdf.sap.corp:1080/downloads), select JVM 1.8, extract the downloaded .zip into desired location (e.g. C:\Program Files\Java), then Go to Window -> Preferences -> Java -> Installed JREs and add the VM.
19
-8. Maven 3.1.1 (or higher), [http://maven.apache.org](http://maven.apache.org)
20
-9. GWT SDK 2.9.0 ([http://www.gwtproject.org/download.html](http://www.gwtproject.org/download.html))
21
-10. Standalone Android SDK (see section "Additional steps required for Android projects"). OPTIONALLY: You may additionally install Android Studio ([https://developer.android.com/tools/studio/index.html](https://developer.android.com/tools/studio/index.html)) or IntelliJ IDEA ([https://www.jetbrains.com/idea/download/](https://www.jetbrains.com/idea/download/)).
15
+4. Install the eclipse plugins (see Automatic Eclipse plugin installation below)
16
+5. Git (e.g. Git for Windows v2.18), [http://git-scm.com](http://git-scm.com) / [https://git-for-windows.github.io](https://git-for-windows.github.io)
17
+6. MongoDB (at least Release 4.4), download: [https://www.mongodb.com/](https://www.mongodb.com/)
18
+7. RabbitMQ, download from [http://www.rabbitmq.com](http://www.rabbitmq.com). Requires Erlang to be installed. RabbitMQ installer will assist in installing Erlang. Some sources report that there may be trouble with latest versions of RabbitMQ. In some cases, McAffee seems to block the installation of the latest version on SAP hardware; in other cases connection problems to newest versions have been reported. We know that version 3.6.8 works well. [https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_8](https://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_8) is the link.
19
+8. JDK 1.8 (Java SE 8), [http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) --- Alternatively you can use the SAPJVM 1.8: Go to [http://sapjvm.wdf.sap.corp:1080/downloads](http://sapjvm.wdf.sap.corp:1080/downloads), select JVM 1.8, extract the downloaded .zip into desired location (e.g. C:\Program Files\Java), then Go to Window -> Preferences -> Java -> Installed JREs and add the VM.
20
+9. Maven 3.1.1 (or higher), [http://maven.apache.org](http://maven.apache.org)
21
+10. GWT SDK 2.9.0 ([http://www.gwtproject.org/download.html](http://www.gwtproject.org/download.html))
22
+11. Standalone Android SDK (see section "Additional steps required for Android projects"). OPTIONALLY: You may additionally install Android Studio ([https://developer.android.com/tools/studio/index.html](https://developer.android.com/tools/studio/index.html)) or IntelliJ IDEA ([https://www.jetbrains.com/idea/download/](https://www.jetbrains.com/idea/download/)).
22 23
23 24
#### Automatic Eclipse plugin installation
24 25