Real-Time Batching in Go

Batching is a well-known optimization technique. You see it everywhere: batch inserts in databases, MGET/MSET in Redis, various bulk APIs. The benefits are clear — it’s faster, cheaper, and less rate-limited. These benefits usually come at the cost of slightly more complex code and some boilerplate.

But how do we cleanly batch something we can’t see yet? Something that’s arriving in real time.

A Real-World Example

Imagine an application that needs to update users’ last_active_at timestamp in the database each time the user interacts with the system. To make this example more extreme, let’s assume that the app updates this timestamp on every HTTP request from a user. Our function might look like this:

func UpdateUserTimestamp(ctx context.Context, userID int) error {
	// UPDATE users SET last_active_at = NOW() WHERE id = userID
}

With thousands of concurrent requests, we’re hitting the database just as many times per second. Each update is tiny, but together they can create unnecessary database pressure. We’d want to group together the updates happening at the same time. Like this:

UPDATE users SET last_active_at=NOW() WHERE id IN (17, 25,31)

The challenge is that UpdateUserTimestamp only sees one user ID at a time, making batching impossible at this level.

We can solve this by having UpdateUserTimestamp send each ID to a shared queue (implemented as a Go channel). A background worker can then read from this queue, accumulate IDs into batches, and send bulk updates to the database.

Making it Developer-Friendly

While the basic concept is straightforward, the devil is in the details. We want the UpdateUserTimestamp behave exactly as before: accept a single user ID, respect context cancellation, and return appropriate errors. Let’s build a solution that handles all these requirements.

Let’s first deal with error handling. When UpdateUserTimestamp sends a user ID to the worker, how does it know if that specific update succeeded or failed? Our solution is to create a dedicated ReplyTo channel for each call. The worker uses this channel to report success or failure for that specific update. This naturally provides the blocking behavior we want — each function call waits for its own result via its unique channel. From the developer’s perspective, the function appears to simply block while performing a query.

Let’s visualize how two concurrent UpdateUserTimestamp calls flow through the system. This sequence diagram shows the key interactions: Real-Time Batching Diagram

Now let’s implement the green part of the diagram in the code:

// This type represents a single request to the worker
type updateUserTimestampRequest struct {
	userID  int
	ReplyTo chan error
}

// This is the queue of user IDs to update.
var updateUserTimestampQueue = make(chan updateUserTimestampRequest)

// UpdateUserTimestamp is the public API for updating the last_active_at column in the users table
func UpdateUserTimestamp(ctx context.Context, userID int) error {
	// Prepare a request to the worker.
	// A ReplyTo channel is used by the worker to send us back a result.
	req := updateUserTimestampRequest{
		userID:  userID,
		ReplyTo: make(chan error),
	}

	// Send request to the worker
	updateUserTimestampQueue <- req

	// Block and wait for the result
	err := <-req.ReplyTo
	return err
}

What About the Context Support?

The code above ignores the context passed to the function. Let’s fix this. We won’t use this context for database query timeouts — the worker takes care of those. But we still need to respect context cancellation and let UpdateUserTimestamp return early without waiting for the worker’s response.

To achieve this we’ll use Go’s select statements for both sending the request to the worker and receiving the result. This way if the context is canceled, we return immediately with a “cancelled” error.

Important: From now on ReplyTo channels must be buffered. This prevents the worker from blocking if the caller’s context is canceled and they don’t read the result.

func UpdateUserTimestamp(ctx context.Context, userID int) error {
	// Prepare a request to the worker.
	// A ReplyTo channel is used by the worker to send us back a result.
	req := updateUserTimestampRequest{
		userID:  userID,
		ReplyTo: make(chan error, 1), // must be buffered
	}

	// Send request to the worker
	select {
	case <-ctx.Done():
		return ctx.Err()
	case updateUserTimestampQueue <- req:
	}

	// Block and wait for the result
	select {
	case <-ctx.Done():
		return ctx.Err()
	case err := <-req.ReplyTo:
		return err
	}
}

How to Batch Data Arriving in Real-Time?

Before we look at the worker implementation, let’s think about how to batch data arriving in real time. Suppose we want to send queries to the database in batches of 10. We can just read from the queue until we have 10 items, then send them to the database. That will work fine, mostly…

What if it’s a quiet day and UpdateUserTimestamp is called only once per minute? It would take 10 minutes to fill up the batch, and our database updates would be delayed by 10 minutes - making it the world’s most patient real-time system.

To deal with this we need to introduce a timeout. If the batch isn’t full after a certain time, we send it to the database anyway. The timeout can be low, like 10ms or even 1ms. This way during high traffic the batch fills up immediately, and during quiet periods we would introduce at most 1ms of latency — keeping the system as real-time as we want.

The Worker

To simplify the worker implementation, we’ll use rill, a concurrency toolkit I’ve built. It provides utilities like rill.Batch, which groups incoming items into batches with a timeout (exactly as described earlier), and rill.ForEach, which helps control the level of concurrency when processing those batches. Thanks to these tools, the worker implementation becomes remarkably compact.

Here’s the complete worker:

func updateUserTimestampWorker(batchSize int, batchTimeout time.Duration, concurrency int, dbTimeout time.Duration) {
	// Start with a stream of update requests
	requests := rill.FromChan(updateUserTimestampQueue, nil)

	// Group requests into batches with timeout
	requestBatches := rill.Batch(requests, batchSize, batchTimeout)

	// Process batches with controlled database concurrency
	_ = rill.ForEach(requestBatches, concurrency, func(batch []updateUserTimestampRequest) error {
		// Create a slice of user IDs
		ids := make([]int, len(batch))
		for i, req := range batch {
			ids[i] = req.userID
		}

		// Execute batched update
		dbCtx, cancel := context.WithTimeout(context.Background(), dbTimeout)
		defer cancel()
		err := sendQueryToDB(dbCtx,
			"UPDATE users SET last_active_at = NOW() WHERE id IN (?)",
			ids,
		)

		// Send result back to all callers in this batch
		for _, req := range batch {
			req.ReplyTo <- err
			close(req.ReplyTo)
		}
		return nil
	})
}

Putting it All Together

That’s pretty much it. We’ve built a system that transparently batches real-time updates to the database. The developer just calls UpdateUserTimestamp as usual, and the system takes care of the rest. The worker reads user IDs from the queue, groups them into batches, and sends them to the database. The system respects context cancellation and handles errors properly.

And as a bonus, we can explicitly control the level of concurrency.

To see the solution in action, let’s write a main function that spawns 100 goroutines each calling our UpdateUserTimestamp function with a different userID.

func main() {
	ctx := context.Background()

	// Start the worker
	go updateUserTimestampWorker(
		7,                   // Batch size
		10*time.Millisecond, // Batch timeout
		2,                   // Concurrency
		10*time.Second,      // Query timeout
	)

	// Simulate many concurrent goroutines calling UpdateUserTimestamp
	var wg sync.WaitGroup

	for i := 1; i <= 100; i++ {
		wg.Add(1)
		go func(userID int) {
			defer wg.Done()

			err := UpdateUserTimestamp(ctx, userID)
			if err != nil {
				fmt.Println("Error updating user timestamp:", err)
			}
		}(i)
	}

	wg.Wait()
}

// Simulate a database query
func sendQueryToDB(ctx context.Context, query string, args ...any) error {
	for _, arg := range args {
		query = strings.Replace(query, "?", fmt.Sprint(arg), 1)
	}
	fmt.Println("Executed:", query)
	return nil
}

The full code is available in the Go playground: https://goplay.tools/snippet/wTmEwFiYBvu

Conclusion

This pattern demonstrates how to combine batching for performance with real-time responsiveness using Go’s concurrency primitives. By using channels, goroutines, and rill, we built a system that efficiently batches database updates while staying reactive.

The same approach works well for other scenarios where batching makes sense. Even SELECT queries can benefit from it, though the worker would need additional logic to route results back to callers. Not to mention a variety of non-real-time use cases where batching can be beneficial.

Of course, there are other ways to handle frequent updates, for example:

  • Store activity timestamps in Redis or another fast storage
  • Queue updates in an external queue (e.g RabbitMQ) and batch them in the consumer, making the pattern distributed
  • Update each user at most once per time window (e.g., 4 hours) and skip the rest of the updates.

This channel-based solution hits a sweet spot for systems that need performance of batching, while keeping the code simple and maintainable.