Parallel Streaming Pattern in Go: How to Scan Large S3 or GCS Buckets Significantly Faster

Somewhere in the second half of 2024, I needed to check a large Google Cloud Storage bucket to find files that weren’t referenced in the database, and delete them. A simple task – traverse all files in the bucket and remove the ones I didn’t need. The bucket was large, and it was immediately clear that the deletion operation would become the main bottleneck. No problem – it could be parallelized and made, roughly speaking, arbitrarily fast by using enough goroutines. However, when I implemented it and ran the script, I was surprised to discover that the file listing operation had itself become a new bottleneck…

Why Listing is Slow?

Most SDKs for object storages provide nice iterator-like APIs for traversing files. Under the hood files are fetched page by page. The typical page size is 1000 meaning that SDK needs to perform 10k requests to traverse a 10 million file bucket. This may not sound like a big deal, but for large buckets, this can easily become a bottleneck.

Here’s what my code looked like when I first encountered this problem. It’s for the Google Cloud Storage, but once all concepts are covered, I’ll demonstrate the Amazon S3 version as well.

The code, I believe, is self-explanatory. It uses the rill concurrency toolkit that I built. It works well for cases like this and removes most of the boilerplate. The only somewhat complicated part that may need explanation is the StreamGCSBucketFiles function. This function is a wrapper around bucket.Objects function . It takes the same arguments but returns a stream (Go channel) instead of a bucket iterator.

import (
	// ...

	"cloud.google.com/go/storage"
	"github.com/destel/rill"
	"google.golang.org/api/iterator"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

    // Initialize GCS client, DB connection etc
    var gcs *storage.Client = // ...
    bucket := gcs.Bucket("some-bucket")
    // ...

    // Get a stream of all files from the bucket
    files := StreamGCSBucketFiles(ctx, bucket, nil)

    // Filter out files that are present in the database.
    // Use 5 goroutines.
    unusedFiles := rill.Filter(files, 5, func(obj *storage.ObjectAttrs) (bool, error) {
       // SELECT EXISTS(SELECT 1 FROM users WHERE avatar = obj.Name)
    })

    // Delete files. Use 50 goroutines.
    err := rill.ForEach(unusedFiles, 50, func(obj *storage.ObjectAttrs) error {
       return bucket.Object(obj.Name).Delete(ctx)
    })

    fmt.Println("Error:", err)
}

// StreamGCSBucketFiles is a streaming wrapper around an objects iterator.
// It returns a stream (channel) of object attributes.
func StreamGCSBucketFiles(ctx context.Context, bucket *storage.BucketHandle, q *storage.Query) <-chan rill.Try[*storage.ObjectAttrs] {
	return rill.Generate(func(send func(*storage.ObjectAttrs), sendErr func(error)) {
		it := bucket.Objects(ctx, q)

		for {
			obj, err := it.Next()
			if errors.Is(err, iterator.Done) {
				break
			}
			if err != nil {
				sendErr(fmt.Errorf("failed to iterate objects: %w", err))
				break
			}

			send(obj)
		}
	})
}

First, we get a stream of all files, then filter out the “good” ones, and finally delete everything that remains from the bucket. Here, both filtering and deletion operations are concurrent and use different numbers of goroutines. But the listing is sequential and just streams all files in alphabetical (lexicographical) order.

Can We Make the Listing Operation Concurrent?

Yes, we can, but we need to know the bucket structure. Let’s look at the example – a bucket that stores users’ files in the users/{username}/{filename} format. Let’s also assume for simplicity that usernames consist only of lowercase Latin characters.

users/alice/file.png
users/bob/file1.png
users/bob/file2.png
users/charlie/file.png
...
users/lucy/file.png
users/mallory/file.png
...
users/wendy/file1.png
users/wendy/file2.png
users/zoe/file.png

It may look like a directory tree with some folders, but it’s not the case. For the sake of our goal, it’s much better to think of it as an alphabetical list of filenames, where these names can also contain slashes.

We can pick one filename, for example, users/mallory/file.png, and make two queries: all files that go before and after it in the list. This filename becomes a split point that partitions bucket into two parts.

We don’t even need to know an exact filename to create such a split point. For instance, we can take users/m as a split point. This way the first part will have all users whose names start with letters a..l, and the second part will have the m..z users.

Let’s look at the example where we, in a similar fashion, partition the bucket into 3 parts, depending on the first letter of the username. Here we use rill.Merge function to stream users from each part independently and concurrently:

	// first_letter < 'j'
	filesAJ := StreamGCSBucketFiles(ctx, bucket, &storage.Query{
		EndOffset: "users/j",
	})

	// 'j' <= first_letter < 'p'
	filesJP := StreamGCSBucketFiles(ctx, bucket, &storage.Query{
		StartOffset: "users/j", 
		EndOffset: "users/p",
	})

	// p <= first_letter
	filesPZ := StreamGCSBucketFiles(ctx, bucket, &storage.Query{
		StartOffset: "users/p",
	})

	// Merge all streams
	files := rill.Merge(filesAJ, filesJP, filesPZ)

This can achieve up to 3x gain in listing speed, assuming that files are distributed more or less evenly between the partitions.

Dynamic Split Points and FlatMap

The previous example used rill.Merge to combine several parallel streams, but there’s a more convenient tool for the cases where the number of split points is large or dynamic – rill.FlatMap.

First, let’s define a Range type that represents a range of filenames between the two consecutive split points:

type Range struct {
    Start, End string
}

Now the previous example can be rewritten. We first generate a stream of ranges and then convert it into a unified stream of files using rill.FlatMap:

	ranges := rill.FromSlice([]Range{
		{"", "users/j"},
		{"users/j", "users/p"},
		{"users/p", ""},
	}, nil)

	files := rill.FlatMap(ranges, 3, func(r Range) <-chan rill.Try[*storage.ObjectAttrs] {
		return StreamGCSBucketFiles(ctx, bucket, &storage.Query{StartOffset: r.Start, EndOffset: r.End})
	})

Seems like not much benefit, but now we can:

  • Generate ranges programmatically
  • Control the level of concurrency. For instance, we can have 1000 ranges, but stream files from at most 5 of them at the same time.

Let’s use rill.Generate to programmatically create a separate range for each lowercase letter of the alphabet, and then stream files from up to 10 ranges at the same time:

	ranges := rill.Generate(func(send func(Range), sendErr func(error)) {
		prevSplit := ""
		for c := 'a'; c <= 'z'; c++ {
			split := fmt.Sprintf("users/%c", c)
			send(Range{Start: prevSplit, End: split})
			prevSplit = split
		}
		send(Range{Start: prevSplit, End: ""})
	})

	files := rill.FlatMap(ranges, 10, func(r Range) <-chan rill.Try[*storage.ObjectAttrs] {
		return StreamGCSBucketFiles(ctx, bucket, &storage.Query{StartOffset: r.Start, EndOffset: r.End})
	})

Putting it All Together

Above we used 26 split points which resulted in 27 ranges. To achieve a significant speedup we would need a large number of goroutines (50, 100, or even more), which in turn requires an even larger number of ranges.

Let’s put together a complete example. Here we’re partitioning based on the first two characters of the username. We’re also using a less restricted alphabet, allowing uppercase and lowercase characters plus digits.

This produces 62 * 62 + 1 = 3845 ranges. Concurrency levels are: 50 for listing, 10 for database, and 100 for file deletions. I am listing a full code below to demonstrate how concise it is:

type Range struct {
    Start, End string
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var gcs *storage.Client = // ...

	bucket := gcs.Bucket("some-bucket")

	// Create ranges based on the first two characters of the username
	ranges := rill.Generate(func(send func(Range), sendErr func(error)) {
		// characters must be in lexicographical order
		chars := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

		prevSplit := ""
		for _, c1 := range chars {
			for _, c2 := range chars {
				split := fmt.Sprintf("users/%c%c", c1, c2)
				send(Range{Start: prevSplit, End: split})
				prevSplit = split
			}
		}
		send(Range{Start: prevSplit, End: ""})
	})

	// Get all files from the bucket into a channel
	files := rill.FlatMap(ranges, 50, func(r Range) <-chan rill.Try[*storage.ObjectAttrs] {
		return StreamGCSBucketFiles(ctx, bucket, &storage.Query{StartOffset: r.Start, EndOffset: r.End})
	})

	// Filter out files that are present in the database.
	unusedFiles := rill.Filter(files, 10, func(obj *storage.ObjectAttrs) (bool, error) {
		// SELECT EXISTS(SELECT 1 FROM users WHERE avatar = obj.Name)
		return false, nil
	})

	// Delete files.
	err := rill.ForEach(unusedFiles, 100, func(obj *storage.ObjectAttrs) error {
		return bucket.Object(obj.Name).Delete(ctx)
	})

	fmt.Println("Error:", err)
}


// StreamGCSBucketFiles is a streaming wrapper around an objects iterator.
// It returns a stream (channel) of object attributes.
func StreamGCSBucketFiles(ctx context.Context, bucket *storage.BucketHandle, q *storage.Query) <-chan rill.Try[*storage.ObjectAttrs] {
	return rill.Generate(func(send func(*storage.ObjectAttrs), sendErr func(error)) {
		it := bucket.Objects(ctx, q)

		for {
			obj, err := it.Next()
			if errors.Is(err, iterator.Done) {
				break
			}
			if err != nil {
				sendErr(fmt.Errorf("failed to iterate objects: %w", err))
				break
			}

			send(obj)
		}
	})
}

Go 1.24 update: The new version of Go brings the support for generic type aliases. Rill v0.7 ships with rill.Stream[T] type alias making it possible to simplify the return type of the StreamGCSBucketFiles function, like this:

func StreamGCSBucketFiles(ctx context.Context, bucket *storage.BucketHandle, q *storage.Query) rill.Stream[*storage.ObjectAttrs] {
	...
}

My Bucket Has Different Structure

No one knows your bucket structure better than you do, and there’s no single solution that fits every case. The goal is to find a set of split points that:

  • Is large enough to meet your target concurrency level
  • Partition the bucket into more or less equal-sized parts. This should not be perfect though.

Let’s look at how we can partition a bucket where files follow the {team_id}/{filename} pattern and team IDs are integers. It’s not immediately clear how to do it, because in the integer world 2 < 10, but when converted to strings "2" > "10".

Let’s assume we have one million teams and we want about 100 partitions. The trick here is to use all 2-digit numbers as split points (excluding the number 10). Below is the list of resulting ranges and team IDs that fall into each range:

  ""-"11"  [1, 10, 100-109, 1000-1099, 10000-10999, 100000-109999, 1000000]
"11"-"12"  [11, 110-119, 1100-1199, 11000-11999, 110000-119999]
...
"19"-"20"  [2, 19, 190-199, 1900-1999, 19000-19999, 190000-199999]
"21"-"22"  [21, 210-219, 2100-2199, 21000-21999, 210000-219999]
...
"99"-""    [99, 990-999, 9900-9999, 99000-99999, 990000-999999]

And range generation code is quite similar to what we’ve seen before

	ranges := rill.Generate(func(send func(Range), sendErr func(error)) {
		prevSplit := ""
		for i:=11; i<=99; i++ {
			split := fmt.Sprintf("%d", i)
			send(Range{Start: prevSplit, End: split})
			prevSplit = split
		}
		send(Range{Start: prevSplit, End: ""})
	})

This produces 89+1 ranges with an almost even distribution of teams among them. If you need more ranges you can use 3-digit numbers (101 - 999) instead of 2-digit ones. And if you need less you can increment i by 2 on each iteration. Simply put, you can be as creative as you wish, just make sure that the split points you generate are ordered lexicographically.

Tip: It can even make sense to create some sort of constructor for the Range struct that panics if range.Start >= range.End. This could help to catch ordering errors during development.

How to Do the Same Thing for the Amazon S3?

Let’s rewrite the last large Google Cloud Storage example for the Amazon S3. Most parts are very similar:

  • Range generation code is the same
  • Database interaction code is the same
  • File deletion code is almost the same
  • FlatMap part is very similar

The main difference is in the streaming wrapper around the ListObjectsV2 API – the StreamS3BucketFiles function. This Amazon API has a StartAfter argument, but does not have an EndBefore as we’ve seen in Google API. It’s not a problem, but rather a small inconvenience: we’ll just break the iteration manually as soon as we reach a filename that’s out of range. The full code is below:

type Range struct {
    Start, End string
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var s3Client *s3.S3
	bucket := "some-bucket"

	// Create ranges based on the first two characters of the username
	ranges := rill.Generate(func(send func(Range), sendErr func(error)) {
		// characters must be in lexicographical order
		chars := "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

		prevSplit := ""
		for _, c1 := range chars {
			for _, c2 := range chars {
				split := fmt.Sprintf("users/%c%c", c1, c2)
				send(Range{Start: prevSplit, End: split})
				prevSplit = split
			}
		}
		send(Range{Start: prevSplit, End: ""})
	})

	// Get all files from the bucket into a channel
	files := rill.FlatMap(ranges, 50, func(r Range) <-chan rill.Try[*s3.Object] {
		input := &s3.ListObjectsV2Input{
			Bucket:     aws.String(bucket),
			StartAfter: aws.String(r.Start),
		}

		return StreamS3BucketFiles(ctx, s3Client, input, r.End)
	})

	// Filter out files that are present in the database.
	unusedFiles := rill.Filter(files, 10, func(obj *s3.Object) (bool, error) {
		// SELECT EXISTS(SELECT 1 FROM users WHERE avatar = obj.Key)
		return false, nil
	})

	// Delete files.
	err := rill.ForEach(unusedFiles, 100, func(obj *s3.Object) error {
		_, err := s3Client.DeleteObject(&s3.DeleteObjectInput{
			Bucket: aws.String(bucket),
			Key:    obj.Key,
		})
		return err
	})

	fmt.Println("Error:", err)
}

func StreamS3BucketFiles(ctx context.Context, client *s3.S3, input *s3.ListObjectsV2Input, endBefore string) <-chan rill.Try[*s3.Object] {
	return rill.Generate(func(send func(object *s3.Object), sendErr func(error)) {
		err := client.ListObjectsV2PagesWithContext(ctx, input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
			for _, obj := range page.Contents {
				// Break iteration manually when we reach the end of the range
				if endBefore != "" && *obj.Key >= endBefore {
					return false
				}
				send(obj)
			}
			return true
		})

		if err != nil {
			sendErr(err)
		}

	})
}

What About the Costs?

Let’s look at how parallel traversal affects the API costs. Most object storage services charge per LIST request, so it’s natural to wonder if parallelization significantly increases the costs.

The key to understanding this lies in how pagination works. When we’re scanning a bucket sequentially, we get a series of full pages (1000 items each) and usually one partial page at the end. What happens when we split the bucket into ranges? Let’s do the math.

Suppose we partition our bucket into 1000 ranges to enable very high levels of concurrency. The worst case scenario is that each range will have one partial page – that’s 1000 additional LIST requests compared to a sequential scan. This sounds like a lot, but in reality, it adds only about $0.005 to the total cost.

So effectively, this method has the same cost efficiency as a regular sequential scan, while being significantly faster.

Performance and Conclusion

The same technique works for any object storage that supports range-based queries or at least just StartAfter queries as demonstrated for S3. With a well-chosen set of split points, both listing and deletion operations can scale almost linearly.

When I applied this approach to production GCS buckets, operations that previously took hours completed in minutes. The main bottleneck typically shifts from the storage service to local hardware resources, as cloud object storages scale remarkably well with concurrent requests.

The resulting code is concise, despite handling complex operations like parallel listing, filtering, and deletion. This is where rill’s composable concurrency model shines – it lets you focus on the core logic while abstracting away all the complexity of channel, goroutine, and error management.

Finally, the API costs of this method are practically identical to a regular sequential scan.

Join my Newsletter

Thanks for reading this article. If you enjoyed it and want to know when I publish something new, join below. I write about 1 post per month.

Prefer other options?

Other recent posts

View All →