Skip to content
This repository was archived by the owner on Aug 3, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
iris
Makefile
47 changes: 34 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"io/ioutil"
"log"
rng "math/rand"
"net"
"os"
"os/signal"
"runtime"
Expand All @@ -38,14 +39,19 @@ import (
)

// Command line flags
var devMode = flag.Bool("dev", false, "start in local developer mode (random cluster and key)")
var relayPort = flag.Int("port", 55555, "relay endpoint for locally connecting clients")
var clusterName = flag.String("net", "", "name of the cluster to join or create")
var rsaKeyPath = flag.String("rsa", "", "path to the RSA private key to use for data security")

var cpuProfile = flag.String("cpuprof", "", "path to CPU profiling results")
var heapProfile = flag.String("heapprof", "", "path to memory heap profiling results")
var blockProfile = flag.String("blockprof", "", "path to lock contention profiling results")
var (
devMode = flag.Bool("dev", false, "start in local developer mode (random cluster and key)")
relayPort = flag.Int("port", 55555, "relay endpoint for locally connecting clients")
clusterName = flag.String("net", "", "name of the cluster to join or create")
rsaKeyPath = flag.String("rsa", "", "path to the RSA private key to use for data security")
interfaceAddr = flag.String("if_addr", "", "ip/mask of the interface which is used to search for other nodes. if nothing is specified, all interfaces will be used.")
pastryPort = flag.Int("pastry_port", 0, "port for incoming pastry connections. a value of 0 triggers makes iris choose a random port.")
tunnelPort = flag.Int("tunnel_port", 0, "port for incoming tunnel connections. a value of 0 triggers makes iris choose a random port.")

cpuProfile = flag.String("cpuprof", "", "path to CPU profiling results")
heapProfile = flag.String("heapprof", "", "path to memory heap profiling results")
blockProfile = flag.String("blockprof", "", "path to lock contention profiling results")
)

// Prints the usage of the Iris command and its options.
func usage() {
Expand Down Expand Up @@ -75,8 +81,11 @@ func usage() {
}

// Parses the command line flags and checks their validity
func parseFlags() (int, string, *rsa.PrivateKey) {
var rsaKey *rsa.PrivateKey
func parseFlags() (int, string, *rsa.PrivateKey, *net.IPNet, int, int) {
var (
rsaKey *rsa.PrivateKey
ifAddr *net.IPNet
)

// Read the command line arguments
flag.Usage = usage
Expand Down Expand Up @@ -136,13 +145,25 @@ func parseFlags() (int, string, *rsa.PrivateKey) {
}
}
}
if *interfaceAddr != "" {
var (
ip net.IP
err error
)
ip, ifAddr, err = net.ParseCIDR(*interfaceAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse the interface address.\n")
os.Exit(-1)
}
ifAddr.IP = ip
}
}
return *relayPort, *clusterName, rsaKey
return *relayPort, *clusterName, rsaKey, ifAddr, *pastryPort, *tunnelPort
}

func main() {
// Extract the command line arguments
relayPort, clusterId, rsaKey := parseFlags()
relayPort, clusterId, rsaKey, ifAddr, pastryPort, tunnelPort := parseFlags()

// Check for CPU profiling
if *cpuProfile != "" {
Expand Down Expand Up @@ -176,7 +197,7 @@ func main() {
// Create and boot a new carrier
log.Printf("main: booting iris overlay...")
overlay := iris.New(clusterId, rsaKey)
if peers, err := overlay.Boot(); err != nil {
if peers, err := overlay.Boot(ifAddr, pastryPort, tunnelPort); err != nil {
log.Fatalf("main: failed to boot iris overlay: %v.", err)
} else {
log.Printf("main: iris overlay converged with %v remote connections.", peers)
Expand Down
20 changes: 12 additions & 8 deletions proto/iris/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ func New(overId string, key *rsa.PrivateKey) *Overlay {
return o
}

// Boots the overlay, returning the number of remote peers.
func (o *Overlay) Boot() (int, error) {
// Boot starts up the overlay, returning the number of remote peers.
func (o *Overlay) Boot(ifAddr *net.IPNet, pastryPort, tunnelPort int) (int, error) {
// Boot the underlay and wait until it converges
peers, err := o.scribe.Boot()
peers, err := o.scribe.Boot(ifAddr, pastryPort)
if err != nil {
return 0, err
}
// Start a tunnel acceptor on each network interface
addrs, err := net.InterfaceAddrs()
if err != nil {
return 0, err
var addrs []net.Addr
if ifAddr == nil {
// Start a tunnel acceptor on each network interface
if addrs, err = net.InterfaceAddrs(); err != nil {
return 0, err
}
} else {
addrs = append(addrs, ifAddr)
}
for _, addr := range addrs {
// Workaround for upstream Go issue #5395, extract IP from both IPNet and IPAddr
Expand All @@ -90,7 +94,7 @@ func (o *Overlay) Boot() (int, error) {

// Start and sync the acceptor
live := make(chan struct{})
go o.tunneler(ip, live, quit)
go o.tunneler(ip, tunnelPort, live, quit)
<-live
}
}
Expand Down
4 changes: 2 additions & 2 deletions proto/iris/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func init() {
gob.Register(&dataHeader{})
}

func (o *Overlay) tunneler(ip net.IP, live chan struct{}, quit chan chan error) {
func (o *Overlay) tunneler(ip net.IP, port int, live chan struct{}, quit chan chan error) {
// Listen for incoming streams on the given interface and random port.
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ip.String(), "0"))
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ip.String(), fmt.Sprintf("%d", port)))
if err != nil {
panic(fmt.Sprintf("failed to resolve interface (%v): %v.", ip, err))
}
Expand Down
4 changes: 2 additions & 2 deletions proto/pastry/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func init() {

// Starts up the overlay networking on a specified interface and fans in all the
// inbound connections into the overlay-global channels.
func (o *Overlay) acceptor(ipnet *net.IPNet, quit chan chan error) {
func (o *Overlay) acceptor(ipnet *net.IPNet, port int, quit chan chan error) {
// Listen for incoming session on the given interface and random port.
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ipnet.IP.String(), "0"))
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(ipnet.IP.String(), fmt.Sprintf("%d", port)))
if err != nil {
panic(fmt.Sprintf("failed to resolve interface (%v): %v.", ipnet.IP, err))
}
Expand Down
17 changes: 11 additions & 6 deletions proto/pastry/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,17 @@ func New(id string, key *rsa.PrivateKey, app Callback) *Overlay {
// Boots the overlay network: it starts up boostrappers and connection acceptors
// on all local IPv4 interfaces, after which the overlay management is booted.
// The method returns the number of remote peers after convergence is reached.
func (o *Overlay) Boot() (int, error) {
// Start the individual acceptors
addrs, err := net.InterfaceAddrs()
if err != nil {
return 0, err
func (o *Overlay) Boot(ifAddr *net.IPNet, port int) (int, error) {
var addrs []net.Addr
if ifAddr == nil {
var err error
if addrs, err = net.InterfaceAddrs(); err != nil {
return 0, err
}
} else {
addrs = append(addrs, ifAddr)
}

for _, addr := range addrs {
// Workaround for upstream Go issue #5395, construct an IPNet if IPAddr is returned
var ipnet *net.IPNet
Expand All @@ -152,7 +157,7 @@ func (o *Overlay) Boot() (int, error) {
// Create a quit channel and start the acceptor
quit := make(chan chan error)
o.acceptQuit = append(o.acceptQuit, quit)
go o.acceptor(ipnet, quit)
go o.acceptor(ipnet, port, quit)
}
}
// Start the overlay processes
Expand Down
5 changes: 3 additions & 2 deletions proto/scribe/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"log"
"math/big"
"net"
"sync"

"github.com/project-iris/iris/config"
Expand Down Expand Up @@ -71,14 +72,14 @@ func New(overId string, key *rsa.PrivateKey, app Callback) *Overlay {
}

// Boots the overlay, returning the number of remote peers.
func (o *Overlay) Boot() (int, error) {
func (o *Overlay) Boot(ifAddr *net.IPNet, pastryPort int) (int, error) {
log.Printf("scribe: booting with id %v.", o.pastry.Self())

// Start the heartbeat first since convergence can last long
o.heart.Start()

// Boot the overlay and wait until it converges
peers, err := o.pastry.Boot()
peers, err := o.pastry.Boot(ifAddr, pastryPort)
if err != nil {
return 0, err
}
Expand Down