View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.servicebus.amqp;
5   
6   import java.security.NoSuchAlgorithmException;
7   import java.util.HashMap;
8   import java.util.Map;
9   
10  import javax.net.ssl.SSLContext;
11  
12  import com.microsoft.azure.servicebus.primitives.TransportType;
13  import org.apache.qpid.proton.Proton;
14  import org.apache.qpid.proton.amqp.Symbol;
15  import org.apache.qpid.proton.amqp.transport.ErrorCondition;
16  import org.apache.qpid.proton.engine.BaseHandler;
17  import org.apache.qpid.proton.engine.Connection;
18  import org.apache.qpid.proton.engine.EndpointState;
19  import org.apache.qpid.proton.engine.Event;
20  import org.apache.qpid.proton.engine.Sasl;
21  import org.apache.qpid.proton.engine.SslDomain;
22  import org.apache.qpid.proton.engine.SslPeerDetails;
23  import org.apache.qpid.proton.engine.Transport;
24  import org.apache.qpid.proton.engine.impl.TransportInternal;
25  import org.apache.qpid.proton.reactor.Handshaker;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import com.microsoft.azure.servicebus.primitives.ClientConstants;
30  import com.microsoft.azure.servicebus.primitives.StringUtil;
31  
32  // ServiceBus <-> ProtonReactor interaction handles all
33  // amqp_connection/transport related events from reactor
34  public class ConnectionHandler extends BaseHandler {
35      private static final SslDomain.VerifyMode VERIFY_MODE;
36      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
37      protected final IAmqpConnection messagingFactory;
38  
39      static {
40          String verifyModePropValue = System.getProperty(ClientConstants.SSL_VERIFY_MODE_PROPERTY_NAME);
41          if (ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue)) {
42              VERIFY_MODE = SslDomain.VerifyMode.ANONYMOUS_PEER;
43          } else if (ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue)) {
44              VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER;
45          } else {
46              VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME;
47          }
48      }
49  
50      protected ConnectionHandler(final IAmqpConnection messagingFactory) {
51          add(new Handshaker());
52          this.messagingFactory = messagingFactory;
53      }
54  
55      public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory) {
56          switch (transportType) {
57              case AMQP_WEB_SOCKETS:
58                  if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
59                      return new ProxyConnectionHandler(messagingFactory);
60                  } else {
61                      return new WebSocketConnectionHandler(messagingFactory);
62                  }
63              case AMQP:
64              default:
65                  return new ConnectionHandler(messagingFactory);
66          }
67      }
68  
69      @Override
70      public void onConnectionInit(Event event) {
71          final Connection connection = event.getConnection();
72          final String hostName = new StringBuilder(messagingFactory.getHostName())
73                                      .append(":")
74                                      .append(String.valueOf(this.getProtocolPort()))
75                                      .toString();
76          TRACE_LOGGER.debug("onConnectionInit: hostname:{}", hostName);
77          connection.setHostname(hostName);
78          connection.setContainer(StringUtil.getShortRandomString());
79  
80          final Map<Symbol, Object> connectionProperties = new HashMap<Symbol, Object>();
81          connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME);
82          connectionProperties.put(AmqpConstants.VERSION, ClientConstants.CURRENT_JAVACLIENT_VERSION);
83          connectionProperties.put(AmqpConstants.PLATFORM, ClientConstants.PLATFORM_INFO);
84          connection.setProperties(connectionProperties);
85          
86          connection.open();
87      }
88  
89      protected IAmqpConnection getMessagingFactory() {
90          return this.messagingFactory;
91      }
92  
93      public void addTransportLayers(final Event event, final TransportInternal transport) {
94          SslDomain domain = Proton.sslDomain();
95          domain.init(SslDomain.Mode.CLIENT);
96  
97          if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME) {
98              try {
99                  // Default SSL context will have the root certificate from azure in truststore anyway
100                 SSLContext defaultContext = SSLContext.getDefault();
101                 StrictTLSContextSpitSpi.html#StrictTLSContextSpi">StrictTLSContextSpi strictTlsContextSpi = new StrictTLSContextSpi(defaultContext);
102                 SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol());
103                 domain.setSslContext(strictTlsContext);
104                 domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
105                 SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort());
106                 transport.ssl(domain, peerDetails);
107             } catch (NoSuchAlgorithmException e) {
108                 // Should never happen
109                 TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
110 //                this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage()));
111             }
112         } else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) {
113             // Default SSL context will have the root certificate from azure in truststore anyway
114             try {
115                 SSLContext defaultContext = SSLContext.getDefault();
116                 domain.setSslContext(defaultContext);
117                 domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
118                 transport.ssl(domain);
119             } catch (NoSuchAlgorithmException e) {
120                 // Should never happen
121                 TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
122 //                this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage()));
123             }
124 
125         } else {
126             domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
127             transport.ssl(domain);
128         }
129     }
130 
131     protected void notifyTransportErrors(final Event event) {
132         /* no-op */
133     }
134 
135     public String getOutboundSocketHostName() {
136         return messagingFactory.getHostName();
137     }
138 
139     public int getOutboundSocketPort() {
140         return this.getProtocolPort();
141     }
142 
143     public int getProtocolPort() {
144         return ClientConstants.AMQPS_PORT;
145     }
146 
147     public int getMaxFrameSize() {
148         return AmqpConstants.MAX_FRAME_SIZE;
149     }
150 
151     @Override
152     public void onConnectionBound(Event event) {
153         TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
154         Transport transport = event.getTransport();
155 
156         this.addTransportLayers(event, (TransportInternal) transport);
157         Sasl sasl = transport.sasl();
158         sasl.setMechanisms("ANONYMOUS");
159     }
160 
161     @Override
162     public void onTransportError(Event event) {
163         ErrorCondition condition = event.getTransport().getCondition();
164         if (condition != null) {
165             TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}, error:{}", event.getConnection().getHostname(), condition.getDescription());
166         } else {
167             TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}. error:{}", event.getConnection().getHostname(), "no description returned");
168         }
169 
170         this.messagingFactory.onConnectionError(condition);
171         Connection connection = event.getConnection();
172         if (connection != null) {
173             connection.free();
174         }
175 
176         this.notifyTransportErrors(event);
177     }
178 
179     @Override
180     public void onConnectionRemoteOpen(Event event) {
181         TRACE_LOGGER.debug("Connection.onConnectionRemoteOpen: hostname:{}, remotecontainer:{}", event.getConnection().getHostname(), event.getConnection().getRemoteContainer());
182         this.messagingFactory.onConnectionOpen();
183     }
184 
185     @Override
186     public void onConnectionRemoteClose(Event event) {
187         final Connection connection = event.getConnection();
188         final ErrorCondition error = connection.getRemoteCondition();
189 
190         TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);
191         boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
192         this.messagingFactory.onConnectionError(error);
193         if (shouldFreeConnection) {
194             connection.free();
195         }
196     }
197 
198     @Override
199     public void onConnectionFinal(Event event) {
200         TRACE_LOGGER.debug("onConnectionFinal: hostname:{}", event.getConnection().getHostname());
201     }
202 
203     @Override
204     public void onConnectionLocalClose(Event event) {
205         Connection connection = event.getConnection();
206         TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
207         if (connection.getRemoteState() == EndpointState.CLOSED) {
208             // Service closed it first. In some such cases transport is not unbound and causing a leak.
209             if (connection.getTransport() != null) {
210                 connection.getTransport().unbind();
211             }
212 
213             connection.free();
214         }
215     }
216 }