Scrapy+rabbitMQ消息队列

Scrapy+rabbitMQ消息队列,第1张

使用RabbitMQ作为任务队列的轮子很少,基本都已停止更新(17年),这里推荐一个国人的修改版,最近才开始发布,但是经本人实测已经可以正常使用

项目地址(GitHub) scrapy-rabbitmq-scheduler

因是国人写的,所以README文件写的通俗易懂

pip install scrapy-rabbitmq-scheduler

在settingspy最后加入

<pre style="margin: 10px 0px; padding: 0px; white-space: pre !important; overflow-wrap: break-word; position: relative !important;">

Copy

`# 指定项目的调度器

SCHEDULER = "scrapy_rabbitmq_schedulerschedulerSaaS"

RABBITMQ_CONNECTION_PARAMETERS = ' amqp://admin:pwd@xxxx:5672/'

DOWNLOADER_MIDDLEWARES = {

'scrapy_rabbitmq_schedulermiddlewareRabbitMQMiddleware': 999

}

ITEM_PIPELINES = {

'scrapy_rabbitmq_schedulerpipelinesRabbitmqPipeline': 300,

}` </pre>

这里与Scrapy原来的方式稍有不同

构造发送请求/接收RabbitMQ数据的方法名为 _make_request

我们必须重构该方法才可正常运行使用爬虫

该方法起到每次从队列中拿取数据后的解析数据并进行请求的作用

通常我们存放在队列中的一个数据为一个JSON/msgpack格式,里面包含了要请求的URl/该条数据所属ID等多个信息

必须要注意的是如果遇到跳转或你在setting中设置了返回状态码为xxx重新爬取,那么Scrapy会将需要重新爬取的url存放至你的队列中,此时队列中有两种格式的数据

当时被这个问题卡了一小时,网上是没有解决方法的,他生成的数据也是不能使用常规方法进行解码的,这常常令人一头雾水

该组件默认RabbitMQ持久化为True,因此请注意建立通道的时候将设置对齐否则会出现因为设置错误导致无法连接的问题

转自: >

生产者先将消息投递一个叫队列的容器中,然后再从这个容器中取出消息,最后再转发给消费者。

消息队列是 Microsoft 的消息处理技术,它在任何安装 Microsoft Windows 的计算机组合中,为任何应用程序提供消息处理和消息队列功能,无论这些计算机是否在同一个网络上或者是否同时联机。

消息队列网络是能够相互间来回发送消息的任何一组计算机。网络中的不同计算机在确保消息顺利处理的过程中扮演不同的角色。它们中有些提供路由信息以确定如何发送消息,有些保存整个网络的重要信息,而有些只是发送和接收消息。

消息队列的类型介绍:

消息队列目前主要有两种类型:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息。包括消息队列键值、用户ID、组ID、消息队列中消息数目等等。

消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核持续的。

在使用SpringBoot中,笔者使用到了RabbitMQ,其中踩了不少地雷,经过些许的刻版终于把它调通了,

笔者主要说的是从生产者生产数据并发送给消费者到后者接收并处理数据这么一个全过程,我这里的数据指的是实体对象生产者和消费者是处在两个不同的项目中的

首先说明下整个过程

在 Spring-AMQP 中比较重要的类就是 Message ,因为要发送的消息必须要构造成一个 Message 对象来进行传输。Message 对象包括两部分 Body 和 Properties

消息生产者构造好 Message 之后,就会将 Message 发送到指定的 Exchange (交换机),再根据 Exchange 的类型及 routing-key 将消息路由到相应的 queue 中,最后被监听该 queue 的消费者消费

RabbitMQ报数据帧错误通常是由于消息传输过程中发生了数据损坏或丢失导致的。数据帧是RabbitMQ中传输消息的基本单位,它包含了消息的头部和内容,如果数据帧损坏或丢失,就会导致消息传输失败。

这种错误可能是由于以下原因导致的:

1 网络问题:网络不稳定、延迟高、带宽不足等都可能导致数据帧的丢失或损坏。

2 硬件问题:硬件故障也可能导致数据帧损坏或丢失,如网卡故障、服务器故障等。

3 应用程序问题:如果应用程序没有正确地处理消息,也可能导致数据帧损坏或丢失。

针对这种错误,可以采取以下措施:

1 检查网络环境,保证网络稳定,带宽足够,并且网络延迟不过高。

2 检查硬件设备,确保网卡、服务器等硬件设备没有故障。

3 检查应用程序代码,确保代码编写正确,没有处理消息时发生错误或异常。

4 采用一些技术手段,如错误重试、消息重发等来解决数据帧错误问题。例如,在RabbitMQ中,可以使用消息确认机制和重试机制来确保消息传输的可靠性。

一般情况下,是配置的原因,应该是你配置了自动确认,又写了代码进行手动确认。当你配置了自动确认时,调用basicConsume方法时rabbitmq服务端返回的确认码不是唯一的,会重复,所以又进行了手动确认,就容易导致消息丢失。

如果你用的是java+spring,配置手动确认如下:

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">

<rabbit:listener ref="ackMessageListener" queue-names="firstqueue"/>

</rabbit:listener-container>

其中,ackMessageListener是实现了ChannelAwareMessageListener接口的实现类实例。

前置文章:

RabbitMQ-消息可靠性&延迟消息

一、MQ常见问题

二、消息堆积-惰性队列

1、消息堆积问题

2、解决消息堆积方法

3、惰性队列

三、高可用-MQ集群

1、集群分类

2、普通集群

3、镜像集群

4、冲裁队列

确保发送的消息至少被消费一次;

实现消息的延迟投递;

处理消息无法及时消费的问题;

避免单点MQ故障导致整体不可用;

1、消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会 成为死信 ,会 被丢弃 ,这就是消息堆积问题。

2、解决消息堆积方法

3、惰性队列

从RabbitMQ的360版本开始,就增加了Lazy Queues的概念,也就是惰性队列。

Ⅰ 接收到消息后直接 存入磁盘 而非内存;

Ⅱ 消费者要消费消息时才会 从磁盘中读取 并加载到内存;

Ⅲ 支持 数百万条 的消息存储。

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

Ⅰ 可以通过命令行将一个运行中的队列修改为惰性队列,如下:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

Ⅱ 用SpringAMQP声明惰性队列,如下:

@Bean注解的形式,如下:

@RabbitListener注解的形式,如下:

Ⅰ 优点

基于磁盘存储,消息上限高;

没有间歇性的page-out,性能比较稳定;

Ⅱ 缺点

基于磁盘存储,消息时效性会降低;

性能受限于磁盘的IO。

官方文档: Clustering Guide — RabbitMQ 。

1、集群分类

是一种 分布式集群 ,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

是一种 主从集群 ,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

注意:镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。

在RabbitMQ的38版本以后推出的,底层采用Raft协议确保主从的数据一致性。

2、普通集群

Ⅰ 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息;

Ⅱ 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回;

Ⅲ 队列所在节点宕机,队列中的消息就会丢失。

Ⅰ 获取Cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为 Erlang cookie 。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有 相同的 cookie 。实例之间也需要它来相互通信。

首先获取Cookie,指令如下:

其中 YYNCLCJEKVNUFYQFPNZH 这一串就是生成的Cookie,如下:

Ⅱ 删除现有mq容器

Ⅲ 准备rabbitmqconf配置文件

此处选择在tmp目录下创建,如下:

配置文件内容如下:

Ⅳ 准备Cookie记录文件

Ⅴ 准备集群目录

Ⅵ 拷贝配置文件、Cookie文件到目录

echo :用于字符串的输出,输出字符串到 | 后面;

-t :表示先打印命令,再执行;

-n 1 :表示执行命令时用的args个数为1个。

Ⅶ 创建集群网络

Ⅷ 运行容器

集群中的节点标示默认都是: rabbit@[hostname] 。

Ⅰ 往rabbit@mq1添加队列

在mq2、mq3中也可以查看到该队列,因为元信息共享。

Ⅱ 往simplequeue添加数据

在mq2、mq3中可以查看到消息,如下:

Ⅲ 让mq1宕机

mq2、mq3无法读取到数据,因为只共享元信息,没有同步备份数据,如下:

3、镜像集群

镜像集群官方文档: Classic Queue Mirroring — RabbitMQ 。

普通集群不具备高可用的特性,使用镜像集群可以解决这个问题。

Ⅰ 镜像队列结构是一主多从(从就是镜像);

Ⅱ 所有 *** 作都是主节点完成,然后同步给镜像节点;

Ⅲ 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失);

Ⅳ 不具备负载均衡功能,因为所有 *** 作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)。

Ⅰ 设置exactly模式

Ⅱ 创建队列

Ⅲ 发送消息

Ⅳ 让mq1宕机

注意:mq1恢复后,该队列的主节点仍然为mq3。

4、冲裁队列

Ⅰ 与镜像队列一样,都是主从模式,支持主从数据同步;

Ⅱ 使用非常简单,没有复杂的配置;

Ⅲ 主从同步基于Raft协议,强一致。

注意:仲裁队列是38版本以后才有的新功能。

+2表示有2个镜像节点,仲裁队列默认镜像数为5,集群节点不足5则都是镜像。

@Bean注解配置

修改配置文件

以上即为RabbitMQ-消息堆积&高可用的全部内容,感谢阅读。

RabbitMQ,遵循AMQP协议,由内在高并发的erlang语言开发,用在实时的对可靠性要求比较高的消息传递上。

kafka是Linkedin于2010年12月份开源的消息发布订阅系统,它主要用于处理活跃的流式数据,大数据量的数据处理上。

1)在架构模型方面,

RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。

kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。

2)在吞吐量,

kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量 *** 作,具有O(1)的复杂度,消息处理的效率很高。

rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的 *** 作;基于存储的可靠性的要求存储可以采用内存或者硬盘。

3)在可用性方面,

rabbitMQ支持miror的queue,主queue失效,miror queue接管。

kafka的broker支持主备模式。

4)在集群负载均衡方面,

kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。

rabbitMQ的负载均衡需要单独的loadbalancer进行支持。

所以关于这两个选择,我们还是了解了这4个大致的区别。关于高吞吐,以及我们队日志的特定场景分析,任然选择了,kafka。当然设计理念不一样,rabbitMQ用于可靠的消息传递,智齿事物,不支持批量的 *** 作,可用性差不多,只是实现不一样。在集群方面,kafka胜一筹,通过topic注册zookeeper,调用机制,实现语义指定分片,然而rabbitMQ的负载需要单独loadbalancer支持

————————————————

版权声明:本文为CSDN博主「大壮vip」的原创文章,遵循 CC 40 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:>

以上就是关于Scrapy+rabbitMQ消息队列全部的内容,包括:Scrapy+rabbitMQ消息队列、即时通信之 - RabbitMQ(基于socket)基础概念详细介绍、消息队列(mq)是什么等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/9318786.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-27
下一篇2023-04-27

发表评论

登录后才能评论

评论列表(0条)

    保存