信使:同步和排队消息处理

编辑该页面

警告:你浏览的文档欧宝官网下载appob娱乐下载Symfony 5.2,不再维护。

这个页面的更新版本Symfob娱乐下载ony 6.2(当前的稳定版本)。

信使:同步和排队消息处理

信使与发送消息的能力提供了一个消息总线,然后立即处理它们在您的应用程序或送往传输处理后(如队列)。更深入地了解它,阅读信使组件文档

安装

在应用程序中使用ob娱乐下载Symfony Flex运行这个命令安装信使:

1
美元作曲家需要symfony /信使ob娱乐下载

创建一个消息和处理程序

信使围绕着两个不同的类,您将创建:(1)消息类保存数据和(2)一个处理程序(s)将调用类的消息。处理程序类将读取消息类和执行一些任务。

没有特定的要求消息类,除了它可以序列化:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ SmsNotification.php名称空间应用程序\消息;SmsNotification{私人美元内容;公共函数__construct(字符串美元内容){美元- >内容=美元内容;}公共函数getContent():字符串{返回美元- >内容;}}

消息处理程序是一个PHP调用,推荐的方法是创建一个类,实现创建它MessageHandlerInterface和有一个__invoke ()方法的type-hinted消息类(或消息接口):

1 2 3 4 5 6 7 8 9 10 11 12 13
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\SmsNotification;使用ob娱乐下载\组件\信使\处理程序\MessageHandlerInterface;SmsNotificationHandler实现了MessageHandlerInterface{公共函数__invoke(SmsNotification美元消息){/ /……做一些工作,比如发送SMS消息!}}

多亏了自动配置SmsNotificationtype-hint, ob娱乐下载Symfony知道这个处理程序时应该调用SmsNotification消息被派遣。大多数时候,这是你所需要做的。但是你也可以手动配置消息处理程序。看到所有的配置处理程序,运行:

1
美元php bin /控制台调试:信使

发送的消息

你准备好了!分派消息(调用处理程序),注入messenger.default_bus服务(通过MessageBusInterface),就像在一个控制器:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/ / src /控制器/ DefaultController.php名称空间应用程序\控制器;使用应用程序\消息\SmsNotification;使用ob娱乐下载\\FrameworkBundle\控制器\AbstractController;使用ob娱乐下载\组件\信使\MessageBusInterface;DefaultController扩展AbstractController{公共函数指数(MessageBusInterface美元公共汽车){/ /将导致SmsNotificationHandler被称为美元公共汽车- >调度(SmsNotification (“看!我创建了一个消息!”));/ /或使用快捷键美元- >dispatchMessage (SmsNotification (“看!我创建了一个消息!”));/ /……}}

传输:异步/排队消息

默认情况下,消息就派来处理。如果你想处理异步消息,您可以配置一个交通工具。传输能够发送消息(如排队系统)通过一个工人接收它们。信使支持多种传输

请注意

如果你想使用一个不支持的运输,检查排队的交通支持诸如卡夫卡和谷歌Pub / Sub。

运输登记使用“DSN”。多亏了信使的Flex食谱,你.env文件已经有几个例子。

1 2 3
# MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息# MESSENGER_TRANSPORT_DSN =学说:/ /违约# MESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息

你想要取消哪个运输(或一组.env.local)。看到信使:同步和排队消息处理为更多的细节。

接下来,在配置/包/ messenger.yaml,让我们定义一个运输异步使用这种配置:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”#或扩充配置更多的选择#异步:# dsn:“% env (MESSENGER_TRANSPORT_DSN) %”#选择:[]

路由信息传输

现在您已经传输配置,而不是立即处理一条消息,您可以配置他们发送到运输:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:传输:异步:“% env (MESSENGER_TRANSPORT_DSN) %”路由:#异步是什么名字你给你运输“消息应用\ \ SmsNotification”:异步

由于这个,应用\ \ SmsNotification消息会发送到吗异步运输和处理程序(s)将立即被称为。任何消息不匹配路由仍将立即处理。

您还可以路由类的父类或接口。或发送消息到多个传输:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9
#配置/包/ messenger.yaml框架:信使:路由:#所有消息路由扩展这个示例的基类或者接口“消息应用\ \ AbstractAsyncMessage”:异步“消息应用\ \ AsyncMessageInterface”:异步“我的消息\ \ ToBeSentToTwoSenders”:(异步审计)

请注意

如果你为孩子和父类,配置路由两个规则使用。例如,如果你有一个SmsNotification对象,从通知,路由通知SmsNotification就会被使用。

教义实体信息

如果你需要通过一个教义实体在一个消息,最好是通过实体的主键(或其他相关信息处理程序实际需要,电子邮件等),而不是对象:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/ / src /信息/ NewUserWelcomeEmail.php名称空间应用程序\消息;NewUserWelcomeEmail{私人美元用户标识;公共函数__construct(int美元用户标识){美元- >用户id =美元用户标识;}公共函数getUserId():int{返回美元- >用户标识;}}

然后,在处理程序,您可以查询一个全新的对象:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21日22日23日
/ / src / MessageHandler / NewUserWelcomeEmailHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\NewUserWelcomeEmail;使用应用程序\存储库\UserRepository;使用ob娱乐下载\组件\信使\处理程序\MessageHandlerInterface;NewUserWelcomeEmailHandler实现了MessageHandlerInterface{私人美元userRepository;公共函数__construct(UserRepository美元userRepository){美元- >userRepository =美元userRepository;}公共函数__invoke(NewUserWelcomeEmail美元welcomeEmail){美元用户=美元- >userRepository- >找到(美元welcomeEmail- >getUserId ());/ /……发送一封电子邮件!}}

这保证了实体包含最新数据。

同步处理消息

如果消息没有匹配任何路由规则,它不会立即发送到任何运输和处理。在某些情况下(如当处理程序绑定到不同的传输),更容易或更灵活的处理这明确:通过创建一个同步运输和“发送”消息立即处理:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:#……其他传输同步:“同步:/ /”路由:应用程序消息\ \ SmsNotification:同步

创建自己的运输

您还可以创建自己的运输,如果你需要发送或接收消息不支持的东西。看到如何创建自己的信使运输吗

消费消息(运行工人)

一旦你的消息路由,在大多数情况下,你需要“消耗”。你可以这样做的信使:消费命令:

1 2 3 4
美元php bin /控制台信使:使用异步#使用vv查看细节发生了什么美元php bin /控制台信使:异步vv消费

第一个参数是接收者的名字(或服务id如果你路由到一个自定义的服务)。默认情况下,该命令将一直运行下去:寻找新的消息在你的运输和处理它们。这个命令被称为“工人”。

部署到生产环境

在生产中,有一些重要的事情要考虑:

使用主管来保持你的工人(s)运行
你需要的是一个或多个“工人”。要做到这一点,使用过程控制系统主管
不要让工人一直运行下去
一些服务(如教义的EntityManager随着时间的推移)将会消耗更多的内存。所以,而不是让你的工人永远运行,使用一个标记信使:消费——限制= 10告诉你的员工只有在退出前处理10条消息(主管就会创建一个新的进程)。也有其他的选择——内存限制= 128——期限= 3600
重启工人部署
每次部署,您将需要重新启动你的工作进程,这样他们看到新部署代码。要做到这一点,跑了信使:stop-workers在部署。这将信号每个工人,它应该完成消息目前优雅地处理和关闭。然后,主管将创造新的工作进程。使用的命令应用程序内部缓存,所以确保你喜欢这是配置为使用一个适配器。
使用相同的缓存之间部署
如果您的部署策略涉及到创建新目标目录,你应该设定一个值cache.prefix.seed配置选项以使用相同的缓存名称空间之间部署。否则,cache.app池将使用的价值kernel.project_dir参数名称空间的基础,这将导致不同的名称空间每次新的部署。
< /dl>

优先传输

有时某些类型的消息之前应该有一个更高的优先级和处理。为做到这一点,您可以创建多个不同的信息传输和路线。例如:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21日22日23日24
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:# queue_name是特定于运输原则queue_name:# AMQP发送到一个单独的交流然后队列#交换:#名称:高#队列:# messages_high: ~#或复述,尝试“集团”async_priority_low:dsn:' % env (MESSENGER_TRANSPORT_DSN) %选项:queue_name:路由:“消息应用\ \ SmsNotification”:async_priority_low“消息应用\ \ NewUserWelcomeEmail”:async_priority_high

然后您可以运行每个运输工人或指导一个工人来处理消息优先级顺序:

1
美元php bin /控制台信使:消费async_priority_high async_priority_low

工人总是第一个寻找等待消息async_priority_high。如果没有,然后它将使用消息async_priority_low

主管配置

主管是一个伟大的工具来保证你的工作进程(es)总是运行(即使它关闭由于失败,限制或由于触及消息信使:stop-workers)。例如,您可以将它安装在Ubuntu通过:

1
美元sudo apt-get安装主管

主管通常生活在一个配置文件/etc/supervisor/conf.d目录中。例如,您可以创建一个新的messenger-worker.conf文件以确保有2的实例信使:消费正在运行。

1 2 3 4 5 6 7 8 9
、/ etc /主管/ conf.d / messenger-worker.conf(项目:messenger-consume)命令= php /道路/ /你/ app / bin /控制台信使:使用异步——期限=3600年用户= ubuntunumprocs=2startsecs=0自动启动=真正的autorestart=真正的process_name= % s_ (program_name) % (process_num)02d

改变异步参数使用的名称(或传输)和运输用户Unix用户在您的服务器上。

如果你使用复述,运输,请注意每个工人需要一个唯一的使用者名称,以避免相同的消息由多个工人。实现这一目标的一个方法是设置一个环境变量的主管配置文件,您可以参考messenger.yaml(见上面的复述,部分):

1
环境= MESSENGER_CONSUMER_NAME = % s_ (program_name) % (process_num)02d

接下来,告诉主管阅读你的配置和启动你的工人:

1 2 3 4 5
美元sudo supervisorctl重读美元sudo supervisorctl更新美元sudo supervisorctl开始messenger-consume: *

看到主管医生为更多的细节。

优雅的关闭

如果你安装PCNTLPHP扩展项目中,工人们将会处理SIGTERMPOSIX信号在退出前完成处理当前的消息。

在某些情况下,SIGTERM信号发送由主管本身(如停止一个码头工人容器有主管作为它的入口点)。在这些情况下,您需要添加一个stopwaitsecs关键程序配置(价值所需的宽限期以秒为单位),以执行一个优雅的关闭:

1 2
(项目:x)stopwaitsecs=20.

重试和失败

如果抛出一个异常消费消息的传输时它会自动被发送到运输再尝试。默认情况下,消息将被丢弃或前3次重试发送到运输失败。每个重试也将被推迟,如果失败是由于一个暂时的问题。所有的这些都是为每个运输可配置:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#配置/包/ messenger.yaml框架:信使:传输:async_priority_high:dsn:' % env (MESSENGER_TRANSPORT_DSN) %#默认配置retry_strategy:max_retries:3#毫秒延迟延迟:1000年#每次重试前使延迟高#例如延迟1秒,2秒,4秒乘数:2max_delay:0#覆盖所有这些服务#组件实现Symfony ob娱乐下载\ \信使\ \ RetryStrategyInterface重试#服务:零

提示

ob娱乐下载Symfony触发WorkerMessageRetriedEvent消息时你可以重试运行你自己的逻辑。

5.2

WorkerMessageRetriedEvent类是在Symfony 5.2中引入的。ob娱乐下载

避免错误

有时候处理一条消息可能会失败的方式知道是永久性的,不应重试。如果你扔UnrecoverableMessageHandlingException,消息不会重试。

强制重试

5.1

RecoverableMessageHandlingException是在Symfony 5.1中引入的。ob娱乐下载

有时候处理一条消息,你必须失败知道是暂时的,必须重试。如果你扔RecoverableMessageHandlingException,消息总是会重试。

储蓄和重试失败的消息

如果一条消息失败多次重试(max_retries),然后将被丢弃。为了避免这种情况发生,您可以配置一个failure_transport:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:#重试后,消息将被发送到“失败”运输failure_transport:失败的传输:#……其他传输失败:“教义:/ /违约? queue_name =失败'

在这个例子中,如果消息处理失败(默认3倍max_retries),它将被发送到失败的交通工具。当你可以使用信使:消费失败消费这像一个正常的运输,通常需要手动查看消息的传输和选择重试失败:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#看到所有消息在传输失败美元php bin /控制台信使:失败:节目#看到特定的失败的详细信息美元php bin /控制台信使:失败:显示20 vv#查看和重试的消息一个接一个美元php bin /控制台信使:失败:重试vv#重试特定消息美元php bin /控制台信使:失败:重试20 30 -力量#删除消息然后绕过它美元php bin /控制台信使:失败:删除20#删除消息然后绕过他们,显示每个消息然后再删除它美元php bin /控制台信使:失败:删除20 30——显示消息

5.1

——显示消息选项是在Symfony 5.1中引入的。ob娱乐下载

如果消息再次失败,它将被发送回失败运输由于正常重试规则。一旦最大重试,永久的消息将被丢弃。

传输配置

信使支持许多不同的传输类型,每个都有自己的选择。选项可以被传送到传输通过DSN字符串或配置。

1 2
# .envMESSENGER_TRANSPORT_DSN = amqp: / / localhost / % 2 f /消息? auto_setup =
  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8
#配置/包/ messenger.yaml框架:信使:传输:my_transport:dsn:“% env (MESSENGER_TRANSPORT_DSN) %”选项:auto_setup:

下定义的选项选项优先于中定义的DSN。

AMQP运输

AMQP运输使用AMQP PHP扩展将消息发送到队列RabbitMQ之类。

5.1

从5.1 Symfony开始ob娱乐下载,AMQP运输已经搬到一个单独的包。安装运行:

1
美元作曲家需要symfony / amob娱乐下载qp-messenger

AMQP运输DSN可能看起来像这样:

1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost: 5672 / % 2 f /消息#或使用amqp协议MESSENGER_TRANSPORT_DSN = amqp: / /客人:guest@localhost / % 2 f /消息

5.2

amqp协议支持是在Symfony 5.2中引入的。ob娱乐下载

如果你想使用TLS / SSL加密AMQP,您还必须提供一个CA证书。定义的证书路径amqp.cacertPHP。在i setting (e.g.amqp.cacert=/etc/ssl/certs< /code>)或cacert参数的DSN(如amqp: / / localhost ? cacert = / etc / ssl /证书/)。

默认端口使用TLS / SSL加密AMQP是5671,但是你可以覆盖它港口参数的DSN(如。amqp: / / localhost ? cacert = / etc / ssl /证书/端口= 12345)。

请注意

默认情况下,运输会自动创建任何交往,队列和绑定键是必要的。可以禁用,但有些功能不能正常工作(如延迟队列)。

运输有其他一些选项,包括配置交换方式,队列绑定键和更多。看文档欧宝官网下载app连接

运输有很多选项:

选项 描述 默认的
auto_setup 是否应该发送时自动创建表/。 真正的
cacert CA证书路径PEM格式的文件。
cert 路径PEM格式的客户端证书。
channel_max 指定频道数最高的服务器许可证。0表示标准扩展限制
confirm_timeout 超时秒确认;如果没有指定,传输不会等待消息确认。注:0或更大的秒。可能是部分。
connect_timeout 连接超时。注:0或更大的秒。可能是部分。
frame_max 最大的帧大小的服务器提出了连接,包括帧头和end-byte。0意味着标准扩展限制(取决于librabbimq默认的帧大小限制)
心跳 秒的延迟,心跳连接服务器。0表示服务器不希望一个心跳。注意,librabbitmq心跳有限支持,这意味着心跳检查只在阻塞调用。
主机 AMQP服务的主机名
关键 客户关键路径在PEM格式。
密码 密码使用连接到AMQP服务
持续的 “假”
港口 AMQP港服务
prefetch_count
read_timeout 收入活动的超时。注:0或更大的秒。可能是部分。
重试
sasl_method
用户 用户名使用连接AMQP服务
验证 启用或禁用同行验证。如果启用同行验证服务器证书的普通名称必须匹配服务器名称。同行验证默认情况下是启用的。
vhost 虚拟主机使用与AMQP服务
write_timeout 超时的结果的活动。注:0或更大的秒。可能是部分。
延迟(queue_name_pattern) 模式使用创建队列 delay_ % exchange_name % _ % routing_key % _ %延迟%
延迟(exchange_name) 交换的名称用于延迟/重试的消息 延迟
队列[名称](参数) 额外的参数
队列[名字][binding_arguments] 参数而使用绑定的队列。
队列[名字][binding_keys] 绑定键(如果有的话)绑定到此队列
队列[名字][标记] 队列的旗帜 AMQP_DURABLE
交换(参数) 为交换(如额外的参数。alternate-exchange)
交换(default_publish_routing_key) 路由在出版时使用的关键,如果没有指定在消息
交换(旗帜) 交换的旗帜 AMQP_DURABLE
交换(名字) 交换的名称
交换(类型) 类型的交换 扇出

5.2

confirm_timeout选项是在Symfony 5.2中引入的。ob娱乐下载

你也可以通过添加配置AMQP-specific设置你的消息AmqpStamp你的信封:

1 2 3 4 5 6 7
使用ob娱乐下载\组件\信使\\Amqp\运输\AmqpStamp;/ /……美元属性= [];美元公共汽车- >调度(SmsNotification (), (AmqpStamp (“custom-routing-key”AMQP_NOPARAM,美元属性)));

谨慎

消费者不要出现在一个管理小组,这样传输不依赖\ AmqpQueue:消费()这是阻碍。拥有一个阻塞接收机使——时限/内存限制选项的信使:消费命令以及信使:stop-workers命令效率低下,如它们都依靠这一事实接收者立即返回不管它是否发现一个消息。消费工人负责迭代,直到它收到一个消息处理和/或直到到达的停止条件之一。因此,工人不能达成停止逻辑陷入阻塞调用。

教义运输

运输原则可用于信息存储在一个数据库表中。

5.1

从5.1 Symfony开始ob娱乐下载,教义运输已经搬到一个单独的包。安装运行:

1
美元作曲家需要symfony / doob娱乐下载ctrine-messenger

教义运输DSN可能看起来像这样:

1 2
# .envMESSENGER_TRANSPORT_DSN =学说:/ /违约

的格式是原则:/ / < connection_name >,如果你有多个连接和想使用另一个比“默认”。运输将自动创建一个表命名messenger_messages

5.1

自动生成的迁移的能力messenger_messages表是在Symfony DoctrineBundlob娱乐下载e 5.1和2.1中引入的。

或者自己创建表,设置auto_setup选项生成一个迁移

谨慎

的datetime属性信息存储在数据库中使用当前系统的时区。这可能会导致问题如果多个机器有不同的时区配置使用相同的存储。

运输有很多选项:

选项 描述 默认的
table_name 表的名称 messenger_messages
queue_name 队列的名称(列在表中,使用多个传输一个表) 默认的
redeliver_timeout 请在重试前超时消息队列中,但在“处理”状态(如果一个工人停止出于某种原因,这将发生,最终你应该重试的消息)——在几秒钟内。 3600年
auto_setup 是否应该发送时自动创建表/。 真正的

5.1

利用PostgreSQL的听的能力/通知是在Symfony 5.1中引入的。ob娱乐下载

当使用PostgreSQL,你可以访问以下选项来利用听/通知特性。这允许一个更好的性能比默认的投票行为主义方法运输因为PostgreSQL将直接通知工人当一个新的消息插入到表中。

选项 描述 默认的
use_notify 是否使用听/通知。 真正的
check_delayed_interval 间隔检查延迟消息,以毫秒为单位。设置为0禁用检查。 1000年
get_notify_timeout 等待响应的时间当调用PDO: pgsqlGetNotify”以毫秒为单位。 0

Beanstalkd运输

5.2

Beanstalkd传输是在Symfony 5.2中引入的。ob娱乐下载

Beanstalkd运输直接向Beanstalkd工作队列发送消息。安装运行:

1
美元作曲家需要symfony / beob娱乐下载anstalkd-messenger

Beanstalkd运输DSN可能看起来像这样:

1 2 3 4 5
# .envMESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost: 11300 ? tube_name = foo&timeout = 4竞技场队伍= 120#如果没有港口,它将默认为11300MESSENGER_TRANSPORT_DSN = beanstalkd: / / localhost

运输有很多选项:

选项 描述 默认的
tube_name 队列的名称 默认的
超时 预订信息超时,在几秒钟内。 0(将导致服务器立即返回响应或将抛出TransportException)
竞技场队伍 留言时间运行之前放回就绪队列,在几秒钟内。 90年

复述,运输

复述,运输使用队列消息。这种运输要求复述,PHP扩展(> = 4.3)和复述,运行服务器(^ 5.0)。

5.1

从5.1 Symfony开始ob娱乐下载,复述,运输已经搬到一个单独的包。安装运行:

1
美元作曲家需要symfony / reob娱乐下载dis-messenger

复述,运输DSN可能看起来像这样:

1 2 3 4 5 6
# .envMESSENGER_TRANSPORT_DSN =复述:/ / localhost: 6379 /消息#完整DSN的例子MESSENGER_TRANSPORT_DSN =复述:/ / password@localhost: 6379 /信息/ symob娱乐下载fony /消费者? auto_setup =真正的序列化器&dbindex = 0 = 1 &stream_max_entries = 0# Unix Socket的例子MESSENGER_TRANSPORT_DSN =复述:/ / / var /运行/ redis.sock

5.1

Unix socket DSN是在Symfony 5.1中引入的。ob娱乐下载

许多选项可以通过DSN或通过配置选项关键在运输messenger.yaml:

选项 描述 默认的
复述,流的名字 消息
集团 复述,消费者团体名称 ob娱乐下载
消费者 消费者的名字用于复述 消费者
auto_setup 自动创建复述组吗? 真正的
身份验证 复述,密码
delete_after_ack 如果真正的,消息后自动删除处理
delete_after_reject 如果真正的如果他们拒绝了,消息自动删除 真正的
懒惰的 连接只有当真正需要的连接
序列化器 如何序列化的最后有效载荷复述(复述:OPT_SERIALIZER选项) 复述:SERIALIZER_PHP
stream_max_entries 流的最大条目数将会被削减。将其设置为一个足够大的数量,以避免失去等待消息 0(这意味着“没有削减”)
tls 支持TLS连接的支持
redeliver_timeout 再审超时之前等待消息隶属一个废弃的使用者(如果一个工人死了因为某些原因,这将发生,最终你应该重试的消息)——在几秒钟内。 3600年
claim_interval 间隔的等待/废弃的消息应该检查要求——以毫秒为单位 60000年(1分钟)

谨慎

不应超过一个信使:消费命令运行相同的组合,集团消费者,或消息最终被不止一次处理。如果你运行多个队列的工人,消费者可以设置一个环境变量(如% env (MESSENGER_CONSUMER_NAME) %)由主管(在下面的例子)或任何其他服务用于管理工作进程。在一个容器环境中,主机名可以作为消费者的名字,由于只有一个工人/集装箱/主机。如果使用Kubernetes安排集装箱,可以考虑使用StatefulSet有稳定的名字。

提示

delete_after_ack真正的(如果你使用一个组)或定义stream_max_entries(如果你可以估计多少max条目是可以接受的在你的情况下),以避免内存泄漏。否则,将永远在复述,所有消息。

5.1

delete_after_ack,redeliver_timeoutclaim_interval选择在Symfony 5.1中引入的。ob娱乐下载

5.2

delete_after_reject懒惰的选择在Symfony 5.2中引入的。ob娱乐下载

在内存中运输

内存中交通并不实际交付消息。相反,它认为在记忆的请求,并将其用于测试。例如,如果你有一个async_priority_normal运输,你可以覆盖它测试环境使用这个运输:

  • YAML
  • XML
  • PHP
1 2 3 4 5
#配置/包/测试/ messenger.yaml框架:信使:传输:async_priority_normal:的内存:/ /

然后,在测试时,将消息被送到真正的运输。更好的是,在一个测试中,您可以检查一个消息发送时的要求:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/ /测试/控制器/ DefaultControllerTest.php名称空间应用程序\测试\控制器;使用ob娱乐下载\\FrameworkBundle\测试\WebTestCase;使用ob娱乐下载\组件\信使\运输\InMemoryTransport;DefaultControllerTest扩展WebTestCase{公共函数testSomething(){美元客户端=静态::createClient ();/ /……美元- >assertSame (200年,美元客户端- >getResponse ()- >getStatusCode ());/ *@varInMemoryTransport美元运输* /美元运输=自我::$容器- >get (“messenger.transport.async_priority_normal”);美元- >assertCount (1,美元运输- >getSent ());}}

请注意

所有内存中传输每个测试后将自动重置测试类扩展KernelTestCaseWebTestCase

Amazon SQS

5.1

Amazon SQS运输是在Symfony 5.1中引入的。ob娱乐下载

Amazon SQS运输是完美的应用运行在AWS上。安装运行:

1
美元作曲家需要symfony / amob娱乐下载azon-sqs-messenger

SQS运输DSN可能看起来像这样:

1 2 3
# .envMESSENGER_TRANSPORT_DSN = https://sqs.eu -西方- 3. amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a MESSENGER_TRANSPORT_DSN=sqs://localhost:9494/messages?sslmode=禁用

请注意

需要的运输将自动创建队列。这可以设置禁用auto_setup选项

提示

在发送或接收消息,Symfony需要队列名称转换为一个AWS队列通过调用URLob娱乐下载GetQueueUrl在AWS API。这种额外的API调用可以避免通过提供一个DSN队列的URL。

5.2

DSN的特性来提供队列URL是在Symfony 5.2中引入的。ob娱乐下载

运输有很多选项:

选项 描述 默认的
access_key AWS访问密钥 必须urlencoded
账户 标识符的AWS帐户 的所有者凭证
auto_setup 是否应该自动创建队列中发送/。 真正的
buffer_size 预取的消息数量 9
端点 绝对URL到SQS服务 https://sqs.eu -西方- 1. amazonaws.com
poll_timeout 等待新消息持续时间以秒为单位 0.1
queue_name 队列的名称 消息
地区 AWS地区的名称 一来就
secret_key AWS密钥 必须urlencoded
visibility_timeout 秒的消息是不可见的(可见性超时值) 队列的配置
wait_time 长轮询时间以秒为单位 20.

请注意

wait_time参数定义的最大持续时间Amazon SQS应该等到消息在队列之前发送一个响应。它帮助减少使用Amazon SQS的成本通过消除空响应的数量。

poll_timeout参数定义了接收方之前应该等待时间返回null。它能避免阻止其他接收器被称为。

请注意

如果队列的名字是后缀.fifo,AWS将创建一个先进先出队列。使用邮票AmazonSqsFifoStamp定义消息组ID消息重复数据删除ID

FIFO队列不支持设置每条消息延迟,的值延迟:0重试策略中需要设置。

序列化消息

当消息被发送到(收到)传输,它们序列化使用PHP的家乡serialize ()&unserialize ()功能。你可以改变这一全球(或每个运输)实现的服务SerializerInterface:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:序列化器:default_serializer:messenger.transport.ob娱乐下载symfony_serializerob娱乐下载symfony_serializer:格式:json背景:{}传输:async_priority_normal:dsn:#……序列化器:messenger.transport.ob娱乐下载symfony_serializer

messenger.transport.ob娱乐下载symfony_serializer是一个内置的服务,使用了吗序列化器组件在一些方面和可配置。如果你选择使用Symfony序列化器,您可ob娱乐下载以控制上下文在个案基础上通过SerializerStamp(见信封和邮票)。

提示

当发送/接收消息/从另一个应用程序中,您可能需要更多的控制序列化过程。使用自定义序列化器提供控制。看到ob娱乐下载SymfonyCasts”消息序列化器教程获取详细信息。

自定义处理程序

手动配置处理程序

ob娱乐下载Symfony通常会自动发现和注册您的处理程序。但是,你还可以手动配置一个处理程序,并将其传递给一些额外的配置,通过标记处理程序服务messenger.message_handler

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11
#配置/ services.yaml服务:应用MessageHandler \ \ SmsNotificationHandler:标签:(messenger.message_handler)#或配置选项标签:- - - - - -名称:messenger.message_handler#只需要通过type-hint如果不能猜处理:应用\ \ SmsNotification消息

可能的选项来配置标签:

  • 公共汽车
  • from_transport
  • 处理
  • 方法
  • 优先级

处理程序用户&选项

一个处理程序类可以处理多个消息或配置本身通过实现MessageSubscriberInterface:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17日18 19 20 21日22日23日24日25日26日27 28 29 30 31 32
/ / src / MessageHandler / SmsNotificationHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\OtherSmsNotification;使用应用程序\消息\SmsNotification;使用ob娱乐下载\组件\信使\处理程序\MessageSubscriberInterface;SmsNotificationHandler实现了MessageSubscriberInterface{公共函数__invoke(SmsNotification美元消息){/ /……}公共函数handleOtherSmsNotification(OtherSmsNotification美元消息){/ /……}公共静态函数getHandledMessages():iterable{/ / __invoke处理此消息收益率SmsNotification::类;/ /还handleOtherSmsNotification处理此消息收益率OtherSmsNotification::类= > [“方法”= >“handleOtherSmsNotification”,/ /“优先级”= > 0,/ /“总线”= >“messenger.bus.default”,];}}

处理程序绑定到不同的传输

每个消息可以有多个处理程序,当消息被消耗所有被称为它的处理程序。但是你也可以只配置一个处理程序被称为当它收到一个具体的交通工具。这可以让你有一个消息,每个处理程序调用不同的“工人”的消费不同的交通工具。

假设你有一个UploadedImage消息有两个处理程序:

  • ThumbnailUploadedImageHandler:你想这是由传输image_transport
  • NotifyAboutNewUploadedImageHandler:你想这是由传输async_priority_normal

为此,添加from_transport每个处理程序的选项。例如:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/ / src / MessageHandler / ThumbnailUploadedImageHandler.php名称空间应用程序\MessageHandler;使用应用程序\消息\UploadedImage;使用ob娱乐下载\组件\信使\处理程序\MessageSubscriberInterface;ThumbnailUploadedImageHandler实现了MessageSubscriberInterface{公共函数__invoke(UploadedImage美元uploadedImage){/ /做一些缩略图}公共静态函数getHandledMessages():iterable{收益率UploadedImage::类= > [“from_transport”= >“image_transport”,);}}

类似的:

1 2 3 4 5 6 7 8 9 10 11 12 13 14
/ / src / MessageHandler / NotifyAboutNewUploadedImageHandler.php/ /……NotifyAboutNewUploadedImageHandler实现了MessageSubscriberInterface{/ /……公共静态函数getHandledMessages():iterable{收益率UploadedImage::类= > [“from_transport”= >“async_priority_normal”,);}}

然后,确保“路线”你的信息这两个传输:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10
#配置/包/ messenger.yaml框架:信使:传输:async_priority_normal:#……image_transport:#……路由:#……“消息应用\ \ UploadedImage”:[image_transport,async_priority_normal]

就是这样!您现在可以使用每个运输:

1 2 3 4
#只会叫ThumbnailUploadedImageHandler在处理消息美元php bin /控制台信使:消费image_transport vv美元php bin /控制台信使:消费async_priority_normal vv

谨慎

如果一个处理程序from_transport配置,它将被执行每一个收到消息的传输。

延长信使

信封和邮票

消息可以是任何PHP对象。有时,您可能需要配置一些额外的消息——就像它应该的方式处理内部AMQP或添加消息处理的延迟时间。你可以通过添加一个“邮票”,您的消息:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
使用ob娱乐下载\组件\信使\信封;使用ob娱乐下载\组件\信使\MessageBusInterface;使用ob娱乐下载\组件\信使\邮票\DelayStamp;公共函数指数(MessageBusInterface美元公共汽车){美元公共汽车- >调度(SmsNotification (“……”),(/ /等待5秒前处理DelayStamp (5000年)));/ /或显式地创建一个信封美元公共汽车- >调度(信封(SmsNotification (“……”),(DelayStamp (5000年)));/ /……}

在内部,每条消息都包裹在一个信封,消息和邮票。您可以手动创建此或允许消息总线。不同的用途有不同的邮票,他们内部使用跟踪信息的消息,比如消息总线处理它或如果它是在失败后重试。

中间件

当你发送一个消息到消息总线取决于其收藏的中间件和他们的订单。默认情况下,每个总线的中间件配置是这样的:

  1. add_bus_name_stamp_middleware——添加一个邮票记录哪个总线这个消息被派到;
  2. dispatch_after_current_bus——看事务性消息:处理后新消息处理完成;
  3. failed_message_processing_middleware——过程的消息在通过重试传输失败让他们适当的功能,如果他们被收到原始运输;
  4. 自己的组中间件;
  5. 的send_message——如果运输路由配置,这将消息发送给运输和停止中间件链;
  6. handle_message——调用消息处理程序(s)为给定的消息。

请注意

这些中间件名称实际上是快捷方式名称。真正的服务id是前缀messenger.middleware。(如。messenger.middleware.handle_message)。

执行中间件消息时派遣再次通过工人当接收到消息(消息被发送到一个传输异步处理)。牢记这一点如果你创建自己的中间件。

您可以添加自己的中间件这个列表,或完全禁用默认的中间件和只有包括你自己的:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11 12 13
#配置/包/ messenger.yaml框架:信使:公共汽车:messenger.bus.default:#禁用默认的中间件default_middleware:#和/或添加您自己的中间件:#服务id实现Symfony \中间件组件\信使\ \ Mob娱乐下载iddlewareInterface- - - - - -“应用程序、中间件、MyMiddleware”- - - - - -“应用程序、中间件、AnotherMiddleware”

请注意

如果一个中间件服务是抽象的,将会创建一个不同的实例服务总线。

中间件为原则

1.11

以下原则DoctrineBundle 1.11中介绍了中间件。

如果你在你的应用中使用原则,存在许多可选的中间件,您可能想要使用:

  • YAML
  • XML
  • PHP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21日22日23日24
#配置/包/ messenger.yaml框架:信使:公共汽车:command_bus:中间件:#每次处理一条消息,教义连接#“灵感”和重新连接如果关闭。有用的#如果你的工人很长一段时间和数据库运行#连接有时丢失- - - - - -doctrine_ping_connection#处理后,连接关闭,#可以释放数据库连接在一个工人,#而不是让他们永远开放- - - - - -doctrine_close_connection#封装在单个教义事务处理程序#处理程序不需要调用flush()和一个错误#在任何处理程序将导致一个回滚- - - - - -doctrine_transaction#或通过一个不同的实体管理器#——doctrine_transaction(“自定义”):

多个公共汽车,命令与事件的公交车

信使给你一个默认消息总线服务。但是,您可以配置多达你想要的,创建“命令”、“查询”或“事件”的公共汽车和控制他们的中间件。看到多个公共汽车

这项工作,包括代码示例,许可下Creative Commons冲锋队3.0许可证。