48b5c2c8b04db642eb031814da5afa4590adc592
java/com.sap.sse.replication/src/com/sap/sse/replication/impl/RabbitInputStreamProvider.java
| ... | ... | @@ -39,14 +39,16 @@ public class RabbitInputStreamProvider extends NamedImpl { |
| 39 | 39 | messagesAreWrittenToThis = new PipedOutputStream(); |
| 40 | 40 | clientReadsFromThis = new PipedInputStream(messagesAreWrittenToThis); |
| 41 | 41 | final QueueingConsumer messageConsumer = new QueueingConsumer(channel); |
| 42 | - channel.basicConsume(queueName, /* auto-ack */ true, messageConsumer); |
|
| 42 | + channel.basicQos(1); // with "manual" acknowledgement, process one message at a time, avoiding overloading inbound socket |
|
| 43 | + channel.basicConsume(queueName, /* auto-ack */ false, messageConsumer); |
|
| 43 | 44 | new Thread(getClass().getSimpleName()) { |
| 44 | 45 | @Override |
| 45 | 46 | public void run() { |
| 46 | 47 | while (true) { |
| 47 | 48 | try { |
| 48 | - Delivery delivery = messageConsumer.nextDelivery(); |
|
| 49 | + final Delivery delivery = messageConsumer.nextDelivery(); |
|
| 49 | 50 | byte[] bytesFromMessage = delivery.getBody(); |
| 51 | + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), /* multiple */ false); |
|
| 50 | 52 | if (RabbitOutputStream.startsWithTerminationCommand(bytesFromMessage, bytesFromMessage.length)) { |
| 51 | 53 | if (bytesFromMessage.length == RabbitOutputStream.TERMINATION_COMMAND.length) { |
| 52 | 54 | // received exactly TERMINATION_COMMAND |