Best Venom code snippet using ssh.New
tunnelServer.go
Source:tunnelServer.go
...88 tunnelProtocol string89 port int90 BPFProgramName string91}92// NewTunnelServer initializes a new tunnel server.93func NewTunnelServer(94 support *SupportServices,95 shutdownBroadcast <-chan struct{}) (*TunnelServer, error) {96 sshServer, err := newSSHServer(support, shutdownBroadcast)97 if err != nil {98 return nil, errors.Trace(err)99 }100 return &TunnelServer{101 runWaitGroup: new(sync.WaitGroup),102 listenerError: make(chan error),103 shutdownBroadcast: shutdownBroadcast,104 sshServer: sshServer,105 }, nil106}107// Run runs the tunnel server; this function blocks while running a selection of108// listeners that handle connection using various obfuscation protocols.109//110// Run listens on each designated tunnel port and spawns new goroutines to handle111// each client connection. It halts when shutdownBroadcast is signaled. A list of active112// clients is maintained, and when halting all clients are cleanly shutdown.113//114// Each client goroutine handles its own obfuscation (optional), SSH handshake, SSH115// authentication, and then looping on client new channel requests. "direct-tcpip"116// channels, dynamic port fowards, are supported. When the UDPInterceptUdpgwServerAddress117// config parameter is configured, UDP port forwards over a TCP stream, following118// the udpgw protocol, are handled.119//120// A new goroutine is spawned to handle each port forward for each client. Each port121// forward tracks its bytes transferred. Overall per-client stats for connection duration,122// GeoIP, number of port forwards, and bytes transferred are tracked and logged when the123// client shuts down.124//125// Note: client handler goroutines may still be shutting down after Run() returns. See126// comment in sshClient.stop(). TODO: fully synchronized shutdown.127func (server *TunnelServer) Run() error {128 // TODO: should TunnelServer hold its own support pointer?129 support := server.sshServer.support130 // First bind all listeners; once all are successful,131 // start accepting connections on each.132 var listeners []*sshListener133 for tunnelProtocol, listenPort := range support.Config.TunnelProtocolPorts {134 localAddress := net.JoinHostPort(135 support.Config.ServerIPAddress, strconv.Itoa(listenPort))136 var listener net.Listener137 var BPFProgramName string138 var err error139 if protocol.TunnelProtocolUsesFrontedMeekQUIC(tunnelProtocol) {140 // For FRONTED-MEEK-QUIC-OSSH, no listener implemented. The edge-to-server141 // hop uses HTTPS and the client tunnel protocol is distinguished using142 // protocol.MeekCookieData.ClientTunnelProtocol.143 continue144 } else if protocol.TunnelProtocolUsesQUIC(tunnelProtocol) {145 logTunnelProtocol := tunnelProtocol146 listener, err = quic.Listen(147 CommonLogger(log),148 func(clientAddress string, err error, logFields common.LogFields) {149 logIrregularTunnel(150 support, logTunnelProtocol, listenPort, clientAddress,151 errors.Trace(err), LogFields(logFields))152 },153 localAddress,154 support.Config.ObfuscatedSSHKey,155 support.Config.EnableGQUIC)156 } else if protocol.TunnelProtocolUsesRefractionNetworking(tunnelProtocol) {157 listener, err = refraction.Listen(localAddress)158 } else if protocol.TunnelProtocolUsesFrontedMeek(tunnelProtocol) {159 listener, err = net.Listen("tcp", localAddress)160 } else {161 // Only direct, unfronted protocol listeners use TCP BPF circumvention162 // programs.163 listener, BPFProgramName, err = newTCPListenerWithBPF(support, localAddress)164 }165 if err != nil {166 for _, existingListener := range listeners {167 existingListener.Listener.Close()168 }169 return errors.Trace(err)170 }171 tacticsListener := NewTacticsListener(172 support,173 listener,174 tunnelProtocol,175 func(IP string) GeoIPData { return support.GeoIPService.Lookup(IP) })176 log.WithTraceFields(177 LogFields{178 "localAddress": localAddress,179 "tunnelProtocol": tunnelProtocol,180 "BPFProgramName": BPFProgramName,181 }).Info("listening")182 listeners = append(183 listeners,184 &sshListener{185 Listener: tacticsListener,186 localAddress: localAddress,187 port: listenPort,188 tunnelProtocol: tunnelProtocol,189 BPFProgramName: BPFProgramName,190 })191 }192 for _, listener := range listeners {193 server.runWaitGroup.Add(1)194 go func(listener *sshListener) {195 defer server.runWaitGroup.Done()196 log.WithTraceFields(197 LogFields{198 "localAddress": listener.localAddress,199 "tunnelProtocol": listener.tunnelProtocol,200 }).Info("running")201 server.sshServer.runListener(202 listener,203 server.listenerError)204 log.WithTraceFields(205 LogFields{206 "localAddress": listener.localAddress,207 "tunnelProtocol": listener.tunnelProtocol,208 }).Info("stopped")209 }(listener)210 }211 var err error212 select {213 case <-server.shutdownBroadcast:214 case err = <-server.listenerError:215 }216 for _, listener := range listeners {217 listener.Close()218 }219 server.sshServer.stopClients()220 server.runWaitGroup.Wait()221 log.WithTrace().Info("stopped")222 return err223}224// GetLoadStats returns load stats for the tunnel server. The stats are225// broken down by protocol ("SSH", "OSSH", etc.) and type. Types of stats226// include current connected client count, total number of current port227// forwards.228func (server *TunnelServer) GetLoadStats() (229 UpstreamStats, ProtocolStats, RegionStats) {230 return server.sshServer.getLoadStats()231}232// GetEstablishedClientCount returns the number of currently established233// clients.234func (server *TunnelServer) GetEstablishedClientCount() int {235 return server.sshServer.getEstablishedClientCount()236}237// ResetAllClientTrafficRules resets all established client traffic rules238// to use the latest config and client properties. Any existing traffic239// rule state is lost, including throttling state.240func (server *TunnelServer) ResetAllClientTrafficRules() {241 server.sshServer.resetAllClientTrafficRules()242}243// ResetAllClientOSLConfigs resets all established client OSL state to use244// the latest OSL config. Any existing OSL state is lost, including partial245// progress towards SLOKs.246func (server *TunnelServer) ResetAllClientOSLConfigs() {247 server.sshServer.resetAllClientOSLConfigs()248}249// SetClientHandshakeState sets the handshake state -- that it completed and250// what parameters were passed -- in sshClient. This state is used for allowing251// port forwards and for future traffic rule selection. SetClientHandshakeState252// also triggers an immediate traffic rule re-selection, as the rules selected253// upon tunnel establishment may no longer apply now that handshake values are254// set.255//256// The authorizations received from the client handshake are verified and the257// resulting list of authorized access types are applied to the client's tunnel258// and traffic rules.259//260// A list of active authorization IDs, authorized access types, and traffic261// rate limits are returned for responding to the client and logging.262func (server *TunnelServer) SetClientHandshakeState(263 sessionID string,264 state handshakeState,265 authorizations []string) (*handshakeStateInfo, error) {266 return server.sshServer.setClientHandshakeState(sessionID, state, authorizations)267}268// GetClientHandshaked indicates whether the client has completed a handshake269// and whether its traffic rules are immediately exhausted.270func (server *TunnelServer) GetClientHandshaked(271 sessionID string) (bool, bool, error) {272 return server.sshServer.getClientHandshaked(sessionID)273}274// GetClientDisableDiscovery indicates whether discovery is disabled for the275// client corresponding to sessionID.276func (server *TunnelServer) GetClientDisableDiscovery(277 sessionID string) (bool, error) {278 return server.sshServer.getClientDisableDiscovery(sessionID)279}280// UpdateClientAPIParameters updates the recorded handshake API parameters for281// the client corresponding to sessionID.282func (server *TunnelServer) UpdateClientAPIParameters(283 sessionID string,284 apiParams common.APIParameters) error {285 return server.sshServer.updateClientAPIParameters(sessionID, apiParams)286}287// AcceptClientDomainBytes indicates whether to accept domain bytes reported288// by the client.289func (server *TunnelServer) AcceptClientDomainBytes(290 sessionID string) (bool, error) {291 return server.sshServer.acceptClientDomainBytes(sessionID)292}293// SetEstablishTunnels sets whether new tunnels may be established or not.294// When not establishing, incoming connections are immediately closed.295func (server *TunnelServer) SetEstablishTunnels(establish bool) {296 server.sshServer.setEstablishTunnels(establish)297}298// CheckEstablishTunnels returns whether new tunnels may be established or299// not, and increments a metrics counter when establishment is disallowed.300func (server *TunnelServer) CheckEstablishTunnels() bool {301 return server.sshServer.checkEstablishTunnels()302}303// GetEstablishTunnelsMetrics returns whether tunnel establishment is304// currently allowed and the number of tunnels rejected since due to not305// establishing since the last GetEstablishTunnelsMetrics call.306func (server *TunnelServer) GetEstablishTunnelsMetrics() (bool, int64) {307 return server.sshServer.getEstablishTunnelsMetrics()308}309type sshServer struct {310 // Note: 64-bit ints used with atomic operations are placed311 // at the start of struct to ensure 64-bit alignment.312 // (https://golang.org/pkg/sync/atomic/#pkg-note-BUG)313 lastAuthLog int64314 authFailedCount int64315 establishLimitedCount int64316 support *SupportServices317 establishTunnels int32318 concurrentSSHHandshakes semaphore.Semaphore319 shutdownBroadcast <-chan struct{}320 sshHostKey ssh.Signer321 clientsMutex sync.Mutex322 stoppingClients bool323 acceptedClientCounts map[string]map[string]int64324 clients map[string]*sshClient325 oslSessionCacheMutex sync.Mutex326 oslSessionCache *cache.Cache327 authorizationSessionIDsMutex sync.Mutex328 authorizationSessionIDs map[string]string329 obfuscatorSeedHistory *obfuscator.SeedHistory330}331func newSSHServer(332 support *SupportServices,333 shutdownBroadcast <-chan struct{}) (*sshServer, error) {334 privateKey, err := ssh.ParseRawPrivateKey([]byte(support.Config.SSHPrivateKey))335 if err != nil {336 return nil, errors.Trace(err)337 }338 // TODO: use cert (ssh.NewCertSigner) for anti-fingerprint?339 signer, err := ssh.NewSignerFromKey(privateKey)340 if err != nil {341 return nil, errors.Trace(err)342 }343 var concurrentSSHHandshakes semaphore.Semaphore344 if support.Config.MaxConcurrentSSHHandshakes > 0 {345 concurrentSSHHandshakes = semaphore.New(support.Config.MaxConcurrentSSHHandshakes)346 }347 // The OSL session cache temporarily retains OSL seed state348 // progress for disconnected clients. This enables clients349 // that disconnect and immediately reconnect to the same350 // server to resume their OSL progress. Cached progress351 // is referenced by session ID and is retained for352 // OSL_SESSION_CACHE_TTL after disconnect.353 //354 // Note: session IDs are assumed to be unpredictable. If a355 // rogue client could guess the session ID of another client,356 // it could resume its OSL progress and, if the OSL config357 // were known, infer some activity.358 oslSessionCache := cache.New(OSL_SESSION_CACHE_TTL, 1*time.Minute)359 return &sshServer{360 support: support,361 establishTunnels: 1,362 concurrentSSHHandshakes: concurrentSSHHandshakes,363 shutdownBroadcast: shutdownBroadcast,364 sshHostKey: signer,365 acceptedClientCounts: make(map[string]map[string]int64),366 clients: make(map[string]*sshClient),367 oslSessionCache: oslSessionCache,368 authorizationSessionIDs: make(map[string]string),369 obfuscatorSeedHistory: obfuscator.NewSeedHistory(nil),370 }, nil371}372func (sshServer *sshServer) setEstablishTunnels(establish bool) {373 // Do nothing when the setting is already correct. This avoids374 // spurious log messages when setEstablishTunnels is called375 // periodically with the same setting.376 if establish == (atomic.LoadInt32(&sshServer.establishTunnels) == 1) {377 return378 }379 establishFlag := int32(1)380 if !establish {381 establishFlag = 0382 }383 atomic.StoreInt32(&sshServer.establishTunnels, establishFlag)384 log.WithTraceFields(385 LogFields{"establish": establish}).Info("establishing tunnels")386}387func (sshServer *sshServer) checkEstablishTunnels() bool {388 establishTunnels := atomic.LoadInt32(&sshServer.establishTunnels) == 1389 if !establishTunnels {390 atomic.AddInt64(&sshServer.establishLimitedCount, 1)391 }392 return establishTunnels393}394func (sshServer *sshServer) getEstablishTunnelsMetrics() (bool, int64) {395 return atomic.LoadInt32(&sshServer.establishTunnels) == 1,396 atomic.SwapInt64(&sshServer.establishLimitedCount, 0)397}398// runListener is intended to run an a goroutine; it blocks399// running a particular listener. If an unrecoverable error400// occurs, it will send the error to the listenerError channel.401func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError chan<- error) {402 handleClient := func(clientTunnelProtocol string, clientConn net.Conn) {403 // Note: establish tunnel limiter cannot simply stop TCP404 // listeners in all cases (e.g., meek) since SSH tunnels can405 // span multiple TCP connections.406 if !sshServer.checkEstablishTunnels() {407 log.WithTrace().Debug("not establishing tunnels")408 clientConn.Close()409 return410 }411 // tunnelProtocol is used for stats and traffic rules. In many cases, its412 // value is unambiguously determined by the listener port. In certain cases,413 // such as multiple fronted protocols with a single backend listener, the414 // client's reported tunnel protocol value is used. The caller must validate415 // clientTunnelProtocol with protocol.IsValidClientTunnelProtocol.416 tunnelProtocol := sshListener.tunnelProtocol417 if clientTunnelProtocol != "" {418 tunnelProtocol = clientTunnelProtocol419 }420 // sshListener.tunnelProtocol indictes the tunnel protocol run by the421 // listener. For direct protocols, this is also the client tunnel protocol.422 // For fronted protocols, the client may use a different protocol to connect423 // to the front and then only the front-to-Psiphon server will use the424 // listener protocol.425 //426 // A fronted meek client, for example, reports its first hop protocol in427 // protocol.MeekCookieData.ClientTunnelProtocol. Most metrics record this428 // value as relay_protocol, since the first hop is the one subject to429 // adversarial conditions. In some cases, such as irregular tunnels, there430 // is no ClientTunnelProtocol value available and the listener tunnel431 // protocol will be logged.432 //433 // Similarly, listenerPort indicates the listening port, which is the dialed434 // port number for direct protocols; while, for fronted protocols, the435 // client may dial a different port for its first hop.436 // Process each client connection concurrently.437 go sshServer.handleClient(sshListener, tunnelProtocol, clientConn)438 }439 // Note: when exiting due to a unrecoverable error, be sure440 // to try to send the error to listenerError so that the outer441 // TunnelServer.Run will properly shut down instead of remaining442 // running.443 if protocol.TunnelProtocolUsesMeekHTTP(sshListener.tunnelProtocol) ||444 protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol) {445 meekServer, err := NewMeekServer(446 sshServer.support,447 sshListener.Listener,448 sshListener.tunnelProtocol,449 sshListener.port,450 protocol.TunnelProtocolUsesMeekHTTPS(sshListener.tunnelProtocol),451 protocol.TunnelProtocolUsesFrontedMeek(sshListener.tunnelProtocol),452 protocol.TunnelProtocolUsesObfuscatedSessionTickets(sshListener.tunnelProtocol),453 handleClient,454 sshServer.shutdownBroadcast)455 if err == nil {456 err = meekServer.Run()457 }458 if err != nil {459 select {460 case listenerError <- errors.Trace(err):461 default:462 }463 return464 }465 } else {466 for {467 conn, err := sshListener.Listener.Accept()468 select {469 case <-sshServer.shutdownBroadcast:470 if err == nil {471 conn.Close()472 }473 return474 default:475 }476 if err != nil {477 if e, ok := err.(net.Error); ok && e.Temporary() {478 log.WithTraceFields(LogFields{"error": err}).Error("accept failed")479 // Temporary error, keep running480 continue481 }482 select {483 case listenerError <- errors.Trace(err):484 default:485 }486 return487 }488 handleClient("", conn)489 }490 }491}492// An accepted client has completed a direct TCP or meek connection and has a net.Conn. Registration493// is for tracking the number of connections.494func (sshServer *sshServer) registerAcceptedClient(tunnelProtocol, region string) {495 sshServer.clientsMutex.Lock()496 defer sshServer.clientsMutex.Unlock()497 if sshServer.acceptedClientCounts[tunnelProtocol] == nil {498 sshServer.acceptedClientCounts[tunnelProtocol] = make(map[string]int64)499 }500 sshServer.acceptedClientCounts[tunnelProtocol][region] += 1501}502func (sshServer *sshServer) unregisterAcceptedClient(tunnelProtocol, region string) {503 sshServer.clientsMutex.Lock()504 defer sshServer.clientsMutex.Unlock()505 sshServer.acceptedClientCounts[tunnelProtocol][region] -= 1506}507// An established client has completed its SSH handshake and has a ssh.Conn. Registration is508// for tracking the number of fully established clients and for maintaining a list of running509// clients (for stopping at shutdown time).510func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool {511 sshServer.clientsMutex.Lock()512 if sshServer.stoppingClients {513 sshServer.clientsMutex.Unlock()514 return false515 }516 // In the case of a duplicate client sessionID, the previous client is closed.517 // - Well-behaved clients generate a random sessionID that should be unique (won't518 // accidentally conflict) and hard to guess (can't be targeted by a malicious519 // client).520 // - Clients reuse the same sessionID when a tunnel is unexpectedly disconnected521 // and reestablished. In this case, when the same server is selected, this logic522 // will be hit; closing the old, dangling client is desirable.523 // - Multi-tunnel clients should not normally use one server for multiple tunnels.524 existingClient := sshServer.clients[client.sessionID]525 sshServer.clientsMutex.Unlock()526 if existingClient != nil {527 // This case is expected to be common, and so logged at the lowest severity528 // level.529 log.WithTrace().Debug(530 "stopping existing client with duplicate session ID")531 existingClient.stop()532 // Block until the existingClient is fully terminated. This is necessary to533 // avoid this scenario:534 // - existingClient is invoking handshakeAPIRequestHandler535 // - sshServer.clients[client.sessionID] is updated to point to new client536 // - existingClient's handshakeAPIRequestHandler invokes537 // SetClientHandshakeState but sets the handshake parameters for new538 // client539 // - as a result, the new client handshake will fail (only a single handshake540 // is permitted) and the new client server_tunnel log will contain an541 // invalid mix of existing/new client fields542 //543 // Once existingClient.awaitStopped returns, all existingClient port544 // forwards and request handlers have terminated, so no API handler, either545 // tunneled web API or SSH API, will remain and it is safe to point546 // sshServer.clients[client.sessionID] to the new client.547 // Limitation: this scenario remains possible with _untunneled_ web API548 // requests.549 //550 // Blocking also ensures existingClient.releaseAuthorizations is invoked before551 // the new client attempts to submit the same authorizations.552 //553 // Perform blocking awaitStopped operation outside the554 // sshServer.clientsMutex mutex to avoid blocking all other clients for the555 // duration. We still expect and require that the stop process completes556 // rapidly, e.g., does not block on network I/O, allowing the new client557 // connection to proceed without delay.558 //559 // In addition, operations triggered by stop, and which must complete before560 // awaitStopped returns, will attempt to lock sshServer.clientsMutex,561 // including unregisterEstablishedClient.562 existingClient.awaitStopped()563 }564 sshServer.clientsMutex.Lock()565 defer sshServer.clientsMutex.Unlock()566 // existingClient's stop will have removed it from sshServer.clients via567 // unregisterEstablishedClient, so sshServer.clients[client.sessionID] should568 // be nil -- unless yet another client instance using the same sessionID has569 // connected in the meantime while awaiting existingClient stop. In this570 // case, it's not clear which is the most recent connection from the client,571 // so instead of this connection terminating more peers, it aborts.572 if sshServer.clients[client.sessionID] != nil {573 // As this is expected to be rare case, it's logged at a higher severity574 // level.575 log.WithTrace().Warning(576 "aborting new client with duplicate session ID")577 return false578 }579 sshServer.clients[client.sessionID] = client580 return true581}582func (sshServer *sshServer) unregisterEstablishedClient(client *sshClient) {583 sshServer.clientsMutex.Lock()584 registeredClient := sshServer.clients[client.sessionID]585 // registeredClient will differ from client when client is the existingClient586 // terminated in registerEstablishedClient. In that case, registeredClient587 // remains connected, and the sshServer.clients entry should be retained.588 if registeredClient == client {589 delete(sshServer.clients, client.sessionID)590 }591 sshServer.clientsMutex.Unlock()592 client.stop()593}594type UpstreamStats map[string]interface{}595type ProtocolStats map[string]map[string]interface{}596type RegionStats map[string]map[string]map[string]interface{}597func (sshServer *sshServer) getLoadStats() (598 UpstreamStats, ProtocolStats, RegionStats) {599 sshServer.clientsMutex.Lock()600 defer sshServer.clientsMutex.Unlock()601 // Explicitly populate with zeros to ensure 0 counts in log messages.602 zeroClientStats := func() map[string]interface{} {603 stats := make(map[string]interface{})604 stats["accepted_clients"] = int64(0)605 stats["established_clients"] = int64(0)606 return stats607 }608 // Due to hot reload and changes to the underlying system configuration, the609 // set of resolver IPs may change between getLoadStats calls, so this610 // enumeration for zeroing is a best effort.611 resolverIPs := sshServer.support.DNSResolver.GetAll()612 // Fields which are primarily concerned with upstream/egress performance.613 zeroUpstreamStats := func() map[string]interface{} {614 stats := make(map[string]interface{})615 stats["dialing_tcp_port_forwards"] = int64(0)616 stats["tcp_port_forwards"] = int64(0)617 stats["total_tcp_port_forwards"] = int64(0)618 stats["udp_port_forwards"] = int64(0)619 stats["total_udp_port_forwards"] = int64(0)620 stats["tcp_port_forward_dialed_count"] = int64(0)621 stats["tcp_port_forward_dialed_duration"] = int64(0)622 stats["tcp_port_forward_failed_count"] = int64(0)623 stats["tcp_port_forward_failed_duration"] = int64(0)624 stats["tcp_port_forward_rejected_dialing_limit_count"] = int64(0)625 stats["tcp_port_forward_rejected_disallowed_count"] = int64(0)626 stats["udp_port_forward_rejected_disallowed_count"] = int64(0)627 stats["tcp_ipv4_port_forward_dialed_count"] = int64(0)628 stats["tcp_ipv4_port_forward_dialed_duration"] = int64(0)629 stats["tcp_ipv4_port_forward_failed_count"] = int64(0)630 stats["tcp_ipv4_port_forward_failed_duration"] = int64(0)631 stats["tcp_ipv6_port_forward_dialed_count"] = int64(0)632 stats["tcp_ipv6_port_forward_dialed_duration"] = int64(0)633 stats["tcp_ipv6_port_forward_failed_count"] = int64(0)634 stats["tcp_ipv6_port_forward_failed_duration"] = int64(0)635 zeroDNSStats := func() map[string]int64 {636 m := map[string]int64{"ALL": 0}637 for _, resolverIP := range resolverIPs {638 m[resolverIP.String()] = 0639 }640 return m641 }642 stats["dns_count"] = zeroDNSStats()643 stats["dns_duration"] = zeroDNSStats()644 stats["dns_failed_count"] = zeroDNSStats()645 stats["dns_failed_duration"] = zeroDNSStats()646 return stats647 }648 zeroProtocolStats := func() map[string]map[string]interface{} {649 stats := make(map[string]map[string]interface{})650 stats["ALL"] = zeroClientStats()651 for tunnelProtocol := range sshServer.support.Config.TunnelProtocolPorts {652 stats[tunnelProtocol] = zeroClientStats()653 }654 return stats655 }656 addInt64 := func(stats map[string]interface{}, name string, value int64) {657 stats[name] = stats[name].(int64) + value658 }659 upstreamStats := zeroUpstreamStats()660 // [<protocol or ALL>][<stat name>] -> count661 protocolStats := zeroProtocolStats()662 // [<region][<protocol or ALL>][<stat name>] -> count663 regionStats := make(RegionStats)664 // Note: as currently tracked/counted, each established client is also an accepted client665 for tunnelProtocol, regionAcceptedClientCounts := range sshServer.acceptedClientCounts {666 for region, acceptedClientCount := range regionAcceptedClientCounts {667 if acceptedClientCount > 0 {668 if regionStats[region] == nil {669 regionStats[region] = zeroProtocolStats()670 }671 addInt64(protocolStats["ALL"], "accepted_clients", acceptedClientCount)672 addInt64(protocolStats[tunnelProtocol], "accepted_clients", acceptedClientCount)673 addInt64(regionStats[region]["ALL"], "accepted_clients", acceptedClientCount)674 addInt64(regionStats[region][tunnelProtocol], "accepted_clients", acceptedClientCount)675 }676 }677 }678 for _, client := range sshServer.clients {679 client.Lock()680 tunnelProtocol := client.tunnelProtocol681 region := client.geoIPData.Country682 if regionStats[region] == nil {683 regionStats[region] = zeroProtocolStats()684 }685 for _, stats := range []map[string]interface{}{686 protocolStats["ALL"],687 protocolStats[tunnelProtocol],688 regionStats[region]["ALL"],689 regionStats[region][tunnelProtocol]} {690 addInt64(stats, "established_clients", 1)691 }692 // Note:693 // - can't sum trafficState.peakConcurrentPortForwardCount to get a global peak694 // - client.udpTrafficState.concurrentDialingPortForwardCount isn't meaningful695 addInt64(upstreamStats, "dialing_tcp_port_forwards",696 client.tcpTrafficState.concurrentDialingPortForwardCount)697 addInt64(upstreamStats, "tcp_port_forwards",698 client.tcpTrafficState.concurrentPortForwardCount)699 addInt64(upstreamStats, "total_tcp_port_forwards",700 client.tcpTrafficState.totalPortForwardCount)701 addInt64(upstreamStats, "udp_port_forwards",702 client.udpTrafficState.concurrentPortForwardCount)703 addInt64(upstreamStats, "total_udp_port_forwards",704 client.udpTrafficState.totalPortForwardCount)705 addInt64(upstreamStats, "tcp_port_forward_dialed_count",706 client.qualityMetrics.TCPPortForwardDialedCount)707 addInt64(upstreamStats, "tcp_port_forward_dialed_duration",708 int64(client.qualityMetrics.TCPPortForwardDialedDuration/time.Millisecond))709 addInt64(upstreamStats, "tcp_port_forward_failed_count",710 client.qualityMetrics.TCPPortForwardFailedCount)711 addInt64(upstreamStats, "tcp_port_forward_failed_duration",712 int64(client.qualityMetrics.TCPPortForwardFailedDuration/time.Millisecond))713 addInt64(upstreamStats, "tcp_port_forward_rejected_dialing_limit_count",714 client.qualityMetrics.TCPPortForwardRejectedDialingLimitCount)715 addInt64(upstreamStats, "tcp_port_forward_rejected_disallowed_count",716 client.qualityMetrics.TCPPortForwardRejectedDisallowedCount)717 addInt64(upstreamStats, "udp_port_forward_rejected_disallowed_count",718 client.qualityMetrics.UDPPortForwardRejectedDisallowedCount)719 addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_count",720 client.qualityMetrics.TCPIPv4PortForwardDialedCount)721 addInt64(upstreamStats, "tcp_ipv4_port_forward_dialed_duration",722 int64(client.qualityMetrics.TCPIPv4PortForwardDialedDuration/time.Millisecond))723 addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_count",724 client.qualityMetrics.TCPIPv4PortForwardFailedCount)725 addInt64(upstreamStats, "tcp_ipv4_port_forward_failed_duration",726 int64(client.qualityMetrics.TCPIPv4PortForwardFailedDuration/time.Millisecond))727 addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_count",728 client.qualityMetrics.TCPIPv6PortForwardDialedCount)729 addInt64(upstreamStats, "tcp_ipv6_port_forward_dialed_duration",730 int64(client.qualityMetrics.TCPIPv6PortForwardDialedDuration/time.Millisecond))731 addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_count",732 client.qualityMetrics.TCPIPv6PortForwardFailedCount)733 addInt64(upstreamStats, "tcp_ipv6_port_forward_failed_duration",734 int64(client.qualityMetrics.TCPIPv6PortForwardFailedDuration/time.Millisecond))735 // DNS metrics limitations:736 // - port forwards (sshClient.handleTCPChannel) don't know or log the resolver IP.737 // - udpgw and packet tunnel transparent DNS use a heuristic to classify success/failure,738 // and there may be some delay before these code paths report DNS metrics.739 // Every client.qualityMetrics DNS map has an "ALL" entry.740 totalDNSCount := int64(0)741 totalDNSFailedCount := int64(0)742 for key, value := range client.qualityMetrics.DNSCount {743 upstreamStats["dns_count"].(map[string]int64)[key] += value744 totalDNSCount += value745 }746 for key, value := range client.qualityMetrics.DNSDuration {747 upstreamStats["dns_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)748 }749 for key, value := range client.qualityMetrics.DNSFailedCount {750 upstreamStats["dns_failed_count"].(map[string]int64)[key] += value751 totalDNSFailedCount += value752 }753 for key, value := range client.qualityMetrics.DNSFailedDuration {754 upstreamStats["dns_failed_duration"].(map[string]int64)[key] += int64(value / time.Millisecond)755 }756 // Update client peak failure rate metrics, to be recorded in757 // server_tunnel.758 //759 // Limitations:760 //761 // - This is a simple data sampling that doesn't require additional762 // timers or tracking logic. Since the rates are calculated on763 // getLoadStats events and using accumulated counts, these peaks764 // only represent the highest failure rate within a765 // Config.LoadMonitorPeriodSeconds non-sliding window. There is no766 // sample recorded for short tunnels with no overlapping767 // getLoadStats event.768 //769 // - There is no minimum sample window, as a getLoadStats event may770 // occur immediately after a client first connects. This may be771 // compensated for by adjusting772 // Config.PeakUpstreamFailureRateMinimumSampleSize, so as to only773 // consider failure rates with a larger number of samples.774 //775 // - Non-UDP "failures" are not currently tracked.776 minimumSampleSize := int64(sshServer.support.Config.peakUpstreamFailureRateMinimumSampleSize)777 sampleSize := client.qualityMetrics.TCPPortForwardDialedCount +778 client.qualityMetrics.TCPPortForwardFailedCount779 if sampleSize >= minimumSampleSize {780 TCPPortForwardFailureRate := float64(client.qualityMetrics.TCPPortForwardFailedCount) /781 float64(sampleSize)782 if client.peakMetrics.TCPPortForwardFailureRate == nil {783 client.peakMetrics.TCPPortForwardFailureRate = new(float64)784 *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate785 client.peakMetrics.TCPPortForwardFailureRateSampleSize = new(int64)786 *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize787 } else if *client.peakMetrics.TCPPortForwardFailureRate < TCPPortForwardFailureRate {788 *client.peakMetrics.TCPPortForwardFailureRate = TCPPortForwardFailureRate789 *client.peakMetrics.TCPPortForwardFailureRateSampleSize = sampleSize790 }791 }792 sampleSize = totalDNSCount + totalDNSFailedCount793 if sampleSize >= minimumSampleSize {794 DNSFailureRate := float64(totalDNSFailedCount) / float64(sampleSize)795 if client.peakMetrics.DNSFailureRate == nil {796 client.peakMetrics.DNSFailureRate = new(float64)797 *client.peakMetrics.DNSFailureRate = DNSFailureRate798 client.peakMetrics.DNSFailureRateSampleSize = new(int64)799 *client.peakMetrics.DNSFailureRateSampleSize = sampleSize800 } else if *client.peakMetrics.DNSFailureRate < DNSFailureRate {801 *client.peakMetrics.DNSFailureRate = DNSFailureRate802 *client.peakMetrics.DNSFailureRateSampleSize = sampleSize803 }804 }805 // Reset quality metrics counters806 client.qualityMetrics.reset()807 client.Unlock()808 }809 for _, client := range sshServer.clients {810 client.Lock()811 // Update client peak proximate (same region) concurrently connected812 // (other clients) client metrics, to be recorded in server_tunnel.813 // This operation requires a second loop over sshServer.clients since814 // established_clients is calculated in the first loop.815 //816 // Limitations:817 //818 // - This is an approximation, not a true peak, as it only samples819 // data every Config.LoadMonitorPeriodSeconds period. There is no820 // sample recorded for short tunnels with no overlapping821 // getLoadStats event.822 //823 // - The "-1" calculation counts all but the current client as other824 // clients; it can be the case that the same client has a dangling825 // accepted connection that has yet to time-out server side. Due to826 // NAT, we can't determine if the client is the same based on827 // network address. For established clients,828 // registerEstablishedClient ensures that any previous connection829 // is first terminated, although this is only for the same830 // session_id. Concurrent proximate clients may be considered an831 // exact number of other _network connections_, even from the same832 // client.833 region := client.geoIPData.Country834 stats := regionStats[region]["ALL"]835 n := stats["accepted_clients"].(int64) - 1836 if n >= 0 {837 if client.peakMetrics.concurrentProximateAcceptedClients == nil {838 client.peakMetrics.concurrentProximateAcceptedClients = new(int64)839 *client.peakMetrics.concurrentProximateAcceptedClients = n840 } else if *client.peakMetrics.concurrentProximateAcceptedClients < n {841 *client.peakMetrics.concurrentProximateAcceptedClients = n842 }843 }844 n = stats["established_clients"].(int64) - 1845 if n >= 0 {846 if client.peakMetrics.concurrentProximateEstablishedClients == nil {847 client.peakMetrics.concurrentProximateEstablishedClients = new(int64)848 *client.peakMetrics.concurrentProximateEstablishedClients = n849 } else if *client.peakMetrics.concurrentProximateEstablishedClients < n {850 *client.peakMetrics.concurrentProximateEstablishedClients = n851 }852 }853 client.Unlock()854 }855 return upstreamStats, protocolStats, regionStats856}857func (sshServer *sshServer) getEstablishedClientCount() int {858 sshServer.clientsMutex.Lock()859 defer sshServer.clientsMutex.Unlock()860 establishedClients := len(sshServer.clients)861 return establishedClients862}863func (sshServer *sshServer) resetAllClientTrafficRules() {864 sshServer.clientsMutex.Lock()865 clients := make(map[string]*sshClient)866 for sessionID, client := range sshServer.clients {867 clients[sessionID] = client868 }869 sshServer.clientsMutex.Unlock()870 for _, client := range clients {871 client.setTrafficRules()872 }873}874func (sshServer *sshServer) resetAllClientOSLConfigs() {875 // Flush cached seed state. This has the same effect876 // and same limitations as calling setOSLConfig for877 // currently connected clients -- all progress is lost.878 sshServer.oslSessionCacheMutex.Lock()879 sshServer.oslSessionCache.Flush()880 sshServer.oslSessionCacheMutex.Unlock()881 sshServer.clientsMutex.Lock()882 clients := make(map[string]*sshClient)883 for sessionID, client := range sshServer.clients {884 clients[sessionID] = client885 }886 sshServer.clientsMutex.Unlock()887 for _, client := range clients {888 client.setOSLConfig()889 }890}891func (sshServer *sshServer) setClientHandshakeState(892 sessionID string,893 state handshakeState,894 authorizations []string) (*handshakeStateInfo, error) {895 sshServer.clientsMutex.Lock()896 client := sshServer.clients[sessionID]897 sshServer.clientsMutex.Unlock()898 if client == nil {899 return nil, errors.TraceNew("unknown session ID")900 }901 handshakeStateInfo, err := client.setHandshakeState(902 state, authorizations)903 if err != nil {904 return nil, errors.Trace(err)905 }906 return handshakeStateInfo, nil907}908func (sshServer *sshServer) getClientHandshaked(909 sessionID string) (bool, bool, error) {910 sshServer.clientsMutex.Lock()911 client := sshServer.clients[sessionID]912 sshServer.clientsMutex.Unlock()913 if client == nil {914 return false, false, errors.TraceNew("unknown session ID")915 }916 completed, exhausted := client.getHandshaked()917 return completed, exhausted, nil918}919func (sshServer *sshServer) getClientDisableDiscovery(920 sessionID string) (bool, error) {921 sshServer.clientsMutex.Lock()922 client := sshServer.clients[sessionID]923 sshServer.clientsMutex.Unlock()924 if client == nil {925 return false, errors.TraceNew("unknown session ID")926 }927 return client.getDisableDiscovery(), nil928}929func (sshServer *sshServer) updateClientAPIParameters(930 sessionID string,931 apiParams common.APIParameters) error {932 sshServer.clientsMutex.Lock()933 client := sshServer.clients[sessionID]934 sshServer.clientsMutex.Unlock()935 if client == nil {936 return errors.TraceNew("unknown session ID")937 }938 client.updateAPIParameters(apiParams)939 return nil940}941func (sshServer *sshServer) revokeClientAuthorizations(sessionID string) {942 sshServer.clientsMutex.Lock()943 client := sshServer.clients[sessionID]944 sshServer.clientsMutex.Unlock()945 if client == nil {946 return947 }948 // sshClient.handshakeState.authorizedAccessTypes is not cleared. Clearing949 // authorizedAccessTypes may cause sshClient.logTunnel to fail to log950 // access types. As the revocation may be due to legitimate use of an951 // authorization in multiple sessions by a single client, useful metrics952 // would be lost.953 client.Lock()954 client.handshakeState.authorizationsRevoked = true955 client.Unlock()956 // Select and apply new traffic rules, as filtered by the client's new957 // authorization state.958 client.setTrafficRules()959}960func (sshServer *sshServer) acceptClientDomainBytes(961 sessionID string) (bool, error) {962 sshServer.clientsMutex.Lock()963 client := sshServer.clients[sessionID]964 sshServer.clientsMutex.Unlock()965 if client == nil {966 return false, errors.TraceNew("unknown session ID")967 }968 return client.acceptDomainBytes(), nil969}970func (sshServer *sshServer) stopClients() {971 sshServer.clientsMutex.Lock()972 sshServer.stoppingClients = true973 clients := sshServer.clients974 sshServer.clients = make(map[string]*sshClient)975 sshServer.clientsMutex.Unlock()976 for _, client := range clients {977 client.stop()978 }979}980func (sshServer *sshServer) handleClient(981 sshListener *sshListener, tunnelProtocol string, clientConn net.Conn) {982 // Calling clientConn.RemoteAddr at this point, before any Read calls,983 // satisfies the constraint documented in tapdance.Listen.984 clientAddr := clientConn.RemoteAddr()985 // Check if there were irregularities during the network connection986 // establishment. When present, log and then behave as Obfuscated SSH does987 // when the client fails to provide a valid seed message.988 //989 // One concrete irregular case is failure to send a PROXY protocol header for990 // TAPDANCE-OSSH.991 if indicator, ok := clientConn.(common.IrregularIndicator); ok {992 tunnelErr := indicator.IrregularTunnelError()993 if tunnelErr != nil {994 logIrregularTunnel(995 sshServer.support,996 sshListener.tunnelProtocol,997 sshListener.port,998 common.IPAddressFromAddr(clientAddr),999 errors.Trace(tunnelErr),1000 nil)1001 var afterFunc *time.Timer1002 if sshServer.support.Config.sshHandshakeTimeout > 0 {1003 afterFunc = time.AfterFunc(sshServer.support.Config.sshHandshakeTimeout, func() {1004 clientConn.Close()1005 })1006 }1007 io.Copy(ioutil.Discard, clientConn)1008 clientConn.Close()1009 afterFunc.Stop()1010 return1011 }1012 }1013 // Get any packet manipulation values from GetAppliedSpecName as soon as1014 // possible due to the expiring TTL.1015 serverPacketManipulation := ""1016 replayedServerPacketManipulation := false1017 if sshServer.support.Config.RunPacketManipulator &&1018 protocol.TunnelProtocolMayUseServerPacketManipulation(tunnelProtocol) {1019 // A meekConn has synthetic address values, including the original client1020 // address in cases where the client uses an upstream proxy to connect to1021 // Psiphon. For meekConn, and any other conn implementing1022 // UnderlyingTCPAddrSource, get the underlying TCP connection addresses.1023 //1024 // Limitation: a meek tunnel may consist of several TCP connections. The1025 // server_packet_manipulation metric will reflect the packet manipulation1026 // applied to the _first_ TCP connection only.1027 var localAddr, remoteAddr *net.TCPAddr1028 var ok bool1029 underlying, ok := clientConn.(common.UnderlyingTCPAddrSource)1030 if ok {1031 localAddr, remoteAddr, ok = underlying.GetUnderlyingTCPAddrs()1032 } else {1033 localAddr, ok = clientConn.LocalAddr().(*net.TCPAddr)1034 if ok {1035 remoteAddr, ok = clientConn.RemoteAddr().(*net.TCPAddr)1036 }1037 }1038 if ok {1039 specName, extraData, err := sshServer.support.PacketManipulator.1040 GetAppliedSpecName(localAddr, remoteAddr)1041 if err == nil {1042 serverPacketManipulation = specName1043 replayedServerPacketManipulation, _ = extraData.(bool)1044 }1045 }1046 }1047 geoIPData := sshServer.support.GeoIPService.Lookup(1048 common.IPAddressFromAddr(clientAddr))1049 sshServer.registerAcceptedClient(tunnelProtocol, geoIPData.Country)1050 defer sshServer.unregisterAcceptedClient(tunnelProtocol, geoIPData.Country)1051 // When configured, enforce a cap on the number of concurrent SSH1052 // handshakes. This limits load spikes on busy servers when many clients1053 // attempt to connect at once. Wait a short time, SSH_BEGIN_HANDSHAKE_TIMEOUT,1054 // to acquire; waiting will avoid immediately creating more load on another1055 // server in the network when the client tries a new candidate. Disconnect the1056 // client when that wait time is exceeded.1057 //1058 // This mechanism limits memory allocations and CPU usage associated with the1059 // SSH handshake. At this point, new direct TCP connections or new meek1060 // connections, with associated resource usage, are already established. Those1061 // connections are expected to be rate or load limited using other mechanisms.1062 //1063 // TODO:1064 //1065 // - deduct time spent acquiring the semaphore from SSH_HANDSHAKE_TIMEOUT in1066 // sshClient.run, since the client is also applying an SSH handshake timeout1067 // and won't exclude time spent waiting.1068 // - each call to sshServer.handleClient (in sshServer.runListener) is invoked1069 // in its own goroutine, but shutdown doesn't synchronously await these1070 // goroutnes. Once this is synchronizes, the following context.WithTimeout1071 // should use an sshServer parent context to ensure blocking acquires1072 // interrupt immediately upon shutdown.1073 var onSSHHandshakeFinished func()1074 if sshServer.support.Config.MaxConcurrentSSHHandshakes > 0 {1075 ctx, cancelFunc := context.WithTimeout(1076 context.Background(),1077 sshServer.support.Config.sshBeginHandshakeTimeout)1078 defer cancelFunc()1079 err := sshServer.concurrentSSHHandshakes.Acquire(ctx, 1)1080 if err != nil {1081 clientConn.Close()1082 // This is a debug log as the only possible error is context timeout.1083 log.WithTraceFields(LogFields{"error": err}).Debug(1084 "acquire SSH handshake semaphore failed")1085 return1086 }1087 onSSHHandshakeFinished = func() {1088 sshServer.concurrentSSHHandshakes.Release(1)1089 }1090 }1091 sshClient := newSshClient(1092 sshServer,1093 sshListener,1094 tunnelProtocol,1095 serverPacketManipulation,1096 replayedServerPacketManipulation,1097 clientAddr,1098 geoIPData)1099 // sshClient.run _must_ call onSSHHandshakeFinished to release the semaphore:1100 // in any error case; or, as soon as the SSH handshake phase has successfully1101 // completed.1102 sshClient.run(clientConn, onSSHHandshakeFinished)1103}1104func (sshServer *sshServer) monitorPortForwardDialError(err error) {1105 // "err" is the error returned from a failed TCP or UDP port1106 // forward dial. Certain system error codes indicate low resource1107 // conditions: insufficient file descriptors, ephemeral ports, or1108 // memory. For these cases, log an alert.1109 // TODO: also temporarily suspend new clients1110 // Note: don't log net.OpError.Error() as the full error string1111 // may contain client destination addresses.1112 opErr, ok := err.(*net.OpError)1113 if ok {1114 if opErr.Err == syscall.EADDRNOTAVAIL ||1115 opErr.Err == syscall.EAGAIN ||1116 opErr.Err == syscall.ENOMEM ||1117 opErr.Err == syscall.EMFILE ||1118 opErr.Err == syscall.ENFILE {1119 log.WithTraceFields(1120 LogFields{"error": opErr.Err}).Error(1121 "port forward dial failed due to unavailable resource")1122 }1123 }1124}1125type sshClient struct {1126 sync.Mutex1127 sshServer *sshServer1128 sshListener *sshListener1129 tunnelProtocol string1130 sshConn ssh.Conn1131 throttledConn *common.ThrottledConn1132 serverPacketManipulation string1133 replayedServerPacketManipulation bool1134 clientAddr net.Addr1135 geoIPData GeoIPData1136 sessionID string1137 isFirstTunnelInSession bool1138 supportsServerRequests bool1139 handshakeState handshakeState1140 udpgwChannelHandler *udpgwPortForwardMultiplexer1141 totalUdpgwChannelCount int1142 packetTunnelChannel ssh.Channel1143 totalPacketTunnelChannelCount int1144 trafficRules TrafficRules1145 tcpTrafficState trafficState1146 udpTrafficState trafficState1147 qualityMetrics *qualityMetrics1148 tcpPortForwardLRU *common.LRUConns1149 oslClientSeedState *osl.ClientSeedState1150 signalIssueSLOKs chan struct{}1151 runCtx context.Context1152 stopRunning context.CancelFunc1153 stopped chan struct{}1154 tcpPortForwardDialingAvailableSignal context.CancelFunc1155 releaseAuthorizations func()1156 stopTimer *time.Timer1157 preHandshakeRandomStreamMetrics randomStreamMetrics1158 postHandshakeRandomStreamMetrics randomStreamMetrics1159 sendAlertRequests chan protocol.AlertRequest1160 sentAlertRequests map[string]bool1161 peakMetrics peakMetrics1162 destinationBytesMetricsASN string1163 tcpDestinationBytesMetrics destinationBytesMetrics1164 udpDestinationBytesMetrics destinationBytesMetrics1165}1166type trafficState struct {1167 bytesUp int641168 bytesDown int641169 concurrentDialingPortForwardCount int641170 peakConcurrentDialingPortForwardCount int641171 concurrentPortForwardCount int641172 peakConcurrentPortForwardCount int641173 totalPortForwardCount int641174 availablePortForwardCond *sync.Cond1175}1176type randomStreamMetrics struct {1177 count int641178 upstreamBytes int641179 receivedUpstreamBytes int641180 downstreamBytes int641181 sentDownstreamBytes int641182}1183type peakMetrics struct {1184 concurrentProximateAcceptedClients *int641185 concurrentProximateEstablishedClients *int641186 TCPPortForwardFailureRate *float641187 TCPPortForwardFailureRateSampleSize *int641188 DNSFailureRate *float641189 DNSFailureRateSampleSize *int641190}1191// qualityMetrics records upstream TCP dial attempts and1192// elapsed time. Elapsed time includes the full TCP handshake1193// and, in aggregate, is a measure of the quality of the1194// upstream link. These stats are recorded by each sshClient1195// and then reported and reset in sshServer.getLoadStats().1196type qualityMetrics struct {1197 TCPPortForwardDialedCount int641198 TCPPortForwardDialedDuration time.Duration1199 TCPPortForwardFailedCount int641200 TCPPortForwardFailedDuration time.Duration1201 TCPPortForwardRejectedDialingLimitCount int641202 TCPPortForwardRejectedDisallowedCount int641203 UDPPortForwardRejectedDisallowedCount int641204 TCPIPv4PortForwardDialedCount int641205 TCPIPv4PortForwardDialedDuration time.Duration1206 TCPIPv4PortForwardFailedCount int641207 TCPIPv4PortForwardFailedDuration time.Duration1208 TCPIPv6PortForwardDialedCount int641209 TCPIPv6PortForwardDialedDuration time.Duration1210 TCPIPv6PortForwardFailedCount int641211 TCPIPv6PortForwardFailedDuration time.Duration1212 DNSCount map[string]int641213 DNSDuration map[string]time.Duration1214 DNSFailedCount map[string]int641215 DNSFailedDuration map[string]time.Duration1216}1217func newQualityMetrics() *qualityMetrics {1218 return &qualityMetrics{1219 DNSCount: make(map[string]int64),1220 DNSDuration: make(map[string]time.Duration),1221 DNSFailedCount: make(map[string]int64),1222 DNSFailedDuration: make(map[string]time.Duration),1223 }1224}1225func (q *qualityMetrics) reset() {1226 q.TCPPortForwardDialedCount = 01227 q.TCPPortForwardDialedDuration = 01228 q.TCPPortForwardFailedCount = 01229 q.TCPPortForwardFailedDuration = 01230 q.TCPPortForwardRejectedDialingLimitCount = 01231 q.TCPPortForwardRejectedDisallowedCount = 01232 q.UDPPortForwardRejectedDisallowedCount = 01233 q.TCPIPv4PortForwardDialedCount = 01234 q.TCPIPv4PortForwardDialedDuration = 01235 q.TCPIPv4PortForwardFailedCount = 01236 q.TCPIPv4PortForwardFailedDuration = 01237 q.TCPIPv6PortForwardDialedCount = 01238 q.TCPIPv6PortForwardDialedDuration = 01239 q.TCPIPv6PortForwardFailedCount = 01240 q.TCPIPv6PortForwardFailedDuration = 01241 // Retain existing maps to avoid memory churn. The Go compiler optimizes map1242 // clearing operations of the following form.1243 for k := range q.DNSCount {1244 delete(q.DNSCount, k)1245 }1246 for k := range q.DNSDuration {1247 delete(q.DNSDuration, k)1248 }1249 for k := range q.DNSFailedCount {1250 delete(q.DNSFailedCount, k)1251 }1252 for k := range q.DNSFailedDuration {1253 delete(q.DNSFailedDuration, k)1254 }1255}1256type handshakeStateInfo struct {1257 activeAuthorizationIDs []string1258 authorizedAccessTypes []string1259 upstreamBytesPerSecond int641260 downstreamBytesPerSecond int641261}1262type handshakeState struct {1263 completed bool1264 apiProtocol string1265 apiParams common.APIParameters1266 activeAuthorizationIDs []string1267 authorizedAccessTypes []string1268 authorizationsRevoked bool1269 domainBytesChecksum []byte1270 establishedTunnelsCount int1271 splitTunnelLookup *splitTunnelLookup1272}1273type destinationBytesMetrics struct {1274 bytesUp int641275 bytesDown int641276}1277func (d *destinationBytesMetrics) UpdateProgress(1278 downstreamBytes, upstreamBytes, _ int64) {1279 // Concurrency: UpdateProgress may be called without holding the sshClient1280 // lock; all accesses to bytesUp/bytesDown must use atomic operations.1281 atomic.AddInt64(&d.bytesUp, upstreamBytes)1282 atomic.AddInt64(&d.bytesDown, downstreamBytes)1283}1284func (d *destinationBytesMetrics) getBytesUp() int64 {1285 return atomic.LoadInt64(&d.bytesUp)1286}1287func (d *destinationBytesMetrics) getBytesDown() int64 {1288 return atomic.LoadInt64(&d.bytesDown)1289}1290type splitTunnelLookup struct {1291 regions []string1292 regionsLookup map[string]bool1293}1294func newSplitTunnelLookup(1295 ownRegion string,1296 otherRegions []string) (*splitTunnelLookup, error) {1297 length := len(otherRegions)1298 if ownRegion != "" {1299 length += 11300 }1301 // This length check is a sanity check and prevents clients shipping1302 // excessively long lists which could impact performance.1303 if length > 250 {1304 return nil, errors.Tracef("too many regions: %d", length)1305 }1306 // Create map lookups for lists where the number of values to compare1307 // against exceeds a threshold where benchmarks show maps are faster than1308 // looping through a slice. Otherwise use a slice for lookups. In both1309 // cases, the input slice is no longer referenced.1310 if length >= stringLookupThreshold {1311 regionsLookup := make(map[string]bool)1312 if ownRegion != "" {1313 regionsLookup[ownRegion] = true1314 }1315 for _, region := range otherRegions {1316 regionsLookup[region] = true1317 }1318 return &splitTunnelLookup{1319 regionsLookup: regionsLookup,1320 }, nil1321 } else {1322 regions := []string{}1323 if ownRegion != "" && !common.Contains(otherRegions, ownRegion) {1324 regions = append(regions, ownRegion)1325 }1326 // TODO: check for other duplicate regions?1327 regions = append(regions, otherRegions...)1328 return &splitTunnelLookup{1329 regions: regions,1330 }, nil1331 }1332}1333func (lookup *splitTunnelLookup) lookup(region string) bool {1334 if lookup.regionsLookup != nil {1335 return lookup.regionsLookup[region]1336 } else {1337 return common.Contains(lookup.regions, region)1338 }1339}1340func newSshClient(1341 sshServer *sshServer,1342 sshListener *sshListener,1343 tunnelProtocol string,1344 serverPacketManipulation string,1345 replayedServerPacketManipulation bool,1346 clientAddr net.Addr,1347 geoIPData GeoIPData) *sshClient {1348 runCtx, stopRunning := context.WithCancel(context.Background())1349 // isFirstTunnelInSession is defaulted to true so that the pre-handshake1350 // traffic rules won't apply UnthrottleFirstTunnelOnly and negate any1351 // unthrottled bytes during the initial protocol negotiation.1352 client := &sshClient{1353 sshServer: sshServer,1354 sshListener: sshListener,1355 tunnelProtocol: tunnelProtocol,1356 serverPacketManipulation: serverPacketManipulation,1357 replayedServerPacketManipulation: replayedServerPacketManipulation,1358 clientAddr: clientAddr,1359 geoIPData: geoIPData,1360 isFirstTunnelInSession: true,1361 qualityMetrics: newQualityMetrics(),1362 tcpPortForwardLRU: common.NewLRUConns(),1363 signalIssueSLOKs: make(chan struct{}, 1),1364 runCtx: runCtx,1365 stopRunning: stopRunning,1366 stopped: make(chan struct{}),1367 sendAlertRequests: make(chan protocol.AlertRequest, ALERT_REQUEST_QUEUE_BUFFER_SIZE),1368 sentAlertRequests: make(map[string]bool),1369 }1370 client.tcpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))1371 client.udpTrafficState.availablePortForwardCond = sync.NewCond(new(sync.Mutex))1372 return client1373}1374func (sshClient *sshClient) run(1375 baseConn net.Conn, onSSHHandshakeFinished func()) {1376 // When run returns, the client has fully stopped, with all SSH state torn1377 // down and no port forwards or API requests in progress.1378 defer close(sshClient.stopped)1379 // onSSHHandshakeFinished must be called even if the SSH handshake is aborted.1380 defer func() {1381 if onSSHHandshakeFinished != nil {1382 onSSHHandshakeFinished()1383 }1384 }()1385 // Set initial traffic rules, pre-handshake, based on currently known info.1386 sshClient.setTrafficRules()1387 conn := baseConn1388 // Wrap the base client connection with an ActivityMonitoredConn which will1389 // terminate the connection if no data is received before the deadline. This1390 // timeout is in effect for the entire duration of the SSH connection. Clients1391 // must actively use the connection or send SSH keep alive requests to keep1392 // the connection active. Writes are not considered reliable activity indicators1393 // due to buffering.1394 activityConn, err := common.NewActivityMonitoredConn(1395 conn,1396 SSH_CONNECTION_READ_DEADLINE,1397 false,1398 nil)1399 if err != nil {1400 conn.Close()1401 if !isExpectedTunnelIOError(err) {1402 log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")1403 }1404 return1405 }1406 conn = activityConn1407 // Further wrap the connection with burst monitoring, when enabled.1408 //1409 // Limitation: burst parameters are fixed for the duration of the tunnel1410 // and do not change after a tactics hot reload.1411 var burstConn *common.BurstMonitoredConn1412 p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)1413 if err != nil {1414 log.WithTraceFields(LogFields{"error": errors.Trace(err)}).Warning(1415 "ServerTacticsParametersCache.Get failed")1416 return1417 }1418 if !p.IsNil() {1419 upstreamTargetBytes := int64(p.Int(parameters.ServerBurstUpstreamTargetBytes))1420 upstreamDeadline := p.Duration(parameters.ServerBurstUpstreamDeadline)1421 downstreamTargetBytes := int64(p.Int(parameters.ServerBurstDownstreamTargetBytes))1422 downstreamDeadline := p.Duration(parameters.ServerBurstDownstreamDeadline)1423 if (upstreamDeadline != 0 && upstreamTargetBytes != 0) ||1424 (downstreamDeadline != 0 && downstreamTargetBytes != 0) {1425 burstConn = common.NewBurstMonitoredConn(1426 conn,1427 true,1428 upstreamTargetBytes, upstreamDeadline,1429 downstreamTargetBytes, downstreamDeadline)1430 conn = burstConn1431 }1432 }1433 // Allow garbage collection.1434 p.Close()1435 // Further wrap the connection in a rate limiting ThrottledConn.1436 throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits())1437 conn = throttledConn1438 // Replay of server-side parameters is set or extended after a new tunnel1439 // meets duration and bytes transferred targets. Set a timer now that expires1440 // shortly after the target duration. When the timer fires, check the time of1441 // last byte read (a read indicating a live connection with the client),1442 // along with total bytes transferred and set or extend replay if the targets1443 // are met.1444 //1445 // Both target checks are conservative: the tunnel may be healthy, but a byte1446 // may not have been read in the last second when the timer fires. Or bytes1447 // may be transferring, but not at the target level. Only clients that meet1448 // the strict targets at the single check time will trigger replay; however,1449 // this replay will impact all clients with similar GeoIP data.1450 //1451 // A deferred function cancels the timer and also increments the replay1452 // failure counter, which will ultimately clear replay parameters, when the1453 // tunnel fails before the API handshake is completed (this includes any1454 // liveness test).1455 //1456 // A tunnel which fails to meet the targets but successfully completes any1457 // liveness test and the API handshake is ignored in terms of replay scoring.1458 isReplayCandidate, replayWaitDuration, replayTargetDuration :=1459 sshClient.sshServer.support.ReplayCache.GetReplayTargetDuration(sshClient.geoIPData)1460 if isReplayCandidate {1461 getFragmentorSeed := func() *prng.Seed {1462 fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)1463 if ok {1464 fragmentorSeed, _ := fragmentor.GetReplay()1465 return fragmentorSeed1466 }1467 return nil1468 }1469 setReplayAfterFunc := time.AfterFunc(1470 replayWaitDuration,1471 func() {1472 if activityConn.GetActiveDuration() >= replayTargetDuration {1473 sshClient.Lock()1474 bytesUp := sshClient.tcpTrafficState.bytesUp + sshClient.udpTrafficState.bytesUp1475 bytesDown := sshClient.tcpTrafficState.bytesDown + sshClient.udpTrafficState.bytesDown1476 sshClient.Unlock()1477 sshClient.sshServer.support.ReplayCache.SetReplayParameters(1478 sshClient.tunnelProtocol,1479 sshClient.geoIPData,1480 sshClient.serverPacketManipulation,1481 getFragmentorSeed(),1482 bytesUp,1483 bytesDown)1484 }1485 })1486 defer func() {1487 setReplayAfterFunc.Stop()1488 completed, _ := sshClient.getHandshaked()1489 if !completed {1490 // Count a replay failure case when a tunnel used replay parameters1491 // (excluding OSSH fragmentation, which doesn't use the ReplayCache) and1492 // failed to complete the API handshake.1493 replayedFragmentation := false1494 if sshClient.tunnelProtocol != protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {1495 fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)1496 if ok {1497 _, replayedFragmentation = fragmentor.GetReplay()1498 }1499 }1500 usedReplay := replayedFragmentation || sshClient.replayedServerPacketManipulation1501 if usedReplay {1502 sshClient.sshServer.support.ReplayCache.FailedReplayParameters(1503 sshClient.tunnelProtocol,1504 sshClient.geoIPData,1505 sshClient.serverPacketManipulation,1506 getFragmentorSeed())1507 }1508 }1509 }()1510 }1511 // Run the initial [obfuscated] SSH handshake in a goroutine so we can both1512 // respect shutdownBroadcast and implement a specific handshake timeout.1513 // The timeout is to reclaim network resources in case the handshake takes1514 // too long.1515 type sshNewServerConnResult struct {1516 obfuscatedSSHConn *obfuscator.ObfuscatedSSHConn1517 sshConn *ssh.ServerConn1518 channels <-chan ssh.NewChannel1519 requests <-chan *ssh.Request1520 err error1521 }1522 resultChannel := make(chan *sshNewServerConnResult, 2)1523 var sshHandshakeAfterFunc *time.Timer1524 if sshClient.sshServer.support.Config.sshHandshakeTimeout > 0 {1525 sshHandshakeAfterFunc = time.AfterFunc(sshClient.sshServer.support.Config.sshHandshakeTimeout, func() {1526 resultChannel <- &sshNewServerConnResult{err: std_errors.New("ssh handshake timeout")}1527 })1528 }1529 go func(baseConn, conn net.Conn) {1530 sshServerConfig := &ssh.ServerConfig{1531 PasswordCallback: sshClient.passwordCallback,1532 AuthLogCallback: sshClient.authLogCallback,1533 ServerVersion: sshClient.sshServer.support.Config.SSHServerVersion,1534 }1535 sshServerConfig.AddHostKey(sshClient.sshServer.sshHostKey)1536 var err error1537 if protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {1538 // With Encrypt-then-MAC hash algorithms, packet length is1539 // transmitted in plaintext, which aids in traffic analysis;1540 // clients may still send Encrypt-then-MAC algorithms in their1541 // KEX_INIT message, but do not select these algorithms.1542 //1543 // The exception is TUNNEL_PROTOCOL_SSH, which is intended to appear1544 // like SSH on the wire.1545 sshServerConfig.NoEncryptThenMACHash = true1546 } else {1547 // For TUNNEL_PROTOCOL_SSH only, randomize KEX.1548 if sshClient.sshServer.support.Config.ObfuscatedSSHKey != "" {1549 sshServerConfig.KEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed(1550 sshClient.sshServer.support.Config.ObfuscatedSSHKey)1551 if err != nil {1552 err = errors.Trace(err)1553 }1554 }1555 }1556 result := &sshNewServerConnResult{}1557 // Wrap the connection in an SSH deobfuscator when required.1558 if err == nil && protocol.TunnelProtocolUsesObfuscatedSSH(sshClient.tunnelProtocol) {1559 // Note: NewServerObfuscatedSSHConn blocks on network I/O1560 // TODO: ensure this won't block shutdown1561 result.obfuscatedSSHConn, err = obfuscator.NewServerObfuscatedSSHConn(1562 conn,1563 sshClient.sshServer.support.Config.ObfuscatedSSHKey,1564 sshClient.sshServer.obfuscatorSeedHistory,1565 func(clientIP string, err error, logFields common.LogFields) {1566 logIrregularTunnel(1567 sshClient.sshServer.support,1568 sshClient.sshListener.tunnelProtocol,1569 sshClient.sshListener.port,1570 clientIP,1571 errors.Trace(err),1572 LogFields(logFields))1573 })1574 if err != nil {1575 err = errors.Trace(err)1576 } else {1577 conn = result.obfuscatedSSHConn1578 }1579 // Seed the fragmentor, when present, with seed derived from initial1580 // obfuscator message. See tactics.Listener.Accept. This must preceed1581 // ssh.NewServerConn to ensure fragmentor is seeded before downstream bytes1582 // are written.1583 if err == nil && sshClient.tunnelProtocol == protocol.TUNNEL_PROTOCOL_OBFUSCATED_SSH {1584 fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)1585 if ok {1586 var fragmentorPRNG *prng.PRNG1587 fragmentorPRNG, err = result.obfuscatedSSHConn.GetDerivedPRNG("server-side-fragmentor")1588 if err != nil {1589 err = errors.Trace(err)1590 } else {1591 fragmentor.SetReplay(fragmentorPRNG)1592 }1593 }1594 }1595 }1596 if err == nil {1597 result.sshConn, result.channels, result.requests, err =1598 ssh.NewServerConn(conn, sshServerConfig)1599 if err != nil {1600 err = errors.Trace(err)1601 }1602 }1603 result.err = err1604 resultChannel <- result1605 }(baseConn, conn)1606 var result *sshNewServerConnResult1607 select {1608 case result = <-resultChannel:1609 case <-sshClient.sshServer.shutdownBroadcast:1610 // Close() will interrupt an ongoing handshake1611 // TODO: wait for SSH handshake goroutines to exit before returning?1612 conn.Close()1613 return1614 }1615 if sshHandshakeAfterFunc != nil {1616 sshHandshakeAfterFunc.Stop()1617 }1618 if result.err != nil {1619 conn.Close()1620 // This is a Debug log due to noise. The handshake often fails due to I/O1621 // errors as clients frequently interrupt connections in progress when1622 // client-side load balancing completes a connection to a different server.1623 log.WithTraceFields(LogFields{"error": result.err}).Debug("SSH handshake failed")1624 return1625 }1626 // The SSH handshake has finished successfully; notify now to allow other1627 // blocked SSH handshakes to proceed.1628 if onSSHHandshakeFinished != nil {1629 onSSHHandshakeFinished()1630 }1631 onSSHHandshakeFinished = nil1632 sshClient.Lock()1633 sshClient.sshConn = result.sshConn1634 sshClient.throttledConn = throttledConn1635 sshClient.Unlock()1636 if !sshClient.sshServer.registerEstablishedClient(sshClient) {1637 conn.Close()1638 log.WithTrace().Warning("register failed")1639 return1640 }1641 sshClient.runTunnel(result.channels, result.requests)1642 // Note: sshServer.unregisterEstablishedClient calls sshClient.stop(),1643 // which also closes underlying transport Conn.1644 sshClient.sshServer.unregisterEstablishedClient(sshClient)1645 // Log tunnel metrics.1646 var additionalMetrics []LogFields1647 // Add activity and burst metrics.1648 //1649 // The reported duration is based on last confirmed data transfer, which for1650 // sshClient.activityConn.GetActiveDuration() is time of last read byte and1651 // not conn close time. This is important for protocols such as meek. For1652 // meek, the connection remains open until the HTTP session expires, which1653 // may be some time after the tunnel has closed. (The meek protocol has no1654 // allowance for signalling payload EOF, and even if it did the client may1655 // not have the opportunity to send a final request with an EOF flag set.)1656 activityMetrics := make(LogFields)1657 activityMetrics["start_time"] = activityConn.GetStartTime()1658 activityMetrics["duration"] = int64(activityConn.GetActiveDuration() / time.Millisecond)1659 additionalMetrics = append(additionalMetrics, activityMetrics)1660 if burstConn != nil {1661 // Any outstanding burst should be recorded by burstConn.Close which should1662 // be called by unregisterEstablishedClient.1663 additionalMetrics = append(1664 additionalMetrics, LogFields(burstConn.GetMetrics(activityConn.GetStartTime())))1665 }1666 // Some conns report additional metrics. Meek conns report resiliency1667 // metrics and fragmentor.Conns report fragmentor configs.1668 if metricsSource, ok := baseConn.(common.MetricsSource); ok {1669 additionalMetrics = append(1670 additionalMetrics, LogFields(metricsSource.GetMetrics()))1671 }1672 if result.obfuscatedSSHConn != nil {1673 additionalMetrics = append(1674 additionalMetrics, LogFields(result.obfuscatedSSHConn.GetMetrics()))1675 }1676 // Add server-replay metrics.1677 replayMetrics := make(LogFields)1678 replayedFragmentation := false1679 fragmentor, ok := baseConn.(common.FragmentorReplayAccessor)1680 if ok {1681 _, replayedFragmentation = fragmentor.GetReplay()1682 }1683 replayMetrics["server_replay_fragmentation"] = replayedFragmentation1684 replayMetrics["server_replay_packet_manipulation"] = sshClient.replayedServerPacketManipulation1685 additionalMetrics = append(additionalMetrics, replayMetrics)1686 // Limitation: there's only one log per tunnel with bytes transferred1687 // metrics, so the byte count can't be attributed to a certain day for1688 // tunnels that remain connected for well over 24h. In practise, most1689 // tunnels are short-lived, especially on mobile devices.1690 sshClient.logTunnel(additionalMetrics)1691 // Transfer OSL seed state -- the OSL progress -- from the closing1692 // client to the session cache so the client can resume its progress1693 // if it reconnects to this same server.1694 // Note: following setOSLConfig order of locking.1695 sshClient.Lock()1696 if sshClient.oslClientSeedState != nil {1697 sshClient.sshServer.oslSessionCacheMutex.Lock()1698 sshClient.oslClientSeedState.Hibernate()1699 sshClient.sshServer.oslSessionCache.Set(1700 sshClient.sessionID, sshClient.oslClientSeedState, cache.DefaultExpiration)1701 sshClient.sshServer.oslSessionCacheMutex.Unlock()1702 sshClient.oslClientSeedState = nil1703 }1704 sshClient.Unlock()1705 // Initiate cleanup of the GeoIP session cache. To allow for post-tunnel1706 // final status requests, the lifetime of cached GeoIP records exceeds the1707 // lifetime of the sshClient.1708 sshClient.sshServer.support.GeoIPService.MarkSessionCacheToExpire(sshClient.sessionID)1709}1710func (sshClient *sshClient) passwordCallback(conn ssh.ConnMetadata, password []byte) (*ssh.Permissions, error) {1711 expectedSessionIDLength := 2 * protocol.PSIPHON_API_CLIENT_SESSION_ID_LENGTH1712 expectedSSHPasswordLength := 2 * SSH_PASSWORD_BYTE_LENGTH1713 var sshPasswordPayload protocol.SSHPasswordPayload1714 err := json.Unmarshal(password, &sshPasswordPayload)1715 if err != nil {1716 // Backwards compatibility case: instead of a JSON payload, older clients1717 // send the hex encoded session ID prepended to the SSH password.1718 // Note: there's an even older case where clients don't send any session ID,1719 // but that's no longer supported.1720 if len(password) == expectedSessionIDLength+expectedSSHPasswordLength {1721 sshPasswordPayload.SessionId = string(password[0:expectedSessionIDLength])1722 sshPasswordPayload.SshPassword = string(password[expectedSessionIDLength:])1723 } else {1724 return nil, errors.Tracef("invalid password payload for %q", conn.User())1725 }1726 }1727 if !isHexDigits(sshClient.sshServer.support.Config, sshPasswordPayload.SessionId) ||1728 len(sshPasswordPayload.SessionId) != expectedSessionIDLength {1729 return nil, errors.Tracef("invalid session ID for %q", conn.User())1730 }1731 userOk := (subtle.ConstantTimeCompare(1732 []byte(conn.User()), []byte(sshClient.sshServer.support.Config.SSHUserName)) == 1)1733 passwordOk := (subtle.ConstantTimeCompare(1734 []byte(sshPasswordPayload.SshPassword), []byte(sshClient.sshServer.support.Config.SSHPassword)) == 1)1735 if !userOk || !passwordOk {1736 return nil, errors.Tracef("invalid password for %q", conn.User())1737 }1738 sessionID := sshPasswordPayload.SessionId1739 // The GeoIP session cache will be populated if there was a previous tunnel1740 // with this session ID. This will be true up to GEOIP_SESSION_CACHE_TTL, which1741 // is currently much longer than the OSL session cache, another option to use if1742 // the GeoIP session cache is retired (the GeoIP session cache currently only1743 // supports legacy use cases).1744 isFirstTunnelInSession := !sshClient.sshServer.support.GeoIPService.InSessionCache(sessionID)1745 supportsServerRequests := common.Contains(1746 sshPasswordPayload.ClientCapabilities, protocol.CLIENT_CAPABILITY_SERVER_REQUESTS)1747 sshClient.Lock()1748 // After this point, these values are read-only as they are read1749 // without obtaining sshClient.Lock.1750 sshClient.sessionID = sessionID1751 sshClient.isFirstTunnelInSession = isFirstTunnelInSession1752 sshClient.supportsServerRequests = supportsServerRequests1753 geoIPData := sshClient.geoIPData1754 sshClient.Unlock()1755 // Store the GeoIP data associated with the session ID. This makes1756 // the GeoIP data available to the web server for web API requests.1757 // A cache that's distinct from the sshClient record is used to allow1758 // for or post-tunnel final status requests.1759 // If the client is reconnecting with the same session ID, this call1760 // will undo the expiry set by MarkSessionCacheToExpire.1761 sshClient.sshServer.support.GeoIPService.SetSessionCache(sessionID, geoIPData)1762 return nil, nil1763}1764func (sshClient *sshClient) authLogCallback(conn ssh.ConnMetadata, method string, err error) {1765 if err != nil {1766 if method == "none" && err.Error() == "ssh: no auth passed yet" {1767 // In this case, the callback invocation is noise from auth negotiation1768 return1769 }1770 // Note: here we previously logged messages for fail2ban to act on. This is no longer1771 // done as the complexity outweighs the benefits.1772 //1773 // - The SSH credential is not secret -- it's in the server entry. Attackers targeting1774 // the server likely already have the credential. On the other hand, random scanning and1775 // brute forcing is mitigated with high entropy random passwords, rate limiting1776 // (implemented on the host via iptables), and limited capabilities (the SSH session can1777 // only port forward).1778 //1779 // - fail2ban coverage was inconsistent; in the case of an unfronted meek protocol through1780 // an upstream proxy, the remote address is the upstream proxy, which should not be blocked.1781 // The X-Forwarded-For header cant be used instead as it may be forged and used to get IPs1782 // deliberately blocked; and in any case fail2ban adds iptables rules which can only block1783 // by direct remote IP, not by original client IP. Fronted meek has the same iptables issue.1784 //1785 // Random scanning and brute forcing of port 22 will result in log noise. To mitigate this,1786 // not every authentication failure is logged. A summary log is emitted periodically to1787 // retain some record of this activity in case this is relevant to, e.g., a performance1788 // investigation.1789 atomic.AddInt64(&sshClient.sshServer.authFailedCount, 1)1790 lastAuthLog := monotime.Time(atomic.LoadInt64(&sshClient.sshServer.lastAuthLog))1791 if monotime.Since(lastAuthLog) > SSH_AUTH_LOG_PERIOD {1792 now := int64(monotime.Now())1793 if atomic.CompareAndSwapInt64(&sshClient.sshServer.lastAuthLog, int64(lastAuthLog), now) {1794 count := atomic.SwapInt64(&sshClient.sshServer.authFailedCount, 0)1795 log.WithTraceFields(1796 LogFields{"lastError": err, "failedCount": count}).Warning("authentication failures")1797 }1798 }1799 log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication failed")1800 } else {1801 log.WithTraceFields(LogFields{"error": err, "method": method}).Debug("authentication success")1802 }1803}1804// stop signals the ssh connection to shutdown. After sshConn.Wait returns,1805// the SSH connection has terminated but sshClient.run may still be running and1806// in the process of exiting.1807//1808// The shutdown process must complete rapidly and not, e.g., block on network1809// I/O, as newly connecting clients need to await stop completion of any1810// existing connection that shares the same session ID.1811func (sshClient *sshClient) stop() {1812 sshClient.sshConn.Close()1813 sshClient.sshConn.Wait()1814}1815// awaitStopped will block until sshClient.run has exited, at which point all1816// worker goroutines associated with the sshClient, including any in-flight1817// API handlers, will have exited.1818func (sshClient *sshClient) awaitStopped() {1819 <-sshClient.stopped1820}1821// runTunnel handles/dispatches new channels and new requests from the client.1822// When the SSH client connection closes, both the channels and requests channels1823// will close and runTunnel will exit.1824func (sshClient *sshClient) runTunnel(1825 channels <-chan ssh.NewChannel,1826 requests <-chan *ssh.Request) {1827 waitGroup := new(sync.WaitGroup)1828 // Start client SSH API request handler1829 waitGroup.Add(1)1830 go func() {1831 defer waitGroup.Done()1832 sshClient.handleSSHRequests(requests)1833 }()1834 // Start request senders1835 if sshClient.supportsServerRequests {1836 waitGroup.Add(1)1837 go func() {1838 defer waitGroup.Done()1839 sshClient.runOSLSender()1840 }()1841 waitGroup.Add(1)1842 go func() {1843 defer waitGroup.Done()1844 sshClient.runAlertSender()1845 }()1846 }1847 // Start the TCP port forward manager1848 // The queue size is set to the traffic rules (MaxTCPPortForwardCount +1849 // MaxTCPDialingPortForwardCount), which is a reasonable indication of resource1850 // limits per client; when that value is not set, a default is used.1851 // A limitation: this queue size is set once and doesn't change, for this client,1852 // when traffic rules are reloaded.1853 queueSize := sshClient.getTCPPortForwardQueueSize()1854 if queueSize == 0 {1855 queueSize = SSH_TCP_PORT_FORWARD_QUEUE_SIZE1856 }1857 newTCPPortForwards := make(chan *newTCPPortForward, queueSize)1858 waitGroup.Add(1)1859 go func() {1860 defer waitGroup.Done()1861 sshClient.handleTCPPortForwards(waitGroup, newTCPPortForwards)1862 }()1863 // Handle new channel (port forward) requests from the client.1864 for newChannel := range channels {1865 switch newChannel.ChannelType() {1866 case protocol.RANDOM_STREAM_CHANNEL_TYPE:1867 sshClient.handleNewRandomStreamChannel(waitGroup, newChannel)1868 case protocol.PACKET_TUNNEL_CHANNEL_TYPE:1869 sshClient.handleNewPacketTunnelChannel(waitGroup, newChannel)1870 case protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE:1871 // The protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE is the same as1872 // "direct-tcpip", except split tunnel channel rejections are disallowed1873 // even if the client has enabled split tunnel. This channel type allows1874 // the client to ensure tunneling for certain cases while split tunnel is1875 // enabled.1876 sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, false, newTCPPortForwards)1877 case "direct-tcpip":1878 sshClient.handleNewTCPPortForwardChannel(waitGroup, newChannel, true, newTCPPortForwards)1879 default:1880 sshClient.rejectNewChannel(newChannel,1881 fmt.Sprintf("unknown or unsupported channel type: %s", newChannel.ChannelType()))1882 }1883 }1884 // The channel loop is interrupted by a client1885 // disconnect or by calling sshClient.stop().1886 // Stop the TCP port forward manager1887 close(newTCPPortForwards)1888 // Stop all other worker goroutines1889 sshClient.stopRunning()1890 if sshClient.sshServer.support.Config.RunPacketTunnel {1891 // PacketTunnelServer.ClientDisconnected stops packet tunnel workers.1892 sshClient.sshServer.support.PacketTunnelServer.ClientDisconnected(1893 sshClient.sessionID)1894 }1895 waitGroup.Wait()1896 sshClient.cleanupAuthorizations()1897}1898func (sshClient *sshClient) handleSSHRequests(requests <-chan *ssh.Request) {1899 for request := range requests {1900 // Requests are processed serially; API responses must be sent in request order.1901 var responsePayload []byte1902 var err error1903 if request.Type == "keepalive@openssh.com" {1904 // SSH keep alive round trips are used as speed test samples.1905 responsePayload, err = tactics.MakeSpeedTestResponse(1906 SSH_KEEP_ALIVE_PAYLOAD_MIN_BYTES, SSH_KEEP_ALIVE_PAYLOAD_MAX_BYTES)1907 } else {1908 // All other requests are assumed to be API requests.1909 sshClient.Lock()1910 authorizedAccessTypes := sshClient.handshakeState.authorizedAccessTypes1911 sshClient.Unlock()1912 // Note: unlock before use is only safe as long as referenced sshClient data,1913 // such as slices in handshakeState, is read-only after initially set.1914 clientAddr := ""1915 if sshClient.clientAddr != nil {1916 clientAddr = sshClient.clientAddr.String()1917 }1918 responsePayload, err = sshAPIRequestHandler(1919 sshClient.sshServer.support,1920 clientAddr,1921 sshClient.geoIPData,1922 authorizedAccessTypes,1923 request.Type,1924 request.Payload)1925 }1926 if err == nil {1927 err = request.Reply(true, responsePayload)1928 } else {1929 log.WithTraceFields(LogFields{"error": err}).Warning("request failed")1930 err = request.Reply(false, nil)1931 }1932 if err != nil {1933 if !isExpectedTunnelIOError(err) {1934 log.WithTraceFields(LogFields{"error": err}).Warning("response failed")1935 }1936 }1937 }1938}1939type newTCPPortForward struct {1940 enqueueTime time.Time1941 hostToConnect string1942 portToConnect int1943 doSplitTunnel bool1944 newChannel ssh.NewChannel1945}1946func (sshClient *sshClient) handleTCPPortForwards(1947 waitGroup *sync.WaitGroup,1948 newTCPPortForwards chan *newTCPPortForward) {1949 // Lifecycle of a TCP port forward:1950 //1951 // 1. A "direct-tcpip" SSH request is received from the client.1952 //1953 // A new TCP port forward request is enqueued. The queue delivers TCP port1954 // forward requests to the TCP port forward manager, which enforces the TCP1955 // port forward dial limit.1956 //1957 // Enqueuing new requests allows for reading further SSH requests from the1958 // client without blocking when the dial limit is hit; this is to permit new1959 // UDP/udpgw port forwards to be restablished without delay. The maximum size1960 // of the queue enforces a hard cap on resources consumed by a client in the1961 // pre-dial phase. When the queue is full, new TCP port forwards are1962 // immediately rejected.1963 //1964 // 2. The TCP port forward manager dequeues the request.1965 //1966 // The manager calls dialingTCPPortForward(), which increments1967 // concurrentDialingPortForwardCount, and calls1968 // isTCPDialingPortForwardLimitExceeded() to check the concurrent dialing1969 // count.1970 //1971 // The manager enforces the concurrent TCP dial limit: when at the limit, the1972 // manager blocks waiting for the number of dials to drop below the limit before1973 // dispatching the request to handleTCPPortForward(), which will run in its own1974 // goroutine and will dial and relay the port forward.1975 //1976 // The block delays the current request and also halts dequeuing of subsequent1977 // requests and could ultimately cause requests to be immediately rejected if1978 // the queue fills. These actions are intended to apply back pressure when1979 // upstream network resources are impaired.1980 //1981 // The time spent in the queue is deducted from the port forward's dial timeout.1982 // The time spent blocking while at the dial limit is similarly deducted from1983 // the dial timeout. If the dial timeout has expired before the dial begins, the1984 // port forward is rejected and a stat is recorded.1985 //1986 // 3. handleTCPPortForward() performs the port forward dial and relaying.1987 //1988 // a. Dial the target, using the dial timeout remaining after queue and blocking1989 // time is deducted.1990 //1991 // b. If the dial fails, call abortedTCPPortForward() to decrement1992 // concurrentDialingPortForwardCount, freeing up a dial slot.1993 //1994 // c. If the dial succeeds, call establishedPortForward(), which decrements1995 // concurrentDialingPortForwardCount and increments concurrentPortForwardCount,1996 // the "established" port forward count.1997 //1998 // d. Check isPortForwardLimitExceeded(), which enforces the configurable limit on1999 // concurrentPortForwardCount, the number of _established_ TCP port forwards.2000 // If the limit is exceeded, the LRU established TCP port forward is closed and2001 // the newly established TCP port forward proceeds. This LRU logic allows some2002 // dangling resource consumption (e.g., TIME_WAIT) while providing a better2003 // experience for clients.2004 //2005 // e. Relay data.2006 //2007 // f. Call closedPortForward() which decrements concurrentPortForwardCount and2008 // records bytes transferred.2009 for newPortForward := range newTCPPortForwards {2010 remainingDialTimeout :=2011 time.Duration(sshClient.getDialTCPPortForwardTimeoutMilliseconds())*time.Millisecond -2012 time.Since(newPortForward.enqueueTime)2013 if remainingDialTimeout <= 0 {2014 sshClient.updateQualityMetricsWithRejectedDialingLimit()2015 sshClient.rejectNewChannel(2016 newPortForward.newChannel, "TCP port forward timed out in queue")2017 continue2018 }2019 // Reserve a TCP dialing slot.2020 //2021 // TOCTOU note: important to increment counts _before_ checking limits; otherwise,2022 // the client could potentially consume excess resources by initiating many port2023 // forwards concurrently.2024 sshClient.dialingTCPPortForward()2025 // When max dials are in progress, wait up to remainingDialTimeout for dialing2026 // to become available. This blocks all dequeing.2027 if sshClient.isTCPDialingPortForwardLimitExceeded() {2028 blockStartTime := time.Now()2029 ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)2030 sshClient.setTCPPortForwardDialingAvailableSignal(cancelCtx)2031 <-ctx.Done()2032 sshClient.setTCPPortForwardDialingAvailableSignal(nil)2033 cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"2034 remainingDialTimeout -= time.Since(blockStartTime)2035 }2036 if remainingDialTimeout <= 0 {2037 // Release the dialing slot here since handleTCPChannel() won't be called.2038 sshClient.abortedTCPPortForward()2039 sshClient.updateQualityMetricsWithRejectedDialingLimit()2040 sshClient.rejectNewChannel(2041 newPortForward.newChannel, "TCP port forward timed out before dialing")2042 continue2043 }2044 // Dial and relay the TCP port forward. handleTCPChannel is run in its own worker goroutine.2045 // handleTCPChannel will release the dialing slot reserved by dialingTCPPortForward(); and2046 // will deal with remainingDialTimeout <= 0.2047 waitGroup.Add(1)2048 go func(remainingDialTimeout time.Duration, newPortForward *newTCPPortForward) {2049 defer waitGroup.Done()2050 sshClient.handleTCPChannel(2051 remainingDialTimeout,2052 newPortForward.hostToConnect,2053 newPortForward.portToConnect,2054 newPortForward.doSplitTunnel,2055 newPortForward.newChannel)2056 }(remainingDialTimeout, newPortForward)2057 }2058}2059func (sshClient *sshClient) handleNewRandomStreamChannel(2060 waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {2061 // A random stream channel returns the requested number of bytes -- random2062 // bytes -- to the client while also consuming and discarding bytes sent2063 // by the client.2064 //2065 // One use case for the random stream channel is a liveness test that the2066 // client performs to confirm that the tunnel is live. As the liveness2067 // test is performed in the concurrent establishment phase, before2068 // selecting a single candidate for handshake, the random stream channel2069 // is available pre-handshake, albeit with additional restrictions.2070 //2071 // The random stream is subject to throttling in traffic rules; for2072 // unthrottled liveness tests, set EstablishmentRead/WriteBytesPerSecond as2073 // required. The random stream maximum count and response size cap mitigate2074 // clients abusing the facility to waste server resources.2075 //2076 // Like all other channels, this channel type is handled asynchronously,2077 // so it's possible to run at any point in the tunnel lifecycle.2078 //2079 // Up/downstream byte counts don't include SSH packet and request2080 // marshalling overhead.2081 var request protocol.RandomStreamRequest2082 err := json.Unmarshal(newChannel.ExtraData(), &request)2083 if err != nil {2084 sshClient.rejectNewChannel(newChannel, fmt.Sprintf("invalid request: %s", err))2085 return2086 }2087 if request.UpstreamBytes > RANDOM_STREAM_MAX_BYTES {2088 sshClient.rejectNewChannel(newChannel,2089 fmt.Sprintf("invalid upstream bytes: %d", request.UpstreamBytes))2090 return2091 }2092 if request.DownstreamBytes > RANDOM_STREAM_MAX_BYTES {2093 sshClient.rejectNewChannel(newChannel,2094 fmt.Sprintf("invalid downstream bytes: %d", request.DownstreamBytes))2095 return2096 }2097 var metrics *randomStreamMetrics2098 sshClient.Lock()2099 if !sshClient.handshakeState.completed {2100 metrics = &sshClient.preHandshakeRandomStreamMetrics2101 } else {2102 metrics = &sshClient.postHandshakeRandomStreamMetrics2103 }2104 countOk := true2105 if !sshClient.handshakeState.completed &&2106 metrics.count >= PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT {2107 countOk = false2108 } else {2109 metrics.count++2110 }2111 sshClient.Unlock()2112 if !countOk {2113 sshClient.rejectNewChannel(newChannel, "max count exceeded")2114 return2115 }2116 channel, requests, err := newChannel.Accept()2117 if err != nil {2118 if !isExpectedTunnelIOError(err) {2119 log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")2120 }2121 return2122 }2123 go ssh.DiscardRequests(requests)2124 waitGroup.Add(1)2125 go func() {2126 defer waitGroup.Done()2127 upstream := new(sync.WaitGroup)2128 received := 02129 sent := 02130 if request.UpstreamBytes > 0 {2131 // Process streams concurrently to minimize elapsed time. This also2132 // avoids a unidirectional flow burst early in the tunnel lifecycle.2133 upstream.Add(1)2134 go func() {2135 defer upstream.Done()2136 n, err := io.CopyN(ioutil.Discard, channel, int64(request.UpstreamBytes))2137 received = int(n)2138 if err != nil {2139 if !isExpectedTunnelIOError(err) {2140 log.WithTraceFields(LogFields{"error": err}).Warning("receive failed")2141 }2142 }2143 }()2144 }2145 if request.DownstreamBytes > 0 {2146 n, err := io.CopyN(channel, rand.Reader, int64(request.DownstreamBytes))2147 sent = int(n)2148 if err != nil {2149 if !isExpectedTunnelIOError(err) {2150 log.WithTraceFields(LogFields{"error": err}).Warning("send failed")2151 }2152 }2153 }2154 upstream.Wait()2155 sshClient.Lock()2156 metrics.upstreamBytes += int64(request.UpstreamBytes)2157 metrics.receivedUpstreamBytes += int64(received)2158 metrics.downstreamBytes += int64(request.DownstreamBytes)2159 metrics.sentDownstreamBytes += int64(sent)2160 sshClient.Unlock()2161 channel.Close()2162 }()2163}2164func (sshClient *sshClient) handleNewPacketTunnelChannel(2165 waitGroup *sync.WaitGroup, newChannel ssh.NewChannel) {2166 // packet tunnel channels are handled by the packet tunnel server2167 // component. Each client may have at most one packet tunnel channel.2168 if !sshClient.sshServer.support.Config.RunPacketTunnel {2169 sshClient.rejectNewChannel(newChannel, "unsupported packet tunnel channel type")2170 return2171 }2172 // Accept this channel immediately. This channel will replace any2173 // previously existing packet tunnel channel for this client.2174 packetTunnelChannel, requests, err := newChannel.Accept()2175 if err != nil {2176 if !isExpectedTunnelIOError(err) {2177 log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")2178 }2179 return2180 }2181 go ssh.DiscardRequests(requests)2182 sshClient.setPacketTunnelChannel(packetTunnelChannel)2183 // PacketTunnelServer will run the client's packet tunnel. If necessary, ClientConnected2184 // will stop packet tunnel workers for any previous packet tunnel channel.2185 checkAllowedTCPPortFunc := func(upstreamIPAddress net.IP, port int) bool {2186 return sshClient.isPortForwardPermitted(portForwardTypeTCP, upstreamIPAddress, port)2187 }2188 checkAllowedUDPPortFunc := func(upstreamIPAddress net.IP, port int) bool {2189 return sshClient.isPortForwardPermitted(portForwardTypeUDP, upstreamIPAddress, port)2190 }2191 checkAllowedDomainFunc := func(domain string) bool {2192 ok, _ := sshClient.isDomainPermitted(domain)2193 return ok2194 }2195 flowActivityUpdaterMaker := func(2196 isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []tun.FlowActivityUpdater {2197 trafficType := portForwardTypeTCP2198 if !isTCP {2199 trafficType = portForwardTypeUDP2200 }2201 activityUpdaters := sshClient.getActivityUpdaters(trafficType, upstreamIPAddress)2202 flowUpdaters := make([]tun.FlowActivityUpdater, len(activityUpdaters))2203 for i, activityUpdater := range activityUpdaters {2204 flowUpdaters[i] = activityUpdater2205 }2206 return flowUpdaters2207 }2208 metricUpdater := func(2209 TCPApplicationBytesDown, TCPApplicationBytesUp,2210 UDPApplicationBytesDown, UDPApplicationBytesUp int64) {2211 sshClient.Lock()2212 sshClient.tcpTrafficState.bytesDown += TCPApplicationBytesDown2213 sshClient.tcpTrafficState.bytesUp += TCPApplicationBytesUp2214 sshClient.udpTrafficState.bytesDown += UDPApplicationBytesDown2215 sshClient.udpTrafficState.bytesUp += UDPApplicationBytesUp2216 sshClient.Unlock()2217 }2218 dnsQualityReporter := sshClient.updateQualityMetricsWithDNSResult2219 err = sshClient.sshServer.support.PacketTunnelServer.ClientConnected(2220 sshClient.sessionID,2221 packetTunnelChannel,2222 checkAllowedTCPPortFunc,2223 checkAllowedUDPPortFunc,2224 checkAllowedDomainFunc,2225 flowActivityUpdaterMaker,2226 metricUpdater,2227 dnsQualityReporter)2228 if err != nil {2229 log.WithTraceFields(LogFields{"error": err}).Warning("start packet tunnel client failed")2230 sshClient.setPacketTunnelChannel(nil)2231 }2232}2233func (sshClient *sshClient) handleNewTCPPortForwardChannel(2234 waitGroup *sync.WaitGroup,2235 newChannel ssh.NewChannel,2236 allowSplitTunnel bool,2237 newTCPPortForwards chan *newTCPPortForward) {2238 // udpgw client connections are dispatched immediately (clients use this for2239 // DNS, so it's essential to not block; and only one udpgw connection is2240 // retained at a time).2241 //2242 // All other TCP port forwards are dispatched via the TCP port forward2243 // manager queue.2244 // http://tools.ietf.org/html/rfc4254#section-7.22245 var directTcpipExtraData struct {2246 HostToConnect string2247 PortToConnect uint322248 OriginatorIPAddress string2249 OriginatorPort uint322250 }2251 err := ssh.Unmarshal(newChannel.ExtraData(), &directTcpipExtraData)2252 if err != nil {2253 sshClient.rejectNewChannel(newChannel, "invalid extra data")2254 return2255 }2256 // Intercept TCP port forwards to a specified udpgw server and handle directly.2257 // TODO: also support UDP explicitly, e.g. with a custom "direct-udp" channel type?2258 isUdpgwChannel := sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress != "" &&2259 sshClient.sshServer.support.Config.UDPInterceptUdpgwServerAddress ==2260 net.JoinHostPort(directTcpipExtraData.HostToConnect, strconv.Itoa(int(directTcpipExtraData.PortToConnect)))2261 if isUdpgwChannel {2262 // Dispatch immediately. handleUDPChannel runs the udpgw protocol in its2263 // own worker goroutine.2264 waitGroup.Add(1)2265 go func(channel ssh.NewChannel) {2266 defer waitGroup.Done()2267 sshClient.handleUdpgwChannel(channel)2268 }(newChannel)2269 } else {2270 // Dispatch via TCP port forward manager. When the queue is full, the channel2271 // is immediately rejected.2272 //2273 // Split tunnel logic is enabled for this TCP port forward when the client2274 // has enabled split tunnel mode and the channel type allows it.2275 doSplitTunnel := sshClient.handshakeState.splitTunnelLookup != nil && allowSplitTunnel2276 tcpPortForward := &newTCPPortForward{2277 enqueueTime: time.Now(),2278 hostToConnect: directTcpipExtraData.HostToConnect,2279 portToConnect: int(directTcpipExtraData.PortToConnect),2280 doSplitTunnel: doSplitTunnel,2281 newChannel: newChannel,2282 }2283 select {2284 case newTCPPortForwards <- tcpPortForward:2285 default:2286 sshClient.updateQualityMetricsWithRejectedDialingLimit()2287 sshClient.rejectNewChannel(newChannel, "TCP port forward dial queue full")2288 }2289 }2290}2291func (sshClient *sshClient) cleanupAuthorizations() {2292 sshClient.Lock()2293 if sshClient.releaseAuthorizations != nil {2294 sshClient.releaseAuthorizations()2295 }2296 if sshClient.stopTimer != nil {2297 sshClient.stopTimer.Stop()2298 }2299 sshClient.Unlock()2300}2301// setPacketTunnelChannel sets the single packet tunnel channel2302// for this sshClient. Any existing packet tunnel channel is2303// closed.2304func (sshClient *sshClient) setPacketTunnelChannel(channel ssh.Channel) {2305 sshClient.Lock()2306 if sshClient.packetTunnelChannel != nil {2307 sshClient.packetTunnelChannel.Close()2308 }2309 sshClient.packetTunnelChannel = channel2310 sshClient.totalPacketTunnelChannelCount += 12311 sshClient.Unlock()2312}2313// setUdpgwChannelHandler sets the single udpgw channel handler for this2314// sshClient. Each sshClient may have only one concurrent udpgw2315// channel/handler. Each udpgw channel multiplexes many UDP port forwards via2316// the udpgw protocol. Any existing udpgw channel/handler is closed.2317func (sshClient *sshClient) setUdpgwChannelHandler(udpgwChannelHandler *udpgwPortForwardMultiplexer) bool {2318 sshClient.Lock()2319 if sshClient.udpgwChannelHandler != nil {2320 previousHandler := sshClient.udpgwChannelHandler2321 sshClient.udpgwChannelHandler = nil2322 // stop must be run without holding the sshClient mutex lock, as the2323 // udpgw goroutines may attempt to lock the same mutex. For example,2324 // udpgwPortForwardMultiplexer.run calls sshClient.establishedPortForward2325 // which calls sshClient.allocatePortForward.2326 sshClient.Unlock()2327 previousHandler.stop()2328 sshClient.Lock()2329 // In case some other channel has set the sshClient.udpgwChannelHandler2330 // in the meantime, fail. The caller should discard this channel/handler.2331 if sshClient.udpgwChannelHandler != nil {2332 sshClient.Unlock()2333 return false2334 }2335 }2336 sshClient.udpgwChannelHandler = udpgwChannelHandler2337 sshClient.totalUdpgwChannelCount += 12338 sshClient.Unlock()2339 return true2340}2341var serverTunnelStatParams = append(2342 []requestParamSpec{2343 {"last_connected", isLastConnected, requestParamOptional},2344 {"establishment_duration", isIntString, requestParamOptional}},2345 baseSessionAndDialParams...)2346func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) {2347 sshClient.Lock()2348 logFields := getRequestLogFields(2349 "server_tunnel",2350 sshClient.geoIPData,2351 sshClient.handshakeState.authorizedAccessTypes,2352 sshClient.handshakeState.apiParams,2353 serverTunnelStatParams)2354 // "relay_protocol" is sent with handshake API parameters. In pre-2355 // handshake logTunnel cases, this value is not yet known. As2356 // sshClient.tunnelProtocol is authoritative, set this value2357 // unconditionally, overwriting any value from handshake.2358 logFields["relay_protocol"] = sshClient.tunnelProtocol2359 if sshClient.serverPacketManipulation != "" {2360 logFields["server_packet_manipulation"] = sshClient.serverPacketManipulation2361 }2362 if sshClient.sshListener.BPFProgramName != "" {2363 logFields["server_bpf"] = sshClient.sshListener.BPFProgramName2364 }2365 logFields["session_id"] = sshClient.sessionID2366 logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession2367 logFields["handshake_completed"] = sshClient.handshakeState.completed2368 logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp2369 logFields["bytes_down_tcp"] = sshClient.tcpTrafficState.bytesDown2370 logFields["peak_concurrent_dialing_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentDialingPortForwardCount2371 logFields["peak_concurrent_port_forward_count_tcp"] = sshClient.tcpTrafficState.peakConcurrentPortForwardCount2372 logFields["total_port_forward_count_tcp"] = sshClient.tcpTrafficState.totalPortForwardCount2373 logFields["bytes_up_udp"] = sshClient.udpTrafficState.bytesUp2374 logFields["bytes_down_udp"] = sshClient.udpTrafficState.bytesDown2375 // sshClient.udpTrafficState.peakConcurrentDialingPortForwardCount isn't meaningful2376 logFields["peak_concurrent_port_forward_count_udp"] = sshClient.udpTrafficState.peakConcurrentPortForwardCount2377 logFields["total_port_forward_count_udp"] = sshClient.udpTrafficState.totalPortForwardCount2378 logFields["total_udpgw_channel_count"] = sshClient.totalUdpgwChannelCount2379 logFields["total_packet_tunnel_channel_count"] = sshClient.totalPacketTunnelChannelCount2380 logFields["pre_handshake_random_stream_count"] = sshClient.preHandshakeRandomStreamMetrics.count2381 logFields["pre_handshake_random_stream_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.upstreamBytes2382 logFields["pre_handshake_random_stream_received_upstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.receivedUpstreamBytes2383 logFields["pre_handshake_random_stream_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.downstreamBytes2384 logFields["pre_handshake_random_stream_sent_downstream_bytes"] = sshClient.preHandshakeRandomStreamMetrics.sentDownstreamBytes2385 logFields["random_stream_count"] = sshClient.postHandshakeRandomStreamMetrics.count2386 logFields["random_stream_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.upstreamBytes2387 logFields["random_stream_received_upstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.receivedUpstreamBytes2388 logFields["random_stream_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.downstreamBytes2389 logFields["random_stream_sent_downstream_bytes"] = sshClient.postHandshakeRandomStreamMetrics.sentDownstreamBytes2390 if sshClient.destinationBytesMetricsASN != "" {2391 // Check if the configured DestinationBytesMetricsASN has changed2392 // (or been cleared). If so, don't log and discard the accumulated2393 // bytes to ensure we don't continue to record stats as previously2394 // configured.2395 //2396 // Any counts accumulated before the DestinationBytesMetricsASN change2397 // are lost. At this time we can't change2398 // sshClient.destinationBytesMetricsASN dynamically, after a tactics2399 // hot reload, as there may be destination bytes port forwards that2400 // were in place before the change, which will continue to count.2401 logDestBytes := true2402 if sshClient.sshServer.support.ServerTacticsParametersCache != nil {2403 p, err := sshClient.sshServer.support.ServerTacticsParametersCache.Get(sshClient.geoIPData)2404 if err != nil || p.IsNil() ||2405 sshClient.destinationBytesMetricsASN != p.String(parameters.DestinationBytesMetricsASN) {2406 logDestBytes = false2407 }2408 }2409 if logDestBytes {2410 bytesUpTCP := sshClient.tcpDestinationBytesMetrics.getBytesUp()2411 bytesDownTCP := sshClient.tcpDestinationBytesMetrics.getBytesDown()2412 bytesUpUDP := sshClient.udpDestinationBytesMetrics.getBytesUp()2413 bytesDownUDP := sshClient.udpDestinationBytesMetrics.getBytesDown()2414 logFields["dest_bytes_asn"] = sshClient.destinationBytesMetricsASN2415 logFields["dest_bytes_up_tcp"] = bytesUpTCP2416 logFields["dest_bytes_down_tcp"] = bytesDownTCP2417 logFields["dest_bytes_up_udp"] = bytesUpUDP2418 logFields["dest_bytes_down_udp"] = bytesDownUDP2419 logFields["dest_bytes"] = bytesUpTCP + bytesDownTCP + bytesUpUDP + bytesDownUDP2420 }2421 }2422 // Only log fields for peakMetrics when there is data recorded, otherwise2423 // omit the field.2424 if sshClient.peakMetrics.concurrentProximateAcceptedClients != nil {2425 logFields["peak_concurrent_proximate_accepted_clients"] = *sshClient.peakMetrics.concurrentProximateAcceptedClients2426 }2427 if sshClient.peakMetrics.concurrentProximateEstablishedClients != nil {2428 logFields["peak_concurrent_proximate_established_clients"] = *sshClient.peakMetrics.concurrentProximateEstablishedClients2429 }2430 if sshClient.peakMetrics.TCPPortForwardFailureRate != nil && sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize != nil {2431 logFields["peak_tcp_port_forward_failure_rate"] = *sshClient.peakMetrics.TCPPortForwardFailureRate2432 logFields["peak_tcp_port_forward_failure_rate_sample_size"] = *sshClient.peakMetrics.TCPPortForwardFailureRateSampleSize2433 }2434 if sshClient.peakMetrics.DNSFailureRate != nil && sshClient.peakMetrics.DNSFailureRateSampleSize != nil {2435 logFields["peak_dns_failure_rate"] = *sshClient.peakMetrics.DNSFailureRate2436 logFields["peak_dns_failure_rate_sample_size"] = *sshClient.peakMetrics.DNSFailureRateSampleSize2437 }2438 // Pre-calculate a total-tunneled-bytes field. This total is used2439 // extensively in analytics and is more performant when pre-calculated.2440 logFields["bytes"] = sshClient.tcpTrafficState.bytesUp +2441 sshClient.tcpTrafficState.bytesDown +2442 sshClient.udpTrafficState.bytesUp +2443 sshClient.udpTrafficState.bytesDown2444 // Merge in additional metrics from the optional metrics source2445 for _, metrics := range additionalMetrics {2446 for name, value := range metrics {2447 // Don't overwrite any basic fields2448 if logFields[name] == nil {2449 logFields[name] = value2450 }2451 }2452 }2453 // Retain lock when invoking LogRawFieldsWithTimestamp to block any2454 // concurrent writes to variables referenced by logFields.2455 log.LogRawFieldsWithTimestamp(logFields)2456 sshClient.Unlock()2457}2458var blocklistHitsStatParams = []requestParamSpec{2459 {"propagation_channel_id", isHexDigits, 0},2460 {"sponsor_id", isHexDigits, 0},2461 {"client_version", isIntString, requestParamLogStringAsInt},2462 {"client_platform", isClientPlatform, 0},2463 {"client_features", isAnyString, requestParamOptional | requestParamArray},2464 {"client_build_rev", isHexDigits, requestParamOptional},2465 {"device_region", isAnyString, requestParamOptional},2466 {"egress_region", isRegionCode, requestParamOptional},2467 {"session_id", isHexDigits, 0},2468 {"last_connected", isLastConnected, requestParamOptional},2469}2470func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []BlocklistTag) {2471 sshClient.Lock()2472 logFields := getRequestLogFields(2473 "server_blocklist_hit",2474 sshClient.geoIPData,2475 sshClient.handshakeState.authorizedAccessTypes,2476 sshClient.handshakeState.apiParams,2477 blocklistHitsStatParams)2478 logFields["session_id"] = sshClient.sessionID2479 // Note: see comment in logTunnel regarding unlock and concurrent access.2480 sshClient.Unlock()2481 for _, tag := range tags {2482 if IP != nil {2483 logFields["blocklist_ip_address"] = IP.String()2484 }2485 if domain != "" {2486 logFields["blocklist_domain"] = domain2487 }2488 logFields["blocklist_source"] = tag.Source2489 logFields["blocklist_subject"] = tag.Subject2490 log.LogRawFieldsWithTimestamp(logFields)2491 }2492}2493func (sshClient *sshClient) runOSLSender() {2494 for {2495 // Await a signal that there are SLOKs to send2496 // TODO: use reflect.SelectCase, and optionally await timer here?2497 select {2498 case <-sshClient.signalIssueSLOKs:2499 case <-sshClient.runCtx.Done():2500 return2501 }2502 retryDelay := SSH_SEND_OSL_INITIAL_RETRY_DELAY2503 for {2504 err := sshClient.sendOSLRequest()2505 if err == nil {2506 break2507 }2508 if !isExpectedTunnelIOError(err) {2509 log.WithTraceFields(LogFields{"error": err}).Warning("sendOSLRequest failed")2510 }2511 // If the request failed, retry after a delay (with exponential backoff)2512 // or when signaled that there are additional SLOKs to send2513 retryTimer := time.NewTimer(retryDelay)2514 select {2515 case <-retryTimer.C:2516 case <-sshClient.signalIssueSLOKs:2517 case <-sshClient.runCtx.Done():2518 retryTimer.Stop()2519 return2520 }2521 retryTimer.Stop()2522 retryDelay *= SSH_SEND_OSL_RETRY_FACTOR2523 }2524 }2525}2526// sendOSLRequest will invoke osl.GetSeedPayload to issue SLOKs and2527// generate a payload, and send an OSL request to the client when2528// there are new SLOKs in the payload.2529func (sshClient *sshClient) sendOSLRequest() error {2530 seedPayload := sshClient.getOSLSeedPayload()2531 // Don't send when no SLOKs. This will happen when signalIssueSLOKs2532 // is received but no new SLOKs are issued.2533 if len(seedPayload.SLOKs) == 0 {2534 return nil2535 }2536 oslRequest := protocol.OSLRequest{2537 SeedPayload: seedPayload,2538 }2539 requestPayload, err := json.Marshal(oslRequest)2540 if err != nil {2541 return errors.Trace(err)2542 }2543 ok, _, err := sshClient.sshConn.SendRequest(2544 protocol.PSIPHON_API_OSL_REQUEST_NAME,2545 true,2546 requestPayload)2547 if err != nil {2548 return errors.Trace(err)2549 }2550 if !ok {2551 return errors.TraceNew("client rejected request")2552 }2553 sshClient.clearOSLSeedPayload()2554 return nil2555}2556// runAlertSender dequeues and sends alert requests to the client. As these2557// alerts are informational, there is no retry logic and no SSH client2558// acknowledgement (wantReply) is requested. This worker scheme allows2559// nonconcurrent components including udpgw and packet tunnel to enqueue2560// alerts without blocking their traffic processing.2561func (sshClient *sshClient) runAlertSender() {2562 for {2563 select {2564 case <-sshClient.runCtx.Done():2565 return2566 case request := <-sshClient.sendAlertRequests:2567 payload, err := json.Marshal(request)2568 if err != nil {2569 log.WithTraceFields(LogFields{"error": err}).Warning("Marshal failed")2570 break2571 }2572 _, _, err = sshClient.sshConn.SendRequest(2573 protocol.PSIPHON_API_ALERT_REQUEST_NAME,2574 false,2575 payload)2576 if err != nil && !isExpectedTunnelIOError(err) {2577 log.WithTraceFields(LogFields{"error": err}).Warning("SendRequest failed")2578 break2579 }2580 sshClient.Lock()2581 sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] = true2582 sshClient.Unlock()2583 }2584 }2585}2586// enqueueAlertRequest enqueues an alert request to be sent to the client.2587// Only one request is sent per tunnel per protocol.AlertRequest value;2588// subsequent alerts with the same value are dropped. enqueueAlertRequest will2589// not block until the queue exceeds ALERT_REQUEST_QUEUE_BUFFER_SIZE.2590func (sshClient *sshClient) enqueueAlertRequest(request protocol.AlertRequest) {2591 sshClient.Lock()2592 if sshClient.sentAlertRequests[fmt.Sprintf("%+v", request)] {2593 sshClient.Unlock()2594 return2595 }2596 sshClient.Unlock()2597 select {2598 case <-sshClient.runCtx.Done():2599 case sshClient.sendAlertRequests <- request:2600 }2601}2602func (sshClient *sshClient) enqueueDisallowedTrafficAlertRequest() {2603 reason := protocol.PSIPHON_API_ALERT_DISALLOWED_TRAFFIC2604 actionURLs := sshClient.getAlertActionURLs(reason)2605 sshClient.enqueueAlertRequest(2606 protocol.AlertRequest{2607 Reason: reason,2608 ActionURLs: actionURLs,2609 })2610}2611func (sshClient *sshClient) enqueueUnsafeTrafficAlertRequest(tags []BlocklistTag) {2612 reason := protocol.PSIPHON_API_ALERT_UNSAFE_TRAFFIC2613 actionURLs := sshClient.getAlertActionURLs(reason)2614 for _, tag := range tags {2615 sshClient.enqueueAlertRequest(2616 protocol.AlertRequest{2617 Reason: reason,2618 Subject: tag.Subject,2619 ActionURLs: actionURLs,2620 })2621 }2622}2623func (sshClient *sshClient) getAlertActionURLs(alertReason string) []string {2624 sshClient.Lock()2625 sponsorID, _ := getStringRequestParam(2626 sshClient.handshakeState.apiParams, "sponsor_id")2627 sshClient.Unlock()2628 return sshClient.sshServer.support.PsinetDatabase.GetAlertActionURLs(2629 alertReason,2630 sponsorID,2631 sshClient.geoIPData.Country,2632 sshClient.geoIPData.ASN)2633}2634func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessage string) {2635 // We always return the reject reason "Prohibited":2636 // - Traffic rules and connection limits may prohibit the connection.2637 // - External firewall rules may prohibit the connection, and this is not currently2638 // distinguishable from other failure modes.2639 // - We limit the failure information revealed to the client.2640 reason := ssh.Prohibited2641 // Note: Debug level, as logMessage may contain user traffic destination address information2642 log.WithTraceFields(2643 LogFields{2644 "channelType": newChannel.ChannelType(),2645 "logMessage": logMessage,2646 "rejectReason": reason.String(),2647 }).Debug("reject new channel")2648 // Note: logMessage is internal, for logging only; just the reject reason is sent to the client.2649 newChannel.Reject(reason, reason.String())2650}2651// setHandshakeState records that a client has completed a handshake API request.2652// Some parameters from the handshake request may be used in future traffic rule2653// selection. Port forwards are disallowed until a handshake is complete. The2654// handshake parameters are included in the session summary log recorded in2655// sshClient.stop().2656func (sshClient *sshClient) setHandshakeState(2657 state handshakeState,2658 authorizations []string) (*handshakeStateInfo, error) {2659 sshClient.Lock()2660 completed := sshClient.handshakeState.completed2661 if !completed {2662 sshClient.handshakeState = state2663 }2664 sshClient.Unlock()2665 // Client must only perform one handshake2666 if completed {2667 return nil, errors.TraceNew("handshake already completed")2668 }2669 // Verify the authorizations submitted by the client. Verified, active2670 // (non-expired) access types will be available for traffic rules2671 // filtering.2672 //2673 // When an authorization is active but expires while the client is2674 // connected, the client is disconnected to ensure the access is reset.2675 // This is implemented by setting a timer to perform the disconnect at the2676 // expiry time of the soonest expiring authorization.2677 //2678 // sshServer.authorizationSessionIDs tracks the unique mapping of active2679 // authorization IDs to client session IDs and is used to detect and2680 // prevent multiple malicious clients from reusing a single authorization2681 // (within the scope of this server).2682 // authorizationIDs and authorizedAccessTypes are returned to the client2683 // and logged, respectively; initialize to empty lists so the2684 // protocol/logs don't need to handle 'null' values.2685 authorizationIDs := make([]string, 0)2686 authorizedAccessTypes := make([]string, 0)2687 var stopTime time.Time2688 for i, authorization := range authorizations {2689 // This sanity check mitigates malicious clients causing excess CPU use.2690 if i >= MAX_AUTHORIZATIONS {2691 log.WithTrace().Warning("too many authorizations")2692 break2693 }2694 verifiedAuthorization, err := accesscontrol.VerifyAuthorization(2695 &sshClient.sshServer.support.Config.AccessControlVerificationKeyRing,2696 authorization)2697 if err != nil {2698 log.WithTraceFields(2699 LogFields{"error": err}).Warning("verify authorization failed")2700 continue2701 }2702 authorizationID := base64.StdEncoding.EncodeToString(verifiedAuthorization.ID)2703 if common.Contains(authorizedAccessTypes, verifiedAuthorization.AccessType) {2704 log.WithTraceFields(2705 LogFields{"accessType": verifiedAuthorization.AccessType}).Warning("duplicate authorization access type")2706 continue2707 }2708 authorizationIDs = append(authorizationIDs, authorizationID)2709 authorizedAccessTypes = append(authorizedAccessTypes, verifiedAuthorization.AccessType)2710 if stopTime.IsZero() || stopTime.After(verifiedAuthorization.Expires) {2711 stopTime = verifiedAuthorization.Expires2712 }2713 }2714 // Associate all verified authorizationIDs with this client's session ID.2715 // Handle cases where previous associations exist:2716 //2717 // - Multiple malicious clients reusing a single authorization. In this2718 // case, authorizations are revoked from the previous client.2719 //2720 // - The client reconnected with a new session ID due to user toggling.2721 // This case is expected due to server affinity. This cannot be2722 // distinguished from the previous case and the same action is taken;2723 // this will have no impact on a legitimate client as the previous2724 // session is dangling.2725 //2726 // - The client automatically reconnected with the same session ID. This2727 // case is not expected as sshServer.registerEstablishedClient2728 // synchronously calls sshClient.releaseAuthorizations; as a safe guard,2729 // this case is distinguished and no revocation action is taken.2730 sshClient.sshServer.authorizationSessionIDsMutex.Lock()2731 for _, authorizationID := range authorizationIDs {2732 sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]2733 if ok && sessionID != sshClient.sessionID {2734 logFields := LogFields{2735 "event_name": "irregular_tunnel",2736 "tunnel_error": "duplicate active authorization",2737 "duplicate_authorization_id": authorizationID,2738 }2739 sshClient.geoIPData.SetLogFields(logFields)2740 duplicateGeoIPData := sshClient.sshServer.support.GeoIPService.GetSessionCache(sessionID)2741 if duplicateGeoIPData != sshClient.geoIPData {2742 duplicateGeoIPData.SetLogFieldsWithPrefix("duplicate_authorization_", logFields)2743 }2744 log.LogRawFieldsWithTimestamp(logFields)2745 // Invoke asynchronously to avoid deadlocks.2746 // TODO: invoke only once for each distinct sessionID?2747 go sshClient.sshServer.revokeClientAuthorizations(sessionID)2748 }2749 sshClient.sshServer.authorizationSessionIDs[authorizationID] = sshClient.sessionID2750 }2751 sshClient.sshServer.authorizationSessionIDsMutex.Unlock()2752 if len(authorizationIDs) > 0 {2753 sshClient.Lock()2754 // Make the authorizedAccessTypes available for traffic rules filtering.2755 sshClient.handshakeState.activeAuthorizationIDs = authorizationIDs2756 sshClient.handshakeState.authorizedAccessTypes = authorizedAccessTypes2757 // On exit, sshClient.runTunnel will call releaseAuthorizations, which2758 // will release the authorization IDs so the client can reconnect and2759 // present the same authorizations again. sshClient.runTunnel will2760 // also cancel the stopTimer in case it has not yet fired.2761 // Note: termination of the stopTimer goroutine is not synchronized.2762 sshClient.releaseAuthorizations = func() {2763 sshClient.sshServer.authorizationSessionIDsMutex.Lock()2764 for _, authorizationID := range authorizationIDs {2765 sessionID, ok := sshClient.sshServer.authorizationSessionIDs[authorizationID]2766 if ok && sessionID == sshClient.sessionID {2767 delete(sshClient.sshServer.authorizationSessionIDs, authorizationID)2768 }2769 }2770 sshClient.sshServer.authorizationSessionIDsMutex.Unlock()2771 }2772 sshClient.stopTimer = time.AfterFunc(2773 time.Until(stopTime),2774 func() {2775 sshClient.stop()2776 })2777 sshClient.Unlock()2778 }2779 upstreamBytesPerSecond, downstreamBytesPerSecond := sshClient.setTrafficRules()2780 sshClient.setOSLConfig()2781 // Set destination bytes metrics.2782 //2783 // Limitation: this is a one-time operation and doesn't get reset when2784 // tactics are hot-reloaded. This allows us to simply retain any2785 // destination byte counts accumulated and eventually log in2786 // server_tunnel, without having to deal with a destination change2787 // mid-tunnel. As typical tunnels are short, and destination changes can2788 // be applied gradually, handling mid-tunnel changes is not a priority.2789 sshClient.setDestinationBytesMetrics()2790 return &handshakeStateInfo{2791 activeAuthorizationIDs: authorizationIDs,2792 authorizedAccessTypes: authorizedAccessTypes,2793 upstreamBytesPerSecond: upstreamBytesPerSecond,2794 downstreamBytesPerSecond: downstreamBytesPerSecond,2795 }, nil2796}2797// getHandshaked returns whether the client has completed a handshake API2798// request and whether the traffic rules that were selected after the2799// handshake immediately exhaust the client.2800//2801// When the client is immediately exhausted it will be closed; but this2802// takes effect asynchronously. The "exhausted" return value is used to2803// prevent API requests by clients that will close.2804func (sshClient *sshClient) getHandshaked() (bool, bool) {2805 sshClient.Lock()2806 defer sshClient.Unlock()2807 completed := sshClient.handshakeState.completed2808 exhausted := false2809 // Notes:2810 // - "Immediately exhausted" is when CloseAfterExhausted is set and2811 // either ReadUnthrottledBytes or WriteUnthrottledBytes starts from2812 // 0, so no bytes would be read or written. This check does not2813 // examine whether 0 bytes _remain_ in the ThrottledConn.2814 // - This check is made against the current traffic rules, which2815 // could have changed in a hot reload since the handshake.2816 if completed &&2817 *sshClient.trafficRules.RateLimits.CloseAfterExhausted &&2818 (*sshClient.trafficRules.RateLimits.ReadUnthrottledBytes == 0 ||2819 *sshClient.trafficRules.RateLimits.WriteUnthrottledBytes == 0) {2820 exhausted = true2821 }2822 return completed, exhausted2823}2824func (sshClient *sshClient) getDisableDiscovery() bool {2825 sshClient.Lock()2826 defer sshClient.Unlock()2827 return *sshClient.trafficRules.DisableDiscovery2828}2829func (sshClient *sshClient) updateAPIParameters(2830 apiParams common.APIParameters) {2831 sshClient.Lock()2832 defer sshClient.Unlock()2833 // Only update after handshake has initialized API params.2834 if !sshClient.handshakeState.completed {2835 return2836 }2837 for name, value := range apiParams {2838 sshClient.handshakeState.apiParams[name] = value2839 }2840}2841func (sshClient *sshClient) acceptDomainBytes() bool {2842 sshClient.Lock()2843 defer sshClient.Unlock()2844 // When the domain bytes checksum differs from the checksum sent to the2845 // client in the handshake response, the psinet regex configuration has2846 // changed. In this case, drop the stats so we don't continue to record2847 // stats as previously configured.2848 //2849 // Limitations:2850 // - The checksum comparison may result in dropping some stats for a2851 // domain that remains in the new configuration.2852 // - We don't push new regexs to the clients, so clients that remain2853 // connected will continue to send stats that will be dropped; and2854 // those clients will not send stats as newly configured until after2855 // reconnecting.2856 // - Due to the design of2857 // transferstats.ReportRecentBytesTransferredForServer in the client,2858 // the client may accumulate stats, reconnect before its next status2859 // request, get a new regex configuration, and then send the previously2860 // accumulated stats in its next status request. The checksum scheme2861 // won't prevent the reporting of those stats.2862 sponsorID, _ := getStringRequestParam(sshClient.handshakeState.apiParams, "sponsor_id")2863 domainBytesChecksum := sshClient.sshServer.support.PsinetDatabase.GetDomainBytesChecksum(sponsorID)2864 return bytes.Equal(sshClient.handshakeState.domainBytesChecksum, domainBytesChecksum)2865}2866// setOSLConfig resets the client's OSL seed state based on the latest OSL config2867// As sshClient.oslClientSeedState may be reset by a concurrent goroutine,2868// oslClientSeedState must only be accessed within the sshClient mutex.2869func (sshClient *sshClient) setOSLConfig() {2870 sshClient.Lock()2871 defer sshClient.Unlock()2872 propagationChannelID, err := getStringRequestParam(2873 sshClient.handshakeState.apiParams, "propagation_channel_id")2874 if err != nil {2875 // This should not fail as long as client has sent valid handshake2876 return2877 }2878 // Use a cached seed state if one is found for the client's2879 // session ID. This enables resuming progress made in a previous2880 // tunnel.2881 // Note: go-cache is already concurency safe; the additional mutex2882 // is necessary to guarantee that Get/Delete is atomic; although in2883 // practice no two concurrent clients should ever supply the same2884 // session ID.2885 sshClient.sshServer.oslSessionCacheMutex.Lock()2886 oslClientSeedState, found := sshClient.sshServer.oslSessionCache.Get(sshClient.sessionID)2887 if found {2888 sshClient.sshServer.oslSessionCache.Delete(sshClient.sessionID)2889 sshClient.sshServer.oslSessionCacheMutex.Unlock()2890 sshClient.oslClientSeedState = oslClientSeedState.(*osl.ClientSeedState)2891 sshClient.oslClientSeedState.Resume(sshClient.signalIssueSLOKs)2892 return2893 }2894 sshClient.sshServer.oslSessionCacheMutex.Unlock()2895 // Two limitations when setOSLConfig() is invoked due to an2896 // OSL config hot reload:2897 //2898 // 1. any partial progress towards SLOKs is lost.2899 //2900 // 2. all existing osl.ClientSeedPortForwards for existing2901 // port forwards will not send progress to the new client2902 // seed state.2903 sshClient.oslClientSeedState = sshClient.sshServer.support.OSLConfig.NewClientSeedState(2904 sshClient.geoIPData.Country,2905 propagationChannelID,2906 sshClient.signalIssueSLOKs)2907}2908// newClientSeedPortForward will return nil when no seeding is2909// associated with the specified ipAddress.2910func (sshClient *sshClient) newClientSeedPortForward(IPAddress net.IP) *osl.ClientSeedPortForward {2911 sshClient.Lock()2912 defer sshClient.Unlock()2913 // Will not be initialized before handshake.2914 if sshClient.oslClientSeedState == nil {2915 return nil2916 }2917 return sshClient.oslClientSeedState.NewClientSeedPortForward(IPAddress)2918}2919// getOSLSeedPayload returns a payload containing all seeded SLOKs for2920// this client's session.2921func (sshClient *sshClient) getOSLSeedPayload() *osl.SeedPayload {2922 sshClient.Lock()2923 defer sshClient.Unlock()2924 // Will not be initialized before handshake.2925 if sshClient.oslClientSeedState == nil {2926 return &osl.SeedPayload{SLOKs: make([]*osl.SLOK, 0)}2927 }2928 return sshClient.oslClientSeedState.GetSeedPayload()2929}2930func (sshClient *sshClient) clearOSLSeedPayload() {2931 sshClient.Lock()2932 defer sshClient.Unlock()2933 sshClient.oslClientSeedState.ClearSeedPayload()2934}2935func (sshClient *sshClient) setDestinationBytesMetrics() {2936 sshClient.Lock()2937 defer sshClient.Unlock()2938 // Limitation: the server-side tactics cache is used to avoid the overhead2939 // of an additional tactics filtering per tunnel. As this cache is2940 // designed for GeoIP filtering only, handshake API parameters are not2941 // applied to tactics filtering in this case.2942 tacticsCache := sshClient.sshServer.support.ServerTacticsParametersCache2943 if tacticsCache == nil {2944 return2945 }2946 p, err := tacticsCache.Get(sshClient.geoIPData)2947 if err != nil {2948 log.WithTraceFields(LogFields{"error": err}).Warning("get tactics failed")2949 return2950 }2951 if p.IsNil() {2952 return2953 }2954 sshClient.destinationBytesMetricsASN = p.String(parameters.DestinationBytesMetricsASN)2955}2956func (sshClient *sshClient) newDestinationBytesMetricsUpdater(portForwardType int, IPAddress net.IP) *destinationBytesMetrics {2957 sshClient.Lock()2958 defer sshClient.Unlock()2959 if sshClient.destinationBytesMetricsASN == "" {2960 return nil2961 }2962 if sshClient.sshServer.support.GeoIPService.LookupISPForIP(IPAddress).ASN != sshClient.destinationBytesMetricsASN {2963 return nil2964 }2965 if portForwardType == portForwardTypeTCP {2966 return &sshClient.tcpDestinationBytesMetrics2967 }2968 return &sshClient.udpDestinationBytesMetrics2969}2970func (sshClient *sshClient) getActivityUpdaters(portForwardType int, IPAddress net.IP) []common.ActivityUpdater {2971 var updaters []common.ActivityUpdater2972 clientSeedPortForward := sshClient.newClientSeedPortForward(IPAddress)2973 if clientSeedPortForward != nil {2974 updaters = append(updaters, clientSeedPortForward)2975 }2976 destinationBytesMetrics := sshClient.newDestinationBytesMetricsUpdater(portForwardType, IPAddress)2977 if destinationBytesMetrics != nil {2978 updaters = append(updaters, destinationBytesMetrics)2979 }2980 return updaters2981}2982// setTrafficRules resets the client's traffic rules based on the latest server config2983// and client properties. As sshClient.trafficRules may be reset by a concurrent2984// goroutine, trafficRules must only be accessed within the sshClient mutex.2985func (sshClient *sshClient) setTrafficRules() (int64, int64) {2986 sshClient.Lock()2987 defer sshClient.Unlock()2988 isFirstTunnelInSession := sshClient.isFirstTunnelInSession &&2989 sshClient.handshakeState.establishedTunnelsCount == 02990 sshClient.trafficRules = sshClient.sshServer.support.TrafficRulesSet.GetTrafficRules(2991 isFirstTunnelInSession,2992 sshClient.tunnelProtocol,2993 sshClient.geoIPData,2994 sshClient.handshakeState)2995 if sshClient.throttledConn != nil {2996 // Any existing throttling state is reset.2997 sshClient.throttledConn.SetLimits(2998 sshClient.trafficRules.RateLimits.CommonRateLimits(2999 sshClient.handshakeState.completed))3000 }3001 return *sshClient.trafficRules.RateLimits.ReadBytesPerSecond,3002 *sshClient.trafficRules.RateLimits.WriteBytesPerSecond3003}3004func (sshClient *sshClient) rateLimits() common.RateLimits {3005 sshClient.Lock()3006 defer sshClient.Unlock()3007 return sshClient.trafficRules.RateLimits.CommonRateLimits(3008 sshClient.handshakeState.completed)3009}3010func (sshClient *sshClient) idleTCPPortForwardTimeout() time.Duration {3011 sshClient.Lock()3012 defer sshClient.Unlock()3013 return time.Duration(*sshClient.trafficRules.IdleTCPPortForwardTimeoutMilliseconds) * time.Millisecond3014}3015func (sshClient *sshClient) idleUDPPortForwardTimeout() time.Duration {3016 sshClient.Lock()3017 defer sshClient.Unlock()3018 return time.Duration(*sshClient.trafficRules.IdleUDPPortForwardTimeoutMilliseconds) * time.Millisecond3019}3020func (sshClient *sshClient) setTCPPortForwardDialingAvailableSignal(signal context.CancelFunc) {3021 sshClient.Lock()3022 defer sshClient.Unlock()3023 sshClient.tcpPortForwardDialingAvailableSignal = signal3024}3025const (3026 portForwardTypeTCP = iota3027 portForwardTypeUDP3028)3029func (sshClient *sshClient) isPortForwardPermitted(3030 portForwardType int,3031 remoteIP net.IP,3032 port int) bool {3033 // Disallow connection to bogons.3034 //3035 // As a security measure, this is a failsafe. The server should be run on a3036 // host with correctly configured firewall rules.3037 //3038 // This check also avoids spurious disallowed traffic alerts for destinations3039 // that are impossible to reach.3040 if !sshClient.sshServer.support.Config.AllowBogons && common.IsBogon(remoteIP) {3041 return false3042 }3043 // Blocklist check.3044 //3045 // Limitation: isPortForwardPermitted is not called in transparent DNS3046 // forwarding cases. As the destination IP address is rewritten in these3047 // cases, a blocklist entry won't be dialed in any case. However, no logs3048 // will be recorded.3049 if !sshClient.isIPPermitted(remoteIP) {3050 return false3051 }3052 // Don't lock before calling logBlocklistHits.3053 // Unlock before calling enqueueDisallowedTrafficAlertRequest/log.3054 sshClient.Lock()3055 allowed := true3056 // Client must complete handshake before port forwards are permitted.3057 if !sshClient.handshakeState.completed {3058 allowed = false3059 }3060 if allowed {3061 // Traffic rules checks.3062 switch portForwardType {3063 case portForwardTypeTCP:3064 if !sshClient.trafficRules.AllowTCPPort(remoteIP, port) {3065 allowed = false3066 }3067 case portForwardTypeUDP:3068 if !sshClient.trafficRules.AllowUDPPort(remoteIP, port) {3069 allowed = false3070 }3071 }3072 }3073 sshClient.Unlock()3074 if allowed {3075 return true3076 }3077 switch portForwardType {3078 case portForwardTypeTCP:3079 sshClient.updateQualityMetricsWithTCPRejectedDisallowed()3080 case portForwardTypeUDP:3081 sshClient.updateQualityMetricsWithUDPRejectedDisallowed()3082 }3083 sshClient.enqueueDisallowedTrafficAlertRequest()3084 log.WithTraceFields(3085 LogFields{3086 "type": portForwardType,3087 "port": port,3088 }).Debug("port forward denied by traffic rules")3089 return false3090}3091// isDomainPermitted returns true when the specified domain may be resolved3092// and returns false and a reject reason otherwise.3093func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) {3094 // We're not doing comprehensive validation, to avoid overhead per port3095 // forward. This is a simple sanity check to ensure we don't process3096 // blantantly invalid input.3097 //3098 // TODO: validate with dns.IsDomainName?3099 if len(domain) > 255 {3100 return false, "invalid domain name"3101 }3102 tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain)3103 if len(tags) > 0 {3104 sshClient.logBlocklistHits(nil, domain, tags)3105 if sshClient.sshServer.support.Config.BlocklistActive {3106 // Actively alert and block3107 sshClient.enqueueUnsafeTrafficAlertRequest(tags)3108 return false, "port forward not permitted"3109 }3110 }3111 return true, ""3112}3113func (sshClient *sshClient) isIPPermitted(remoteIP net.IP) bool {3114 tags := sshClient.sshServer.support.Blocklist.LookupIP(remoteIP)3115 if len(tags) > 0 {3116 sshClient.logBlocklistHits(remoteIP, "", tags)3117 if sshClient.sshServer.support.Config.BlocklistActive {3118 // Actively alert and block3119 sshClient.enqueueUnsafeTrafficAlertRequest(tags)3120 return false3121 }3122 }3123 return true3124}3125func (sshClient *sshClient) isTCPDialingPortForwardLimitExceeded() bool {3126 sshClient.Lock()3127 defer sshClient.Unlock()3128 state := &sshClient.tcpTrafficState3129 max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount3130 if max > 0 && state.concurrentDialingPortForwardCount >= int64(max) {3131 return true3132 }3133 return false3134}3135func (sshClient *sshClient) getTCPPortForwardQueueSize() int {3136 sshClient.Lock()3137 defer sshClient.Unlock()3138 return *sshClient.trafficRules.MaxTCPPortForwardCount +3139 *sshClient.trafficRules.MaxTCPDialingPortForwardCount3140}3141func (sshClient *sshClient) getDialTCPPortForwardTimeoutMilliseconds() int {3142 sshClient.Lock()3143 defer sshClient.Unlock()3144 return *sshClient.trafficRules.DialTCPPortForwardTimeoutMilliseconds3145}3146func (sshClient *sshClient) dialingTCPPortForward() {3147 sshClient.Lock()3148 defer sshClient.Unlock()3149 state := &sshClient.tcpTrafficState3150 state.concurrentDialingPortForwardCount += 13151 if state.concurrentDialingPortForwardCount > state.peakConcurrentDialingPortForwardCount {3152 state.peakConcurrentDialingPortForwardCount = state.concurrentDialingPortForwardCount3153 }3154}3155func (sshClient *sshClient) abortedTCPPortForward() {3156 sshClient.Lock()3157 defer sshClient.Unlock()3158 sshClient.tcpTrafficState.concurrentDialingPortForwardCount -= 13159}3160func (sshClient *sshClient) allocatePortForward(portForwardType int) bool {3161 sshClient.Lock()3162 defer sshClient.Unlock()3163 // Check if at port forward limit. The subsequent counter3164 // changes must be atomic with the limit check to ensure3165 // the counter never exceeds the limit in the case of3166 // concurrent allocations.3167 var max int3168 var state *trafficState3169 if portForwardType == portForwardTypeTCP {3170 max = *sshClient.trafficRules.MaxTCPPortForwardCount3171 state = &sshClient.tcpTrafficState3172 } else {3173 max = *sshClient.trafficRules.MaxUDPPortForwardCount3174 state = &sshClient.udpTrafficState3175 }3176 if max > 0 && state.concurrentPortForwardCount >= int64(max) {3177 return false3178 }3179 // Update port forward counters.3180 if portForwardType == portForwardTypeTCP {3181 // Assumes TCP port forwards called dialingTCPPortForward3182 state.concurrentDialingPortForwardCount -= 13183 if sshClient.tcpPortForwardDialingAvailableSignal != nil {3184 max := *sshClient.trafficRules.MaxTCPDialingPortForwardCount3185 if max <= 0 || state.concurrentDialingPortForwardCount < int64(max) {3186 sshClient.tcpPortForwardDialingAvailableSignal()3187 }3188 }3189 }3190 state.concurrentPortForwardCount += 13191 if state.concurrentPortForwardCount > state.peakConcurrentPortForwardCount {3192 state.peakConcurrentPortForwardCount = state.concurrentPortForwardCount3193 }3194 state.totalPortForwardCount += 13195 return true3196}3197// establishedPortForward increments the concurrent port3198// forward counter. closedPortForward decrements it, so it3199// must always be called for each establishedPortForward3200// call.3201//3202// When at the limit of established port forwards, the LRU3203// existing port forward is closed to make way for the newly3204// established one. There can be a minor delay as, in addition3205// to calling Close() on the port forward net.Conn,3206// establishedPortForward waits for the LRU's closedPortForward()3207// call which will decrement the concurrent counter. This3208// ensures all resources associated with the LRU (socket,3209// goroutine) are released or will very soon be released before3210// proceeding.3211func (sshClient *sshClient) establishedPortForward(3212 portForwardType int, portForwardLRU *common.LRUConns) {3213 // Do not lock sshClient here.3214 var state *trafficState3215 if portForwardType == portForwardTypeTCP {3216 state = &sshClient.tcpTrafficState3217 } else {3218 state = &sshClient.udpTrafficState3219 }3220 // When the maximum number of port forwards is already3221 // established, close the LRU. CloseOldest will call3222 // Close on the port forward net.Conn. Both TCP and3223 // UDP port forwards have handler goroutines that may3224 // be blocked calling Read on the net.Conn. Close will3225 // eventually interrupt the Read and cause the handlers3226 // to exit, but not immediately. So the following logic3227 // waits for a LRU handler to be interrupted and signal3228 // availability.3229 //3230 // Notes:3231 //3232 // - the port forward limit can change via a traffic3233 // rules hot reload; the condition variable handles3234 // this case whereas a channel-based semaphore would3235 // not.3236 //3237 // - if a number of goroutines exceeding the total limit3238 // arrive here all concurrently, some CloseOldest() calls3239 // will have no effect as there can be less existing port3240 // forwards than new ones. In this case, the new port3241 // forward will be delayed. This is highly unlikely in3242 // practise since UDP calls to establishedPortForward are3243 // serialized and TCP calls are limited by the dial3244 // queue/count.3245 if !sshClient.allocatePortForward(portForwardType) {3246 portForwardLRU.CloseOldest()3247 log.WithTrace().Debug("closed LRU port forward")3248 state.availablePortForwardCond.L.Lock()3249 for !sshClient.allocatePortForward(portForwardType) {3250 state.availablePortForwardCond.Wait()3251 }3252 state.availablePortForwardCond.L.Unlock()3253 }3254}3255func (sshClient *sshClient) closedPortForward(3256 portForwardType int, bytesUp, bytesDown int64) {3257 sshClient.Lock()3258 var state *trafficState3259 if portForwardType == portForwardTypeTCP {3260 state = &sshClient.tcpTrafficState3261 } else {3262 state = &sshClient.udpTrafficState3263 }3264 state.concurrentPortForwardCount -= 13265 state.bytesUp += bytesUp3266 state.bytesDown += bytesDown3267 sshClient.Unlock()3268 // Signal any goroutine waiting in establishedPortForward3269 // that an established port forward slot is available.3270 state.availablePortForwardCond.Signal()3271}3272func (sshClient *sshClient) updateQualityMetricsWithDialResult(3273 tcpPortForwardDialSuccess bool, dialDuration time.Duration, IP net.IP) {3274 sshClient.Lock()3275 defer sshClient.Unlock()3276 if tcpPortForwardDialSuccess {3277 sshClient.qualityMetrics.TCPPortForwardDialedCount += 13278 sshClient.qualityMetrics.TCPPortForwardDialedDuration += dialDuration3279 if IP.To4() != nil {3280 sshClient.qualityMetrics.TCPIPv4PortForwardDialedCount += 13281 sshClient.qualityMetrics.TCPIPv4PortForwardDialedDuration += dialDuration3282 } else if IP != nil {3283 sshClient.qualityMetrics.TCPIPv6PortForwardDialedCount += 13284 sshClient.qualityMetrics.TCPIPv6PortForwardDialedDuration += dialDuration3285 }3286 } else {3287 sshClient.qualityMetrics.TCPPortForwardFailedCount += 13288 sshClient.qualityMetrics.TCPPortForwardFailedDuration += dialDuration3289 if IP.To4() != nil {3290 sshClient.qualityMetrics.TCPIPv4PortForwardFailedCount += 13291 sshClient.qualityMetrics.TCPIPv4PortForwardFailedDuration += dialDuration3292 } else if IP != nil {3293 sshClient.qualityMetrics.TCPIPv6PortForwardFailedCount += 13294 sshClient.qualityMetrics.TCPIPv6PortForwardFailedDuration += dialDuration3295 }3296 }3297}3298func (sshClient *sshClient) updateQualityMetricsWithRejectedDialingLimit() {3299 sshClient.Lock()3300 defer sshClient.Unlock()3301 sshClient.qualityMetrics.TCPPortForwardRejectedDialingLimitCount += 13302}3303func (sshClient *sshClient) updateQualityMetricsWithTCPRejectedDisallowed() {3304 sshClient.Lock()3305 defer sshClient.Unlock()3306 sshClient.qualityMetrics.TCPPortForwardRejectedDisallowedCount += 13307}3308func (sshClient *sshClient) updateQualityMetricsWithUDPRejectedDisallowed() {3309 sshClient.Lock()3310 defer sshClient.Unlock()3311 sshClient.qualityMetrics.UDPPortForwardRejectedDisallowedCount += 13312}3313func (sshClient *sshClient) updateQualityMetricsWithDNSResult(3314 success bool, duration time.Duration, resolverIP net.IP) {3315 sshClient.Lock()3316 defer sshClient.Unlock()3317 resolver := ""3318 if resolverIP != nil {3319 resolver = resolverIP.String()3320 }3321 if success {3322 sshClient.qualityMetrics.DNSCount["ALL"] += 13323 sshClient.qualityMetrics.DNSDuration["ALL"] += duration3324 if resolver != "" {3325 sshClient.qualityMetrics.DNSCount[resolver] += 13326 sshClient.qualityMetrics.DNSDuration[resolver] += duration3327 }3328 } else {3329 sshClient.qualityMetrics.DNSFailedCount["ALL"] += 13330 sshClient.qualityMetrics.DNSFailedDuration["ALL"] += duration3331 if resolver != "" {3332 sshClient.qualityMetrics.DNSFailedCount[resolver] += 13333 sshClient.qualityMetrics.DNSFailedDuration[resolver] += duration3334 }3335 }3336}3337func (sshClient *sshClient) handleTCPChannel(3338 remainingDialTimeout time.Duration,3339 hostToConnect string,3340 portToConnect int,3341 doSplitTunnel bool,3342 newChannel ssh.NewChannel) {3343 // Assumptions:3344 // - sshClient.dialingTCPPortForward() has been called3345 // - remainingDialTimeout > 03346 established := false3347 defer func() {3348 if !established {3349 sshClient.abortedTCPPortForward()3350 }3351 }()3352 // Transparently redirect web API request connections.3353 isWebServerPortForward := false3354 config := sshClient.sshServer.support.Config3355 if config.WebServerPortForwardAddress != "" {3356 destination := net.JoinHostPort(hostToConnect, strconv.Itoa(portToConnect))3357 if destination == config.WebServerPortForwardAddress {3358 isWebServerPortForward = true3359 if config.WebServerPortForwardRedirectAddress != "" {3360 // Note: redirect format is validated when config is loaded3361 host, portStr, _ := net.SplitHostPort(config.WebServerPortForwardRedirectAddress)3362 port, _ := strconv.Atoi(portStr)3363 hostToConnect = host3364 portToConnect = port3365 }3366 }3367 }3368 // Validate the domain name and check the domain blocklist before dialing.3369 //3370 // The IP blocklist is checked in isPortForwardPermitted, which also provides3371 // IP blocklist checking for the packet tunnel code path. When hostToConnect3372 // is an IP address, the following hostname resolution step effectively3373 // performs no actions and next immediate step is the isPortForwardPermitted3374 // check.3375 //3376 // Limitation: this case handles port forwards where the client sends the3377 // destination domain in the SSH port forward request but does not currently3378 // handle DNS-over-TCP; in the DNS-over-TCP case, a client may bypass the3379 // block list check.3380 if !isWebServerPortForward &&3381 net.ParseIP(hostToConnect) == nil {3382 ok, rejectMessage := sshClient.isDomainPermitted(hostToConnect)3383 if !ok {3384 // Note: not recording a port forward failure in this case3385 sshClient.rejectNewChannel(newChannel, rejectMessage)3386 return3387 }3388 }3389 // Dial the remote address.3390 //3391 // Hostname resolution is performed explicitly, as a separate step, as the3392 // target IP address is used for traffic rules (AllowSubnets), OSL seed3393 // progress, and IP address blocklists.3394 //3395 // Contexts are used for cancellation (via sshClient.runCtx, which is3396 // cancelled when the client is stopping) and timeouts.3397 dialStartTime := time.Now()3398 IP := net.ParseIP(hostToConnect)3399 if IP == nil {3400 // Resolve the hostname3401 log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving")3402 ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)3403 IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect)3404 cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"3405 resolveElapsedTime := time.Since(dialStartTime)3406 // Record DNS metrics. If LookupIPAddr returns net.DNSError.IsNotFound, this3407 // is "no such host" and not a DNS failure. Limitation: the resolver IP is3408 // not known.3409 dnsErr, ok := err.(*net.DNSError)3410 dnsNotFound := ok && dnsErr.IsNotFound3411 dnsSuccess := err == nil || dnsNotFound3412 sshClient.updateQualityMetricsWithDNSResult(dnsSuccess, resolveElapsedTime, nil)3413 // IPv4 is preferred in case the host has limited IPv6 routing. IPv6 is3414 // selected and attempted only when there's no IPv4 option.3415 // TODO: shuffle list to try other IPs?3416 for _, ip := range IPs {3417 if ip.IP.To4() != nil {3418 IP = ip.IP3419 break3420 }3421 }3422 if IP == nil && len(IPs) > 0 {3423 // If there are no IPv4 IPs, the first IP is IPv6.3424 IP = IPs[0].IP3425 }3426 if err == nil && IP == nil {3427 err = std_errors.New("no IP address")3428 }3429 if err != nil {3430 // Record a port forward failure3431 sshClient.updateQualityMetricsWithDialResult(false, resolveElapsedTime, IP)3432 sshClient.rejectNewChannel(newChannel, fmt.Sprintf("LookupIP failed: %s", err))3433 return3434 }3435 remainingDialTimeout -= resolveElapsedTime3436 }3437 if remainingDialTimeout <= 0 {3438 sshClient.rejectNewChannel(newChannel, "TCP port forward timed out resolving")3439 return3440 }3441 // When the client has indicated split tunnel mode and when the channel is3442 // not of type protocol.TCP_PORT_FORWARD_NO_SPLIT_TUNNEL_TYPE, check if the3443 // client and the port forward destination are in the same GeoIP country. If3444 // so, reject the port forward with a distinct response code that indicates3445 // to the client that this port forward should be performed locally, direct3446 // and untunneled.3447 //3448 // Clients are expected to cache untunneled responses to avoid this round3449 // trip in the immediate future and reduce server load.3450 //3451 // When the countries differ, immediately proceed with the standard port3452 // forward. No additional round trip is required.3453 //3454 // If either GeoIP country is "None", one or both countries are unknown3455 // and there is no match.3456 //3457 // Traffic rules, such as allowed ports, are not enforced for port forward3458 // destinations classified as untunneled.3459 //3460 // Domain and IP blocklists still apply to port forward destinations3461 // classified as untunneled.3462 //3463 // The client's use of split tunnel mode is logged in server_tunnel metrics3464 // as the boolean value split_tunnel. As they may indicate some information3465 // about browsing activity, no other split tunnel metrics are logged.3466 if doSplitTunnel {3467 destinationGeoIPData := sshClient.sshServer.support.GeoIPService.LookupIP(IP)3468 if sshClient.geoIPData.Country != GEOIP_UNKNOWN_VALUE &&3469 sshClient.handshakeState.splitTunnelLookup.lookup(3470 destinationGeoIPData.Country) {3471 // Since isPortForwardPermitted is not called in this case, explicitly call3472 // ipBlocklistCheck. The domain blocklist case is handled above.3473 if !sshClient.isIPPermitted(IP) {3474 // Note: not recording a port forward failure in this case3475 sshClient.rejectNewChannel(newChannel, "port forward not permitted")3476 return3477 }3478 newChannel.Reject(protocol.CHANNEL_REJECT_REASON_SPLIT_TUNNEL, "")3479 return3480 }3481 }3482 // Enforce traffic rules, using the resolved IP address.3483 if !isWebServerPortForward &&3484 !sshClient.isPortForwardPermitted(3485 portForwardTypeTCP,3486 IP,3487 portToConnect) {3488 // Note: not recording a port forward failure in this case3489 sshClient.rejectNewChannel(newChannel, "port forward not permitted")3490 return3491 }3492 // TCP dial.3493 remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect))3494 log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing")3495 ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout)3496 fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr)3497 cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled"3498 // Record port forward success or failure3499 sshClient.updateQualityMetricsWithDialResult(err == nil, time.Since(dialStartTime), IP)3500 if err != nil {3501 // Monitor for low resource error conditions3502 sshClient.sshServer.monitorPortForwardDialError(err)3503 sshClient.rejectNewChannel(newChannel, fmt.Sprintf("DialTimeout failed: %s", err))3504 return3505 }3506 // The upstream TCP port forward connection has been established. Schedule3507 // some cleanup and notify the SSH client that the channel is accepted.3508 defer fwdConn.Close()3509 fwdChannel, requests, err := newChannel.Accept()3510 if err != nil {3511 if !isExpectedTunnelIOError(err) {3512 log.WithTraceFields(LogFields{"error": err}).Warning("accept new channel failed")3513 }3514 return3515 }3516 go ssh.DiscardRequests(requests)3517 defer fwdChannel.Close()3518 // Release the dialing slot and acquire an established slot.3519 //3520 // establishedPortForward increments the concurrent TCP port3521 // forward counter and closes the LRU existing TCP port forward3522 // when already at the limit.3523 //3524 // Known limitations:3525 //3526 // - Closed LRU TCP sockets will enter the TIME_WAIT state,3527 // continuing to consume some resources.3528 sshClient.establishedPortForward(portForwardTypeTCP, sshClient.tcpPortForwardLRU)3529 // "established = true" cancels the deferred abortedTCPPortForward()3530 established = true3531 // TODO: 64-bit alignment? https://golang.org/pkg/sync/atomic/#pkg-note-BUG3532 var bytesUp, bytesDown int643533 defer func() {3534 sshClient.closedPortForward(3535 portForwardTypeTCP, atomic.LoadInt64(&bytesUp), atomic.LoadInt64(&bytesDown))3536 }()3537 lruEntry := sshClient.tcpPortForwardLRU.Add(fwdConn)3538 defer lruEntry.Remove()3539 // ActivityMonitoredConn monitors the TCP port forward I/O and updates3540 // its LRU status. ActivityMonitoredConn also times out I/O on the port3541 // forward if both reads and writes have been idle for the specified3542 // duration.3543 fwdConn, err = common.NewActivityMonitoredConn(3544 fwdConn,3545 sshClient.idleTCPPortForwardTimeout(),3546 true,3547 lruEntry,3548 sshClient.getActivityUpdaters(portForwardTypeTCP, IP)...)3549 if err != nil {3550 log.WithTraceFields(LogFields{"error": err}).Error("NewActivityMonitoredConn failed")3551 return3552 }3553 // Relay channel to forwarded connection.3554 log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying")3555 // TODO: relay errors to fwdChannel.Stderr()?3556 relayWaitGroup := new(sync.WaitGroup)3557 relayWaitGroup.Add(1)3558 go func() {3559 defer relayWaitGroup.Done()3560 // io.Copy allocates a 32K temporary buffer, and each port forward relay3561 // uses two of these buffers; using common.CopyBuffer with a smaller buffer3562 // reduces the overall memory footprint.3563 bytes, err := common.CopyBuffer(3564 fwdChannel, fwdConn, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE))...
rpcsessionfactory_test.go
Source:rpcsessionfactory_test.go
...12 "github.com/damianoneill/net/v2/netconf/testserver"13 assert "github.com/stretchr/testify/require"14)15func TestTransportFailure(t *testing.T) {16 s, err := NewRPCSession(context.Background(), &ssh.ClientConfig{}, "localhost:0")17 assert.Error(t, err, "Expecting new session to fail")18 assert.Nil(t, s, "Session should be nil")19}20func TestSessionSetupFailure(t *testing.T) {21 ts := testserver.NewSSHServer(t, testserver.TestUserName, testserver.TestPassword)22 defer ts.Close()23 sshConfig := &ssh.ClientConfig{24 User: testserver.TestUserName,25 Auth: []ssh.AuthMethod{ssh.Password(testserver.TestPassword)},26 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec27 }28 ctx := WithClientTrace(context.Background(), DefaultLoggingHooks)29 s, err := NewRPCSessionWithConfig(ctx, sshConfig, fmt.Sprintf("localhost:%d", ts.Port()), &Config{SetupTimeoutSecs: 1})30 assert.Error(t, err, "Expecting new session to fail - no hello from server")31 assert.Nil(t, s, "Session should be nil")32}33func TestSessionSetupSuccess(t *testing.T) {34 ts := testserver.NewTestNetconfServer(t)35 sshConfig := &ssh.ClientConfig{36 User: testserver.TestUserName,37 Auth: []ssh.AuthMethod{ssh.Password(testserver.TestPassword)},38 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec39 }40 s, err := NewRPCSessionWithConfig(context.Background(), sshConfig, fmt.Sprintf("localhost:%d", ts.Port()), &Config{SetupTimeoutSecs: 1})41 assert.NoError(t, err, "Expecting new session to succeed")42 assert.NotNil(t, s, "Session should not be nil")43}44func TestSessionWithHooks(t *testing.T) {45 logged := exerciseSession(t, NoOpLoggingHooks)46 assert.Equal(t, "", logged, "Nothing should be logged")47 logged = exerciseSession(t, DefaultLoggingHooks)48 assert.NotEqual(t, "", logged, "Something should be logged")49 assert.Contains(t, logged, "Error context", "Error should be logged")50 assert.NotContains(t, logged, "ConnectStart", "ConnectStart should not be logged")51 assert.NotContains(t, logged, "ReadDone", "ReadDone should not be logged")52 logged = exerciseSession(t, MetricLoggingHooks)53 assert.NotEqual(t, "", logged, "Something should be logged")54 assert.Contains(t, logged, "Error context", "Error should be logged")55 assert.NotContains(t, logged, "ConnectStart", "ConnectStart should not be logged")56 assert.Contains(t, logged, "ReadDone", "ReadDone should not be logged")57 logged = exerciseSession(t, DiagnosticLoggingHooks)58 assert.NotEqual(t, "", logged, "Something should be logged")59 assert.Contains(t, logged, "Error context", "Error should be logged")60 assert.Contains(t, logged, "ReadDone", "ReadDone should be logged")61}62func TestSessionSetupFromSSHClient(t *testing.T) {63 ts := testserver.NewTestNetconfServer(t)64 sshConfig := &ssh.ClientConfig{65 User: testserver.TestUserName,66 Auth: []ssh.AuthMethod{ssh.Password(testserver.TestPassword)},67 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec68 }69 sshClient, err := ssh.Dial("tcp", fmt.Sprintf("localhost:%d", ts.Port()), sshConfig)70 assert.NoError(t, err)71 s, err := NewRPCSessionFromSSHClient(context.Background(), sshClient)72 assert.NoError(t, err, "Expecting new session to succeed")73 assert.NotNil(t, s, "Session should not be nil")74}75func exerciseSession(t *testing.T, hooks *ClientTrace) string {76 var b bytes.Buffer77 w := bufio.NewWriter(&b)78 log.SetOutput(w)79 ts := testserver.NewTestNetconfServer(t).80 WithRequestHandler(testserver.EchoRequestHandler).81 WithRequestHandler(testserver.EchoRequestHandler).82 WithRequestHandler(testserver.EchoRequestHandler).83 WithRequestHandler(testserver.CloseRequestHandler)84 sshConfig := &ssh.ClientConfig{85 User: testserver.TestUserName,86 Auth: []ssh.AuthMethod{ssh.Password(testserver.TestPassword)},87 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec88 }89 ctx := context.Background()90 if hooks != nil {91 ctx = WithClientTrace(ctx, hooks)92 }93 s, _ := NewRPCSession(ctx, sshConfig, fmt.Sprintf("localhost:%d", ts.Port()))94 sh := ts.SessionHandler(s.ID())95 reply, _ := s.Execute(common.Request("<get/>"))96 assert.NotNil(t, reply, "Execute failed unexpectedly")97 rch := make(chan *common.RPCReply)98 _ = s.ExecuteAsync(common.Request("<get/>"), rch)99 reply = <-rch100 assert.NotNil(t, reply, "ExecuteAsync failed unexpectedly")101 nch := make(chan *common.Notification)102 reply, _ = s.Subscribe("<create-subscription/>", nch)103 assert.NotNil(t, reply, "Subscribe failed unexpectedly")104 time.AfterFunc(time.Duration(100)*time.Millisecond, func() { sh.SendNotification("<eventA/>") })105 nmsg := <-nch106 assert.NotNil(t, nmsg, "Failed to receive notification")107 sh.SendNotification("<eventB/>") // Should be dropped108 ts.WithRequestHandler(testserver.CloseRequestHandler) // Force error on next request109 reply, _ = s.Execute(common.Request("<get/>"))110 assert.Nil(t, reply, "Execute succeeded unexpectedly")111 s.Close()112 _ = w.Flush()113 return b.String()114}115// Simple real NE access test116// func TestRealNewSession(t *testing.T) {117// sshConfig := &ssh.Config{118// User: "XXxxxx",119// Auth: []ssh.AuthMethod{ssh.Password("XXxxxxxxx")},120// HostKeyCallback: ssh.InsecureIgnoreHostKey(),121// }122// s, err := NewRPCSession(WithClientTrace(context.Background(), DefaultLoggingHooks), sshConfig, fmt.Sprintf("172.26.138.57:%d", 830))123// assert.NoError(t, err, "Not expecting new session to fail")124// assert.NotNil(t, s, "Session should be non-nil")125// defer s.Close()126// reply, err := s.Execute(Request(`<get-config><source><running/></source></get-config>`))127// assert.NoError(t, err, "Not expecting exec to fail")128// assert.NotNil(t, reply, "Reply should be non-nil")129// }130// func TestRealNewSessionFromSSHClient(t *testing.T) {131// sshConfig := &ssh.ClientConfig{132// User: "xxxxxxx",133// Auth: []ssh.AuthMethod{ssh.Password("XxXxXxX")},134// HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec135// }136//137// sshClient, err := ssh.Dial("tcp", fmt.Sprintf("172.26.138.34:%d", 830), sshConfig)138// assert.NoError(t, err)139//140// s, err := NewRPCSessionFromSSHClient(WithClientTrace(context.Background(), DefaultLoggingHooks), sshClient)141// assert.NoError(t, err, "Not expecting new session to fail")142// assert.NotNil(t, s, "Session should be non-nil")143//144// defer s.Close()145//146// reply, err := s.Execute(common.Request(`<get-config><source><running/></source></get-config>`))147// assert.NoError(t, err, "Not expecting exec to fail")148// assert.NotNil(t, reply, "Reply should be non-nil")149//}...
transport_test.go
Source:transport_test.go
...10 "golang.org/x/crypto/ssh"11)12var dftContext = context.Background()13func TestSuccessfulConnection(t *testing.T) {14 ts := testserver.NewSSHServer(t, "testUser", "testPassword")15 defer ts.Close()16 sshConfig := &ssh.ClientConfig{17 User: "testUser",18 Auth: []ssh.AuthMethod{ssh.Password("testPassword")},19 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec20 }21 tr, err := newTransport(dftContext, ts.Port(), sshConfig)22 assert.NoError(t, err, "Not expecting new transport to fail")23 defer tr.Close()24}25func TestFailingConnection(t *testing.T) {26 ts := testserver.NewSSHServer(t, "testUser", "testPassword")27 defer ts.Close()28 sshConfig := &ssh.ClientConfig{29 User: "testUser",30 Auth: []ssh.AuthMethod{ssh.Password("wrongPassword")},31 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec32 }33 tr, err := newTransport(dftContext, ts.Port(), sshConfig)34 assert.Error(t, err, "Not expecting new transport to succeed")35 assert.Nil(t, tr, "Transport should not be defined")36}37func TestWriteRead(t *testing.T) {38 ts := testserver.NewSSHServer(t, "testUser", "testPassword")39 defer ts.Close()40 sshConfig := &ssh.ClientConfig{41 User: "testUser",42 Auth: []ssh.AuthMethod{ssh.Password("testPassword")},43 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec44 }45 tr, err := newTransport(dftContext, ts.Port(), sshConfig)46 assert.NoError(t, err, "Not expecting new transport to fail")47 defer tr.Close()48 rdr := bufio.NewReader(tr)49 _, _ = tr.Write([]byte("Message\n"))50 response, _ := rdr.ReadString('\n')51 assert.Equal(t, "GOT:Message\n", response, "Failed to get expected response")52}53func TestTrace(t *testing.T) {54 ts := testserver.NewSSHServer(t, "testUser", "testPassword")55 defer ts.Close()56 sshConfig := &ssh.ClientConfig{57 User: "testUser",58 Auth: []ssh.AuthMethod{ssh.Password("testPassword")},59 HostKeyCallback: ssh.InsecureIgnoreHostKey(), //nolint: gosec60 }61 var traces []string62 trace := &ClientTrace{63 ConnectStart: func(target string) {64 traces = append(traces, fmt.Sprintf("ConnectStart %s", target))65 },66 ConnectDone: func(target string, err error, d time.Duration) {67 traces = append(traces, fmt.Sprintf("ConnectDone %s error:%v", target, err))68 assert.True(t, d > 0, "Duration should be defined")69 },70 DialStart: func(clientConfig *ssh.ClientConfig, target string) {71 traces = append(traces, fmt.Sprintf("DialStart %s", target))72 },73 DialDone: func(clientConfig *ssh.ClientConfig, target string, err error, d time.Duration) {74 traces = append(traces, fmt.Sprintf("DialDone %s error:%v", target, err))75 assert.True(t, d > 0, "Duration should be defined")76 },77 ConnectionClosed: func(target string, err error) {78 traces = append(traces, fmt.Sprintf("ConnectionClosed target:%s error:%v", target, err))79 },80 ReadStart: func(p []byte) {81 traces = append(traces, "ReadStart called")82 },83 ReadDone: func(p []byte, c int, err error, d time.Duration) {84 traces = append(traces, fmt.Sprintf("ReadDone %s %d %v", string(p[:c]), c, err))85 assert.True(t, d > 0, "Duration should be defined")86 },87 WriteStart: func(p []byte) {88 traces = append(traces, fmt.Sprintf("WriteStart %s", p))89 },90 WriteDone: func(p []byte, c int, err error, d time.Duration) {91 traces = append(traces, fmt.Sprintf("WriteDone %s %d %v", string(p[:c]), c, err))92 assert.True(t, d > 0, "Duration should be defined")93 },94 }95 ctx := WithClientTrace(context.Background(), trace)96 tr, _ := newTransport(ctx, ts.Port(), sshConfig)97 _, _ = tr.Write([]byte("Message\n"))98 _, _ = bufio.NewReader(tr).ReadString('\n')99 tr.Close()100 assert.Equal(t, fmt.Sprintf("ConnectStart localhost:%d", ts.Port()), traces[0])101 assert.Equal(t, fmt.Sprintf("DialStart localhost:%d", ts.Port()), traces[1])102 assert.Equal(t, fmt.Sprintf("DialDone localhost:%d error:<nil>", ts.Port()), traces[2])103 assert.Equal(t, fmt.Sprintf("ConnectDone localhost:%d error:<nil>", ts.Port()), traces[3])104 assert.Equal(t, "WriteStart Message\n", traces[4])105 assert.Equal(t, "WriteDone Message\n 8 <nil>", traces[5])106 assert.Equal(t, "ReadStart called", traces[6])107 assert.Equal(t, "ReadDone GOT:Message\n 12 <nil>", traces[7])108 assert.Contains(t, traces[8], "ConnectionClosed target:localhost:")109}110func newTransport(ctx context.Context, port int, cfg *ssh.ClientConfig) (Transport, error) {111 target := fmt.Sprintf("localhost:%d", port)112 return NewSSHTransport(ctx, NewDialer(target, cfg), target)113}...
New
Using AI Code Generation
1import (2func main() {3 ssh.Handle(func(s ssh.Session) {4 cmd := exec.Command("bash")5 ptyReq, winCh, isPty := s.Pty()6 if isPty {7 cmd.Env = append(cmd.Env, fmt.Sprintf("TERM=%s", ptyReq.Term))8 f, err := pty.Start(cmd)9 if err != nil {10 log.Fatal(err)11 }12 go func() {13 for win := range winCh {14 log.Printf("new win size: %d %d", win.Width, win.Height)15 pty.Setsize(f, &pty.Winsize{Rows: uint16(win.Height), Cols: uint16(win.Width), X: 0, Y: 0})16 }17 }()18 go func() {19 defer f.Close()20 defer s.Close()21 }()22 go func() {23 defer f.Close()24 defer s.Close()25 }()26 } else {27 if err := cmd.Run(); err != nil {28 log.Fatal(err)29 }30 }31 })32 log.Println("starting ssh server on :2222...")33 log.Fatal(ssh.ListenAndServe(":2222", nil))34}35Welcome to Ubuntu 16.04.3 LTS (GNU/Linux 4.4.0-116-generic x86_64)
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!