欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python_RabbitMQ实现简单的进程间通信

程序员文章站 2023-11-24 12:06:58
RabbitMQ 消息队列PYthreading Queue进程Queue 父进程与子进程,或同一父进程下的多个子进程进行交互缺点:两个不同Python文件不能通过上面两个Queue进行交互erlong基于这个语言创建的一种中间商win中需要先安装erlong才能使用rabbitmq_server start安装 Python module pip install pika or easy_install p...

RabbitMQ 消息队列

  1. PY
    threading Queue
    进程Queue 父进程与子进程,或同一父进程下的多个子进程进行交互
    缺点:两个不同Python文件不能通过上面两个Queue进行交互

  2. erlong
    基于这个语言创建的一种中间商
    win中需要先安装erlong才能使用
    rabbitmq_server start

  3. 安装 Python module
    pip install pika
    or
    easy_install pika
    or
    源码

  4. rabbit 默认端口15672
    查看当前时刻的队列数
    rabbitmqctl.bat list_queue

  5. exchange
    在定义的时候就是有类型的,决定到底哪些queue符合条件,可以接受消息
    fanout:所有bind到此exchange的queue都可以收到消息
    direct:通过routingkey和exchange决定唯一的queue可以接受消息
    topic: 所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue都可以接受消息

          表达式符号说明:
          # 代表一个或多个字符     * 代表任何字符
    
  6. RPC
    remote procedure call 双向传输,指令<-------->指令执行结果
    实现方法: 创建两个队列,一个队列收指令,一个队列发送执行结果

  7. 用rabbitmq实现简单的生产者消费者模型
    1) rabbit_producer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()

# create the queue, the name of queue is "hello"
# durable=True can make the queue be exist, although the service have stopped before.
channel.queue_declare(queue="hello", durable=True)

# n RabbitMQ a message can never be sent directly to queue,it always need to go through
channel.basic_publish(exchange = " ",
                      routing_key = "hello",
                      body = "Hello world!",
                      properties = pika.BasicPropreties(
                          delivery_mode=2,  # make the message persistence
                      )
                      )
print("[x] sent 'Hello world!'")
connection.close()

2) rabbit_consumer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()
channel.queue_declare(queue="hello", durable=True)

def callback(ch, method, properties, body):
    '''
    Handle the recieved data
    :param ch: The address of the channel
    :param method: Information about the connection
    :param properties:
    :param body:
    :return:
    '''
    print("------>", ch, method, properties )
    print("[x] Recieved %r" % body)
    # ack by ourself
    ch.basic_ack(delivery_tag = method.delivery_tag)

# follow is for consumer to auto change with the ability
channel.basic_qos(profetch_count=1)
# no_ack = True   represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,     # If have recieved message, enable the callback() function to handle the message.
                      queue = "hello",
                      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
  1. 用rabbitmq中的fanout模式实现广播模式
    1) fanout_rabbit_publish.py
# Author : Xuefeng

import pika
import sys

# 广播模式:
# 生产者发送一条消息,所有的开通链接的消费者都可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="logs",
                         type="fanout")
message = ' '.join(sys.argv[1:]) or "info:Hello world!"
channel.basic_publish(
    exchange="logs",
    routing_key="",
    body=message
)
print("[x] Send %r" % message)

connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefeng


import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()
# exclusive 排他,唯一的 随机生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

channel.queue_bind(exchange="logs",
                   queue=queue_name)


def callback(ch, method, properties, body):
    '''
    Handle the recieved data
    :param ch: The address of the channel
    :param method: Information about the connection
    :param properties:
    :param body:
    :return:
    '''
    print("------>", ch, method, properties )
    print("[x] Recieved %r" % body)
    # ack by ourself
    ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True   represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,     # If have recieved message, enable the callback() function to handle the message.
                      queue = "hello",
                      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
  1. 用rabbitmq中的direct模式实现消息过滤模式
    1) direct_rabbit_publisher.py
# Author : Xuefeng
import pika
import sys

# 消息过滤模式:
# 生产者发送一条消息,通过severity优先级来确定是否可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
                         type="direct")
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
    exchange="direct_logs",
    routing_key=severity,
    body=message
)
print("[x] Send %r:%r" % (severity, message))

connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="direct_logs",
                         type="direct")

# exclusive 排他,唯一的 随机生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange="direct_logs",
                        queue=queue_name,
                       routing_key=severity)
    


def callback(ch, method, properties, body):
    '''
    Handle the recieved data
    :param ch: The address of the channel
    :param method: Information about the connection
    :param properties:
    :param body:
    :return:
    '''
    print("------>", ch, method, properties )
    print("[x] Recieved %r" % body)
    # ack by ourself
    ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True   represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,     # If have recieved message, enable the callback() function to handle the message.
                      queue = "hello",
                      no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
  1. 用rabbitmq中的topic模式实现细致消息过滤模式
    1) topic_rabbit_publisher.py
# Author : Xuefeng

import pika
import sys

# 消息细致过滤模式:
# 生产者发送一条消息,通过运行脚本 *.info 等确定接收消息类型进行对应接收
connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
                         type="topic")
binding_key = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
    exchange="topic_logs",
    routing_key=binding_key,
    body=message
)
print("[x] Send %r:%r" % (binding_key, message))

connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="topic_logs",
                         type="topic")

# exclusive 排他,唯一的 随机生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange="topic_logs",
                       queue=queue_name,
                       routing_key=binding_key)


def callback(ch, method, properties, body):
    '''
    Handle the recieved data
    :param ch: The address of the channel
    :param method: Information about the connection
    :param properties:
    :param body:
    :return:
    '''
    print("------>", ch, method, properties)
    print("[x] Recieved %r" % body)
    # ack by ourself
    ch.basic_ack(delivery_tag=method.delivery_tag)


# no_ack = True   represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,  # If have recieved message, enable the callback() function to handle the message.
                      queue="hello",
                      no_ack=True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()
  1. 用rabbitmq实现rpc操作
    1) Rpc_rabbit_client.py
# Author : Xuefeng

import pika
import time
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue       # 随机的生成一个接收命令执行结果的队列
        self.channel.basic_consume(self.on_response,    # 只要收到消息就调用
                                   no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange="",
            routing_key="rpc_queue",
            properties=pika.BasicPropreties(
                rely_to=self.callback_queue,
                correlation_id=self.corr_id         # 通过随机生成的ID来验证指令执行结果与指令的匹配性
            ),
            body=str(n)
        )
        while self.response is None:
            self.connection.process_data_events()   # 非阻塞版的start_consume,有没有消息都继续
            print("no message...")
            time.sleep(0.5)
        return int(self.response)

fibonacci_rcp = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rcp.call(30)
print("[x] Rec %r" % response)



2) Rpc_rabbit_server.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
    "localhost"
))
# statement a channel
channel = connection.channel()

channel.queue_declare(queue="rpc_queue")

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1)+fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)
    print("[.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(
        exchange="",
        routing_key=props.rely_to,
        properties=pika.BasicPropreties(correlation_id=\
                                        props.correlation),
        body = str(body)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")

print("[x] Awaiting RPC requests")
channel.start_consumeing()



channel.exchange_declare(exchange="direct_logs",
                         type="direct")

# exclusive 排他,唯一的 随机生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]

本文地址:https://blog.csdn.net/zuefeng/article/details/107072600