# Generating k-Smooth numbers in Python

Some time ago, I was introduced to the idea of Regular numbers (also called a *Hamming number*), which is the set of all numbers which can be expressed by the equation 2^{n} × 3^{m} × 5^{k}. Generating these numbers is harder than it looks, because they quickly become sparse, so the "brute force" approach is impractical for anything beyond the first few regular numbers:

```
def regular_numbers_brute(n: int) -> Iterable[int]:
"""Yields the first n regular numbers."""
found = 0
for i in itertools.count(1): # Iterate over integers from 1 to ∞
if found >= n:
break
j = i
for s in [2, 3, 5]:
# Divide by each prime factor until it no longer evenly divides.
while j % s == 0:
j /= s
# If we've divided by only 2, 3 and 5 and we get 1, it's regular.
if j == 1:
yield i
found += 1
```

If you actually run this, you'll find that the 100^{th} regular number is 1536, and the 200^{th} regular number is 16200, so if you are checking every number, the second 100 numbers you generated takes 10x longer than the first 100. The 500^{th} number is 937,500, so you need to check nearly 1M numbers to find the first 500 regular numbers.

It turns out that there's a fairly simple intuition about the structure of these numbers that makes it relatively easy to come up with an efficient algorithm for generating them: every regular number (other than 1) is the result of multiplying another regular number by 2, 3 or 5. So you start your sequence with 1, which tells you that 2, 3 and 5 are regular numbers. Pick the next one in the sequence, 2, and multiply it by 2, 3 and 5 to learn that 4, 6 and 10 are regular numbers, and repeat this indefinitely to get the full sequence — after that, you just have to put them in the right order.

Implementing this efficiently and *lazily* is surprisingly easy and compact in Python when you make use of generators[1][2] and the nature of variable scope. First I'll show you the implementation, then I'll explain what's happening:

```
import heapq
from typing import Iterable
def regular_numbers_lazy() -> Iterable[int]:
def generate() -> Iterable[int]:
"""Function to generate all regular numbers."""
yield 1 # Seed the algorithm with the first regular number
# Keep track of what we just emitted to eliminate duplicates
last_val = 1
# Note: `sequences` comes from the outer scope!
for next_val in heapq.merge(*sequences):
# The values come out of heapq.merge in order, so checking
# whether the current # value is the same as the last value is
# sufficient to eliminate # duplicates.
if next_val != last_val:
last_val = next_val
yield next_val
output, i2, i3, i5 = itertools.tee(generate(), 4)
h2 = (2 * x for x in i2) # Last regular number multiplied by 2
h3 = (3 * x for x in i3) # Last regular number multiplied by 3
h5 = (5 * x for x in i5) # Last regular number multiplied by 5
sequences = (h2, h3, h5)
# Yield only the first n elements from the generator
return output
```

This may be a bit hard to disentangle on first look because there's something akin to recursion going on here. The function `generate()` closes over the variable `sequences`, which is a tuple of 3 iterators, `h2`, `h3` and `h5`, each of which are a tee of the result of calling `generate()`. So what's happening here? How does this progress? The key is to recognize two things: generators are lazy (meaning you only execute as much of them as you need to handle the next `next()` call), and `generate` yields *at least one value* before it needs to access `sequences`. So breaking down what happens in the function call:

```
def generate() -> Iterable[int]:
...
```

First, `generate()` is defined but not executed. Python doesn't need to know anything about what happens inside of it except that it is syntactically correct. At this point the variable `sequences` is not defined, but that's OK because we don't need to access it yet.

```
output, i2, i3, i5 = itertools.tee(generate(), 4)
```

Here, we call `generate()` which creates a generator object, but nothing has requested any *values* from the generator, so *no code needs to be executed*. We then create 4 copies[3] of that generator and store them in variables `output`, `i2`, `i3` and `i5`.

```
h2 = (2 * x for x in i2) # Last regular number multiplied by 2
...
sequences = (h2, h3, h5)
```

Now we create 3 new generators from our tees of the original `generate()` call, and store them in a tuple called `sequences`. In the case of `h2`, it is a generator that will request a value from its copy of the generator and yield that value multiplied by 2. `h3` and `h5` are similar. Note that again these are generators, and we haven't asked for any values from them yet, meaning that none of the code in `generate()` or in these generator expressions has been executed yet, but now `sequences` has been defined, so when the code *is* executed, it will refer to this tuple `(h2, h3, h5)`.

```
return output
```

Finally, we return an iterable that is just the output of `generate()`. Note that the entire function has executed already and not a single line of `generate()` has been executed, so we don't have to worry that `sequences` is defined *after* `generate()`, which references it. Now what happens the first few times you call `next()` on the result of this function? Recall that what we're returning is the iterator `output`, which is a tee from a call to `generate()`, so we'll execute as much of `generate()` as is necessary to get the first value it yields. This is easy, since the first line of `generate()` is:

```
yield 1
```

So the first value yielded is `1`.

The next time you call `next()`, we start to get into the real meat of the function. The first thing that happens is that we initialize `last_val` to `1`, then we start iterating over `heapq.merge(*sequences)`:

```
for next_val in heapq.merge(*sequences):
if next_val != last_val:
last_val = next_val
yield next_val
```

`heapq.merge` is a way to lazily sort sorted iterators — you pass it any number of iterators and it consumes one value from each, then returns whichever value comes next. So when we call `heapq.merge(*sequences)`, it will first call `next()` on each of `h2`, `h3` and `h5`. When `next` is called on `h2`, the generator expression calls `next()` on its tee of the original `generate()` call, getting the first value in the `generate` generator: `1`. It multiplies this value by `2` and yields that. `h3` and `h5` do the same thing, and yield `3` and `5` respectively, so `heapq.merge` has `[2, 3, 5]` so far, and so the first value it yields is `2`, and since `1 != 2`, `generate()` yields `2`, the next value for `output`, and execution pauses.

Now when we call `next()` on the output *again*, we resume the `for` loop: the next value for `h2` is retrieved by getting the next value from `generate()` (which we saw from the previous paragraph was `2`) and multiplying that by 2. Thus `heapq.merge` is choosing the next highest value from among `[4, 3, 5]`. It chooses `3`, calls `next(h3)` and gets `6`, and we go through another loop, choosing from `[4, 6, 5]`, and so on.

As you can see, `generate()` only ever does as much work as is necessary to calculate the next element of the sequence (and it's seeded with the first element of the sequence: `1`). `itertools.tee` manages keeping track of the minimum amount of the sequence so that only minimal memory is used (as explained in more detail in footnote [3]).

This sort of approach is also easy to generalize: if we have a function `generate_primes(k)` to generate a list of prime numbers less than or equal to *k*, we can easily modify this to generate k-smooth numbers for **any** value of *k*:

```
def k_smooth_numbers(k: int) -> Iterable[int]:
def generate() -> Iterable[int]:
"""Function to generate all k-smooth numbers."""
yield 1
last_val = 1
for next_val in heapq.merge(*sequences):
if next_val != last_val:
last_val = next_val
yield next_val
primes : Sequence[int] = generate_primes(k)
output, *output_tees = itertools.tee(generate(), len(primes))
sequences = (
(prime * value for value in output_tee)
for prime, output_tee in zip(primes, output_tees))
return output
```

If we have this function, our original `regular_numbers_lazy()` can be defined as `regular_numbers_lazy = functools.partial(k_smooth_numbers, 5)`.

## Thoughts

In this post, we saw that we can express an efficient implementation of this algorithm quite compactly with the clever use of a generator and a closure, but the question remains: is this a good idea? Is the shorter code also easier to understand than a more verbose version that doesn't rely on recursively-defined generators? You can eliminate the closures by storing the state in an object, like so:

```
class SmoothGenerator:
def __init__(self, k: int):
self._primes : Sequence[int] = generate_primes(k)
def __iter__(self) -> Iterator[int]:
output, *output_tees = itertools.tee(self._generate(),
len(self._primes))
self._sequences = (
(prime * value for value in output_tee)
for prime, output_tee in zip(self._primes, output_tees))
return output
def _generate(self) -> Iterable[int]:
yield 1
last_val = 1
for next_val in heapq.merge(*self._sequences):
if next_val != last_val:
last_val = next_val
yield next_val
def k_smooth_numbers(k: int) -> Iterable[int]:
return iter(SmoothGenerator(k))
```

This is essentially the same as the version that uses a closure, but it is more explicit about where it stores its state. I'd say it's somewhat easier to understand for people who are uncomfortable with a function referencing variables that have not yet been declared, but closing over as-yet-undeclared variables is probably the least confusing aspect of this implementation. The real challenge to readability is the laziness and recursive nature of the generator functions, and I do not think that those can be eliminated without adding enough complexity to make the whole thing harder to follow.

I'll note that a lot of the heavy lifting in reducing the cognitive overhead here is being done by `heapq.merge` and `itertools.tee`. In many ways, being comfortable using these and other "functional programming" constructs is more useful than having a good understanding of how to write clever recursive generators.

## Footnotes

[1] | This problem is actually such a neat exercise for demonstrating the power of resumable functions (generators) that it also forms the basis of a very long and interesting test in the CPython test suite, though calling it a "test" sort of undersells the fact that it's actually a short article written in literate programming style that is used as a doctest — if you are interested in these kinds of topics, the whole file is worth reading. |

[2] | Since the initial publication, I've gotten the impression that people think that I came up with this approach. Considering that I've been meaning to write this blog post for about two years, I no longer really remember how much of it is mine, but I am fairly confident that I cribbed the general idea from the test mentioned in the previous footnote. This is consistent with the fact that this solution is exceedingly elegant and Tim Peters is far more clever about this and many other things than I could ever hope to be. |

[3] | (1, 2) I am playing a bit fast-and-loose with my terminology by calling these "copies", which unfortunately hides a useful feature of ```
def peek_ahead(iterator: Iterator[T]) -> Iterator[tuple[T, T]]:
a, b = itertools.tee(iterator)
next(b, None) # Advance b to be 1 element ahead of a
return zip(a, b)
``` Will keep a buffer of 2 elements, whereas something like this this will store the entire contents of the generator in the ```
def non_consuming_sum(iterator: Iterator[T]) -> tuple[Iterator[T], T]:
a, b = itertools.tee(iterator)
iterator_sum = sum(b) # Consumes all values from the iterator b
return a, iterator_sum
``` |