Show Me How: Build Streaming Data Pipelines for Real-Time Data Warehousing | Register Today
Walmart’s global presence, with its vast number of retail stores plus its robust and rapidly growing e-commerce business, make it one of the most challenging retail companies on the planet for inventory-related use cases. Real-time inventory planning has become a must for Walmart in the face of rapidly changing buyer behaviors and expectations. But real-time inventory is only half of the equation.
The other half is real-time replenishment, which at a high level, we define as the way we can fulfill the inventory demand at every physical node in the supply chain network. As soon as inventory gets below a certain threshold, and based on many other supply chain parameters like sales forecast, safety stock, current availability of the item at node and its parents, we need to automatically replenish that item in a way that optimizes resources and increases customer satisfaction.
Walmart doesn’t just have physical stores, it also has hundreds of distribution centers (DCs) of varying types, in a multi-echelon ecosystem that makes accurate, reliable, real-time replenishment a very challenging proposition. Apache Kafka® is a big part of the cutting-edge IT architecture that makes it all happen.
A real-time inventory use case was one of our first successful deployments of Kafka in our ecosystem. Similar to that use case, Walmart’s real-time replenishment use case is uniquely challenging due to the sheer scale and complexity of Walmart’s operations and the amount of data and moving parts those operations involve. This scale becomes even more daunting when you consider the many IT architecture-dependent goals and expectations we have around establishing a forward-looking platform to handle all the necessary replenishment needs for the company:
Achieving all the above system and platform architecture demands while catering to ever-changing customer behavior is not trivial. This image illustrates a quick summary of our scale:
This challenge gets even bigger as we grow multifold with our data and real-time system messaging needs year after year.
On any given day, Walmart’s real-time replenishment system processes more than tens of billions of messages from close to 100 million SKUs in less than three hours. We leverage an array of processors to generate an order plan for the entire network of Walmart stores with great accuracy and at high throughputs of 85GB messages/min. While doing so, it also ensures there is no data loss through event tracking and necessary replays and retries.
Our event-processing framework uses a micro-batch architecture, with most of the inputs coming in through Kafka. From Kafka, the data gets streamed through change data capture (CDC) and brought into a denormalized view so that it’s readily and quickly accessible. The data is then processed in a planning engine that contains all the inventory positions, forecasts, safety stocks, lead times, and calendars, while considering all the constraints (logistical and otherwise) of the distribution centers, stores, and shipping methods. This is all accomplished within the three-hour daily timeframe in which Walmart processes close to 100 million SKUs. This entire plan is then published through different Kafka topics for different layers, and each Kafka topic has different consumers who listen and formulate different actions accordingly. To give you an idea of the scale, we leverage 18 Kafka brokers and manage 20+ topics, each of which has more than 500 partitions.
In addition to the aforementioned resiliency and horizontal scalability for future needs, there were many other important architectural challenges and goals around setting up our real-time replenishment system, including:
Certain architectural decisions at the platform level have helped a lot.
One is the use of active-passive data replication, which means we have mechanisms in place to switch to a secondary data center if the primary data center has issues. An important consideration with this sort of active-passive environment is if there is an issue with the primary DC, you should know what offset it stopped processing from. You can then either start from that point onward, or, if you’re unable to do that, ensure your consumer or client can identify it and avoid processing duplicate messages. Active-active is another type of configuration to consider, but it comes down to what really works for a given use case and what sort of intervention, if any, one needs to prepare for.
We’ve also designed for resiliency and have mechanisms in place where if any computer or virtual machines go down, there are automatic mechanisms that will bring them up within a certain period of time.
The other thing we’ve done is enable Kafka retries. And on top of that, we’ve developed an in-house audit and replay mechanism that will really help us to come back from any kind of failure. We send the message to the topic, and it will store it in our database, and then our replay mechanism takes care of the rest.
We’ve also set up a fallback mechanism for when we may not be able to send the message to the topic itself—a rest service that will directly go and write to the database, and then the rest is taken care of by the replay mechanism.
We also have alerts and notifications, so we know what’s going on with our system. It’s an in-house subscription-based alerting mechanism that works at both the application level and at the infrastructure level.
Of course, a key part of making Kafka work well for us was finding the right balance for our working ecosystem including the producers, Kafka cluster, and the consumers. Any lag in the producer or consumer layer can cause overall slowness and risk of missing SLAs. Having everything in balance is key. Below are some of the optimizations we made.
With the producer configuration, there are a few configs Kafka provides that have really helped us in our journey to achieve the scale we set out for. One of these was a custom partitioning strategy through which we identified a few of the key aspects of our message and assigned a murmur hash function. We then used that to get to the partition where we want to send the message, instead of using the default round-robin strategy, which would lead to all the messages from the 150 distribution centers (we are reading within 3-4 hours) to get jumbled in cases where the plan processing for a few distribution centers run slow. This allowed us to get the speed while catering to necessary data isolation and accuracy.
The other important producer config optimization was using
linger.ms and setting a batch size, both of which work in combination with each other.
Linger.ms is the delay time the producer takes before sending out the next batch of messages, and we set the default batch size to around 16,000 bytes to optimize speed.
Finally, there are
acks, which denotes the number of brokers that must receive the record before we consider the write to be successful. In our case, data consistency was important, and we wanted to make sure we didn’t lose any messages, so this was a critical setting we had to configure to “all” to ensure all the in-sync replicas receive the record from the producers.
There are a few consumer configurations that have helped us scale a lot better. The first are
Max.poll.records are the number of messages fetched in a single full poll, and
max.poll.interval.ms is the amount of time the consumer has to process these messages. Since we deal with a lot of external services, we have to make sure our contracts and SLAs are airtight.
The next configuration is
Enable.auto.commit asks your consumer to make sure they acknowledge the messages they’re processing. Once all of the messages that are part of the whole request are completed, it will commit the offset. To help ensure no data loss, we basically disabled the
enable-auto-commit flag to make sure we fully process all of the messages we receive before committing the offset.
Finally, you have
session.timeout.ms and the
heartbeat.interval.ms optimization. These two configs work in tandem with each other to help you recover from any client failures. Session timeout sets the amount of time that your consumer needs to recover from any kind of failure. Now, the typical consumer has multiple virtual machines and computes, and there’s always a possibility that a compute instance or a VM goes down. So, you need to leave a certain amount of time for your consumer to recover. Otherwise, when your consumer comes back up, or the compute instance comes back up, it’s going to need to rebalance, thus causing constant rebalance which is not an ideal scenario. The <codeheartbeat.interval.ms makes sure your client sends the heartbeat to the group coordinator to make sure it is alive and doing well.
Thanks to Kafka-backed real-time replenishment:
For retailers looking to build real-time replenishment or real-time inventory use cases, here is some advice:
This blog post shared Walmart’s journey with Kafka and highlighted the real-time replenishment use case we’re currently using it for. Being the world’s largest retailer, real-time replenishment plays a key role in catering to the needs of millions of our online and walk-in customers by ensuring timely and optimal availability of needed stock using Kafka and its ecosystem. We are excited to continue to unlock new use cases using a real-time streaming platform and to continue to lead retail industry innovation for many years to come.
Want to learn more?
Disclaimer: the views and opinions expressed in this article are those of the author and do not necessarily reflect the official policy or position of Walmart Inc. Walmart Inc. does not endorse or recommend any commercial products, processes, or services described in this blog
It is no exaggeration that a lot is going wrong in commercial buildings today. The building and construction sector consumes 36% of global final energy and accounts for almost 40% […]
Our everyday digital experiences are in the midst of a revolution. Customers increasingly expect their online experiences to be interactive, immersive, and real time by default. The need to satisfy […]