Some time ago, I was introduced to the idea of Regular numbers (also called a Hamming number), which is the set of all numbers which can be expressed by the equation 2n × 3m × 5k. Generating these numbers is harder than it looks, because they quickly become sparse, so the "brute force" approach is impractical for anything beyond the first few regular numbers:
def regular_numbers_brute(n: int) -> Iterable[int]: """Yields the first n regular numbers.""" found = 0 for i in itertools.count(1): # Iterate over integers from 1 to ∞ if found >= n: break j = i for s in [2, 3, 5]: # Divide by each prime factor until it no longer evenly divides. while j % s == 0: j /= s # If we've divided by only 2, 3 and 5 and we get 1, it's regular. if j == 1: yield i found += 1
If you actually run this, you'll find that the 100th regular number is 1536, and the 200th regular number is 16200, so if you are checking every number, the second 100 numbers you generated takes 10x longer than the first 100. The 500th number is 937,500, so you need to check nearly 1M numbers to find the first 500 regular numbers.
It turns out that there's a fairly simple intuition about the structure of these numbers that makes it relatively easy to come up with an efficient algorithm for generating them: every regular number (other than 1) is the result of multiplying another regular number by 2, 3 or 5. So you start your sequence with 1, which tells you that 2, 3 and 5 are regular numbers. Pick the next one in the sequence, 2, and multiply it by 2, 3 and 5 to learn that 4, 6 and 10 are regular numbers, and repeat this indefinitely to get the full sequence — after that, you just have to put them in the right order.
Implementing this efficiently and lazily is surprisingly easy and compact in Python when you make use of generators and the nature of variable scope. First I'll show you the implementation, then I'll explain what's happening:
import heapq from typing import Iterable def regular_numbers_lazy() -> Iterable[int]: def generate() -> Iterable[int]: """Function to generate all regular numbers.""" yield 1 # Seed the algorithm with the first regular number # Keep track of what we just emitted to eliminate duplicates last_val = 1 # Note: `sequences` comes from the outer scope! for next_val in heapq.merge(*sequences): # The values come out of heapq.merge in order, so checking # whether the current # value is the same as the last value is # sufficient to eliminate # duplicates. if next_val != last_val: last_val = next_val yield next_val output, i2, i3, i5 = itertools.tee(generate(), 4) h2 = (2 * x for x in i2) # Last regular number multiplied by 2 h3 = (3 * x for x in i3) # Last regular number multiplied by 3 h5 = (5 * x for x in i5) # Last regular number multiplied by 5 sequences = (h2, h3, h5) # Yield only the first n elements from the generator return output
This may be a bit hard to disentangle on first look because there's something akin to recursion going on here. The function generate() closes over the variable sequences, which is a tuple of 3 iterators, h2, h3 and h5, each of which are a tee of the result of calling generate(). So what's happening here? How does this progress? The key is to recognize two things: generators are lazy (meaning you only execute as much of them as you need to handle the next next() call), and generate yields at least one value before it needs to access sequences. So breaking down what happens in the function call:
def generate() -> Iterable[int]: ...
First, generate() is defined but not executed. Python doesn't need to know anything about what happens inside of it except that it is syntactically correct. At this point the variable sequences is not defined, but that's OK because we don't need to access it yet.
output, i2, i3, i5 = itertools.tee(generate(), 4)
Here, we call generate() which creates a generator object, but nothing has requested any values from the generator, so no code needs to be executed. We then create 4 copies of that generator and store them in variables output, i2, i3 and i5.
h2 = (2 * x for x in i2) # Last regular number multiplied by 2 ... sequences = (h2, h3, h5)
Now we create 3 new generators from our tees of the original generate() call, and store them in a tuple called sequences. In the case of h2, it is a generator that will request a value from its copy of the generator and yield that value multiplied by 2. h3 and h5 are similar. Note that again these are generators, and we haven't asked for any values from them yet, meaning that none of the code in generate() or in these generator expressions has been executed yet, but now sequences has been defined, so when the code is executed, it will refer to this tuple (h2, h3, h5).
Finally, we return an iterable that is just the output of generate(). Note that the entire function has executed already and not a single line of generate() has been executed, so we don't have to worry that sequences is defined after generate(), which references it. Now what happens the first few times you call next() on the result of this function? Recall that what we're returning is the iterator output, which is a tee from a call to generate(), so we'll execute as much of generate() as is necessary to get the first value it yields. This is easy, since the first line of generate() is:
So the first value yielded is 1.
The next time you call next(), we start to get into the real meat of the function. The first thing that happens is that we initialize last_val to 1, then we start iterating over heapq.merge(*sequences):
for next_val in heapq.merge(*sequences): if next_val != last_val: last_val = next_val yield next_val
heapq.merge is a way to lazily sort sorted iterators — you pass it any number of iterators and it consumes one value from each, then returns whichever value comes next. So when we call heapq.merge(*sequences), it will first call next() on each of h2, h3 and h5. When next is called on h2, the generator expression calls next() on its tee of the original generate() call, getting the first value in the generate generator: 1. It multiplies this value by 2 and yields that. h3 and h5 do the same thing, and yield 3 and 5 respectively, so heapq.merge has [2, 3, 5] so far, and so the first value it yields is 2, and since 1 != 2, generate() yields 2, the next value for output, and execution pauses.
Now when we call next() on the output again, we resume the for loop: the next value for h2 is retrieved by getting the next value from generate() (which we saw from the previous paragraph was 2) and multiplying that by 2. Thus heapq.merge is choosing the next highest value from among [4, 3, 5]. It chooses 3, calls next(h3) and gets 6, and we go through another loop, choosing from [4, 6, 5], and so on.
As you can see, generate() only ever does as much work as is necessary to calculate the next element of the sequence (and it's seeded with the first element of the sequence: 1). itertools.tee manages keeping track of the minimum amount of the sequence so that only minimal memory is used (as explained in more detail in footnote ).
This sort of approach is also easy to generalize: if we have a function generate_primes(k) to generate a list of prime numbers less than or equal to k, we can easily modify this to generate k-smooth numbers for any value of k:
def k_smooth_numbers(k: int) -> Iterable[int]: def generate() -> Iterable[int]: """Function to generate all k-smooth numbers.""" yield 1 last_val = 1 for next_val in heapq.merge(*sequences): if next_val != last_val: last_val = next_val yield next_val primes : Sequence[int] = generate_primes(k) output, *output_tees = itertools.tee(generate(), len(primes)) sequences = ( (prime * value for value in output_tee) for prime, output_tee in zip(primes, output_tees)) return output
If we have this function, our original regular_numbers_lazy() can be defined as regular_numbers_lazy = functools.partial(k_smooth_numbers, 5).
In this post, we saw that we can express an efficient implementation of this algorithm quite compactly with the clever use of a generator and a closure, but the question remains: is this a good idea? Is the shorter code also easier to understand than a more verbose version that doesn't rely on recursively-defined generators? You can eliminate the closures by storing the state in an object, like so:
class SmoothGenerator: def __init__(self, k: int): self._primes : Sequence[int] = generate_primes(k) def __iter__(self) -> Iterator[int]: output, *output_tees = itertools.tee(self._generate(), len(self._primes)) self._sequences = ( (prime * value for value in output_tee) for prime, output_tee in zip(self._primes, output_tees)) return output def _generate(self) -> Iterable[int]: yield 1 last_val = 1 for next_val in heapq.merge(*self._sequences): if next_val != last_val: last_val = next_val yield next_val def k_smooth_numbers(k: int) -> Iterable[int]: return iter(SmoothGenerator(k))
This is essentially the same as the version that uses a closure, but it is more explicit about where it stores its state. I'd say it's somewhat easier to understand for people who are uncomfortable with a function referencing variables that have not yet been declared, but closing over as-yet-undeclared variables is probably the least confusing aspect of this implementation. The real challenge to readability is the laziness and recursive nature of the generator functions, and I do not think that those can be eliminated without adding enough complexity to make the whole thing harder to follow.
I'll note that a lot of the heavy lifting in reducing the cognitive overhead here is being done by heapq.merge and itertools.tee. In many ways, being comfortable using these and other "functional programming" constructs is more useful than having a good understanding of how to write clever recursive generators.
|||This problem is actually such a neat exercise for demonstrating the power of resumable functions (generators) that it also forms the basis of a very long and interesting test in the CPython test suite, though calling it a "test" sort of undersells the fact that it's actually a short article written in literate programming style that is used as a doctest — if you are interested in these kinds of topics, the whole file is worth reading.|
|||Since the initial publication, I've gotten the impression that people think that I came up with this approach. Considering that I've been meaning to write this blog post for about two years, I no longer really remember how much of it is mine, but I am fairly confident that I cribbed the general idea from the test mentioned in the previous footnote. This is consistent with the fact that this solution is exceedingly elegant and Tim Peters is far more clever about this and many other things than I could ever hope to be.|
|||(1, 2) |
I am playing a bit fast-and-loose with my terminology by calling these "copies", which unfortunately hides a useful feature of itertools.tee. itertools.tee doesn't create 4 copies of the input generator, it is more like a buffering wrapper for the underlying iterator. It consumes values from the underlying iterator upon request, and keeps a copy of each value until that value is no longer needed by any of the teed operators. This means that you store in memory only as many values as necessary to remember the subsequence of the original iterator that spans from the position of the slowest iterator to the position of the fastest iterator. Something like:
def peek_ahead(iterator: Iterator[T]) -> Iterator[tuple[T, T]]: a, b = itertools.tee(iterator) next(b, None) # Advance b to be 1 element ahead of a return zip(a, b)
Will keep a buffer of 2 elements, whereas something like this this will store the entire contents of the generator in the tee's buffer:
def non_consuming_sum(iterator: Iterator[T]) -> tuple[Iterator[T], T]: a, b = itertools.tee(iterator) iterator_sum = sum(b) # Consumes all values from the iterator b return a, iterator_sum