Making Storm fly with Netty

Yahoo recently submitted work to Storm that allows the messaging layer to be pluggable and provides an implementation based on Netty.  This is an important step for many reasons. It enables Storm to run without dependencies on native library, and thus simplifies Storm deployment onto any OS and to the cloud. It opens up a path to add authentication and authorization to the connections between worker processes.

But most important of all it lets me pump twice as many messages per second through the same cluster.  Let me show you how we tuned Netty to be able to do this.

The Test:

At Yahoo we eat our own dog food but before making Netty the default messaging layer for our Storm clusters I needed some numbers to see how it compared to zeromq, which is the current default.  To do this I needed a benchmark that could make Storm’s messaging layer cry uncle, so I wrote one.  It is a simple speed-of-light test that sees how quickly Storm can push messages between the different bolts and spouts.  It allows us to launch multiple topologies of varying complexity that send fixed sized messages.

I work on a team that provides hosted Storm clusters for other teams in Yahoo to use.  As such we have a large variety of topology configurations and message sizes on our clusters.  To be sure any new default was as good as the original, I tested many different configurations ranging from very small topologies with 3 workers, 3 bolts, and 3 spouts all the way up to filling the entire cluster with 408 workers using thousands of bolts and spouts. For each of these I varied the message size as well. I have only included a subset of the topologies that were representative of the tests as a whole. See Appendix B for more details of the setup used.  

In all cases acking was enabled, and each spout was allowed 1000 pending messages so as to have some form of flow control in the system.  We also ran the tests multiple times.  I have not included error bounds on the results because the differences between Netty and zeromq are really quite dramatic, and the differences between runs were all less than 5%.  The benchmark is set up so that it disregards the ack messages from the count of messages, since it is system overhead.

Results:

The first test is a very small one.  It has 3 workers running on 3 separate nodes, so there is essentially no resource contention at all.  Each worker has 1 spout, 1 bolt and 1 acker task.

As can be seen from Figure 1. Netty in this condition is much faster than zeromq. Netty is able to send between 40% and 100% more messages per second depending on the size of the message.


image

Figure 1 Small Topology: Netty vs zeromq

These results were very encouraging, but what about larger topologies with more contention?

The next topology is a much larger one with 100 workers, 100 spouts, 500 bolts spread out in 5 levels of 100 bolts each, and 100 ackers.

As can be seen from Figure 2 something very odd was happening with Netty.

image

Figure 2 Large Topology: Netty vs zeromq

Why in the world would messages per second go up as the message size increased?  That is totally counter intuitive.  So I started to look at what was happening on the individual Supervisor nodes.  The worker processes in the section of the graph that slopes upward had relatively high CPU utilization, usually around 800% or all 8 physical cores maxed, but the load average was 0.2.  That is a smoking gun for context switching being the culprit.  Netty is an asynchronous IO framework.  As each message is sent control is handed back to the OS and the thread waits to be woken up for something else to do.  Once the data finished being sent some thread wakes up to see what else needs to be done with that message.  This constant sleeping and waking was using all of the CPU time.  As the messages got bigger there was less context switching so, oddly enough, more messages could be sent.  This trend continued until the network saturated at 2.5KB messages.  The slope down corresponds to about 4.6 GB/s of messages being sent, and meets up with zeromq at about the 4KB message mark.

So if context switching is killing us, let’s reduce the number of threads that are context switching.  By default, Netty assumes that it is the only thing running on a box and creates a handler thread for every core available.  When there are multiple workers on a box, this huge number of threads causes the extra context switching.  To resolve this I tried two different approaches: first, reduce the number of workers so a single worker is running per node; and second, apply this pull request that makes the number of threads configurable with a default of 1, which matches the zeromq default.

As can be seen from Figure 2, Netty’s default setting is not that great for lots of small messages even when it is the only one on the node. But when we restrict it to a single thread we are able to get between 111% and 85% more messages per second than zeromq and after that the network saturates again.

To be sure that this change would not impact the other topologies I reran all the tests with the new configuration, and as you can see from Figure 1, the difference with the default Netty configuration appears to be negligible.

Conclusion:

The new default settings allow Netty to run twice as fast as the default setting for zeromq.  Netty is now the default for our Storm clusters.

Appendix A: Commands to Run the Tests

Small Topology:

storm jar ./target/storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --testTime 120 --pollFreq 5 --workers 3 --spout 3 --bolt 3 -l 1 -n 1 --ackers 3 --ack --maxSpoutPending 1000 --messageSize $MESSAGE_SIZE

Large Topology:

storm jar ./target/storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --testTime 120 --pollFreq 5 --workers 100 --spout 100 --bolt 100 -l 5 -n 1 --ackers 100 --ack --maxSpoutPending 1000 --messageSize $MESSAGE_SIZE


Appendix B: The Setup

Hardware:

38 nodes total,  3 nodes were dedicated to Zookeeper, 1 to Nimbus and the UI, and 34 for Supervisor and Logviewer processes.  Each node had 2 x 2.4GHz Xenon processors each with 4 cores and Hyperthreading enabled for a total of 16 threads of execution.  Each box had 24 GB of main memory clocked at 1066 MHz, full duplex Gigabit Ethernet, and several 7.2K SATA/300 drives.  They were all in a single rack with a top of rack switch that supports full duplex Gigabit connections between the nodes.


Software:

OS: Red Hat Enterprise Linux Server 6.3

java: 64-bit Oracle java 1.7.

I left RHEL mostly stock, for Zookeeper I did remount the drive that held the edit logs to have the -nobarrier option because it dramatically increased the number of iops that ZooKeeper could handle.  We feel that the added iops in this case is worth the risk of data loss.

The version of Storm used is one that we have modified to optionally have Kerberos authentication with Nimbus, run the worker processes as the user that launched the topology, and have Zookeeper restrict access to topology’s data based off of ACLs.  This version is based off of storm-0.9.0-wip21, and is in the process of being submitted back to the Storm community.  In all cases I ran the tests with these extra security features disabled.

The messaging layer is more or less the same between the version under test and open source Storm with the caveat that we are using a slightly modified version of zeromq to work around some issues we were seeing on top of RHEL6 under heavy load.  It simply contains a backport of a fix from a newer version of zeromq.

Storm was configured to use 16 slots per box, or one per thread of execution. This is higher than a typical Storm cluster would have but I wanted to see what would happen under extremely heavy load. Other important configurations in storm.yaml were.

worker.childopts: "-Xmx2048m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true"
supervisor.childopts: "-Xmx256m"
nimbus.childopts: "-Xmx1024m"
ui.childopts: "-Xmx768m"
nimbus.thrift.threads: 256

When running with Netty storm was configured with the following in the storm.yaml

storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100