用Java開發(fā)IBM Streams應(yīng)用
InfoSphere Streams 概述
IBM InfoSphere Streams是一個高級計算平臺,幫助用戶開發(fā)的應(yīng)用程序快速攝取、分析和關(guān)聯(lián)來自數(shù)千個實時源的信息。該解決方案可處理非常高的數(shù)據(jù)吞吐率,最高可達每秒數(shù)百萬個事件或消息。該平臺支持流數(shù)據(jù)的實時處理,支持不斷更新持續(xù)查詢的結(jié)果,可在仍在移動的數(shù)據(jù)流中檢測洞察。Streams旨在從一個幾分鐘到幾小時的窗口中的移動信息(數(shù)據(jù)流)中揭示有意義的模式。該平臺能夠獲取低延遲洞察,并為注重時效的應(yīng)用程序(比如欺詐檢測或網(wǎng)絡(luò)管理)獲取更好的成果,從而提供業(yè)務(wù)價值。流處理的演示如下圖所示:
Streams 的主要設(shè)計目的是:
- 快速響應(yīng)事件和不斷變化的業(yè)務(wù)條件與需求。
- 支持以比現(xiàn)有系統(tǒng)快幾個數(shù)量級的數(shù)據(jù)處理速度對數(shù)據(jù)執(zhí)行持續(xù)分析。
- 快速適應(yīng)不斷變化的數(shù)據(jù)形式和類型。
- 管理流模式的高可用性、異構(gòu)性和分布。
- 為共享的信息提供安全性和信息機密性。
Streams 提供了一種編程模型和 IDE 來定義數(shù)據(jù)來源,還提供了已融合到處理執(zhí)行單元中的稱為運算符的軟件分析模塊。它還提供了基礎(chǔ)架構(gòu)來支持從這些組件合成可擴展的流處理應(yīng)用程序。主要平臺組件包括:
- 運行時環(huán)境:這包括平臺服務(wù),以及一個用于在單個主機或一組集成的主機上部署和監(jiān)視 Streams 應(yīng)用程序的調(diào)度程序。
- 編程模型:您可使用 SPL(Streams Processing Language,流處理語言,一種聲明性語言)來編寫 Streams 應(yīng)用程序??墒褂迷撜Z言陳述您的需求,運行時環(huán)境會承擔確定如何最佳地服務(wù)該請求的責任。在此模型中,一個 Streams 應(yīng)用程序表示為一個由運算符和連接它們的流組成的圖表。
- 監(jiān)視工具和管理接口:Streams 應(yīng)用程序處理數(shù)據(jù)的速度比普通的操作系統(tǒng)監(jiān)視實用程序快得多。InfoSphere Streams 提供了可處理此環(huán)境的工具。
Streams原生編程語言--SPL
Streams Processing Language (SPL),Streams 的編程語言,是一種分布式數(shù)據(jù)流合成語言。它是一種類似 C++ 或 Java™ 的可擴展且全功能的語言,支持用戶定義的數(shù)據(jù)類型。您可以使用 SPL 或原生語言(C++ 或Java)編寫自定義函數(shù)。也可以使用 C++ 或 Java 編寫用戶定義的運算符。
Streams 通過SPL將應(yīng)用程序會描述一個導向圖,該圖由各個互聯(lián)且處理多個數(shù)據(jù)流的運算符組成。數(shù)據(jù)流可來自系統(tǒng)外部,或者在應(yīng)用程序內(nèi)部生成。SPL 程序的基本構(gòu)建塊包括:
- 流:一個無限的結(jié)構(gòu)化元組序列。它可逐個元組地由運算符使用或通過一個窗口的定義來使用。
- 元組:屬性及其類型的一個結(jié)構(gòu)化列表。流上的每個元組擁有由其流類型指定的形式。
- 流類型:指定元組中每個屬性的名稱和數(shù)據(jù)類型。
- 窗口:一個有限、有序的元組分組。它可以基于計數(shù)、時間、屬性值或標點符號。
- 運算符:SPL 的基礎(chǔ)構(gòu)建塊,它的運算符會處理來自流的數(shù)據(jù)并可生成新流。
- 處理元素 (PE):基礎(chǔ)執(zhí)行單元。一個 PE 可封裝單個運算符或多個合并的運算符。
- 作業(yè):一個已部署好的用來執(zhí)行的 Streams 應(yīng)用程序。它由一個或多個 PE 組成。除了一組 PE 之外,SPL 編譯器還會生成一個 ADL(Application Description Language,應(yīng)用程序描述語言)文件來描述應(yīng)用程序的結(jié)構(gòu)。該 ADL 文件包含每個 PE 的詳細信息,比如要加載和執(zhí)行哪個二進制文件,調(diào)度限制、流格式和一個內(nèi)部運算符數(shù)據(jù)流圖。
Streams編程語言的另一選擇--Java
Java 作為面向?qū)ο蟮母呒壘幊陶Z言,以其使用簡單、完全面象對象、平臺可移植性、健壯的沙盒安全機制、動態(tài)性,以及大量可用的開發(fā)包等一系列優(yōu)勢,在互聯(lián)網(wǎng)分布式環(huán)境下得到了極其廣泛的應(yīng)用,具有廣泛的用戶基礎(chǔ)。為了Streams用戶重用已有的Java開發(fā)技能、保護已有的Java資產(chǎn),IBM Streams平臺提供了使用 Java 編程語言來構(gòu)建 Streams 應(yīng)用程序的框架,具體包括 Java 運算符模型描述文件以及 Java 運算符 API(JavaOp)兩種方式。這兩種方式在一定程度上讓開發(fā)人員集成Java功能模塊。
streamsx.topology項目
雖然Streams所提供的Java運算符模型描述文件以及Java運算符API(JavaOp)方式支持了Java代碼調(diào)用,但是,傳統(tǒng)的Java是面向?qū)ο蟮木幊陶Z言,它只能幫助開發(fā)人員實現(xiàn)業(yè)務(wù)邏輯或重用Java代碼,但它無法以“流處理”的思維,直接進行類似SPL的流應(yīng)用開發(fā)。
streamsx.topology開源項目的出現(xiàn),豐富了Streams的開發(fā)方式,為流應(yīng)用的開發(fā)者提供更多的語言選擇。streamsx.topology項目提供Java Application API,面向流處理應(yīng)用的將Java封裝成一套類庫,使得開發(fā)者完全使用Java和Scala語言并按照“流處理”的思維創(chuàng)建IBM Streams流處理應(yīng)用。
streamsx.topology開源項目參考網(wǎng)址:
http://ibmstreams.github.io/streamsx.topology/
運行streamx.topology的的sample程序
1. 從www.ibm.com/software/data/infosphere/stream-computing/trials.html下載“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一個免費的、可下載的非生產(chǎn)版本,它沒有數(shù)據(jù)或時間限制,支持您在自己的獨特環(huán)境中試驗流計算,構(gòu)建一個強大的分析平臺。該平臺能夠處理難以置信的高數(shù)據(jù)吞吐量,高達每秒數(shù)百萬個事件或消息。InfoSphere Streams QuickStart Edition 沒有提供支持選項,僅適用于非生產(chǎn)用途。要獲得相應(yīng)的支持,請購買 InfoSphereStreams。
2.解壓VM鏡像,并在VMPlayer啟動VM。
該VM已經(jīng)安裝com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:
- com.ibm.streamsx.topology- 拓撲工具箱,讓您能采用Java開發(fā)您的Streams應(yīng)用
- samples- 演示JavaApplication API的示例集合。
- 運行示例應(yīng)用
1)在桌面雙擊InfoSphere Streams Studio (Eclipse)圖標啟動Streams Studio.
2)指定workspace為:/home/streamsadmin/Workspaces/topology/
3) 運行"Hello World" 示例程序:在Project Explorer標簽, 打開src->simple->HelloWorld->HelloWorld.java,代碼如下:
package simple; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; publicclass HelloWorld { publicstaticvoid main(String[] args) throws Exception { /* * Create the container for the topology that will * hold the streams of tuples. */ Topology topology = new Topology("HelloWorld"); /* * Declare a source stream (hw) with String tuples containing two tuples, * "Hello" and "World!". */ TStream<String> hw = topology.strings("Hello", "World!"); /* * Sink hw by printing each of its tuples to System.out. */ hw.print(); if (args.length == 0) StreamsContextFactory.getEmbedded().submit(topology).get(); else StreamsContextFactory.getStreamsContext(args[0]).submit(topology) .get(); } } |
4) 運行"Hello World" 示例程序:右擊HelloWorld.java,選擇Run As-> Run Configurations. 在Run Configurations 'Main' 標簽頁面,確保Main class填 simple.HelloWorld. 在 arguments標簽頁面, 設(shè)置Program arguments為EMBEDDED (EMBEDDED表示程序獨立編譯并嵌入到JVM運行,而不依賴Streams運行時環(huán)境)。
5) 設(shè)置必要參數(shù)后,運行該應(yīng)用您會看到以下的輸出:
Hello
world!
使用Java Application API開發(fā)Streams應(yīng)用
我們創(chuàng)建一個名叫MyGrep的Sample應(yīng)用,用于指導關(guān)鍵字搜索某個文件夾下的文件,搜索到則顯示相應(yīng)內(nèi)容所在的行數(shù)和內(nèi)容。具體步驟如下:
1)創(chuàng)建Java項目: File->New->Project->Java->JavaProject,點擊Next,在Create a Java Project填寫MySamples,點擊Next。
2)在Libraies標簽頁:
點擊External Jar按鈕,選擇com.ibm.streams.topology.jar
點擊Add Library按鈕,選擇IBM InfoSphere Streams
點擊Next和Finish完成項目的創(chuàng)建。新創(chuàng)建項目視圖如下圖所示:
3)創(chuàng)建命名空間:右擊src->New->Package->JavaPackage的Name填寫:mysapce
4)創(chuàng)建Java主類:src->右擊myspace->New->Class,在Name填寫:mysapce,確保勾選“public static void main(String[]args)”。確定后生成MyGrep.java。
5)創(chuàng)建Java類:src->右擊myspace->New->Class,在Name填寫:GrepInfo,不要勾選“public static void main(String[]args)”,確定后生成GrepInfo.java。
6)MyGrep.java和GrepInfo.java的代碼內(nèi)容如下:
MyGrep.java
package myspace; import java.io.ObjectStreamException; import java.util.Arrays; import java.util.concurrent.Future; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; import com.ibm.streamsx.topology.file.FileStreams; import com.ibm.streamsx.topology.function7.Function; publicclass MyGrep { publicstaticvoid main(String[] args) throws Exception { String contextType = args[0]; String directory = args[1]; final String term = args[2]; Topology topology = new Topology("MyGrep"); TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory); TStream<String> lines = FileStreams.textFileReader(filePaths); TStream<GrepInfo> grepInfo = lines.multiTransform( new Function<String, Iterable<GrepInfo>>() { privatestaticfinallongserialVersionUID = 1L; privateintlineNum = 0; @Override public Iterable<GrepInfo> apply(String line) { ++lineNum; if(line.contains(term)){ return Arrays.asList(new GrepInfo(lineNum, line)); } else returnnull; } private Object readResolve() throws ObjectStreamException { returnthis; } }, GrepInfo.class); grepInfo.print(); Future<?> future = StreamsContextFactory.getStreamsContext(contextType) .submit(topology); Thread.sleep(30 * 1000); future.cancel(true); } } |
GrepInfo.java
package myspace; import java.io.Serializable; import com.ibm.streamsx.topology.tuple.Keyable; publicclass GrepInfo implements Keyable<GrepInfo>, Serializable { privatestaticfinallongserialVersionUID = 1L; intlineNum; String lineStr; public GrepInfo(int ln, String ls) { this.lineNum = ln; this.lineStr = ls; } @Override public String toString() { return"Line Num " + lineNum + " : " + lineStr; } @Override public GrepInfo getKey() { // TODO Auto-generated method stub returnnull; } } |
7)運行MyGrep之前,請確保Streams Instance已經(jīng)啟動,并在/home/streamsadmin/test創(chuàng)建一個文本文件并寫如若干內(nèi)容。
8)運行程序:右擊MyGrep.java,選擇Run As -> RunConfigurations. 在Run Configurations 'Main' 標簽頁面,確保Project填寫MySamples和Main class填 myspace.MyGrep。
在 arguments標簽頁面, 設(shè)置Program arguments為DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams運行時環(huán)境,/home/streamsadmin/test是程序搜索關(guān)鍵的目錄;China是搜索關(guān)鍵字)。
9)查看結(jié)果:
在Streams Exploere -> StreamsInstances ->右擊default:<instance>@<Domain>,選擇Show Instance Graph
在Instance Graph窗口,我們能看到MyGrep最終運行圖。右擊最后的Print PE->Show Log->Show PEConsole
在Console將會顯現(xiàn)MyGrep運行的結(jié)果
總結(jié)
streams.topology開源項目所提供的Java Application API使得Streams開發(fā)者對流應(yīng)用的編程語言有了新的選擇,它能幫助開發(fā)者重用Java編程能力,并按照“流處理”的思路簡化流應(yīng)用的開發(fā)過程,讓開發(fā)者更專注于業(yè)務(wù)的處理邏輯而不是流處理的框架。然而,該項目還處于早期階段,很多功能和接口尚未實現(xiàn);對比成熟的、完善的SPL,Java Application API的功能和成熟性還有很大差距。相信在不久的將來,streams.topology將會逐漸完善并成為IBM Streams平臺的一個重要補充。
更多大數(shù)據(jù)與分析相關(guān)行業(yè)資訊、解決方案、案例、教程等請點擊查看>>>
詳情請咨詢在線客服!
客服熱線:023-66090381