java/com.sap.sse.replication/src/com/sap/sse/replication/impl/RabbitOutputStream.java
... ...
@@ -131,28 +131,35 @@ public class RabbitOutputStream extends OutputStream {
131 131
* The method is synchronized as it needs exclusive and atomic access to {@link #count} and {@link #streamBuffer}.
132 132
*/
133 133
private synchronized void sendBuffer() throws IOException {
134
- if (count > 0) {
135
- if (this.channel != null && this.channel.isOpen()) {
136
- final byte[] message;
137
- // check for the unlikely case that a client has coincidentally submitted the TERMINATION_COMMAND; we escape by
138
- // appending a random byte which makes the length differ; the input stream provider will skip one byte after receiving
139
- // the TERMINATION_COMMAND at the beginning of a message whose length is greater than that of the TERMINATION_COMMAND.
140
- if (startsWithTerminationCommand(streamBuffer, count)) {
141
- message = new byte[count+1];
134
+ try {
135
+ if (count > 0) {
136
+ if (this.channel != null && this.channel.isOpen()) {
137
+ final byte[] message;
138
+ // check for the unlikely case that a client has coincidentally submitted the TERMINATION_COMMAND; we escape by
139
+ // appending a random byte which makes the length differ; the input stream provider will skip one byte after receiving
140
+ // the TERMINATION_COMMAND at the beginning of a message whose length is greater than that of the TERMINATION_COMMAND.
141
+ if (startsWithTerminationCommand(streamBuffer, count)) {
142
+ message = new byte[count+1];
143
+ } else {
144
+ message = new byte[count];
145
+ }
146
+ System.arraycopy(streamBuffer, 0, message, 0, count);
147
+ this.channel.basicPublish(/* empty exchange name means the default exchange with direct routing;
148
+ all queues by default bind to this exchange, using the queue name
149
+ as the routing key.
150
+ See also https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default
151
+ */ "", /* routingKey */ queueName, /* properties */ null, message);
152
+ count = 0;
142 153
} else {
143
- message = new byte[count];
154
+ this.closed = true;
155
+ throw new IOException("AMPQ Channel seems to be closed!");
144 156
}
145
- System.arraycopy(streamBuffer, 0, message, 0, count);
146
- this.channel.basicPublish(/* empty exchange name means the default exchange with direct routing;
147
- all queues by default bind to this exchange, using the queue name
148
- as the routing key.
149
- See also https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default
150
- */ "", /* routingKey */ queueName, /* properties */ null, message);
151
- count = 0;
152
- } else {
153
- this.closed = true;
154
- throw new IOException("AMPQ Channel seems to be closed!");
155 157
}
158
+ } catch (IOException e) {
159
+ logger.log(Level.SEVERE, "IOException while sending data to RabbitMQ queue "+queueName, e);
160
+ throw e; // re-throw; callers will have to know that their write failed
161
+ // The re-throw will most likely lead to a writeFatalException(...) on the ObjectOutputStream
162
+ // which doesn't help really because it breaks the protocol.
156 163
}
157 164
}
158 165