如何构建应用程序引擎以及使用 Windows Azure Storage 实现异步消息传送
现在,我们需要一个简单的包装来与我们的队列交互。从本质上说,我们需要能够将消息插入队列,获取任何挂起的消息并清除该队列(请参见图 3)。本文引用地址:http://www.eepw.com.cn/article/149634.htm
图 3 用于与队列交互的包装
namespace HollywoodHackers.Storage.Queue
{
public class StdQueueT> :
StorageBase where T : QueueMessageBase, new()
{
protected CloudQueue queue;
protected CloudQueueClient client;
public StdQueue(string queueName)
{
client = new CloudQueueClient
(StorageBase.QueueBaseUri, StorageBase.Credentials);
queue = client.GetQueueReference(queueName);
queue.CreateIfNotExist();
}
public void AddMessage(T message)
{
CloudQueueMessage msg =
new CloudQueueMessage(message.ToBinary());
queue.AddMessage(msg);
}
public void DeleteMessage(CloudQueueMessage msg)
{
queue.DeleteMessage(msg);
}
public CloudQueueMessage GetMessage()
{
return queue.GetMessage(TimeSpan.FromSeconds(60));
}
}
public class ToastQueue : StdQueueToastQueueMessage>
{
public ToastQueue()
: base(toasts)
{
}
}
}
我们还需要为表存储设置一个包装,以便在用户登录到站点之前可以存储用户通知。可以使用 PartitionKey(行集合的标识符)和 RowKey(可唯一标识特定分区中的每个单独行)组织表数据。选择 PartitionKey 和 RowKey 使用的数据是在使用表存储时所做的最重要的设计决策之一。
这些特点允许跨存储节点进行负载平衡,并在应用程序中提供内置的可伸缩性选项。不考虑数据的数据中心关联性,使用同一分区键的表存储中的行将保留在相同的物理数据存储中。因为针对每个用户存储对应的消息,所以分区键将是 UserName,而 RowKey 则成为标识每行的 GUID(请参见图 4)。
图 4 表存储的包装
namespace HollywoodHackers.Storage.Repositories
{
public class UserTextNotificationRepository : StorageBase
{
public const string EntitySetName =
UserTextNotifications;
CloudTableClient tableClient;
UserTextNotificationContext notificationContext;
public UserTextNotificationRepository()
: base()
{
tableClient = new CloudTableClient
(StorageBase.TableBaseUri, StorageBase.Credentials);
notificationContext = new UserTextNotificationContext
(StorageBase.TableBaseUri,StorageBase.Credentials);
tableClient.CreateTableIfNotExist(EntitySetName);
}
public UserTextNotification[]
GetNotificationsForUser(string userName)
{
var q = from notification in
notificationContext.UserNotifications
where notification.TargetUserName ==
userName select notification;
return q.ToArray();
}
public void AddNotification
(UserTextNotification notification)
{
notification.RowKey = Guid.NewGuid().ToString();
notificationContext.AddObject
(EntitySetName, notification);
notificationContext.SaveChanges();
}
}
}
因为我们的存储机制已经确定,所以我们需要一个工作者角色作为引擎;以便在我们的电子商务站点的后台处理消息。为此,我们定义了一个从 Microsoft.ServiceHosting.ServiceRuntime.RoleEntryPoint 类继承的类,并将其与云服务项目中的工作者角色关联(请参见图 5)。
图 5 作为引擎的工作者角色
public class WorkerRole : RoleEntryPoint
{
ShoppingCartQueue cartQueue;
ToastQueue toastQueue;
UserTextNotificationRepository toastRepository;
public override void Run()
{
// This is a sample worker implementation.
//Replace with your logic.
Trace.WriteLine(WorkerRole1 entry point called,
Information);
toastRepository = new UserTextNotificationRepository();
InitQueue();
while (true)
{
Thread.Sleep(10000);
Trace.WriteLine(Working, Information);
ProcessNewTextNotifications();
ProcessShoppingCarts();
}
}
private void InitQueue()
{
cartQueue = new ShoppingCartQueue();
toastQueue = new ToastQueue();
}
private void ProcessNewTextNotifications()
{
CloudQueueMessage cqm = toastQueue.GetMessage();
while (cqm != null)
{
ToastQueueMessage message =
QueueMessageBase.FromMessageToastQueueMessage>(cqm);
toastRepository.AddNotification(new
UserTextNotification()
{
MessageText = message.MessageText,
MessageDate = DateTime.Now,
TargetUserName = message.TargetUserName,
Title = message.Title
});
toastQueue.DeleteMessage(cqm);
cqm = toastQueue.GetMessage();
}
}
private void ProcessShoppingCarts()
{
// We will add this later in the article!
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
DiagnosticMonitor.Start(DiagnosticsConnectionString);
// For information on handling configuration changes
// see the MSDN topic at
//http://go.microsoft.com/fwlink/?LinkId=166357.
RoleEnvironment.Changing += RoleEnvironmentChanging;
return base.OnStart();
}
private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
{
// If a configuration setting is changing
if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
{
// Set e.Cancel to true to restart this role instance
e.Cancel = true;
}
}
}
让我们看一下工作者角色代码。在初始化和设置所需的队列和表存储之后,此代码将进入一个循环。每 10 秒钟,它就会处理一次队列中的消息。每次我们通过处理循环时都将获取队列中的消息,直到最终返回 null,这表示队列为空。
您从队列中看到的消息永远不会超过 20 个,如果不信,您可以反复尝试来验证一下。对队列进行的任何处理都有时间限制,必须在该时间范围内对每个队列消息执行有意义的操作,否则队列消息将被视为超时,并在队列中显示备份,以便可以由其他工作者来处理此消息。每个消息都会作为用户通知添加到表存储中。关于工作者角色需要记住的重要一点是:一旦入口点方法完成,工作者角色也就结束了。这就是您需要在一个循环中保持逻辑运行的原因。
从客户端的角度来说,我们需要能够以 JSON 形式返回消息,以便 jQuery 可以异步轮询并显示新的用户通知。为此,我们会将一些代码添加到消息控制器中,以便可以访问这些通知(请参见图 6)。
图 6 以 JSON 形式返回消息
public JsonResult GetMessages()
{
if (User.Identity.IsAuthenticated)
{
UserTextNotification[] userToasts =
toastRepository.GetNotifications(User.Identity.Name);
object[] data =
(from UserTextNotification toast in userToasts
select new { title = toast.Title ?? Notification,
text = toast.MessageText }).ToArray();
return Json(data, JsonRequestBehavior.AllowGet);
}
else
return Json(null);
}
在 Visual Studio 2010 beta 2 下的 ASP.NET MVC 2(我们用于撰写本文的环境)中,如果没有 JsonRequestBehavior.AllowGet 选项,您无法将 JSON 数据返回到 jQuery 或其他客户端。在 ASP.NET MVC 1 中,不需要此选项。现在,我们可以编写 JavaScript,它每 15 秒将调用一次 GetMessages 方法并将以 toast 形式消息显示通知(请参见图 7)。
评论