常见的消息中间件

Posted by Hsz on April 9, 2019

简评常见的消息中间件

就像构建程序时我们往往需要一些组件用于结构不同的模块一样,在复杂的分布式系统中,我们使用消息中间件来为不同的组件解耦.

常见的消息中间件分为两种功能:

  • 消息队列,用于构建生产消费模式
  • 广播,用于构建发布订阅模式

常见的中间件有:

  • redis: 使用List数据类型实现消息队列;使用PUBLISH/subscribe构建广播

  • kafka: 同topic可以将消息广播给不同的consumer group,相同consumer groupconsumer消费消息而非广播

  • rabbitmq: 队列绑定到交换机,消息通过交换机发布到队列中,广播就是发送至绑定的指定复数队列中

接下来是简单演示,本文将使用上面3种工具实现一个分发随机数求平方和的功能.其中会用到这两种模式:

  • 生产消费模式: 生产者向sourceQ队列发送数据,消费者从sourceQ队列取数据,消费者计算完成平方后将结果放入队列resultQ,生产者接收resultQ队列中的结果更新累加结果并打印在标准输出中.
  • 广播模式: 生产者在收到KeyboardInterrupt错误时向频道exitCh发出消息,消费者订阅频道exitCh,当收到消息时退出.

我们使用python作为编程语言,项目代码在hsz1273327/message-oriented-middleware-show-how,不同的分支是不同的中间件实现,master分支(默认)是rabbitmq实现

redis实现

redis是一个高性能的key-value内存数据库.在其中实现了一些数据结构.这个小东西轻量但高效深受开发者喜爱.redis默认端口6379

  • 生产者
import sys
import time
import random
import threading
from redis import Redis

redis = Redis.from_url('redis://localhost:6379/2')

sourceQ = "sourceQ"
resultQ = "resultQ"
exitCh = "exitCh"

def push(Q, value):
    if redis.exists(Q):
        if redis.type(Q) == "list":
            redis.xlpush(Q, value)
        else:
            redis.delete(Q)
            redis.lpush(Q, value)
    else:
        redis.lpush(Q, value)
    print(f"send {value} to {Q}")

def producer():
    while True:
        data = random.randint(1, 400)
        push(sourceQ, data)
        time.sleep(1)

def collector():
    sum = 0
    while True:
        if redis.exists(resultQ) and redis.type(resultQ) == b"list":
            data = redis.rpop(resultQ)
            if data:
                print(f"received {data.decode()}")
                sum += int(data)
                print(f"get sum {sum}")
            else:
                time.sleep(1)
        else:
            time.sleep(1)

def main():
    t = threading.Thread(target=collector, daemon=True)
    t.start()
    try:
        producer()
    except KeyboardInterrupt:
        redis.publish(exitCh, 'Exit')
    except Exception as e:
        raise e
    finally:
        sys.exit()


if __name__ == "__main__":
    main()
  • 消费者
import sys
import time
import threading
from redis import Redis

redis = Redis.from_url('redis://localhost:6379/2')
sourceQ = "sourceQ"
resultQ = "resultQ"
exitCh = "exitCh"

def push(Q, value):
    if redis.exists(Q):
        if redis.type(Q) == "list":
            redis.xlpush(Q, value)
        else:
            redis.delete(Q)
            redis.lpush(Q, value)
    else:
        redis.lpush(Q, value)
    print(f"send {value} to {Q}")

def get_result():
    while True:
        if redis.exists(sourceQ) and redis.type(sourceQ) == b"list":
            data = redis.rpop(sourceQ)
            if data:
                print(f"received {data.decode()}")
                push(resultQ, int(data)**2)
            else:
                time.sleep(1)
        else:
            time.sleep(1)

def main():
    t = threading.Thread(target=get_result, daemon=True)
    t.start()
    p = redis.pubsub()
    p.subscribe('exitCh')
    while True:
        message = p.get_message()
        if message:
            if message.get("data") == b"Exit":
                p.close()
                sys.exit(0)
            else:
                time.sleep(1)
        else:
            time.sleep(1)

if __name__ == "__main__":
    main()

在作为队列时,redis适用list类型的值存储要处理的数据,由于list是一个双向链表,我们可以在左边插入也可以在右边插入,也可以在左边取出也可以在右边取出,因此可以以此构造先进先出或者先进后出队列.而List本身最大长度为$232-1$,redis对于数据需要注意redis的list是可能阻塞的,阻塞原因各种各样,可能因为数据同步,可能因为内存不足.

而redis的redis的pub/sub实现相当简单,它只是将订阅者的套接字存放在一个list中,当有消息来时顺次每个都发一遍而已.因为不会保留发布出去的内容,所以发布订阅是无法回溯的.

由于没有ack机制,redis无法保证消息的一定到达也不能很好的保证消息一致性.当然我们可以在redis的基础上通过各种手段给它加上ACK机制,加上边界检查等,但这就违背其初衷了.

redis本质上是一个内存数据库,常用于共享内存,因此执行效率很高,同时天生支持高并发,有测试对较小的的数据性能相当之高.

redis的适用场景是可以容忍数据丢失或不一致,但对实时性要求高的引用场景,比如量化系统中的报价推送.

kafka实现

kafka是一个追求高吞吐的分布式消息队列.和redis比较是另一个极端,几乎是为复杂而生:

  • 天生支持分布式,并且它必须依赖zonekeeper维护集群一致性.
  • 天生持久化,硬盘允许情况下保留所有消息.

kafka

我们在本地可以使用docker部署一个用于测试,kafka默认端口为9092.

  • docker-compose.yml
version: '3.6'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic1:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

使用docker-compose up -d启动就可以在localhost:9092访问到kafka了

  • 生产者
import sys
import time
import random
import threading
from kafka import KafkaProducer
from kafka import KafkaConsumer

kfkproducer = KafkaProducer(bootstrap_servers='localhost:9092')

sourceQ = "sourceQ"
resultQ = "resultQ"
exitCh = "exitCh"

def push(Q, value):
    future = kfkproducer.send(Q,str(value).encode())
    result = future.get(timeout=10)
    print(f"send {value} to {Q},{result}")

def producer():
    while True:
        data = random.randint(1, 400)
        push(sourceQ, data)
        time.sleep(1)

def collector():
    sum = 0
    consumer = KafkaConsumer(resultQ, group_id=resultQ, bootstrap_servers='localhost:9092')
    for msg in consumer:
        print(f"received {msg}")
        sum += int(msg.value.decode("utf-8"))
        print(f"get sum {sum}")

def main():
    t = threading.Thread(target=collector, daemon=True)
    t.start()
    try:
        print("start")
        producer()
    except KeyboardInterrupt:
        push(exitCh, "Exit")
    except Exception as e:
        print(e)
        raise e
    finally:
        sys.exit()

if __name__ == "__main__":
    main()
  • 消费者
import sys
import uuid
import time
import threading
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import TopicPartition

kfkproducer = KafkaProducer(bootstrap_servers='localhost:9092')

sourceQ = "sourceQ"
resultQ = "resultQ"
exitCh = "exitCh"

def push(Q, value):
    future = kfkproducer.send(Q, str(value).encode())
    result = future.get(timeout=10)
    print(f"send {value} to {Q},{result}")

def get_result():
    PARTITION_0 = 0
    consumer = KafkaConsumer(sourceQ, group_id=sourceQ, bootstrap_servers='localhost:9092')
    for msg in consumer:
        print(f"received {msg}")
        data = int(msg.value.decode("utf-8"))
        push(resultQ, int(data)**2)

def main():
    t = threading.Thread(target=get_result, daemon=True)
    t.start()
    group_id = str(uuid.uuid4())
    print(f"group_id:{group_id}")
    consumer = KafkaConsumer(exitCh, group_id=group_id, bootstrap_servers='localhost:9092')
    for msg in consumer:
        print(msg)
        if msg.value == b"Exit":
            sys.exit(0)

if __name__ == "__main__":
    main()

kafka是相当复杂的,但其接口还是比较简单的.只是我们得分清如何设置才能达到自己想要的效果.kafka中有3个概念:

  • topic,订阅的主题,用来区分隔离不同的消息数据,屏蔽了底层复杂的存储方式.对于大多数人来说,在开发的时候只需要关注数据写入到了哪个topic,从哪个topic取出数据.
  • consumer group,消费者组Kafka实现单播和广播两种消息模型的手段.同一个topic的数据会广播给不同的group;同一个group中的消费者,只有一个能拿到这个数据
  • consumer,消费者
  • partition分区,Kafka下数据存储的基本单元,同一个topic的数据会被分散的存储到多个partition中,这些partition可以在同一台机器上.也可以是在多台机器上.优势在于有利于水平扩展,避免单台机器在磁盘空间和性能上的限制.同时可以通过复制来增加数据冗余性以提高容灾能力.为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍.

topicpartition是1对多关系;topicconsumer group是多对多关系;consumer可以是consumer group的一员也可以不是.

一个topic的一条信息是怎么到消费者手中的呢:

  1. 生产者将信息放入topic
  2. kafka会将这个信息广播给各个consumer group
  3. 在各个consumer group中将只有1个consumer可以获得这条信息,kafka会依据partition作为分配算法的一个参数将消息分配给consumer

和传统的消息系统一样,Kafka保证消息的顺序不变.但是尽管服务器保证了消息的顺序,消息还是异步的发送给各个消费者,这也意味着并行消费将不能保证消息的先后顺序.如果只让一个消费者处理消息,又违背了并行处理的初衷.在这一点上Kafka可以采用了一种分而治之的策略:分区. 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序.所以如果你想要顺序的处理Topic的所有消息那就只提供一个partition.

kafka保证一致性和可用性:

  • 生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说如果一个消息M1和M2使用相同的producer发送,M1先发送那么M1将比M2的offset低,并且优先的出现在日志中.消费者收到的消息也是此顺序.
  • 如果一个Topic配置了复制因子(replication factor)为N,那么可以允许N-1服务器宕机而不丢失任何已经提交(committed)的消息.

kafka作为消息中间件适用的场景主要是:

  1. 强调高一致性的场景,尤其需要保证队列顺序的场景
  2. 高吞吐量场景,天生分布式带来的好处
  3. 消息需要保留的场景
  4. 对实时性要求不高的场景,盘写入速度就是kafka处理速度的极限,与基于内存的消息队列比完全不在一个数量级

做为一个网络中间件,像redis一样kafka也有一些副产品一样的功能–一定程度的时序数据库,消费者新建一个group并订阅一个topic,同时设置auto_offset_resetearliest就可以全量的获得这个topic下所有可用的数据.

rabbitmq实现

rabbitmq就是一个标准的传统消息队列了.它虽然不及redis实时性好,不及kafka吞吐量好,但它可以做复杂的路由,性能稳定,就像数据库领域中关系数据库之于非关系数据库一样,是大多数场景下最应该考虑使用的消息队列.

rabbitmq

rabbitmq默认的用户名和密码是guest:guest,可以在端口15672查看到管理页面.

它支持两种协议

  • AMQP Advanced Message Queuing Protocol,专业的消息队列应用层协议,默认端口56725671
  • MQTT 物联网领域常用的消息队列协议,只有发布订阅模式,专为硬件性能低下的远程设备以及网络状况糟糕的情况而设计,默认端口18838883,这种协议本文不介绍以后如果谈到IOT领域我们再回来谈

虽然官方推荐的工具是pika,但这个AMQP实现太过陈旧,都是基于回调的而且其堵塞接口不是线程安全的用起来很恶心,本文使用aio-pika,一个基于python3.5+异步工具的客户端. 我们的实现如下:

  • 生产者
import random
import asyncio
import aio_pika

AMQP_URL = 'amqp://guest:guest@localhost:5672/'

async def producer(channel):
    while True:
        data = random.randint(1, 400)
        await channel.default_exchange.publish(
            aio_pika.Message(body=str(data).encode()),
            routing_key="sourceQ"
        )
        print(f"send {data} to sourceQ")
        await asyncio.sleep(1)

async def collector(Q):
    sum = 0
    async with Q.iterator() as queue_iter:
            # Cancel consuming after __aexit__
        async for message in queue_iter:
            async with message.process():
                print(f"received {message.body}")
                sum += int(message.body)
                print(f"get sum {sum}")

async def pubExt():
    print("exit publishing")
    connection = await aio_pika.connect_robust(AMQP_URL)
    async with connection:
        exitCh = await connection.channel(2)
        ex = await exitCh.declare_exchange('exitCh', type=aio_pika.exchange.ExchangeType.FANOUT)
        await ex.publish(aio_pika.Message(body=b"Exit"), routing_key="")
    print("exit published")

async def main():
    connection = await aio_pika.connect_robust(AMQP_URL)
    async with connection:
        # Creating channel
        channel = await connection.channel(1)    # type: aio_pika.Channel
        await channel.declare_queue("sourceQ")
        resultQ = await channel.declare_queue("resultQ")
        prodcor = producer(channel)
        collcor = collector(resultQ)
        await asyncio.gather(prodcor, collcor)

if __name__ == "__main__":
    print("producer start")
    try:
        asyncio.get_event_loop().run_until_complete(main())
    except KeyboardInterrupt:
        asyncio.get_event_loop().run_until_complete(pubExt())
    finally:
        print("done")

  • 消费者
import uuid
import asyncio
import aio_pika

AMQP_URL = 'amqp://guest:guest@localhost:5672/'

async def consumer(sourceQ, channel):
    async with sourceQ.iterator() as queue_iter:
            # Cancel consuming after __aexit__
        async for message in queue_iter:
            async with message.process():
                print(f"received {message.body}")
                result = int(message.body.decode("utf-8"))**2
            await channel.default_exchange.publish(
                aio_pika.Message(body=str(result).encode()),
                routing_key="resultQ"
            )
            print(f"send {result} to resultQ")

async def main():
    connection = await aio_pika.connect_robust(AMQP_URL)

    async with connection:
            # Creating channel
        channel = await connection.channel(1)    # type: aio_pika.Channel
        sourceQ = await channel.declare_queue("sourceQ")
        await channel.declare_queue("resultQ")
        exitCh = await connection.channel(2)
        extE = await exitCh.declare_exchange('exitCh', type=aio_pika.exchange.ExchangeType.FANOUT)
        q_id = str(uuid.uuid4())
        extQ = await exitCh.declare_queue(q_id, auto_delete=True)
        await extQ.bind(extE)
        task = asyncio.ensure_future(consumer(sourceQ, channel))
        async with extQ.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(f"received {message.body}")
                    if message.body == b"Exit":
                        task.cancel()
                        return

if __name__ == "__main__":
    print("consumer start")
    asyncio.run(main())

rabbitmq有这么几个关键字:

  1. channel每个频道由交换机和队列组成,可以定制多个交换机和多个队列,通常用于区分业务

  2. exchange交换机,生产者生产的消息的发送入口,用于定义消息分发的规则–即一条消息发布出来后应该放入哪些队列,有3种常用的模式:

    • Direct Exchange处理路由键需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配.如果一个队列绑定到该交换机上要求路由键’dog’则只有被标记为’dog’的消息才被转发.
    • Fanout Exchange不处理路由键.只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上.很像子网广播,每台子网内的主机都获得了一份复制的消息.Fanout交换机转发消息是最快的.
    • Topic Exchange将路由键和某模式进行匹配.此时队列需要绑定要一个模式上.符号#匹配一个或多个词;符号*匹配不多不少一个词.比如audit.#能够匹配到audit.irs.corporate.audit.*只会匹配到audit.irs.

    另外每个channel都会有一个特殊的默认exchange,当你手动创建一个队列时后台会自动将这个队列绑定到一个名称为空的Direct类型交换机上,绑定路由名称与队列名称相同.

  3. queue消费者订阅消息的入口,每个queue需要定义一个路由键用于让交换机做匹配.每个队列中一条消息只能被一个订阅者消费掉

rabbitmq在各个方面都很均衡,几乎可以使用在任意场景,是这三种中最常用的默认选择.

下面是rabbitmq的几个最佳实践:

  • 保证队列中有尽量少的消息堆积(redis也适用)

    当队列中堆积过多消息时会给内存带来极大的压力,为了释放内存Rabbit会将消息刷至磁盘,当有大量消息需要刷新至磁盘时,会造成队列的阻塞进而影响系统吞吐量.

  • 启用懒惰队列功能

    懒惰队列会将生产者的消息持久化到磁盘中而不驻留在内存,只有当消息需要被消费时才会重新加载到内存. 这可以很好的避免内存过高.当然也是有代价的,引入磁盘必然会降低实时性并增加io负担因此如果要求实时性就不要开了. 如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失. 当设置了队列最大长度时也应该禁用队列的lazy属性.

  • 持久化消息与队列

    如果你期望消息不丢失,那么需要将队列声明为durable的,并且发送的消息也要声明为persistent(二者必须同时设置)这样在broke发生故障重启后消息就不会丢失.

  • 设置max-length或者TTL

    对于那些吞吐量优先,允许消息丢失的应用程序最好设置这两个属性(队列最大长度和消息过期时间)避免消息堆积来保证内存正常.

  • 队列数量

    rabbit中的队列是单线程的,适当使用多个队列和消费者可以增加吞吐量.但不要设置太多的队列.如果底层节点上的队列与内核一样多则可以实现最佳吞吐量.

  • 自动删除不使用的队列

    有如下几种方式自动删除不使用了的队列

    1. 可以在队列上设置TTL策略.
    2. 设置了自动删除属性的队列.最后一个消费者取消或连接关闭设置了自动关闭的队列就会被删除
    3. 设置独占队列.独占队列只能被声明他的连接所使用.当声明连接关闭或消失时将删除独占队列.
  • 设置优先级队列的优先级上限

    在大多数情况下,不超过5个优先级就足够了

  • 消息确认机制

    如果不需要确保消息不丢失,把消息确认机制关掉可以提高吞吐量;如果开启了消息确认,接收到重要消息的消费应用程序在完成对这些消息的任何处理(工作程序崩溃,异常等)之前都不应该确认消息.发布确认类似于消息确认机制,服务器在收到来自发布者的消息时进行ack.发布确认对性能也有影响.但是如果生产者需要确保消息到达broker,那么它是必需的.由于所有未确认的消息必须驻留在服务器的内存中.如果有太多未确认的消息就会耗尽内存.限制未确认消息的有效方法是设置客户端预取值,即prefetch_count的值.

  • Prefetch预取值

    prefetch属性用来定义一次可以给消费者发送多少条消息,设置合适的值会使消费者得到有效利用.默认情况下预取值为无限大,这意味着消费者尽可能的接收生产者的消息.预取值限制了消费者在确认消息之前可以接收多少消息.所有预获取的消息都将从队列中删除,并且对其他消费者不可见.假如prefetch_count值设为10且消费者开启acknowledge,获取10个消息后rabbitmq不再将队列中的消息推送过来.当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息. 预取值过小会降低rabbit的性能;预取值过大时当存在多个消费者,可能会导致某个消费者负载过高,而其他的消费者处于空闲状态.

    关于预取值的建议:如果消费者数量不多并处理迅速可以将预取值设的大一些,保证消费者充分使用;如果消费者数量较多,且处理时间较短,可以将值取的比上个情况下稍小一些.如果消费者较多且处理时间较长,建议将预取值设为1以便各个消费者”负载均衡”.具体的数目可以在机器上进行测试,然后选取合适的值.总之尽量不使用系统默认的预取策略.最后最重要的一点是预取值只有在手动确认模式下才生效.