b996e416006b2106b8e00fb621b63303c2623026
java/com.sap.sailing.www/release_notes_admin.html
| ... | ... | @@ -27,6 +27,13 @@ |
| 27 | 27 | <ul class="bulletList"> |
| 28 | 28 | <li>Extended RabbitMQ channel heartbeat interval to 1h. This will suffice even for |
| 29 | 29 | longer garbage collection cycles that may happen during replica set start-up.</li> |
| 30 | + <li>Using explicit ("manual") RabbitMQ message delivery acknowledgements for initial |
|
| 31 | + load when starting up a replica, combined with a "prefetch" (basicQos) value of 1. |
|
| 32 | + This way, when the replica is processing data |
|
| 33 | + and doesn't read fast enough, the RabbitMQ server will wait for the next message |
|
| 34 | + confirmation before sending more messages. This avoids socket buffer overflows |
|
| 35 | + which previously could lead to the RabbitMQ server timing out trying to send |
|
| 36 | + messages to the replica's channel.</li> |
|
| 30 | 37 | </ul> |
| 31 | 38 | <h2 class="articleSubheadline">December 2025</h2> |
| 32 | 39 | <ul class="bulletList"> |
java/com.sap.sse.replication/src/com/sap/sse/replication/impl/RabbitInputStreamProvider.java
| ... | ... | @@ -39,14 +39,16 @@ public class RabbitInputStreamProvider extends NamedImpl { |
| 39 | 39 | messagesAreWrittenToThis = new PipedOutputStream(); |
| 40 | 40 | clientReadsFromThis = new PipedInputStream(messagesAreWrittenToThis); |
| 41 | 41 | final QueueingConsumer messageConsumer = new QueueingConsumer(channel); |
| 42 | - channel.basicConsume(queueName, /* auto-ack */ true, messageConsumer); |
|
| 42 | + channel.basicQos(1); // with "manual" acknowledgement, process one message at a time, avoiding overloading inbound socket |
|
| 43 | + channel.basicConsume(queueName, /* auto-ack */ false, messageConsumer); |
|
| 43 | 44 | new Thread(getClass().getSimpleName()) { |
| 44 | 45 | @Override |
| 45 | 46 | public void run() { |
| 46 | 47 | while (true) { |
| 47 | 48 | try { |
| 48 | - Delivery delivery = messageConsumer.nextDelivery(); |
|
| 49 | + final Delivery delivery = messageConsumer.nextDelivery(); |
|
| 49 | 50 | byte[] bytesFromMessage = delivery.getBody(); |
| 51 | + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), /* multiple */ false); |
|
| 50 | 52 | if (RabbitOutputStream.startsWithTerminationCommand(bytesFromMessage, bytesFromMessage.length)) { |
| 51 | 53 | if (bytesFromMessage.length == RabbitOutputStream.TERMINATION_COMMAND.length) { |
| 52 | 54 | // received exactly TERMINATION_COMMAND |