1
2
3
4 package com.microsoft.azure.servicebus.amqp;
5
6 import java.security.NoSuchAlgorithmException;
7 import java.util.HashMap;
8 import java.util.Map;
9
10 import javax.net.ssl.SSLContext;
11
12 import com.microsoft.azure.servicebus.primitives.TransportType;
13 import org.apache.qpid.proton.Proton;
14 import org.apache.qpid.proton.amqp.Symbol;
15 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
16 import org.apache.qpid.proton.engine.BaseHandler;
17 import org.apache.qpid.proton.engine.Connection;
18 import org.apache.qpid.proton.engine.EndpointState;
19 import org.apache.qpid.proton.engine.Event;
20 import org.apache.qpid.proton.engine.Sasl;
21 import org.apache.qpid.proton.engine.SslDomain;
22 import org.apache.qpid.proton.engine.SslPeerDetails;
23 import org.apache.qpid.proton.engine.Transport;
24 import org.apache.qpid.proton.engine.impl.TransportInternal;
25 import org.apache.qpid.proton.reactor.Handshaker;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.microsoft.azure.servicebus.primitives.ClientConstants;
30 import com.microsoft.azure.servicebus.primitives.StringUtil;
31
32
33
34 public class ConnectionHandler extends BaseHandler {
35 private static final SslDomain.VerifyMode VERIFY_MODE;
36 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
37 protected final IAmqpConnection messagingFactory;
38
39 static {
40 String verifyModePropValue = System.getProperty(ClientConstants.SSL_VERIFY_MODE_PROPERTY_NAME);
41 if (ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue)) {
42 VERIFY_MODE = SslDomain.VerifyMode.ANONYMOUS_PEER;
43 } else if (ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue)) {
44 VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER;
45 } else {
46 VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME;
47 }
48 }
49
50 protected ConnectionHandler(final IAmqpConnection messagingFactory) {
51 add(new Handshaker());
52 this.messagingFactory = messagingFactory;
53 }
54
55 public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory) {
56 switch (transportType) {
57 case AMQP_WEB_SOCKETS:
58 if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
59 return new ProxyConnectionHandler(messagingFactory);
60 } else {
61 return new WebSocketConnectionHandler(messagingFactory);
62 }
63 case AMQP:
64 default:
65 return new ConnectionHandler(messagingFactory);
66 }
67 }
68
69 @Override
70 public void onConnectionInit(Event event) {
71 final Connection connection = event.getConnection();
72 final String hostName = new StringBuilder(messagingFactory.getHostName())
73 .append(":")
74 .append(String.valueOf(this.getProtocolPort()))
75 .toString();
76 TRACE_LOGGER.debug("onConnectionInit: hostname:{}", hostName);
77 connection.setHostname(hostName);
78 connection.setContainer(StringUtil.getShortRandomString());
79
80 final Map<Symbol, Object> connectionProperties = new HashMap<Symbol, Object>();
81 connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME);
82 connectionProperties.put(AmqpConstants.VERSION, ClientConstants.CURRENT_JAVACLIENT_VERSION);
83 connectionProperties.put(AmqpConstants.PLATFORM, ClientConstants.PLATFORM_INFO);
84 connection.setProperties(connectionProperties);
85
86 connection.open();
87 }
88
89 protected IAmqpConnection getMessagingFactory() {
90 return this.messagingFactory;
91 }
92
93 public void addTransportLayers(final Event event, final TransportInternal transport) {
94 SslDomain domain = Proton.sslDomain();
95 domain.init(SslDomain.Mode.CLIENT);
96
97 if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME) {
98 try {
99
100 SSLContext defaultContext = SSLContext.getDefault();
101 StrictTLSContextSpitSpi.html#StrictTLSContextSpi">StrictTLSContextSpi strictTlsContextSpi = new StrictTLSContextSpi(defaultContext);
102 SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol());
103 domain.setSslContext(strictTlsContext);
104 domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
105 SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort());
106 transport.ssl(domain, peerDetails);
107 } catch (NoSuchAlgorithmException e) {
108
109 TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
110
111 }
112 } else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) {
113
114 try {
115 SSLContext defaultContext = SSLContext.getDefault();
116 domain.setSslContext(defaultContext);
117 domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
118 transport.ssl(domain);
119 } catch (NoSuchAlgorithmException e) {
120
121 TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
122
123 }
124
125 } else {
126 domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
127 transport.ssl(domain);
128 }
129 }
130
131 protected void notifyTransportErrors(final Event event) {
132
133 }
134
135 public String getOutboundSocketHostName() {
136 return messagingFactory.getHostName();
137 }
138
139 public int getOutboundSocketPort() {
140 return this.getProtocolPort();
141 }
142
143 public int getProtocolPort() {
144 return ClientConstants.AMQPS_PORT;
145 }
146
147 public int getMaxFrameSize() {
148 return AmqpConstants.MAX_FRAME_SIZE;
149 }
150
151 @Override
152 public void onConnectionBound(Event event) {
153 TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
154 Transport transport = event.getTransport();
155
156 this.addTransportLayers(event, (TransportInternal) transport);
157 Sasl sasl = transport.sasl();
158 sasl.setMechanisms("ANONYMOUS");
159 }
160
161 @Override
162 public void onTransportError(Event event) {
163 ErrorCondition condition = event.getTransport().getCondition();
164 if (condition != null) {
165 TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}, error:{}", event.getConnection().getHostname(), condition.getDescription());
166 } else {
167 TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}. error:{}", event.getConnection().getHostname(), "no description returned");
168 }
169
170 this.messagingFactory.onConnectionError(condition);
171 Connection connection = event.getConnection();
172 if (connection != null) {
173 connection.free();
174 }
175
176 this.notifyTransportErrors(event);
177 }
178
179 @Override
180 public void onConnectionRemoteOpen(Event event) {
181 TRACE_LOGGER.debug("Connection.onConnectionRemoteOpen: hostname:{}, remotecontainer:{}", event.getConnection().getHostname(), event.getConnection().getRemoteContainer());
182 this.messagingFactory.onConnectionOpen();
183 }
184
185 @Override
186 public void onConnectionRemoteClose(Event event) {
187 final Connection connection = event.getConnection();
188 final ErrorCondition error = connection.getRemoteCondition();
189
190 TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);
191 boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
192 this.messagingFactory.onConnectionError(error);
193 if (shouldFreeConnection) {
194 connection.free();
195 }
196 }
197
198 @Override
199 public void onConnectionFinal(Event event) {
200 TRACE_LOGGER.debug("onConnectionFinal: hostname:{}", event.getConnection().getHostname());
201 }
202
203 @Override
204 public void onConnectionLocalClose(Event event) {
205 Connection connection = event.getConnection();
206 TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
207 if (connection.getRemoteState() == EndpointState.CLOSED) {
208
209 if (connection.getTransport() != null) {
210 connection.getTransport().unbind();
211 }
212
213 connection.free();
214 }
215 }
216 }