Node.js 13.4.0 版本现已发布。其具体更新内容如下:

Progressive Web Applications take advantage of new technologies to
bring the best of mobile sites and native applications to users.
They’re reliable, fast, and engaging.

重要说明:本方是翻译自https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#getting-started这个文章,感谢Sabby
AnandanMarius BogoeviciEric BottardMark FisherIlayaperumal

Changes
deps:

基础知识

用户首次访问service worker控制的网站或页面时,service
worker会立刻被下载。
ServiceWorker(web worker 的一种)接口
Cache:表示对request/response对象的存储,一个域可以有多个 Cache 对象.
你将在你的代码中处理和更新缓存 . 在 Cache 除非显示地更新缓存,
否则缓存将不会被更新; 缓存数据不会过期, 除非删除它
Cache.match(request, options)返回一个Promise,查找cache中匹配的request
Cache.match(request, options)匹配一个数组对象中的request
Cache.add(request)发送请求并将请求放入cache中,
Cache.put(request, response)将request和response都添加到cache中
Cache.delete(request, options)
才cache中查找乡音的值,并删除返回一个promise,resoleve为true,如果找不到返回false
Cache,keys(request, options)返回一个promise,resolve为所有的cache键值

CacheStorage:
对Cache对象的存储,提供命名缓存的主目录,sw可以通过访问并维护名字字符串到Cache对象的映射
caches.open(cacheName).then(names){};//打开一个cache对象

Client: 表示sw client的作用域。

sw.js中的self:这个关键字表示的是一个service worker
的执行上下文的一个全局属性(ServiceWorkerGlobalScope),类似于window对象,不过这个self是作用于service
worker的全局作用域中。

GopinathanGunnar HillertMark PollackPatrick PeraltaGlenn RenfroThomas

  • 将 npm 更新到 6.13.4
  • 更新 uvwasi(Anna Henningsen)
  • 升级到 libuv 1.34.0(Colin Ihrig)

sw生命周期

澳门新葡萄京官网首页 1

image.png

RisbergDave SyerDavid TuranskiJanne ValkealahtiOleg
Zhurakousky这些原文作者的写作成果,让我们能更好地入门学习Spring Cloud
Data
Flow的相关技术,考虑到国内中文的文档资料比较欠缺,而且基本不太成体系,所以自己希望来翻译该文章,方便学习使用。

doc:

覆盖率

澳门新葡萄京官网首页 2

image.png

以下是原文内容的翻译,一些关键性的技术术语,本文考虑不进行翻译,保持原来的英文词汇,加强技术术语印象,而且这些技术术语强行翻译成中文的话,感觉很别扭。另一方面水平有限,文本可能有翻译不太贴切或是错误的地方,还请大家体谅,看到后多多指正。

  • docs 不赞成使用 http 完成

注意点

  1. 基于https
    可以使用http-server+ngrok配合,当然更简单的使用github。
    2.Service worker是一个注册在指定源和路径下的事件驱动worker。实际上
    SW
    在你网页加载完成同样也能捕获已经发出的请求,所以,为了减少性能损耗,我们一般直接在
    onload 事件里面注册 SW 即可。
  2. 作用域问题
    SW 的作用域不同,监听的 fetch 请求也是不一样的。
    例如,我们将注册路由换成: /example/sw.js,那么,SW 后面只会监听
    /example 路由下的所有 fetch 请求,而不会去监听其他

目录

events:

register

if(navigator.serviceWorker){
    navigator.serviceWorder.register('sw.js')
        .then(registration  =>  {
              console.log(`registered event at scope:${registration.scope}`);
        })
        .cache(err => {
               throw err;
         })
}

入门…
2

  • 添加 captureRejection 选项

install

self.addEventListener('install', function(event) {
  // Perform install steps
});

缓存文件

const cacheVersion = 'v1';
const cacheList = [
    '/',
    'index.html',
    'logo.png',
    'manifest.json',
    '/dist/build.js'
];
self.addEventListener('install', function(event) {
  event.waitUntil(
    caches.open(cacheVersion).then(function(cache) {
      return cache.addAll([cacheList]);
    })
  );
});

event.waitUntil()参数必须为promise,它可以延长一个事件的作用时间,因为我们在打开缓存或者更新的时候很有可能会有延迟,而event.waitUntil()可以防止事件终端。另外它会监听所有的异步promise,一旦有一个reject那么该次event便是失败的,也就是说sw启动失败。当然如果有些文件比较大不好缓存的话别让它返回就好了:

cache.addAll([cachelist1]);
return cache.addAll([cachelist2]);

1. 系统要求…
2

http:

fetchEvent

缓存捕获,当发起请求的时候将request和response缓存下来(缓存一开始定义的缓存列表)。

self.addEventListener('fetch', (event) => {
    event.respondWith(
        caches.match(event.request)
            .then(response => {
                if(response){
                    return reponse;
                 }
                  return fetch(event.request);
             })
    )
})

这是个比较简单的格式,event.respondWith(r),包含请求响应代码,可以设置一个参数r,r是一个promise,resolve之后是一个response对象。整段代码意思就是当请求一个文件时,如果缓存中已经有了,那么就直接返回缓存结果,否则发起请求。

2. 部署Spring Cloud Data Flow 本地服务器…
2

  • 添加 captureRejection 支持
  • llhttp 选择加入不安全的HTTP标头解析
问题:如果没有缓存我们怎么处理?
  1. 等下次sw根据路由去缓存;
  2. 手动缓存

2.1. Maven 配置…
4

http2:

手动缓存
self.addEventListener('fetch', event => {
    event.respondWith(
        caches.match(event.request)
            .then(response => {
                if(response){
                    return response;
                }
                //fetch请求的request、response都定义为stream对象,所以只能读一次这里需要clone一个新的
                let requestObj = event.request.clone();

                return fetch(requestObj)
                            .then(response => {
                                //检测是否成功
                                if(!response || response.status !== 200 || response.type !== 'basic') {
                                    return response;
                                }
                                //如果请求成功,第一要去渲染,第二要缓存
                                //cache.put()也使用stream,所以这里也需要复制一份
                                let responseObj = response.clone();

                                caches.open(cacheVersion)
                                    .then(cache => {
                                        cache.put(event.request, responseObj);
                                    });
                                return response;

                            })
            })
    )
})

3. 应用程序设置…
5

  • 为“request”和“stream”事件实施捕获指令
为什么stream只能读一次?

当可读流读取一次之后可能已经读到stream结尾或者stream已经close了,这里request和response都实现了clone接口来复制一份,所以在需要二次使用stream的时候就需要用副本来实现了。

应用程序…
5

net:

删除旧的缓存

self.addEventListener('activate', evnet => {
    event.waitUntil(
        caches.keys().then(cacheNames => {
            return Promise.all(
                cacheNames.filter(cachename => {
                    if(cachename == cacheVersion){
                        return caches.delete(cachename);
                    }
                })
            ).then(() => {
                return self.clients.claim()
            })
        })
    )
})

我们检查之前保存的sw缓存,还要注意一点就是Promise.all()中不能有undefined,所以我们对于相同的版本要过滤,因而不使用map,避免返回undefined。
通过调用 self.clients.claim() 取得页面的控制权,
这样之后打开页面都会使用版本更新的缓存。

系统架构…
6

  • 实施“connection”事件的捕获拒绝

更新

当你更新了你的sw文件,并修改了cacheVersion之后,刷新浏览器,期待的变化并没有发生,因为虽然你改变了缓存版本,但是此时旧的sw还在控制整个应用,新的sw并没有生效。这时就需要更新一下sw,有以下方法

  1. registration.update() ,也就是在注册的时候选择合适方式更新

navigator.serviceWorker.register('/sw.js').then(reg => {
  // sometime later…
    reg.update();
});
  1. 使用self.skipWaiting();
    在install阶段使用这个可以使得新的sw立即生效。

self.addEventListener('install', event => {
  self.skipWaiting();

  event.waitUntil(
    // caching
  );
});
  1. 调试手动更新
![](https://upload-images.jianshu.io/upload_images/454462-a63d53b8a30f3e01.png)

image.png

直接点击update即可。
注意,我们更新了某些文件的时候也要同时更新sw中的缓存版本(cacheVersion)

4. 简介…
6

repl:

manifest文件

这个文件主要是配置添加到桌面的一些基本信息,比如图标启动页等。详细可以看这个https://developer.mozilla.org/zh-CN/docs/Web/Manifest
下面是我写的一个示例https://github.com/Stevenzwzhai/vue2.0-elementUI-axios-vueRouter/blob/master/pwa/sw.js
或者拉取这个项目https://github.com/Stevenzwzhai/PWA-demo

5. Microservice 微服务架构风格…
8

  • 通过渴望评估输入来支持预览

5.1. 和其它平台架构的比较…
9

stream:

6.流式(stream)应用程序…
9

  • 添加对 captureRejection 选项的支持

6.1. 指令式编程模型…
10

tls:

6.2. 函数式编程模型…
10

  • 对 ‘secureConnection’ 事件实施捕获拒绝
  • 公开当前密码套件的 IETF 名称

7. 数据流…
10

worker:

7.1. 拓扑…
10

  • 添加 argv 构造函数选项

7.2. 并发…
11

详细信息:

7.3. 分区…
11

(文/开源中国)    

7.4. 消息传输保障…
12

8. 分析…
13

9. 任务式(task)应用程序…
14

10. 数据流服务器…
14

10.1. 端点(Endpoint).
14

10.2. 自定义…
15

10.3. 安全…
15

11. 运行时环境…
16

11.1. 容错…
16

11.2. 资源管理…
16

11.3. 运行时调整规模…
16

11.4. 应用程序版本化…
16

********************************以下是原文内容的翻译**********************

Version 1.3.0.M2

© 2012-2017 Pivotal Software, Inc.

你可以自己使用这个文章或是分发给其它人,但前提是你不以赚取费用为目标,同时每个转发的文章必须包含此版权通知,无论是印刷品版本还是电子发行版本。

入门

如果您刚刚开始使用Spring

Cloud Data
Flow,这部分将非常适合你!这里我们回答的基本问题是“这是什么”,“怎么用”,“为什么用”的问题。按着本文的介绍说明,大家就可以构建第一个Spring

Cloud Data Flow的应用程序并了解到一些核心原则。

1.

系统要求

你需要安装java运行环境(java 8或更高版本),同时需要安装好Maven环境。

您需要有一个关系型数据库系统来存储stream、task和应用程序状态。默认情况下,本地数据流服务器会使用嵌入式的H2数据库。

如果您运行的是任何涉及stream分析的应用程序,Redis需要安装好,以便支持相应程序运行。

同时RabbitMQ或者Kafka也是需要安装的。

2.

部署Spring Cloud Data Flow 本地服务器

  1. 下载Spring Cloud Data Flow Server 和Shell 应用程序:

wget

http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-server-local/1.3.0.M2/spring-cloud-dataflow-server-local-1.3.0.M2.jar

wget

http://repo.spring.io/milestone/org/springframework/cloud/spring-cloud-dataflow-shell/1.3.0.M2/spring-cloud-dataflow-shell-1.3.0.M2.jar

  1. 运行 Data Flow Server

Data Flow Server 是一个 基于Spring

Boot 的应用,

你可以直接用java

–jar这样的命令行来运行它.

$

java -jar spring-cloud-dataflow-server-local-1.3.0.M2.jar

  1. 启动 shell:

$

java -jar spring-cloud-dataflow-shell-1.3.0.M2.jar

如果Data Flow Server 和 shell程序不是运行在同一个机器上,那就需要设置

shell程序,让它指向 Data Flow server URL地址

server-unknown:>dataflow

config
serverhttp://198.51.100.0

Successfully

targetedhttp://198.51.100.0

dataflow:>

默认情况下,服务平台当中的应用程序注册信息是空的,如果你想批量地把所有外部基于kafka绑定的stream应用程序注册到服务平台当中,那么你可以具体查看注册应用程序这个章节,链接是,https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#spring-cloud-dataflow-register-stream-apps

$

dataflow:>app import –uri

http://bit.ly/Bacon-RELEASE-stream-applications-kafka-10-maven

取决于你机器上的运行环境,你可能需要配置Data Flow

Server使用你自身的Maven仓库或者是配置代理服务,具体可查看以下链接https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#getting-started-maven-configuration

4.
你可以用shell命令行来罗列出有效的应用程序(source/processors/sink),也可以用它来创建stream,例如:

dataflow:>

stream create –name httptest –definition “http –server.port=9000 |

log” –deploy

在推送数据给应用程序之前,您需要等待一段时间,直到应用程序实际部署成功。通过日志文件的位置,HTTP或是日志应用程序,查看Data
Flow server的日志文

。用tail命令查看每个应用程序的日志文件末尾,以验证应用程序是否已经启动。

接下来,可以尝试推送一些数据

dataflow:>

http post –target –data “hello world”

查看下,hello world是否出现在日志程序的日志文件末尾了。

当你配置一个本地化的程序的时候(每一个应用程序的实例数量大于一个)平台会给应用程序动态分配一个服务器的端口号,除非你明确的手工指定了–server.port=x这样的参数。在这两种情况下,这样的设置将覆盖任何较低级别的设置(如application.yml文件当中的配置)。

当你使用shell程序的时候希望获得更详细的异常信息,那么你可以把异常信息日志的级别设置为WARNING

,这个设置在logback.xml 这个配置文件当中:

name=”org.springframework.shell.core.JLineShellComponent.exceptions”

level=”WARNING”/>

2.1. Maven 配置

如果你想覆盖maven配置当中的一些设置(如remote

repositories, proxies等),或者是让Data Flow
Server运行时使用代理服务,你可以在运行Data Flow

Server时,在命令行参数当中加入相应的参数,例如:

$

java -jar spring-cloud-dataflow-server-local-1.3.0.M2.jar

–maven.localRepository=mylocal

–maven.remote-repositories.repo1.url=https://repo1

–maven.remote-repositories.repo1.auth.username=user1

–maven.remote-repositories.repo1.auth.password=pass1

–maven.remote-repositories.repo1.snapshot-policy.update-policy=daily

–maven.remote-repositories.repo1.snapshot-policy.checksum-policy=warn

–maven.remote-repositories.repo1.release-policy.update-policy=never

–maven.remote-repositories.repo1.release-policy.checksum-policy=fail

–maven.remote-repositories.repo2.url=https://repo2

–maven.remote-repositories.repo2.policy.update-policy=always

–maven.remote-repositories.repo2.policy.checksum-policy=fail

–maven.proxy.host=proxy1

–maven.proxy.port=9010

–maven.proxy.auth.username=proxyuser1

–maven.proxy.auth.password=proxypass1

默认情况下,应用程序使用的是
HTTP协议。你可以省略代理服务的认证信息,如果代理服务本身是不需要用户名和密码的。另外,Maven
的localrepository 默认设置参数是 ${user.home}/.m2/repository/ 。
和上面的示例一样,如果maven的远程存储库需要认证的话,可以指定它们的认证信息。如果远程存储库启用了代理服务,则也可以指定代理的认证信息。

这里Spring Boot的

@ configurationproperties
可以作为环境变量来设置,例如maven_remote_repositories_repo1_url。
spring_application_json
也可以是用环境变量来设置。下面给出了JSON结构的一个示例:

$

SPRING_APPLICATION_JSON='{ “maven”: { “local-repository”:

null,

“remote-repositories”:

{ “repo1”: { “url”:
“https://repo1“,

“auth”: { “username”: “repo1user”, “password”:

“repo1pass” } }, “repo2”: { “url”:

“https://repo2
} },

“proxy”:

{ “host”: “proxyhost”, “port”: 9018,

“auth”: { “username”: “proxyuser”,

“password”: “proxypass” } } } }’ java -jar

spring-cloud-dataflow-server-local-{project-version}.jar

3.

应用程序设置

你可以使用以下的配置信息来自定义Data Flow server的部署。

spring.cloud.deployer.local.workingDirectoriesRoot=java.io.tmpdir

# Directory in which all created processes will run and create log
files.

spring.cloud.deployer.local.deleteFilesOnExit=true

# Whether to delete created files and directories on JVM exit.

spring.cloud.deployer.local.envVarsToInherit=TMP,LANG,LANGUAGE,”LC_.*.

# Array of regular expression patterns for environment variables that
will be

passed to launched applications.

spring.cloud.deployer.local.javaCmd=java

# Command to run java.

spring.cloud.deployer.local.shutdownTimeout=30

# Max number of seconds to wait for app shutdown.

spring.cloud.deployer.local.javaOpts=

# The Java options to pass to the JVM

部署应用程序的时候可以设置部署属性的前缀
,通过deployer.这样的形式,例如设置ticktock
stream这个应用程序的java选项,使用以下stream部署属性。

dataflow:>

stream create –name ticktock –definition “time –server.port=9000 |

log”

dataflow:>

stream deploy –name ticktock –properties

“deployer.time.local.javaOpts=-Xmx2048m -Dtest=foo”

为了方便起见,你可以设置属性deployer.memory 来变相设置java的选项-Xmx

dataflow:>

stream deploy –name ticktock –properties

“deployer.time.memory=2048m”

在部署的时候, 除了.local.memory 选项外,如果你在deployer..local.javaOpts
选项里你设置了-Xmx 这个选项, javaOpts 选项里的值是具有优先权的,同样的,
javaOpts 属性也是比 Data

Flow server的spring.cloud.deployer.local.javaOpts属性具有优先权的。

应用程序

应用程序指的是一系列预先编译好的关于使用stream和task/batch技术来处理数据集成或是流程处理的入门级程序,这些程序是为了让大家学习和增长经验而准备的,更多具体的信息可以查看以下链接:

http://cloud.spring.io/spring-cloud-stream-app-starters/

http://cloud.spring.io/spring-cloud-task-app-starters/

https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#spring-cloud-dataflow-register-stream-apps

系统架构

4.

简介

Spring Cloud Data
Flow简化了专注于数据流处理的应用程序的开发和部署。它体系结构包含的主要概念有,应用程序、Data
Flow Server和运行时环境。

应用程序有两种风格:

• 长时间运行的流应用程序,通过消息中间件来消耗或产生无限数量的数据。

• 短时间运行的任务应用程序,处理完有限的数据集,然后就终止运行。

根据运行时不同,应用程序有两种方式打包

• Spring Boot的JAR包,
这些JAR包托管在Maven库、文件系统、HTTP资源网站或其他Spring资源容器。

• Docker。

运行时环境是应用程序执行的容器,支持的运行时环境有:

· Cloud Foundry

· Apache YARN

· Kubernetes

· Apache Mesos

· 开发用的Local Server

由于系统提供了部署服务提供者接口(SPI),你可以将Data
Flow部署到其他运行时环境当中,例如Docker Swarm。同时还有针对Hashicorp’s
Nomad 和 RedHat Openshift的实现,支持相应的部署。

Data Flow
Server是负责将应用程序部署到运行时环境的组件。每个运行时环境都有一个Data
Flow Server的可执行jar包 。Data Flow Server负责以下工作:

• 解释并执行一个stream
DSL,它描述了数据流经多个长时间运行的应用程序的逻辑流程。

• 启动一个执行长时间任务的应用程序

• 解释并执行一个Task
DSL,它描述了数据流经多个短时间运行的应用程序的逻辑流程。


将应用程序的部署清单属性应用到运行时环境当中。例如,设置实例的初始数量、内存需求和数据分区。

• 提供部署应用程序的运行时状态。

举个例子,stream DSL描述了数据流从HTTP 的source到Apache Cassandra
的sink将被写为“http | cassandra”。DSL中的这些名字会让 Data Flow
Server注册到运行时环境当中,同时映射到应用程序的配置信息,这些应用程序本身可以托管在Maven或Docker仓库。许多常用的source,
processor,
sink类型的应用程序(例如JDBC的,HDFS的,HTTP的,router的),Spring Cloud
Data

Flow团队已经提供了,可以直接使用。两个应用程序间管道式的处理过程是通过消息中间件完成的。目前支持的两个消息中间件是:

· Apache Kafka

· RabbitMQ

使用Kafka的时候,当部署stream,时,the Data Flow
server负责在Kafka中创建相应的topic,并配置每个应用程序来生成或消费topic中的数据,从而实现所需的数据流程。

主要组件的相互流程如下

Spring

Cloud Data 抽像架构图

图中,一个stream的DSL描述被发布到Data Flow Server上。基于Maven
或Docke配置信息当中DSL的应用名称,基于http的source应用程序
和基于cassandra的sink应用程序部署在了目标运行时环境当中。

  1. Microservice 微服务架构风格

Data Flow Server将应用程序部署到符合微服务架构风格的运行时环境当中
。例如,一个stream应用程序是一个抽象的应用程序组合,它本身由多个小的微服务应用程序组成,这些微服务应用程序在自己的进程当中运行。每个微服务应用程序可以扩大或缩小自己的规模,相互独立,每个都有自己版本化的生命周期。

基于Stream 和 Task的微服务应用程序建立在以Spring
Boot为基础的功能库之上。它给所有的微服务应用程序提供了想关的基础功能,如运行状态检查、安全保障、可配置的日志记录、监控和功能管理,还有可执行jar包的打包功能。

有一个非常重要的技术点是需要强调的,所有的这些微服务应用程序都只是标准的应用程序,称为‘just
apps’,你完全可以用‘java -jar’这样的命令行和传递适当的配置属性来运行它们
。我们提供了许多常规功能的微服务应用程序,所以你不必完全从零开始来建立常用的基于Spring
项目生态系统的应用程序, 例如Spring Integration,

Spring Data, Spring Hadoop and Spring
Batch。创建您自己的应用程序类似于创建其他Spring
Boot的应用程序,你可以使用Spring
Initialzr网站或UI图形化界面来创建基本的代码框架或基于Stream 或
Task的微服务程序。

除了向应用程序传递适当的配置之外,Data Flow
server还负责准备目标平台的基础结构设施,以便部署应用程序。例如,在Cloud
Foundry中,它将绑定特定的服务到应用程序上,并为每个应用程序执行‘cf
push’命令。在Kubernetes中,它将复制控制器,服务,和负载均衡器。

Data Flow
Server有助于简化部署多个应用程序到运行环境,但也可以选择手工部署每个微服务应用程序而不使用Data
Flow
Server。这种方法可能更适合于小规模部署,在开发更多应用程序时,再采用Data
Flow的便利性和一致性。手动部署基于Stream和Task的微服务应用程序也是一种有用的学习实践,可以帮助你更好地了解一些Data
Flow Server提供的自动化应用程序配置和平台操作步骤 。

5.1. 和其它平台架构的比较

Spring Cloud Data Flow的体系结构风格和其他Stream 和
Batch的处理平台是不一样的。例如,Apache Spark, Apache

Flink, Google Cloud
Dataflow的应用程序运行在一个专用计算机集群当中。计算引擎的性质使这些平台与Spring
Cloud Data

Flow相比,在数据处理上可以执行更为丰富的复杂计算,但它们同时也引入了更为复杂的运行环境,这种运行环境,对于以数据为中心的应用程序来讲通常是不需要的。这并不意味着在使用Spring
Cloud Data

Flow不能进行实时数据计算。具体可以参考本文分析部分的那个章节,它介绍了集成Redis来处理常见计数器功能的使用案例,以及集成功能API驱动的rxjava的分析使用案例,如时间滑动窗口和移动平均等。

同样的,Spring XD作为Apache Storm,

Hortonworks DataFlow 和Spring Cloud Data

Flow的前身,使用专门的应用程序执行集群,
决定了代码运行在群集当中的哪个位置,和执行健康检查,以确保长期运行的应用程序在失败后会重新启动
。通常情况下,应用程序需要使用特定于框架的接口,以便正确地“插入”到集群的执行框架中。

正如我们在Spring
XD的演进过程中发现的那样,2015年之后兴起许多容器框架,我们再创建自己的运行框架的话那就是多余的了
。当有许多运行时平台都提供此功能时,我们完全没有必要再去构建自己的资源管理机制。考虑到这些因素之后,那么是什么让我们切换到目前的框架,这个你可能已经在用作它用的运行时框架的呢?因为它减少了创建和管理以数据为中心的应用程序的难度,而且原来针对终端用户或
Web应用程序所使用的技巧在这也都是适用的。

6.流式(stream)应用程序

Spring
Boot提供了创建适用于DevOps的微服务应用程序的基础,Spring生态系统的其它功能库则可以帮助创建基于Stream的微服务应用程序,这其中最重要的一项技术是是Spring
Cloud Stream。

Spring Cloud
Stream编程模型的本质是,提供一种简单的方法来描述应用程序的多个输入和输出,这些输入和输出是在消息中间件上进行通信的。这些输入和输出映射到Kafka
的topic 或者是 Rabbit的 exchange 和 queue。针对用于生成数据的Source 、消
费和生成数据的Process以及消费数据的Sink(接收器),功能库还提供了通用的应用程序配置。

6.1. 指令式编程模型

Spring Cloud Stream与Spring Integration的 “event at a
time”命令式编程模型,紧密地结合在一起。这意味着你可以直接写处理一个事件回调的代码。例如,

@EnableBinding(Sink.class)public class LoggingSink { 
@StreamListener(Sink.INPUT)  public void log(String message) {     
System.out.println(message);  }}

例子当中,输入通道上的消息字符串message被传递给了log这个方法。 @

enablebinding 注释用来绑定输入通道与外部中间件的关联关系。

6.2. 函数式编程模型

当然,Spring Cloud Stream还可以支持其他的编程风格。使用reactive
API将传入和传出数据作为连续数据流处理,并定义每个消息应如何处理。还可以使用对入站、出站数据流进行功能转换的运算符。即将推出的版本将支持Apache
Kafka的KStream API编程模型。

  1. 数据流

7.1. 拓扑

Stream DSL描述了数据流在系统中流转过程的线性序列。例如,stream定义为http
|

transformer |
cassandra,每个管道符号连接应用程序的左右两边。命名通道可用于路由和将数据分发到多个消息传递目的地。

水龙头(Taps)可以用来监听流经管道符号的数据。水龙头也可以作为另一个拥有独立生命周期的stream应用程序的source。

7.2. 并发

对于消费事件消息的应用程序,Spring Cloud
stream提供了并发设置,用于设置线程池的大小,这个线程池用于管理调度传入消息的线程。具体信息可以查看以下链接:

http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_consumer_properties

7.3. 分区

流处理的常见模式是,将从一个应用程序移动到下一个应用程序的数据进行分区。分区,是无状态处理模式中的关键概念,出于性能或是一致性的原因,需要确保所有相关数据是在一起处理的。例如,在一个基于时间窗口进行平均数计算的案例里,要确保来自任何给定传感器的所有测量数量都是由相同的应用程序实例进行处理。同时,您还可能希望缓存一些与传入事件相关的数据,以便在不进行外部远程过程调用的情况下对相关数据进行获取。

Spring Cloud Data
Flow通过配置输出和输入绑定来支持分区。为了在不同类型的中间件上以统一的方式实现分区处理,Spring
Cloud Data

Flow提供了一个公共抽象。分区功能可以基于本身就是支持分区的中间件
(如Kafka的
topic),也可以是本身不支持分区的中间件(如RabbitMQ)。下图显示了如何将数据划分到两个bucket中,使得每个计算平均数的processor应用程序实例都消耗一组唯一的数据,而不会重复处理同一个数据。

Spring

Cloud Stream 分区

在Spring Cloud Data

Flow里启用简单的分区策略,你只需要为每个应用程序在部署时设置实例计数和
partitionkeyexpression 生产者属性。 partitionkeyexpression
标识作为消息的一部分,将被用来作为划分数据分区的关键信息。一个ingest
stream应用程序可以被定义为 http |

averageprocessor | cassandra (注意,cassandra
不显示在上面的图中)。假设数据发送到http
source,是JSON格式的,有一个字段称为 sensorid。用shell命令来部署应用程序
stream deploy

ingest –propertiesFile ingestStream.properties
。ingestStream.properties 文件的内容是

deployer.http.count=3deployer.averageprocessor.count=2app.http.producer.partitionKeyExpression=payload.sensorId

该命令将部署应用程序,为应用程序配置好了所有的输入和输出,也确保总是不同的数据传递给每一个averageprocessor实例,而不会重复。在这种情况下,默认的算法就是计算
payload.sensorid

% partitioncount, 在使用RabbitMQ的时候
partitioncount是应用程序实例的个数,
在使用Kafka的时候partitioncount是topic的分区数。

其它的分区策略,具体请查看:

https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#passing_stream_partition_properties

分区参数是如何映射到分区属性上的,请查看:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_partitioning

还要注意,部署后,现在你已经不能修改分区的规模,具体请查看:https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#arch-runtime-scaling

7.4. 消息传输保障

Stream应用程序 是由一系列的小应用程序组合而成,这些小应用程序以Spring
Cloud Stream功能库作为基础与底层消息传递中间件进行通信。Spring Cloud
Stream还提供了来自多个供应商中间件的现成的配置,特别是针对持久化的发布订阅模式的。具体可以查看:

http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_persistent_publish_subscribe_support

Spring Cloud Stream中Binder
抽象类是用来连接应用程序到中间件的。Binder当中有几个配置属性是所有的Binder实现都具备的,也有一些配置属性只是针对特定的中间件实现才具备。具体可以查看:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_binders

针对用户应用程序在消息处理期间产生异常的重试策略,重试策略配置使用
共同的消费配置属性 maxattempts, backoffinitialinterval,
backoffmaxinterval,和
backoffmultiplier。这些属性的默认值将重试回调方法调用3次,然后对第一次重试等待1秒,对于第二次和第三次重试等待2秒。具体可以查看:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/index.html#_consumer_properties

当重试数已超过 maxattempts
属性设置的值、异常和失败的信息将成为消息的内容被发送到应用程序的错误通道当中。默认情况下,此错误通道的默认消息处理程序会记录下这个消息。你可以通过创建自己的消息处理程序订阅错误通道的消息来改变应用程序的默认行为。

Spring Cloud
Stream针对Kafka和RabbitMQ的binder实现有一个配置属性,支持将发送失败的消息和堆栈跟踪发送到他们的死信队列当中。死信队列的性质取决于消息传递中间件的特性(例如Kafka它有一个专用的topic)。为了使RabbitMQ的这一特性起作用,在部署stream的时候可以设置消费者属性

republishtodlq 和 autobinddlq 和 生产者属性 autobinddlq 为true
。在部署stream的时候应该总是将这些生产者和消费者特性部配置好,在启动Data
Flow server将它们配置为常见的应用程序属性 。

消息传输保障是由生产应用程序和消费应用程序选择的底层消息中间件提供的。查看Kafka的生产者和消费者详情文档和
Rabbit的生产者和消费者详情文档,你会找到所有QoS选项的相关说明。

  1. 分析

Spring Cloud Data
Flow包含了一系列的Sink应用程序,这些应用程序会将计数器数据写入到Redis当中,同时还提供了一个REST风格的端点可以用来读取计数器数据。目前支持的计数器类型有:

•总数计数器:统计接收到的消息个数,建议在一个单独的存贮系统里存贮这些数据,如redis。

•字段值计数器:统计消息当中某一个特定字段的某一个值的出现次数。

•聚合计数器:除了统计总数外,还保留了按分钟,小时,天,月统计的总数。

需要注意的是,聚合计数器中使用的时间戳可以是来自消息本身的字段,以便正确地统计无序的消息。

  1. 任务式(task)应用程序

Spring Cloud的任务编程模型提供了以下特点:

•持久性任务的生命周期事件和退出代码状态。

•任务执行前后,生命周期可以挂钩执行代码。

•在任务生命周期里提交任务事件到数据流当中(作为source) 。

• 集成了Spring Batch。

  1. 数据流服务器

10.1. 端点(Endpoint)

数据流服务器采用嵌入式servlet容器,暴露出REST风格的端点,支持创建、部署和取消部署,消毁stream和task,查询运行状态,分析等。数据流服务器使用了SpringMVC框架和
Spring

HATEOAS 功能库来创建遵循HATEOAS原理的REST风格的端点。

Spring

Cloud Data Flow 服务器

10.2. 自定义

每个数据流服务器的可执行jar包
目标都是提供一个单独的运行时环境,这是通过调用在类路径中部署器的SPI接口实现来做到的。

我们提供了一个数据流服务器可执行jar
。在当前版本中,没有提供专门针对运行时环境的的端点,但在将来的版本中可能会提供,以方便使用运行时环境的一些功能。

而我们为每个运行时环境提供了对应的服务器运行JAR包,你也可以使用Spring
Initialzr定制自己的服务器应用程序
。你可以针对我们提供的JAR包添加或删除功能相。例如,添加额外的安全实现、自定义端点或删除任务或分析端点。您还可以通过功能切换来启用或禁用某些功能。

10.3. 安全

数据流服务器的可执行JAR包,支持基本的HTTP,LDAP(S),基于文件系统,和OAuth
2认证来访问它的端点。具体信息可以查看:

https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#configuration-security

通过角色组来授权的功能计划在未来的版本里实现。

  1. 运行时环境

11.1. 容错

Data
Flow的运行时环境会重启一个长周期运行的应用程序,如果它运行过程中失败了的话。Spring
Cloud Data

Flow在部署应用程序时,会要求设置运行时所使用的状态探测器。

构成stream的所有应用程序的状态集合用于确定stream的运行状态。如果应用程序运行出现问题,stream的状态将从‘deployed’更改为‘partial’。

11.2. 资源管理

每个运行时环境允许您控制分配给每个应用程序的内存、磁盘和CPU的数量,可以设置部署清单中的属性值传递给每个运行时环境。通过查看每个平台服务器的说明文档可以了解更多信息。

11.3. 运行时调整规模

部署stream时,可以为包含stream的每个单独应用程序设置实例个数。一旦部署了流,运行时环境也允许你为每个单独的应用程序调整实例的数量。通过使用API,用户界面,或运行命令行,你可以增加或减少实例个数。后续版本将在数据流服务器中提供一个可移植的命令来执行此操作。

目前这项功能还不支持Kafka的
binder(0.8版本的Kafka)以及分区的stream,建议的解决方法是重新部署更新了实例个数的stream应用程序。这两种情况都需要基于总实例计数和当前实例索引的信息建立静态的消费者数量,这一限制将在将来的版本中加以解决。例如,Kafka
0.9和更高版本为动态扩展应用程序提供了良好的基础结构,在不久的将来将作为当前Kafka
0.8
的binder的替代品。本地状态是决定伸缩分区化stream应用程序的关键,典型的依据是应用程序实例数量的变化。这也将在未来的版本中加以解决,为本地状态管理提供一流的支持。

11.4. 应用程序版本化

应用程序的版本化,就是升级或降级应用程序,从一个版本迁移到另一个版本,Spring
Cloud Data

Flow并不直接提供支持。您必须依赖特定的运行时环境来支持这些功能。

Spring Cloud Data
Flow的发展路线图规划,将可以支持部署兼容Spinnaker管理生命周期的应用程序。还将包括基于应用程序度量的自动化分析。用数据流服务器的便携式命令触发Spinnaker的管道功能也在计划中。