Pub-sub Redis in Micronaut

Pub-sub Redis in Micronaut

Apr 13, 2024

Today I struggled a lot to get a simple Redis Pub-sub working locally. I'm working on a private project looking to illustrate the Kong framework in action. For that I need some sort of Queue system and decided to venture on one that is relatively new to me.

I started out by thinking out a way to first test this. In this way I wanted to make this in a more thought-out way. The tests would help, because I'm not interested at this point to have something fully running. I just want to start with testing as it should be. So going through the Micronaut website, I found out that I would need a few dependencies, namely this one:

<dependency>
    <groupId>io.micronaut.redis</groupId>
    <artifactId>micronaut-redis-lettuce</artifactId>
    <version>5.2.0</version>
</dependency>

This is a lettuce dependency, which is mostly a Redis client, which allows to access a lot of functionalities in Redis. Then I created this configuration in the application.yml file:

redis:
    uri: redis://localhost
    timeout: 30s

I added the timeout in order to guarantee connection. This may be removed in the future.

Because I love all sorts of tests and in this case I was interested in seeing how Redis behaves I resourced to the use of Testcontainers. If you do not know, Testcontainers is a framework that allows you to start and stop containers, programatically, via code, using your Docker environment locally or in a pipeline. Anyways I created this abstraction to allow this to happen for now:

abstract class AbstractBuyOddYuccaConcertContainerTest {
    companion object {
        @Container
        @JvmField
        val postgreSQLContainer: TestPostgresSQLContainer = TestPostgresSQLContainer("postgres:14")
            .withUsername("kong")
            .withPassword("kong")
            .withDatabaseName("yucca")
            .withExposedPorts(POSTGRESQL_PORT)
            .withCreateContainerCmdModifier { cmd ->
                cmd.withHostConfig(
                    HostConfig().withPortBindings(
                        PortBinding(
                            bindPort(POSTGRESQL_PORT),
                            ExposedPort(POSTGRESQL_PORT)
                        )
                    )
                )
            }


        @Container
        @JvmField
        val redis: GenericContainer<*> = GenericContainer(parse("redis:5.0.3-alpine"))
            .withExposedPorts(REDIS_PORT)
            .withCreateContainerCmdModifier { cmd ->
                cmd.withHostConfig(
                    HostConfig().withPortBindings(
                        PortBinding(
                            bindPort(REDIS_PORT),
                            ExposedPort(REDIS_PORT)
                        )
                    )
                )
            }

        private val config = ClassicConfiguration()

        init {
            postgreSQLContainer.start()
            redis.start()
            config.setDataSource(
                postgreSQLContainer.jdbcUrl,
                postgreSQLContainer.username,
                postgreSQLContainer.password
            )
            config.schemas = arrayOf("ticket")
            Flyway(config).migrate()
        }
    }
}

In this case I'm not only using Redis, but I'm also using Postgres. This is for other functionalities in my project, but the point here is to explain how I got to test the first Redis functionality in my project.

This is the test:

@MicronautTest
internal class ReservationsServiceTest @Inject constructor(
    private val reservationsService: ReservationsService
) : AbstractBuyOddYuccaConcertContainerTest() {

    @Test
    fun `should get all reservations`(): Unit = runBlocking {
        reservationsService.getAll().toList().shouldNotBeNull()
    }

    companion object {
        @JvmStatic
        @BeforeAll
        fun setup() {
            redis.start()
            postgreSQLContainer.start()
        }
    }
}

The reason why I'm, at this moment, forcing the start of the containers in the @BeroreAll test, is because I noticed that @MicronautTest doesn't seem to work very well with the TestContainers framework with my current setup and the Redis client seems start being configured very much before the container starts.

Of course for this part, I didn't really adhere strictly to the idea of making the integration test first and then perform the implementation. What I did is to firstly implement the service in this way:

@Singleton
class ReservationsService(
    private val receiptRepository: ReceiptRepository,
    private val redisClient: RedisClient
) {
    init {
        val statefulRedisPubSubConnection = redisClient.connectPubSub()
        statefulRedisPubSubConnection.addListener(Listener())
        val redisPubSubAsyncCommands = statefulRedisPubSubConnection.async()
        redisPubSubAsyncCommands.subscribe("channel1")
        redisClient.connectPubSub().async().publish("channel1", "test")
    }

    @OptIn(DelicateCoroutinesApi::class)
    suspend fun createTicket(ticketDto: @Valid TicketDto) = GlobalScope.launch {
        receiptRepository.save(Receipt()).toDto
    }

    fun getAll(): Flow<Receipt> = receiptRepository.findAll()
}

class Listener : RedisPubSubAdapter<String, String>() {
    override fun message(p0: String?, p1: String?) {
    }
}

Of course this is mean't in the future to be where the production code starts. However this class represents the first successful result in trying to access Redis. Among the interfaces readily available in the Lettuce framework, the only one that suits me at the moment in order to implement a Pub Sub mechanism is the RedisClient. The reason why this is the only code that works for me and for now is because I realized that in order for the Pub Sub to work, I need to create an async commands for the publisher and an async commands for the subscriber. Once the RedisClient is injected, the way that the experimental code works goes something like this

  1. Create a Pub Sub connection

val statefulRedisPubSubConnection = redisClient.connectPubSub()
  1. Add a listener (It has to implement an interface of type RedisPubSubListener. RedisPubSubAdapter implements all methods and they are all empty by default)

statefulRedisPubSubConnection.addListener(Listener())
  1. Get the commands instance to be able to issue commands

val redisPubSubAsyncCommands = statefulRedisPubSubConnection.async()
  1. Create a channel by subscribing to it

redisPubSubAsyncCommands.subscribe("channel1")
  1. Send the first message in another connection

redisClient.connectPubSub().async().publish("channel1", "test")

Now I'm very happy to see this working locally. I can spin-off the Redis container, connect to it and see the first Pub Sub working! If you are interested in follow the changes in my project, please have a look at it here: https://github.com/jesperancinha/buy-odd-yucca-concert/tree/master/buy-oyc-api-service/src/main. My goal of this quick post was to show you how I came about to understand the basics of a Pub Sub system implemented with Redis. You can find more documentation about this project here: https://github.com/jesperancinha/buy-odd-yucca-concert/tree/master/buy-oyc-api-service.

Enjoy this post?

Buy João Esperancinha a coffee

More from João Esperancinha