java

Showing 4 posts tagged java

Bullet Updates - Windowing, Apache Pulsar PubSub, Configuration-based Data Ingestion, and More

yahoodevelopers:

By Akshay Sarma, Principal Engineer, Verizon Media & Brian Xiao, Software Engineer, Verizon Media

This is the first of an ongoing series of blog posts sharing releases and announcements for Bullet, an open-sourced lightweight, scalable, pluggable, multi-tenant query system.

Bullet allows you to query any data flowing through a streaming system without having to store it first through its UI or API. The queries are injected into the running system and have minimal overhead. Running hundreds of queries generally fit into the overhead of just reading the streaming data. Bullet requires running an instance of its backend on your data. This backend runs on common stream processing frameworks (Storm and Spark Streaming currently supported).

The data on which Bullet sits determines what it is used for. For example, our team runs an instance of Bullet on user engagement data (~1M events/sec) to let developers find their own events to validate their code that produces this data. We also use this instance to interactively explore data, throw up quick dashboards to monitor live releases, count unique users, debug issues, and more.

Since open sourcing Bullet in 2017, we’ve been hard at work adding many new features! We’ll highlight some of these here and continue sharing update posts for future releases.

Windowing

Bullet used to operate in a request-response fashion - you would submit a query and wait for the query to meet its termination conditions (usually duration) before receiving results. For short-lived queries, say, a few seconds, this was fine. But as we started fielding more interactive and iterative queries, waiting even a minute for results became too cumbersome.

Enter windowing! Bullet now supports time and record-based windowing. With time windowing, you can break up your query into chunks of time over its duration and retrieve results for each chunk.  For example, you can calculate the average of a field, and stream back results every second:

In the above example, the aggregation is operating on all the data since the beginning of the query, but you can also do aggregations on just the windows themselves. This is often called a Tumbling window:

image

With record windowing, you can get the intermediate aggregation for each record that matches your query (a Sliding window). Or you can do a Tumbling window on records rather than time. For example, you could get results back every three records:

image

Overlapping windows in other ways (Hopping windows) or windows that reset based on different criteria (Session windows, Cascading windows) are currently being worked on. Stay tuned!

image
image

Apache Pulsar support as a native PubSub

Bullet uses a PubSub (publish-subscribe) message queue to send queries and results between the Web Service and Backend. As with everything else in Bullet, the PubSub is pluggable. You can use your favorite pubsub by implementing a few interfaces if you don’t want to use the ones we provide. Until now, we’ve maintained and supported a REST-based PubSub and an Apache Kafka PubSub. Now we are excited to announce supporting Apache Pulsar as well! Bullet Pulsar will be useful to those users who want to use Pulsar as their underlying messaging service.

If you aren’t familiar with Pulsar, setting up a local standalone is very simple, and by default, any Pulsar topics written to will automatically be created. Setting up an instance of Bullet with Pulsar instead of REST or Kafka is just as easy. You can refer to our documentation for more details.

image

Plug your data into Bullet without code

While Bullet worked on any data source located in any persistence layer, you still had to implement an interface to connect your data source to the Backend and convert it into a record container format that Bullet understands. For instance, your data might be located in Kafka and be in the Avro format. If you were using Bullet on Storm, you would perhaps write a Storm Spout to read from Kafka, deserialize, and convert the Avro data into the Bullet record format. This was the only interface in Bullet that required our customers to write their own code. Not anymore! Bullet DSL is a text/configuration-based format for users to plug in their data to the Bullet Backend without having to write a single line of code.

Bullet DSL abstracts away the two major components for plugging data into the Bullet Backend. A Connector piece to read from arbitrary data-sources and a Converter piece to convert that read data into the Bullet record container. We currently support and maintain a few of these - Kafka and Pulsar for Connectors and Avro, Maps and arbitrary Java POJOs for Converters. The Converters understand typed data and can even do a bit of minor ETL (Extract, Transform and Load) if you need to change your data around before feeding it into Bullet. As always, the DSL components are pluggable and you can write your own (and contribute it back!) if you need one that we don’t support.

We appreciate your feedback and contributions! Explore Bullet on GitHub, use and help contribute to the project, and chat with us on Google Groups. To get started, try our Quickstarts on Spark or Storm to set up an instance of Bullet on some fake data and play around with it.

Elide : Simplify Your CRUD

By Aaron Klish & Jon Kilroy

Mobile and client-side web applications have been reshaping the design principles of service layer APIs. Business logic traditionally implemented on the server and exposed as functions (think RPC) has morphed into exposing data and shifting that logic to the application. Object hierarchies and graphs are one of the most common and natural forms for data representation and often pair well with simple CRUD (create, read, update, & delete) operations. The main challenges with building these services is that powerful & flexible APIs that expose data are often expensive to build. There is usually only time to build the minimal set of interfaces that are needed by a single application.

Elide is a new Yahoo technology designed to significantly reduce the development cost to expose a hierarchical data model as a production quality REST API. Elide adopts the best standards to date for API definition targeting the concerns of front end developers - JSON-API.

What Is JSON-API

JSON-API is a standard for representing and manipulating an entity relationship graph through a REST API.

Oversimplifying things, the path segment of URLs in JSON-API are constructed by:

  1. Referencing collections of entities at the root by type: /books
  2. Referencing individual entities: /books/1
  3. Referencing collections of entities through a relationship: /books/1/authors or /books/1/relationships/authors

Entity representations consist a type, an ID, a set of attributes, and a set of relationships to other entities:

HTTP/1.1 200 OK
Content-Type: application/vnd.api+json

"data": {
  "id": "12345678-1234-1234-1234-1234567890ab",
  "type": "author",
  "attributes": {
    "name": "Ernest Hemingway"
  },
  "relationships": {
    "books": {
      "data": [
        {
          "type": "book",
          "id": "12345678-1234-1234-1234-1234567890ac"
        },
        {
          "type": "book",
          "id": "12345678-1234-1234-1234-1234567890ad"
        }
      ]
    }
  }
}

Modeling Your Data

JSON-API needs a data model to expose. Rather than starting from scratch with a new DSL, we adopted the most mature, feature rich, and industry proven data modeling framework for the backend - JPA (Java Persistence API). JPA allows developers to define complex models but also to leverage existing providers for a wide variety of persistence architectures - or to build their own custom solutions. We’ve done both at Yahoo. Elide exposes any JPA annotated data model as a complete, JSON-API web service.

Filling the Gaps

Having a powerful API and a powerful modeling language are not enough to build a production quality service. There are four other components that are required to expose data as a service:

  1. Security - With Elide, the developer first chooses which entities to expose and which to leave hidden. Elide then provides new annotations that work alongside JPA to define completely customizable authorization policies for every exposed entity, attribute, relationship, and operation. Like the data model itself, authorization is hierarchical. The path taken to reach an object in the hierarchy determines the security checks invoked, reducing the need to duplicate security rules. Unlike other data as a service frameworks (GraphQL, Falcor, etc.), authorization of data is a first class property of the data model itself.
  2. Data Validation - Data validation can take the form of simple constraints like a field shouldn’t be null. It can also involve complex rules where two or more entities must be changed together or not at all. Part of the rationale to leverage JPA was its powerful set of aspect oriented validation annotations and APIs.
  3. Consistency - Clients need a consistent representation of the object graph that doesn’t interleave operations across users. JSON-API has bulk read (compound documents) and write (PATCH extension) interfaces. These interfaces coupled with transactions that wrap every API request enable the client to build and maintain a consistent view of their data.
  4. Extension - Elide provides hooks for customizing the behavior of data model operations, for wiring in new persistence architectures, and for exposing functions as attributes in the model.

The marriage of JSON-API, JPA, and a custom set of Elide annotations provide other benefits:

  1. Round Trip Reduction - JSON-API provides the ability to query an individual entity - but also ask for its children, grandchildren, and so on in the form of compound documents. It also supports bulk write interfaces to perform a series of updates in a single request.
  2. Type Safety - While JSON-API is not strongly typed, JPA is. Elide ensures that the provided JSON conforms to the underlying data types of the model.
  3. Query Projection - JSON-API allows the client to specify the exact subset of elements it wants returned from the data model. This feature is critical for the performance of the persistence layer and also reduces the total payload of data sent back to the client.
  4. Web Compatible - Facebook has recently released GraphQL - another technology in this space with a similar set of goals. Unlike GraphQL, Elide embraces the web architecture and REST. Elide is fully compatible with traditional web caching that is fundamental to reduce client latency.

Future Work

Elide is working on the release of other features including:

  1. A fluent, JavaScript library (although Elide already works with existing JSON-API clients).
  2. A test framework that makes security verification as simple as setting up an Elide service.
  3. Introspection capabilities that will allow powerful validation for clients, documentation, and other tools.

For now, check out our documentation at elide.io. We hope to have more posts soon with some tutorials for simple projects.

SquiDB, a SQLite database layer for Android

Introducing SquiDB

We are pleased to announce SquiDB, a SQLite database layer for Android. It is designed to make it as easy as possible to work with SQLite databases while still enabling the power and flexibility of raw SQL. SquiDB combines features of an ORM with object-oriented SQL statement builders to simplify reading and writing your data without messy and complicated SQL strings in your Java code. It also includes built in tools and hooks to help you write database migrations as well as implement ContentProviders.

Model objects

Like most ORMs, SquiDB represents rows in your SQLite tables as objects. Unlike some other ORMs, SquiDB uses compile time code generation to let you define your models/table schemas as minimally as possible – the actual code you will work with is generated at compile time. A DatabaseDao object mediates reading and writing these objects from the database. Setting up all these components is quick and easy. For example:

// This is a table schema
@TableModelSpec(className = "Person", tableName = "people")
public class PersonSpec {

    // A text column named "firstName"
    public String firstName;

    // A text column named "lastName"
    public String lastName;

    // A long column named "creationDate", but referred to as "birthday"
    // when working with the model
    @ColumnSpec(name = "creationDate")
    public long birthday;
}

// This is how you'd set up a database instance
public class MyDatabase extends AbstractDatabase {

    private static final int VERSION = 1;

    public MyDatabase(Context context) {
        super(context);
    }

    @Override
    protected String getName() {
        return "my_database.db";
    }

    @Override
    protected Table[] getTables() {
        return new Table[]{
            // List all tables here
            Person.TABLE,
        };
    }

    @Override
    protected int getVersion() {
        return VERSION;
    }

    // Other overridable methods exist for migrations and initialization;
    // omitted for brevity
}

DatabaseDao dao = new DatabaseDao(new MyDatabase(context));

// This is how you'd work with the generated model
Person newPerson = new Person()
    .setFirstName("Sam")
    .setLastName("Bosley")
    .setBirthday(System.currentTimeMillis());
dao.persist(newPerson);

...

String firstName = newPerson.getFirstName();
String lastName = newPerson.getLastName();
long birthday = newPerson.getBirthday();

Building queries

In addition to defining getters and setters for all the columns, the generated model class also defines constant fields you can reference for constructing queries:

long ageCutoff = System.currentTimeMillis() - (DateUtil.YEAR_IN_MILLIS * 18);

Query peopleWhoCanVote = Query.select().where(Person.BIRTHDAY.lt(ageCutoff));

// This becomes "select * from people where people.birthday < ?",
// where ? is the placeholder for the age cutoff arg
SquidCursor voters = dao.query(Person.class, peopleWhoCanVote);

The example is simple, but SquiDB’s query object and associated classes support almost the entire SQL grammar. It is much cleaner and easier to maintain, particularly for complex queries:

// This typical Android-style query...
String sql = "select " + PersonColumns.AGE + ", " + ProfileImageColumns.URL + " from "
    + PERSON_TABLE + " left join " + PROFILE_IMAGE_TABLE + " on " + PersonColumns._ID
    + " = " + ProfileImageColumns.PERSON_ID + " where " + PersonColumns.NAME + " = ?"
    + " AND " + PersonColumns.AGE + " >= ?" + " ORDER BY " + PersonColumns.AGE + " ASC"
String[] sqlArgs = new String[]{"Sam", Integer.toString(18)};

// ... becomes this:
Query query = Query.select(Person.AGE, ProfileImage.URL).from(Person.TABLE)
    .leftJoin(ProfileImage.TABLE, Person.ID.eq(ProfileImage.PERSON_ID))
    .where(Person.NAME.eq("Sam").and(Person.AGE.gte(18))).orderBy(Person.AGE.asc());

The above example with strings uses the ? character as placeholders for arguments to the statement. The values of these arguments are placed in a separate String[]. Users of Android’s SQLiteDatabase will recognize this as the pattern used by many of its methods, including query methods. This is good practice, but it makes the code harder to read and necessitates that extra string array for the arguments. SquiDB inserts those placeholders for you when compiling the Query object and binds the arguments automatically at query time. The raw SQL version is also prone to errors when updating the SQL adds, removes, or changes the contents of sqlArgs. You must always count the number of ?s to find the appropriate argument in the array; for large and complex queries, this can be difficult. SquiDB’s Query object makes it a non-issue and also prevents several classes of typos – you won’t ever mistype a keyword or forget a space character somewhere.

Furthermore, it becomes easier to build/compose queries or SQL clauses as objects:

public Query queryForPeopleWithName(String name, boolean includeLastName) {
    Query baseQuery = Query.select().from(Person.TABLE);
    Criterion nameCriterion = Person.FIRST_NAME.eq(name);
    if (includeLastName) {
        nameCriterion = nameCriterion.or(Person.LAST_NAME.eq(name));
    }
    baseQuery.where(nameCriterion);
    return baseQuery;
}

Working with query results

DatabaseDao can return either single rows of data represented by model objects, or a SquidCursor parametrized by a model type:

// Get the people table row with _id = 1
Person person1 = dao.fetch(Person.class, 1);

// Cursor containing all rows in the people table
SquidCursor personCursor = dao.query(Person.class, Query.select());

Model objects are designed to be efficiently reusable, so iterating through the cursor and inflating model objects to work with is cheap if you don’t need the row data to live outside of the loop:

SquidCursor personCursor = dao.query(Person.class, Query.select());
try {
    Person person = new Person();
    while (personCursor.moveToNext()) {
        person.readPropertiesFromCursor(personCursor);
        doSomethingWithCurrentRow(person);
    }
} finally {
    personCursor.close();
}

SquidCursor is an instance of Android’s CursorWrapper, so you can use one anywhere a standard Android Cursor is expected. It also provides users a typesafe get() method that can work directly with table columns if you don’t want or need to inflate a full model object:

String firstName = personCursor.get(Person.FIRST_NAME);
Long birthday = personCursor.get(Person.BIRTHDAY);

These are simple examples that only use a single table, but it’s still easy to work with model objects even if you need to join across multiple tables.

And more!

We’ve shown several simple examples here, but there’s a lot that SquiDB can do to make more complicated use cases easy too – it can help you work with SQL views using model objects, write database migrations, implement flexible ContentProviders backed by your SQLite database, and more. For a more in-depth look at all you can do with SquiDB, check out our wiki, or to clone and start working with SquiDB go to our GitHub repo. SquiDB is being actively maintained and developed, so we welcome feedback and contributions!

By Sam Bosley and Jonathan Koren

Making Storm fly with Netty

Yahoo recently submitted work to Storm that allows the messaging layer to be pluggable and provides an implementation based on Netty.  This is an important step for many reasons. It enables Storm to run without dependencies on native library, and thus simplifies Storm deployment onto any OS and to the cloud. It opens up a path to add authentication and authorization to the connections between worker processes.

But most important of all it lets me pump twice as many messages per second through the same cluster.  Let me show you how we tuned Netty to be able to do this.

The Test:

At Yahoo we eat our own dog food but before making Netty the default messaging layer for our Storm clusters I needed some numbers to see how it compared to zeromq, which is the current default.  To do this I needed a benchmark that could make Storm’s messaging layer cry uncle, so I wrote one.  It is a simple speed-of-light test that sees how quickly Storm can push messages between the different bolts and spouts.  It allows us to launch multiple topologies of varying complexity that send fixed sized messages.

I work on a team that provides hosted Storm clusters for other teams in Yahoo to use.  As such we have a large variety of topology configurations and message sizes on our clusters.  To be sure any new default was as good as the original, I tested many different configurations ranging from very small topologies with 3 workers, 3 bolts, and 3 spouts all the way up to filling the entire cluster with 408 workers using thousands of bolts and spouts. For each of these I varied the message size as well. I have only included a subset of the topologies that were representative of the tests as a whole. See Appendix B for more details of the setup used.  

In all cases acking was enabled, and each spout was allowed 1000 pending messages so as to have some form of flow control in the system.  We also ran the tests multiple times.  I have not included error bounds on the results because the differences between Netty and zeromq are really quite dramatic, and the differences between runs were all less than 5%.  The benchmark is set up so that it disregards the ack messages from the count of messages, since it is system overhead.

Results:

The first test is a very small one.  It has 3 workers running on 3 separate nodes, so there is essentially no resource contention at all.  Each worker has 1 spout, 1 bolt and 1 acker task.

As can be seen from Figure 1. Netty in this condition is much faster than zeromq. Netty is able to send between 40% and 100% more messages per second depending on the size of the message.


image

Figure 1 Small Topology: Netty vs zeromq

These results were very encouraging, but what about larger topologies with more contention?

The next topology is a much larger one with 100 workers, 100 spouts, 500 bolts spread out in 5 levels of 100 bolts each, and 100 ackers.

As can be seen from Figure 2 something very odd was happening with Netty.

image

Figure 2 Large Topology: Netty vs zeromq

Why in the world would messages per second go up as the message size increased?  That is totally counter intuitive.  So I started to look at what was happening on the individual Supervisor nodes.  The worker processes in the section of the graph that slopes upward had relatively high CPU utilization, usually around 800% or all 8 physical cores maxed, but the load average was 0.2.  That is a smoking gun for context switching being the culprit.  Netty is an asynchronous IO framework.  As each message is sent control is handed back to the OS and the thread waits to be woken up for something else to do.  Once the data finished being sent some thread wakes up to see what else needs to be done with that message.  This constant sleeping and waking was using all of the CPU time.  As the messages got bigger there was less context switching so, oddly enough, more messages could be sent.  This trend continued until the network saturated at 2.5KB messages.  The slope down corresponds to about 4.6 GB/s of messages being sent, and meets up with zeromq at about the 4KB message mark.

So if context switching is killing us, let’s reduce the number of threads that are context switching.  By default, Netty assumes that it is the only thing running on a box and creates a handler thread for every core available.  When there are multiple workers on a box, this huge number of threads causes the extra context switching.  To resolve this I tried two different approaches: first, reduce the number of workers so a single worker is running per node; and second, apply this pull request that makes the number of threads configurable with a default of 1, which matches the zeromq default.

As can be seen from Figure 2, Netty’s default setting is not that great for lots of small messages even when it is the only one on the node. But when we restrict it to a single thread we are able to get between 111% and 85% more messages per second than zeromq and after that the network saturates again.

To be sure that this change would not impact the other topologies I reran all the tests with the new configuration, and as you can see from Figure 1, the difference with the default Netty configuration appears to be negligible.

Conclusion:

The new default settings allow Netty to run twice as fast as the default setting for zeromq.  Netty is now the default for our Storm clusters.

Appendix A: Commands to Run the Tests

Small Topology:

storm jar ./target/storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --testTime 120 --pollFreq 5 --workers 3 --spout 3 --bolt 3 -l 1 -n 1 --ackers 3 --ack --maxSpoutPending 1000 --messageSize $MESSAGE_SIZE

Large Topology:

storm jar ./target/storm_perf_test-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.yahoo.storm.perftest.Main --testTime 120 --pollFreq 5 --workers 100 --spout 100 --bolt 100 -l 5 -n 1 --ackers 100 --ack --maxSpoutPending 1000 --messageSize $MESSAGE_SIZE


Appendix B: The Setup

Hardware:

38 nodes total,  3 nodes were dedicated to Zookeeper, 1 to Nimbus and the UI, and 34 for Supervisor and Logviewer processes.  Each node had 2 x 2.4GHz Xenon processors each with 4 cores and Hyperthreading enabled for a total of 16 threads of execution.  Each box had 24 GB of main memory clocked at 1066 MHz, full duplex Gigabit Ethernet, and several 7.2K SATA/300 drives.  They were all in a single rack with a top of rack switch that supports full duplex Gigabit connections between the nodes.


Software:

OS: Red Hat Enterprise Linux Server 6.3

java: 64-bit Oracle java 1.7.

I left RHEL mostly stock, for Zookeeper I did remount the drive that held the edit logs to have the -nobarrier option because it dramatically increased the number of iops that ZooKeeper could handle.  We feel that the added iops in this case is worth the risk of data loss.

The version of Storm used is one that we have modified to optionally have Kerberos authentication with Nimbus, run the worker processes as the user that launched the topology, and have Zookeeper restrict access to topology’s data based off of ACLs.  This version is based off of storm-0.9.0-wip21, and is in the process of being submitted back to the Storm community.  In all cases I ran the tests with these extra security features disabled.

The messaging layer is more or less the same between the version under test and open source Storm with the caveat that we are using a slightly modified version of zeromq to work around some issues we were seeing on top of RHEL6 under heavy load.  It simply contains a backport of a fix from a newer version of zeromq.

Storm was configured to use 16 slots per box, or one per thread of execution. This is higher than a typical Storm cluster would have but I wanted to see what would happen under extremely heavy load. Other important configurations in storm.yaml were.

worker.childopts: "-Xmx2048m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true"
supervisor.childopts: "-Xmx256m"
nimbus.childopts: "-Xmx1024m"
ui.childopts: "-Xmx768m"
nimbus.thrift.threads: 256

When running with Netty storm was configured with the following in the storm.yaml

storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100