StreamingPro 简化流式计算配置

前言

前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。

另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。

配置

首先需要配置源:

代码语言:javascript
复制
 {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"
      }
    ]
  }</code></pre></div></div><p>我们配置了一个Kafka流,一个普通的CSV文件。目前StreamingPro只允许配置一个Kafka流,但是支持多个topic,按逗号分隔即可。你可以配置多个其他非流式源,比如从MySQL,Parquet,CSV同时读取数据并且映射成表。</p><p>之后你就可以写SQL进行处理了。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">{
    &#34;name&#34;: &#34;stream.sql&#34;,
    &#34;params&#34;: [
      {
        &#34;sql&#34;: &#34;select abc.content,&#39;abc&#39; as dd from abc left join test on test.content = abc.content&#34;,
        &#34;outputTableName&#34;: &#34;finalOutputTable&#34;
      }
    ]
  },</code></pre></div></div><p>我这里做了简单的join。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">{
    &#34;name&#34;: &#34;stream.outputs&#34;,
    &#34;params&#34;: [
      {
        &#34;format&#34;: &#34;jdbc&#34;,
        &#34;path&#34;: &#34;-&#34;,
        &#34;driver&#34;:&#34;com.mysql.jdbc.Driver&#34;,
        &#34;url&#34;:&#34;jdbc:mysql://127.0.0.1/~?characterEncoding=utf8&#34;,
        &#34;inputTableName&#34;: &#34;finalOutputTable&#34;,
        &#34;user&#34;:&#34;~&#34;,
        &#34;password&#34;:&#34;~&#34;,
        &#34;dbtable&#34;:&#34;aaa&#34;,
        &#34;mode&#34;:&#34;Append&#34;
      }
    ]
  }</code></pre></div></div><p>然后把数据追加到Mysql里去。其实你也可以配置多个输出。</p><h3 id="96and" name="%E5%AE%8C%E6%95%B4%E9%85%8D%E7%BD%AE">完整配置</h3><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">{

"example": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "stream.sources.kafka",
"params": [
{
"path": "file:///tmp/sample.csv",
"format": "com.databricks.spark.csv",
"outputTable": "test",
"header": "true"
},
{
"topics":"test",
"zk":"127.0.0.1",
"groupId":"kk3",
"outputTable": "abc"

      }
    ]
  },
  {
    &#34;name&#34;: &#34;stream.sql&#34;,
    &#34;params&#34;: [
      {
        &#34;sql&#34;: &#34;select abc.content,&#39;abc&#39; as dd from abc left join test on test.content = abc.content&#34;,
        &#34;outputTableName&#34;: &#34;finalOutputTable&#34;
      }
    ]
  },
  {
    &#34;name&#34;: &#34;stream.outputs&#34;,
    &#34;params&#34;: [
      {
        &#34;format&#34;: &#34;jdbc&#34;,
        &#34;path&#34;: &#34;-&#34;,
        &#34;driver&#34;:&#34;com.mysql.jdbc.Driver&#34;,
        &#34;url&#34;:&#34;jdbc:mysql://127.0.0.1/~?characterEncoding=utf8&#34;,
        &#34;inputTableName&#34;: &#34;finalOutputTable&#34;,
        &#34;user&#34;:&#34;~&#34;,
        &#34;password&#34;:&#34;~&#34;,
        &#34;dbtable&#34;:&#34;aaa&#34;,
        &#34;mode&#34;:&#34;Append&#34;
      }
    ]
  }
],
&#34;configParams&#34;: {
}

}
}

你可以在StreamingPro-0.4.11 下载到包,然后用命令启动:

代码语言:javascript
复制
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp
--master local[2]
--name test
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar
-streaming.name test
-streaming.platform spark
-streaming.job.file.path file://$SHome/batch.json