diff --git a/async_producer.go b/async_producer.go index e34eed544..8a65bc581 100644 --- a/async_producer.go +++ b/async_producer.go @@ -555,6 +555,22 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { return err } + // Select only partitions with leaders in this rack if configured so, falling back if none are available. + if tp.parent.conf.Producer.PartitionerRackAware { + clientRack := tp.parent.client.Config().RackID + + var partitionsInRack []int32 + for _, p := range partitions { + leader, err := tp.parent.client.Leader(msg.Topic, p) + if err == nil && leader.Rack() == clientRack { + partitionsInRack = append(partitionsInRack, p) + } + } + if len(partitionsInRack) > 0 { + partitions = partitionsInRack + } + } + numPartitions := int32(len(partitions)) if numPartitions == 0 { diff --git a/config.go b/config.go index 5bac2b50a..5e929261c 100644 --- a/config.go +++ b/config.go @@ -204,6 +204,8 @@ type Config struct { // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. Partitioner PartitionerConstructor + // Controls whether the partitioner is rack-aware. This also affects custom partitioners. + PartitionerRackAware bool // If enabled, the producer will ensure that exactly one copy of each message is // written. Idempotent bool @@ -555,6 +557,8 @@ func NewConfig() *Config { c.Producer.Return.Errors = true c.Producer.CompressionLevel = CompressionLevelDefault + c.Producer.PartitionerRackAware = false + c.Producer.Transaction.Timeout = 1 * time.Minute c.Producer.Transaction.Retry.Max = 50 c.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond diff --git a/tools/kafka-console-producer/kafka-console-producer.go b/tools/kafka-console-producer/kafka-console-producer.go index f3cc81de7..7c1380558 100644 --- a/tools/kafka-console-producer/kafka-console-producer.go +++ b/tools/kafka-console-producer/kafka-console-producer.go @@ -22,6 +22,7 @@ var ( value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.") partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`") partition = flag.Int("partition", -1, "The partition to produce to.") + rackID = flag.String("rackid", "", "Produce to leaders with the same client.rack") verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr") showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr") silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout") @@ -83,6 +84,11 @@ func main() { printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner)) } + if *rackID != "" { + config.Producer.PartitionerRackAware = true + config.RackID = *rackID + } + message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)} if *key != "" {