In What is Apache Flink? A developer’s guide to getting started, the New Relic Account Experience team built our first Flink app, which we call the Usage Calculator. This app receives and processes millions of messages from New Relic APM, Infrastructure, and Synthetics agents every day. The messages come in on Kafka topics and contain data from the agents about how often they’re checking in; specifically, we’re interested in how many hours a day our customers are using New Relic.

The Usage Calculator receives messages and holds on to them for 24 hours before it sends the data off to the New Relic Database (at which point, customers can query and search it). The data we collect and hold in each 24-hour period is what we call the state, an uninterruptible record of data. For example, an APM agent checks into New Relic, the Usage Calculator looks to see if we already have data for that agent, and if it does, it combines the new message with any messages it already has, resulting in just one record—this is the state. If our app is restarted—or worse, goes down—we still have that record of data. But more importantly, we store only one record, and this helps us optimize state.

NEW RELIC APACHE INTEGRATION
Apache logo
Start monitoring your Apache data today.
Install the Apache quickstart Install the Apache quickstart

Why is it so important to optimize state?

When we save fewer chunks of data in state, we can safely make more changes to our application without worrying about data loss. When we deploy the Usage Calculator, Flink automatically restores state from backup files. As long as the changes we deploy don't alter state, we can deploy without data loss. Additionally, keeping the bare minimum of data we need keeps the state store at a manageable size, which reduces the time it takes for Flink to write out the backup files (called check/savepoints). Reducing the time to write to a check/savepoint is critical because it has to be completed before the start of the next check/savepoint.

The Usage Calculator streams in data, transforms and collapses it with a map and reduce operation, and then sends it back out. The map and reduce operation to minimize state is the key to maintaining the flexibility and reliability of our application.

Inside our map and reduce operation

The best way to optimize state is to introduce a state model between the incoming data model and the outgoing data model. In our early attempts, we focused on directly transforming the incoming data into the outgoing data format, and then reducing it. But every time we added new fields to the outgoing data format, we had to drop state when we deployed our application.

In our final, working solution, we developed a pattern in which we accept the incoming data, implement a state model that has only the minimum attributes required for the reduction, reduce the data, and finally decorate and transform it to fit the outgoing model. This state model approach minimizes CPU usage as the decoration and transformation happens only once per reduced message instead of on every incoming message.

Here’s how it looks in code:

public static void declareStream(StreamExecutionEnvironment streamEnv) {



    SingleOutputStreamOperator<NrDailyHost> infrastructureHostStream = streamEnv.addSource(generateConsumer())



            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AgentMessage>()))



             .map(HostState::new)



             .keyBy("consumingAccountId", "hostname")



             .reduce(new ReduceHostUsageFunction())



             .map(new MapHostStateToNrDailyHostFunction());



    declareKafkaWriter(infrastructureHostStream);



}

First, we read in the stream and turn the message into AgentMessage, the incoming wireframe model. Then we map it into HostState, the minimized state model. We perform the reduce step before finally fleshing the message out into NrDailyHost, the outgoing wireframe model.

Let’s examine each of these steps more closely.

The incoming wireframe model

This model matches the incoming data exactly.

public class AgentMessage {



        private long timestampMillisecondsSinceEpoch;



        private String agentVersion;



        private String agentLang;



        private int accountId;



        private String hostname;



        private Integer logicalProcessors;



        private Long totalRamBytes;



        private int metadataVersion;



}

The state model

Next we instantiate the state model, which is composed of the attributes we need in the reduction. In this case, we key on consumingAccountId and hostname, and we keep track of the hours used as each new message comes in. We also hang on to the AgentMessage so that we can use it in a later step.

public class HostState {



       private int consumingAccountId;



       private HashSet<Long> epochHoursUsed = new HashSet<Long>(24);



       private AgentMessage agentMessage;



       private String hostname;



 



       public HostState(AgentMessage agentMessage) {



           this.agentMessage = agentMessage;



           epochHoursUsed.add(agentMessage.getTimestampMillisecondsSinceEpoch());



           consumingAccountId = agentMessage.getAccountId();



           hostname = agentMessage.getHostname();



       }



}

Reduction using the state model

The only data we aggregate is the number of hours New Relic is used, which we keep track of with the HostState. This reduction is a stateful operator in Flink, so it’s tied to the input and output models (HostState). Isolating the reduction from the outgoing wireframe (NrDailyHost) allows us to mutate the outgoing wireframe without negatively affecting state.

public HostState reduce(HostState value1, HostState value2) throws Exception {



       if (value1 != null) {



       value2.getEpochHoursUsed().addAll(value1.getEpochHoursUsed());



       }



       return value2;



}

The outgoing wireframe model

Finally, we can use a map function to expand and annotate the message into the outgoing data model. At this point, we have the aggregated fields as well as everything we need to create an instance of NrDailyHost that can be written to a Kafka topic. In this example, the NrDailyHost has many other fields that need to be calculated, so we get the benefit of doing it only after the messages have been reduced instead of on every incoming message.

Making things easier down the road

For our needs in our first Flink app, using a map and reduce operation to accept the incoming data, implement state, and transform the data to fit the outgoing pattern was simple to implement and well worth what we saved in processing time. Over time, we’ll likely make changes to this app, particularly the portions of the pipeline that process the incoming or outgoing models. But if we do need to make changes, we can easily deploy them without losing state.