• <menu id="w2i4a"></menu>
  • logo 慧都大數(shù)據(jù)(一)

    文檔首頁>>慧都大數(shù)據(jù)(一)>>用Java開發(fā)IBM Streams應(yīng)用

    用Java開發(fā)IBM Streams應(yīng)用


    InfoSphere Streams 概述

    IBM InfoSphere Streams是一個高級計算平臺,幫助用戶開發(fā)的應(yīng)用程序快速攝取、分析和關(guān)聯(lián)來自數(shù)千個實時源的信息。該解決方案可處理非常高的數(shù)據(jù)吞吐率,最高可達每秒數(shù)百萬個事件或消息。該平臺支持流數(shù)據(jù)的實時處理,支持不斷更新持續(xù)查詢的結(jié)果,可在仍在移動的數(shù)據(jù)流中檢測洞察。Streams旨在從一個幾分鐘到幾小時的窗口中的移動信息(數(shù)據(jù)流)中揭示有意義的模式。該平臺能夠獲取低延遲洞察,并為注重時效的應(yīng)用程序(比如欺詐檢測或網(wǎng)絡(luò)管理)獲取更好的成果,從而提供業(yè)務(wù)價值。流處理的演示如下圖所示:

    Streams 的主要設(shè)計目的是:

    • 快速響應(yīng)事件和不斷變化的業(yè)務(wù)條件與需求。
    • 支持以比現(xiàn)有系統(tǒng)快幾個數(shù)量級的數(shù)據(jù)處理速度對數(shù)據(jù)執(zhí)行持續(xù)分析。
    • 快速適應(yīng)不斷變化的數(shù)據(jù)形式和類型。
    • 管理流模式的高可用性、異構(gòu)性和分布。
    • 為共享的信息提供安全性和信息機密性。

    Streams 提供了一種編程模型和 IDE 來定義數(shù)據(jù)來源,還提供了已融合到處理執(zhí)行單元中的稱為運算符的軟件分析模塊。它還提供了基礎(chǔ)架構(gòu)來支持從這些組件合成可擴展的流處理應(yīng)用程序。主要平臺組件包括:

    • 運行時環(huán)境:這包括平臺服務(wù),以及一個用于在單個主機或一組集成的主機上部署和監(jiān)視 Streams 應(yīng)用程序的調(diào)度程序。
    • 編程模型:您可使用 SPL(Streams Processing Language,流處理語言,一種聲明性語言)來編寫 Streams 應(yīng)用程序??墒褂迷撜Z言陳述您的需求,運行時環(huán)境會承擔確定如何最佳地服務(wù)該請求的責任。在此模型中,一個 Streams 應(yīng)用程序表示為一個由運算符和連接它們的流組成的圖表。
    • 監(jiān)視工具和管理接口:Streams 應(yīng)用程序處理數(shù)據(jù)的速度比普通的操作系統(tǒng)監(jiān)視實用程序快得多。InfoSphere Streams 提供了可處理此環(huán)境的工具。

    Streams原生編程語言--SPL

    Streams Processing Language (SPL),Streams 的編程語言,是一種分布式數(shù)據(jù)流合成語言。它是一種類似 C++ 或 Java™ 的可擴展且全功能的語言,支持用戶定義的數(shù)據(jù)類型。您可以使用 SPL 或原生語言(C++ 或Java)編寫自定義函數(shù)。也可以使用 C++ 或 Java 編寫用戶定義的運算符。

    Streams 通過SPL將應(yīng)用程序會描述一個導向圖,該圖由各個互聯(lián)且處理多個數(shù)據(jù)流的運算符組成。數(shù)據(jù)流可來自系統(tǒng)外部,或者在應(yīng)用程序內(nèi)部生成。SPL 程序的基本構(gòu)建塊包括:

    • 流:一個無限的結(jié)構(gòu)化元組序列。它可逐個元組地由運算符使用或通過一個窗口的定義來使用。
    • 元組:屬性及其類型的一個結(jié)構(gòu)化列表。流上的每個元組擁有由其流類型指定的形式。
    • 流類型:指定元組中每個屬性的名稱和數(shù)據(jù)類型。
    • 窗口:一個有限、有序的元組分組。它可以基于計數(shù)、時間、屬性值或標點符號。
    • 運算符:SPL 的基礎(chǔ)構(gòu)建塊,它的運算符會處理來自流的數(shù)據(jù)并可生成新流。
    • 處理元素 (PE):基礎(chǔ)執(zhí)行單元。一個 PE 可封裝單個運算符或多個合并的運算符。
    • 作業(yè):一個已部署好的用來執(zhí)行的 Streams 應(yīng)用程序。它由一個或多個 PE 組成。除了一組 PE 之外,SPL 編譯器還會生成一個 ADL(Application Description Language,應(yīng)用程序描述語言)文件來描述應(yīng)用程序的結(jié)構(gòu)。該 ADL 文件包含每個 PE 的詳細信息,比如要加載和執(zhí)行哪個二進制文件,調(diào)度限制、流格式和一個內(nèi)部運算符數(shù)據(jù)流圖。

    Streams編程語言的另一選擇--Java

    Java 作為面向?qū)ο蟮母呒壘幊陶Z言,以其使用簡單、完全面象對象、平臺可移植性、健壯的沙盒安全機制、動態(tài)性,以及大量可用的開發(fā)包等一系列優(yōu)勢,在互聯(lián)網(wǎng)分布式環(huán)境下得到了極其廣泛的應(yīng)用,具有廣泛的用戶基礎(chǔ)。為了Streams用戶重用已有的Java開發(fā)技能、保護已有的Java資產(chǎn),IBM Streams平臺提供了使用 Java 編程語言來構(gòu)建 Streams 應(yīng)用程序的框架,具體包括 Java 運算符模型描述文件以及 Java 運算符 API(JavaOp)兩種方式。這兩種方式在一定程度上讓開發(fā)人員集成Java功能模塊。

    streamsx.topology項目

    雖然Streams所提供的Java運算符模型描述文件以及Java運算符API(JavaOp)方式支持了Java代碼調(diào)用,但是,傳統(tǒng)的Java是面向?qū)ο蟮木幊陶Z言,它只能幫助開發(fā)人員實現(xiàn)業(yè)務(wù)邏輯或重用Java代碼,但它無法以“流處理”的思維,直接進行類似SPL的流應(yīng)用開發(fā)。

    streamsx.topology開源項目的出現(xiàn),豐富了Streams的開發(fā)方式,為流應(yīng)用的開發(fā)者提供更多的語言選擇。streamsx.topology項目提供Java Application API,面向流處理應(yīng)用的將Java封裝成一套類庫,使得開發(fā)者完全使用Java和Scala語言并按照“流處理”的思維創(chuàng)建IBM Streams流處理應(yīng)用。

    streamsx.topology開源項目參考網(wǎng)址:

    http://ibmstreams.github.io/streamsx.topology/

    運行streamx.topology的的sample程序

    1. 從www.ibm.com/software/data/infosphere/stream-computing/trials.html下載“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一個免費的、可下載的非生產(chǎn)版本,它沒有數(shù)據(jù)或時間限制,支持您在自己的獨特環(huán)境中試驗流計算,構(gòu)建一個強大的分析平臺。該平臺能夠處理難以置信的高數(shù)據(jù)吞吐量,高達每秒數(shù)百萬個事件或消息。InfoSphere Streams QuickStart Edition 沒有提供支持選項,僅適用于非生產(chǎn)用途。要獲得相應(yīng)的支持,請購買 InfoSphereStreams。

    2.解壓VM鏡像,并在VMPlayer啟動VM。

    該VM已經(jīng)安裝com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:

    • com.ibm.streamsx.topology- 拓撲工具箱,讓您能采用Java開發(fā)您的Streams應(yīng)用
    • samples- 演示JavaApplication API的示例集合。
    • 運行示例應(yīng)用

    1)在桌面雙擊InfoSphere Streams Studio (Eclipse)圖標啟動Streams Studio.

    2)指定workspace為:/home/streamsadmin/Workspaces/topology/

    3) 運行"Hello World" 示例程序:在Project Explorer標簽, 打開src->simple->HelloWorld->HelloWorld.java,代碼如下:

    package simple;

    import com.ibm.streamsx.topology.TStream;

    import com.ibm.streamsx.topology.Topology;

    import com.ibm.streamsx.topology.context.StreamsContextFactory;

    publicclass HelloWorld {

    publicstaticvoid main(String[] args) throws Exception {

    /*

    * Create the container for the topology that will

    * hold the streams of tuples.

    */

    Topology topology = new Topology("HelloWorld");

    /*

    * Declare a source stream (hw) with String tuples containing two tuples,

    * "Hello" and "World!".

    */

    TStream<String> hw = topology.strings("Hello", "World!");

    /*

    * Sink hw by printing each of its tuples to System.out.

    */

    hw.print();

    if (args.length == 0)

    StreamsContextFactory.getEmbedded().submit(topology).get();

    else

    StreamsContextFactory.getStreamsContext(args[0]).submit(topology)

    .get();

    }

    }

    4) 運行"Hello World" 示例程序:右擊HelloWorld.java,選擇Run As-> Run Configurations. 在Run Configurations 'Main' 標簽頁面,確保Main class填 simple.HelloWorld. 在 arguments標簽頁面, 設(shè)置Program arguments為EMBEDDED (EMBEDDED表示程序獨立編譯并嵌入到JVM運行,而不依賴Streams運行時環(huán)境)。

    5) 設(shè)置必要參數(shù)后,運行該應(yīng)用您會看到以下的輸出:

    Hello

    world!

    使用Java Application API開發(fā)Streams應(yīng)用

    我們創(chuàng)建一個名叫MyGrep的Sample應(yīng)用,用于指導關(guān)鍵字搜索某個文件夾下的文件,搜索到則顯示相應(yīng)內(nèi)容所在的行數(shù)和內(nèi)容。具體步驟如下:

    1)創(chuàng)建Java項目: File->New->Project->Java->JavaProject,點擊Next,在Create a Java Project填寫MySamples,點擊Next。

    2)在Libraies標簽頁:

    點擊External Jar按鈕,選擇com.ibm.streams.topology.jar

    點擊Add Library按鈕,選擇IBM InfoSphere Streams

    點擊Next和Finish完成項目的創(chuàng)建。新創(chuàng)建項目視圖如下圖所示:

    3)創(chuàng)建命名空間:右擊src->New->Package->JavaPackage的Name填寫:mysapce

    4)創(chuàng)建Java主類:src->右擊myspace->New->Class,在Name填寫:mysapce,確保勾選“public static void main(String[]args)”。確定后生成MyGrep.java。

    5)創(chuàng)建Java類:src->右擊myspace->New->Class,在Name填寫:GrepInfo,不要勾選“public static void main(String[]args)”,確定后生成GrepInfo.java。

    6)MyGrep.java和GrepInfo.java的代碼內(nèi)容如下:

    MyGrep.java

    package myspace;

    import java.io.ObjectStreamException;

    import java.util.Arrays;

    import java.util.concurrent.Future;

    import com.ibm.streamsx.topology.TStream;

    import com.ibm.streamsx.topology.Topology;

    import com.ibm.streamsx.topology.context.StreamsContextFactory;

    import com.ibm.streamsx.topology.file.FileStreams;

    import com.ibm.streamsx.topology.function7.Function;

    publicclass MyGrep {

    publicstaticvoid main(String[] args) throws Exception {

    String contextType = args[0];

    String directory = args[1];

    final String term = args[2];

    Topology topology = new Topology("MyGrep");

    TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory);

    TStream<String> lines = FileStreams.textFileReader(filePaths);

    TStream<GrepInfo> grepInfo = lines.multiTransform(

    new Function<String, Iterable<GrepInfo>>() {

    privatestaticfinallongserialVersionUID = 1L;

    privateintlineNum = 0;

    @Override

    public Iterable<GrepInfo> apply(String line) {

    ++lineNum;

    if(line.contains(term)){

    return Arrays.asList(new GrepInfo(lineNum, line));

    }

    else

    returnnull;

    }

    private Object readResolve() throws ObjectStreamException {

    returnthis;

    }

    }, GrepInfo.class);

    grepInfo.print();

    Future<?> future = StreamsContextFactory.getStreamsContext(contextType)

    .submit(topology);

    Thread.sleep(30 * 1000);

    future.cancel(true);

    }

    }

    GrepInfo.java

    package myspace;

    import java.io.Serializable;

    import com.ibm.streamsx.topology.tuple.Keyable;

    publicclass GrepInfo implements Keyable<GrepInfo>, Serializable {

    privatestaticfinallongserialVersionUID = 1L;

    intlineNum;

    String lineStr;

    public GrepInfo(int ln, String ls) {

    this.lineNum = ln;

    this.lineStr = ls;

    }

    @Override

    public String toString() {

    return"Line Num " + lineNum + " : " + lineStr;

    }

    @Override

    public GrepInfo getKey() {

    // TODO Auto-generated method stub

    returnnull;

    }

    }

     

    7)運行MyGrep之前,請確保Streams Instance已經(jīng)啟動,并在/home/streamsadmin/test創(chuàng)建一個文本文件并寫如若干內(nèi)容。

    8)運行程序:右擊MyGrep.java,選擇Run As -> RunConfigurations. 在Run Configurations 'Main' 標簽頁面,確保Project填寫MySamples和Main class填 myspace.MyGrep。

    在 arguments標簽頁面, 設(shè)置Program arguments為DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams運行時環(huán)境,/home/streamsadmin/test是程序搜索關(guān)鍵的目錄;China是搜索關(guān)鍵字)。

    9)查看結(jié)果:

    在Streams Exploere -> StreamsInstances ->右擊default:<instance>@<Domain>,選擇Show Instance Graph

    在Instance Graph窗口,我們能看到MyGrep最終運行圖。右擊最后的Print PE->Show Log->Show PEConsole

    在Console將會顯現(xiàn)MyGrep運行的結(jié)果

    總結(jié)

    streams.topology開源項目所提供的Java Application API使得Streams開發(fā)者對流應(yīng)用的編程語言有了新的選擇,它能幫助開發(fā)者重用Java編程能力,并按照“流處理”的思路簡化流應(yīng)用的開發(fā)過程,讓開發(fā)者更專注于業(yè)務(wù)的處理邏輯而不是流處理的框架。然而,該項目還處于早期階段,很多功能和接口尚未實現(xiàn);對比成熟的、完善的SPL,Java Application API的功能和成熟性還有很大差距。相信在不久的將來,streams.topology將會逐漸完善并成為IBM Streams平臺的一個重要補充。

    更多大數(shù)據(jù)與分析相關(guān)行業(yè)資訊、解決方案、案例、教程等請點擊查看>>>

    詳情請咨詢在線客服!

    客服熱線:023-66090381

    掃碼咨詢


    添加微信 立即咨詢

    電話咨詢

    客服熱線
    023-68661681

    TOP
    三级成人熟女影院,欧美午夜成人精品视频,亚洲国产成人乱色在线观看,色中色成人论坛 (function(){ var bp = document.createElement('script'); var curProtocol = window.location.protocol.split(':')[0]; if (curProtocol === 'https') { bp.src = 'https://zz.bdstatic.com/linksubmit/push.js'; } else { bp.src = 'http://push.zhanzhang.baidu.com/push.js'; } var s = document.getElementsByTagName("script")[0]; s.parentNode.insertBefore(bp, s); })();