From 2d7305ba1711bb296a0542978a27fa6cdefdcd21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Br=C3=BCll?= Date: Tue, 7 Apr 2015 14:44:40 +0200 Subject: [PATCH 1/4] added cmd line parameter to narrow down the use of interfaces --- .gitignore | 1 + main.go | 27 +++++++++++++++------------ proto/iris/overlay.go | 23 ++++++++++++++++------- proto/pastry/overlay.go | 19 ++++++++++++++----- proto/scribe/overlay.go | 4 ++-- 5 files changed, 48 insertions(+), 26 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..58616fa --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +iris diff --git a/main.go b/main.go index 3ddcb38..0a69fd6 100644 --- a/main.go +++ b/main.go @@ -38,14 +38,17 @@ import ( ) // Command line flags -var devMode = flag.Bool("dev", false, "start in local developer mode (random cluster and key)") -var relayPort = flag.Int("port", 55555, "relay endpoint for locally connecting clients") -var clusterName = flag.String("net", "", "name of the cluster to join or create") -var rsaKeyPath = flag.String("rsa", "", "path to the RSA private key to use for data security") - -var cpuProfile = flag.String("cpuprof", "", "path to CPU profiling results") -var heapProfile = flag.String("heapprof", "", "path to memory heap profiling results") -var blockProfile = flag.String("blockprof", "", "path to lock contention profiling results") +var ( + devMode = flag.Bool("dev", false, "start in local developer mode (random cluster and key)") + relayPort = flag.Int("port", 55555, "relay endpoint for locally connecting clients") + clusterName = flag.String("net", "", "name of the cluster to join or create") + rsaKeyPath = flag.String("rsa", "", "path to the RSA private key to use for data security") + interfaceAddr = flag.String("if_addr", "", "ip/mask of the interface which is used to search for other nodes. if nothing is specified, all interfaces will be used.") + + cpuProfile = flag.String("cpuprof", "", "path to CPU profiling results") + heapProfile = flag.String("heapprof", "", "path to memory heap profiling results") + blockProfile = flag.String("blockprof", "", "path to lock contention profiling results") +) // Prints the usage of the Iris command and its options. func usage() { @@ -75,7 +78,7 @@ func usage() { } // Parses the command line flags and checks their validity -func parseFlags() (int, string, *rsa.PrivateKey) { +func parseFlags() (int, string, *rsa.PrivateKey, string) { var rsaKey *rsa.PrivateKey // Read the command line arguments @@ -137,12 +140,12 @@ func parseFlags() (int, string, *rsa.PrivateKey) { } } } - return *relayPort, *clusterName, rsaKey + return *relayPort, *clusterName, rsaKey, *interfaceAddr } func main() { // Extract the command line arguments - relayPort, clusterId, rsaKey := parseFlags() + relayPort, clusterId, rsaKey, interfaceAddr := parseFlags() // Check for CPU profiling if *cpuProfile != "" { @@ -176,7 +179,7 @@ func main() { // Create and boot a new carrier log.Printf("main: booting iris overlay...") overlay := iris.New(clusterId, rsaKey) - if peers, err := overlay.Boot(); err != nil { + if peers, err := overlay.Boot(interfaceAddr); err != nil { log.Fatalf("main: failed to boot iris overlay: %v.", err) } else { log.Printf("main: iris overlay converged with %v remote connections.", peers) diff --git a/proto/iris/overlay.go b/proto/iris/overlay.go index d38aca4..7ea6b30 100644 --- a/proto/iris/overlay.go +++ b/proto/iris/overlay.go @@ -58,17 +58,26 @@ func New(overId string, key *rsa.PrivateKey) *Overlay { return o } -// Boots the overlay, returning the number of remote peers. -func (o *Overlay) Boot() (int, error) { +// Boot starts up the overlay, returning the number of remote peers. +func (o *Overlay) Boot(interfaceAddr string) (int, error) { // Boot the underlay and wait until it converges - peers, err := o.scribe.Boot() + peers, err := o.scribe.Boot(interfaceAddr) if err != nil { return 0, err } - // Start a tunnel acceptor on each network interface - addrs, err := net.InterfaceAddrs() - if err != nil { - return 0, err + var addrs []net.Addr + if interfaceAddr == "" { + // Start a tunnel acceptor on each network interface + if addrs, err = net.InterfaceAddrs(); err != nil { + return 0, err + } + } else { + // Start a tunnel acceptor on the specified network interface + _, addr, err := net.ParseCIDR(interfaceAddr) + if err != nil { + return 0, err + } + addrs = append(addrs, addr) } for _, addr := range addrs { // Workaround for upstream Go issue #5395, extract IP from both IPNet and IPAddr diff --git a/proto/pastry/overlay.go b/proto/pastry/overlay.go index 0a062d3..5620ec6 100644 --- a/proto/pastry/overlay.go +++ b/proto/pastry/overlay.go @@ -125,12 +125,21 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay { // Boots the overlay network: it starts up boostrappers and connection acceptors // on all local IPv4 interfaces, after which the overlay management is booted. // The method returns the number of remote peers after convergence is reached. -func (o *Overlay) Boot() (int, error) { - // Start the individual acceptors - addrs, err := net.InterfaceAddrs() - if err != nil { - return 0, err +func (o *Overlay) Boot(interfaceAddr string) (int, error) { + var addrs []net.Addr + if interfaceAddr == "" { + var err error + if addrs, err = net.InterfaceAddrs(); err != nil { + return 0, err + } + } else { + _, addr, err := net.ParseCIDR(interfaceAddr) + if err != nil { + return 0, err + } + addrs = append(addrs, addr) } + for _, addr := range addrs { // Workaround for upstream Go issue #5395, construct an IPNet if IPAddr is returned var ipnet *net.IPNet diff --git a/proto/scribe/overlay.go b/proto/scribe/overlay.go index fb4b4b1..762981a 100644 --- a/proto/scribe/overlay.go +++ b/proto/scribe/overlay.go @@ -71,14 +71,14 @@ func New(overId string, key *rsa.PrivateKey, app Callback) *Overlay { } // Boots the overlay, returning the number of remote peers. -func (o *Overlay) Boot() (int, error) { +func (o *Overlay) Boot(interfaceAddr string) (int, error) { log.Printf("scribe: booting with id %v.", o.pastry.Self()) // Start the heartbeat first since convergence can last long o.heart.Start() // Boot the overlay and wait until it converges - peers, err := o.pastry.Boot() + peers, err := o.pastry.Boot(interfaceAddr) if err != nil { return 0, err } From aead150080cf34779e7bda280260003bfb4a2067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Br=C3=BCll?= Date: Tue, 28 Apr 2015 12:31:29 +0200 Subject: [PATCH 2/4] fixed if addr parsing --- main.go | 26 +++++++++++++++++++++----- proto/iris/overlay.go | 13 ++++--------- proto/pastry/overlay.go | 10 +++------- proto/scribe/overlay.go | 5 +++-- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index 0a69fd6..965b563 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ import ( "fmt" "io/ioutil" "log" + "net" rng "math/rand" "os" "os/signal" @@ -78,8 +79,11 @@ func usage() { } // Parses the command line flags and checks their validity -func parseFlags() (int, string, *rsa.PrivateKey, string) { - var rsaKey *rsa.PrivateKey +func parseFlags() (int, string, *rsa.PrivateKey, net.Addr) { + var ( + rsaKey *rsa.PrivateKey + ifAddr *net.IPNet + ) // Read the command line arguments flag.Usage = usage @@ -139,13 +143,25 @@ func parseFlags() (int, string, *rsa.PrivateKey, string) { } } } + if *interfaceAddr != "" { + var ( + ip net.IP + err error + ) + ip, ifAddr, err = net.ParseCIDR(*interfaceAddr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to parse the interface address.\n") + os.Exit(-1) + } + ifAddr.IP = ip + } } - return *relayPort, *clusterName, rsaKey, *interfaceAddr + return *relayPort, *clusterName, rsaKey, ifAddr } func main() { // Extract the command line arguments - relayPort, clusterId, rsaKey, interfaceAddr := parseFlags() + relayPort, clusterId, rsaKey, ifAddr := parseFlags() // Check for CPU profiling if *cpuProfile != "" { @@ -179,7 +195,7 @@ func main() { // Create and boot a new carrier log.Printf("main: booting iris overlay...") overlay := iris.New(clusterId, rsaKey) - if peers, err := overlay.Boot(interfaceAddr); err != nil { + if peers, err := overlay.Boot(ifAddr); err != nil { log.Fatalf("main: failed to boot iris overlay: %v.", err) } else { log.Printf("main: iris overlay converged with %v remote connections.", peers) diff --git a/proto/iris/overlay.go b/proto/iris/overlay.go index 7ea6b30..3c5d5d9 100644 --- a/proto/iris/overlay.go +++ b/proto/iris/overlay.go @@ -59,25 +59,20 @@ func New(overId string, key *rsa.PrivateKey) *Overlay { } // Boot starts up the overlay, returning the number of remote peers. -func (o *Overlay) Boot(interfaceAddr string) (int, error) { +func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { // Boot the underlay and wait until it converges - peers, err := o.scribe.Boot(interfaceAddr) + peers, err := o.scribe.Boot(ifAddr) if err != nil { return 0, err } var addrs []net.Addr - if interfaceAddr == "" { + if ifAddr == nil { // Start a tunnel acceptor on each network interface if addrs, err = net.InterfaceAddrs(); err != nil { return 0, err } } else { - // Start a tunnel acceptor on the specified network interface - _, addr, err := net.ParseCIDR(interfaceAddr) - if err != nil { - return 0, err - } - addrs = append(addrs, addr) + addrs = append(addrs, ifAddr) } for _, addr := range addrs { // Workaround for upstream Go issue #5395, extract IP from both IPNet and IPAddr diff --git a/proto/pastry/overlay.go b/proto/pastry/overlay.go index 5620ec6..346a076 100644 --- a/proto/pastry/overlay.go +++ b/proto/pastry/overlay.go @@ -125,19 +125,15 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay { // Boots the overlay network: it starts up boostrappers and connection acceptors // on all local IPv4 interfaces, after which the overlay management is booted. // The method returns the number of remote peers after convergence is reached. -func (o *Overlay) Boot(interfaceAddr string) (int, error) { +func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { var addrs []net.Addr - if interfaceAddr == "" { + if ifAddr == nil { var err error if addrs, err = net.InterfaceAddrs(); err != nil { return 0, err } } else { - _, addr, err := net.ParseCIDR(interfaceAddr) - if err != nil { - return 0, err - } - addrs = append(addrs, addr) + addrs = append(addrs, ifAddr) } for _, addr := range addrs { diff --git a/proto/scribe/overlay.go b/proto/scribe/overlay.go index 762981a..10ca89f 100644 --- a/proto/scribe/overlay.go +++ b/proto/scribe/overlay.go @@ -23,6 +23,7 @@ import ( "crypto/rsa" "errors" "log" + "net" "math/big" "sync" @@ -71,14 +72,14 @@ func New(overId string, key *rsa.PrivateKey, app Callback) *Overlay { } // Boots the overlay, returning the number of remote peers. -func (o *Overlay) Boot(interfaceAddr string) (int, error) { +func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { log.Printf("scribe: booting with id %v.", o.pastry.Self()) // Start the heartbeat first since convergence can last long o.heart.Start() // Boot the overlay and wait until it converges - peers, err := o.pastry.Boot(interfaceAddr) + peers, err := o.pastry.Boot(ifAddr) if err != nil { return 0, err } From 303243aaab5b414550164000cb406096349ed327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Br=C3=BCll?= Date: Tue, 28 Apr 2015 15:28:58 +0200 Subject: [PATCH 3/4] added another small fix --- main.go | 2 +- proto/iris/overlay.go | 2 +- proto/pastry/overlay.go | 3 ++- proto/scribe/overlay.go | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 965b563..797f4c3 100644 --- a/main.go +++ b/main.go @@ -79,7 +79,7 @@ func usage() { } // Parses the command line flags and checks their validity -func parseFlags() (int, string, *rsa.PrivateKey, net.Addr) { +func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet) { var ( rsaKey *rsa.PrivateKey ifAddr *net.IPNet diff --git a/proto/iris/overlay.go b/proto/iris/overlay.go index 3c5d5d9..586b302 100644 --- a/proto/iris/overlay.go +++ b/proto/iris/overlay.go @@ -59,7 +59,7 @@ func New(overId string, key *rsa.PrivateKey) *Overlay { } // Boot starts up the overlay, returning the number of remote peers. -func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { // Boot the underlay and wait until it converges peers, err := o.scribe.Boot(ifAddr) if err != nil { diff --git a/proto/pastry/overlay.go b/proto/pastry/overlay.go index 346a076..5e18e15 100644 --- a/proto/pastry/overlay.go +++ b/proto/pastry/overlay.go @@ -125,7 +125,7 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay { // Boots the overlay network: it starts up boostrappers and connection acceptors // on all local IPv4 interfaces, after which the overlay management is booted. // The method returns the number of remote peers after convergence is reached. -func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { var addrs []net.Addr if ifAddr == nil { var err error @@ -133,6 +133,7 @@ func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { return 0, err } } else { + log.Printf("ip net %#v", ifAddr) addrs = append(addrs, ifAddr) } diff --git a/proto/scribe/overlay.go b/proto/scribe/overlay.go index 10ca89f..34ca35a 100644 --- a/proto/scribe/overlay.go +++ b/proto/scribe/overlay.go @@ -72,7 +72,7 @@ func New(overId string, key *rsa.PrivateKey, app Callback) *Overlay { } // Boots the overlay, returning the number of remote peers. -func (o *Overlay) Boot(ifAddr net.Addr) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { log.Printf("scribe: booting with id %v.", o.pastry.Self()) // Start the heartbeat first since convergence can last long From dc8e7099869e8ad32f6d721d4dcfa15b4060f161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Br=C3=BCll?= Date: Tue, 26 May 2015 17:26:35 +0200 Subject: [PATCH 4/4] made pastry and tunnel ports configurable --- .gitignore | 1 + main.go | 14 ++++++++------ proto/iris/overlay.go | 6 +++--- proto/iris/tunnel.go | 4 ++-- proto/pastry/handshake.go | 4 ++-- proto/pastry/overlay.go | 5 ++--- proto/scribe/overlay.go | 6 +++--- 7 files changed, 21 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 58616fa..2f3776d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ iris +Makefile diff --git a/main.go b/main.go index 797f4c3..2bf2ce6 100644 --- a/main.go +++ b/main.go @@ -26,8 +26,8 @@ import ( "fmt" "io/ioutil" "log" - "net" rng "math/rand" + "net" "os" "os/signal" "runtime" @@ -45,6 +45,8 @@ var ( clusterName = flag.String("net", "", "name of the cluster to join or create") rsaKeyPath = flag.String("rsa", "", "path to the RSA private key to use for data security") interfaceAddr = flag.String("if_addr", "", "ip/mask of the interface which is used to search for other nodes. if nothing is specified, all interfaces will be used.") + pastryPort = flag.Int("pastry_port", 0, "port for incoming pastry connections. a value of 0 triggers makes iris choose a random port.") + tunnelPort = flag.Int("tunnel_port", 0, "port for incoming tunnel connections. a value of 0 triggers makes iris choose a random port.") cpuProfile = flag.String("cpuprof", "", "path to CPU profiling results") heapProfile = flag.String("heapprof", "", "path to memory heap profiling results") @@ -79,7 +81,7 @@ func usage() { } // Parses the command line flags and checks their validity -func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet) { +func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet, int, int) { var ( rsaKey *rsa.PrivateKey ifAddr *net.IPNet @@ -145,7 +147,7 @@ func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet) { } if *interfaceAddr != "" { var ( - ip net.IP + ip net.IP err error ) ip, ifAddr, err = net.ParseCIDR(*interfaceAddr) @@ -156,12 +158,12 @@ func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet) { ifAddr.IP = ip } } - return *relayPort, *clusterName, rsaKey, ifAddr + return *relayPort, *clusterName, rsaKey, ifAddr, *pastryPort, *tunnelPort } func main() { // Extract the command line arguments - relayPort, clusterId, rsaKey, ifAddr := parseFlags() + relayPort, clusterId, rsaKey, ifAddr, pastryPort, tunnelPort := parseFlags() // Check for CPU profiling if *cpuProfile != "" { @@ -195,7 +197,7 @@ func main() { // Create and boot a new carrier log.Printf("main: booting iris overlay...") overlay := iris.New(clusterId, rsaKey) - if peers, err := overlay.Boot(ifAddr); err != nil { + if peers, err := overlay.Boot(ifAddr, pastryPort, tunnelPort); err != nil { log.Fatalf("main: failed to boot iris overlay: %v.", err) } else { log.Printf("main: iris overlay converged with %v remote connections.", peers) diff --git a/proto/iris/overlay.go b/proto/iris/overlay.go index 586b302..b1c6c80 100644 --- a/proto/iris/overlay.go +++ b/proto/iris/overlay.go @@ -59,9 +59,9 @@ func New(overId string, key *rsa.PrivateKey) *Overlay { } // Boot starts up the overlay, returning the number of remote peers. -func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet, pastryPort, tunnelPort int) (int, error) { // Boot the underlay and wait until it converges - peers, err := o.scribe.Boot(ifAddr) + peers, err := o.scribe.Boot(ifAddr, pastryPort) if err != nil { return 0, err } @@ -94,7 +94,7 @@ func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { // Start and sync the acceptor live := make(chan struct{}) - go o.tunneler(ip, live, quit) + go o.tunneler(ip, tunnelPort, live, quit) <-live } } diff --git a/proto/iris/tunnel.go b/proto/iris/tunnel.go index 1a42797..323711b 100644 --- a/proto/iris/tunnel.go +++ b/proto/iris/tunnel.go @@ -60,9 +60,9 @@ func init() { gob.Register(&dataHeader{}) } -func (o *Overlay) tunneler(ip net.IP, live chan struct{}, quit chan chan error) { +func (o *Overlay) tunneler(ip net.IP, port int, live chan struct{}, quit chan chan error) { // Listen for incoming streams on the given interface and random port. - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ip.String(), "0")) + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ip.String(), fmt.Sprintf("%d", port))) if err != nil { panic(fmt.Sprintf("failed to resolve interface (%v): %v.", ip, err)) } diff --git a/proto/pastry/handshake.go b/proto/pastry/handshake.go index 6e78d49..5307f28 100644 --- a/proto/pastry/handshake.go +++ b/proto/pastry/handshake.go @@ -49,9 +49,9 @@ func init() { // Starts up the overlay networking on a specified interface and fans in all the // inbound connections into the overlay-global channels. -func (o *Overlay) acceptor(ipnet *net.IPNet, quit chan chan error) { +func (o *Overlay) acceptor(ipnet *net.IPNet, port int, quit chan chan error) { // Listen for incoming session on the given interface and random port. - addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ipnet.IP.String(), "0")) + addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ipnet.IP.String(), fmt.Sprintf("%d", port))) if err != nil { panic(fmt.Sprintf("failed to resolve interface (%v): %v.", ipnet.IP, err)) } diff --git a/proto/pastry/overlay.go b/proto/pastry/overlay.go index 5e18e15..059a032 100644 --- a/proto/pastry/overlay.go +++ b/proto/pastry/overlay.go @@ -125,7 +125,7 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay { // Boots the overlay network: it starts up boostrappers and connection acceptors // on all local IPv4 interfaces, after which the overlay management is booted. // The method returns the number of remote peers after convergence is reached. -func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet, port int) (int, error) { var addrs []net.Addr if ifAddr == nil { var err error @@ -133,7 +133,6 @@ func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { return 0, err } } else { - log.Printf("ip net %#v", ifAddr) addrs = append(addrs, ifAddr) } @@ -158,7 +157,7 @@ func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { // Create a quit channel and start the acceptor quit := make(chan chan error) o.acceptQuit = append(o.acceptQuit, quit) - go o.acceptor(ipnet, quit) + go o.acceptor(ipnet, port, quit) } } // Start the overlay processes diff --git a/proto/scribe/overlay.go b/proto/scribe/overlay.go index 34ca35a..d9b50d3 100644 --- a/proto/scribe/overlay.go +++ b/proto/scribe/overlay.go @@ -23,8 +23,8 @@ import ( "crypto/rsa" "errors" "log" - "net" "math/big" + "net" "sync" "github.com/project-iris/iris/config" @@ -72,14 +72,14 @@ func New(overId string, key *rsa.PrivateKey, app Callback) *Overlay { } // Boots the overlay, returning the number of remote peers. -func (o *Overlay) Boot(ifAddr *net.IPNet) (int, error) { +func (o *Overlay) Boot(ifAddr *net.IPNet, pastryPort int) (int, error) { log.Printf("scribe: booting with id %v.", o.pastry.Self()) // Start the heartbeat first since convergence can last long o.heart.Start() // Boot the overlay and wait until it converges - peers, err := o.pastry.Boot(ifAddr) + peers, err := o.pastry.Boot(ifAddr, pastryPort) if err != nil { return 0, err }