更新時(shí)間:2019-10-16 來源:黑馬程序員 瀏覽量:
1、簡介
Flink CEP是在flink之上實(shí)現(xiàn)的復(fù)雜事件處理(CEP)庫,它允許我們在事件流中檢測事件的模式,讓我們有機(jī)會(huì)掌握數(shù)據(jù)中重要的事項(xiàng)。
本文章主要是介紹了flink cep中可用的api調(diào)用,首先介紹Pattern
API,它允許你指定要在事件流中檢測的模式,并介紹匹配事件并對其進(jìn)行操作。最后分析下CEP庫在處理事件時(shí)間延遲問題?!?strong>推薦了解大數(shù)據(jù)培訓(xùn)課程】
2、使用步驟
(1)首先我們需要引入cep的依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.5.0</version> </dependency>
(2)確定equals()和hashcode()方法
如果使用CEP,需要我們在datastream中的事件實(shí)現(xiàn)正確的equals()和hashcode()方法,因?yàn)镕link CEP使用他們來比較和匹配事件。
簡單demo代碼:
al input: DataStream[Event] = ... val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))
3、Pattern API
Pattern API允許你定義要從輸入流中提取的復(fù)雜模式序列。
每個(gè)復(fù)雜模式序列都是由多個(gè)簡單模式組成,簡單模式就是尋找具有相同屬性的單個(gè)事件的模式,我們可以先定義一些簡單的模式,然后組合成復(fù)雜的序列模式。
可以將模式序列視為此類模式的結(jié)構(gòu)圖,基于用戶指定的條件從一個(gè)模式轉(zhuǎn)換到下一個(gè)模式,例如:
event.getName().equals(“start”).
匹配的是一系列輸入事件,通過一系列有效的模式轉(zhuǎn)換訪問復(fù)雜模式圖中的所有模式。注意每個(gè)模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來標(biāo)識匹配的事件。模式名稱中不能包含字符”:”。
下面我們首先介紹如何定義單個(gè)模式,然后再將各個(gè)模式組合到復(fù)雜模式中。
單個(gè)模式
Pattern可以是單個(gè),也可以是循環(huán)模式,單個(gè)模式接收單個(gè)事件,而循環(huán)模式可以接收多個(gè)事件,在模式匹配符號中,模式“a b + c?d”(或“a”,后跟一個(gè)或多個(gè)“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。
默認(rèn)情況下,模式是單個(gè)模式,可以使用Quantifiers將其轉(zhuǎn)換為循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件,基于它接收的事件。
Quantifiers
在FlinkCEP中,可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個(gè)或多個(gè)事件發(fā)生的模式(例如之前提到的b+);用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,對于名為start的模式,以下是有效的Quantifiers:
// expecting 4 occurrences start.times(4); // expecting 0 or 4 occurrences start.times(4).optional(); // expecting 2, 3 or 4 occurrences start.times(2, 4); // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy(); // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy(); // expecting 1 or more occurrences start.oneOrMore(); // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy(); // expecting 0 or more occurrences start.oneOrMore().optional(); // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy(); // expecting 2 or more occurrences start.timesOrMore(2); // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy(); // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy();
Conditions條件
每個(gè)模式中,從一個(gè)模式轉(zhuǎn)到下一個(gè)模式,可以指定其他條件,我們可以使用下面這些條件:
1.傳入事件的屬性,例如其值應(yīng)大于5,或者大于先前接收的事件的平均值;
2.匹配事件的連續(xù)性,例如檢測模式a,b,c序列中不能有任何非匹配事件。
Conditions on Properties關(guān)于屬性的條件
可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件,條件可以是iterativeConditions或SimpleConditions。
1.迭代條件
這是最常見的條件類型,你可以指定一個(gè)條件,該條件基于先前接收的事件的屬性或器子集的統(tǒng)計(jì)信息來接收后續(xù)事件。
下面代碼說的是:如果名稱以”foo”開頭同時(shí)如果該模式的先前接收的事件的價(jià)格總和加上當(dāng)前事件的價(jià)格不超過該值5.0,則迭代條件接收名為”middle”的模式的下一個(gè)事件:迭代條件可以很強(qiáng)大,尤其是與循環(huán)模式相結(jié)合,例如:oneOrMore();
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => { lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } )
注意對context.getEventsForPattern()的調(diào)用將為給定潛在匹配項(xiàng)查找所有先前接收的事件,此操作代價(jià)可能會(huì)變化巨大,因此應(yīng)盡量減少其使用。
2.簡單條件
這種類型的條件時(shí)擴(kuò)展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接收事件:
start.where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) { return value.getName().startsWith("foo"); }});
此外還可以通過pattern.subtype(subclass)方法將接收事件的類型限定為初始事件類型的子類型:
start.where(event => event.getName.startsWith("foo"))
組合條件:
如上所示,可以將子類型條件與其他條件組合使用,這適用于所有條件。我們可以通過順序調(diào)用where()來任意組合條件。最終結(jié)果將是各個(gè)條件的結(jié)果的邏輯and,要使用or組合條件,可以使用or()方法,如下所示:
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
停止條件
在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如:接收值大于5的事件,直到其值的總和小于50。
我們看個(gè)例子來更好的理解:
給定模式:(a+ until b),b之前,要出現(xiàn)一個(gè)或者多個(gè)a,
給定輸入的序列:a1,c,a2,b,a3
輸出結(jié)果:{a1 a2}{a1}{a2}{a3}
我們可以看到{a1,a2,a3},{a2,a3}兩個(gè)并沒有輸出,這就是停止條件的作用。
連續(xù)事件的條件
Flink CEP支持事件之間以一下形式連續(xù):
嚴(yán)格連續(xù)性:希望所有匹配事件一個(gè)接一個(gè)的出現(xiàn),中間沒有任何不匹配的事件;
寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)不匹配事件,不能忽略兩個(gè)事件之間的匹配事件。
非確定性輕松連續(xù)性:進(jìn)一步放寬連續(xù)性,允許忽略某些匹配事件的其它匹配。
為了解釋上面的內(nèi)容,我們舉個(gè)例子。假如有個(gè)模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:
嚴(yán)格連續(xù)性:{a2 b} - 由于c的存在導(dǎo)致a1被廢棄
寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略
非確定性寬松連續(xù)性:{a1 b}, {a2 b}, 和 {a1 a2 b}
對于循環(huán)模式(例如oneOrMore()和times()),默認(rèn)是寬松的連續(xù)性。 如果你想要嚴(yán)格的連續(xù)性,你必須使用consecutive()顯式指定它, 如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。
組合模式
簡介
已經(jīng)了解了單個(gè)模式的樣子,現(xiàn)在是時(shí)候看看如何將它們組合成一個(gè)完整的模式序列。
模式序列必須以初始模式開始,如下所示:
Patternstart = Pattern.begin("start");
接下來,您可以通過指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。
在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴(yán)格,寬松和非確定性寬松,以及如何在循環(huán)模式中應(yīng)用它們。 要在連續(xù)模式之間應(yīng)用它們,可以使用:
next() 對應(yīng)嚴(yán)格, followedBy() 對應(yīng)寬松連續(xù)性 followedByAny() 對應(yīng)非確定性寬松連續(xù)性亦或
notNext() 如果不希望一個(gè)事件類型緊接著另一個(gè)類型出現(xiàn)。 notFollowedBy() 不希望兩個(gè)事件之間任何地方出現(xiàn)該事件。 注意
模式序列不能以notFollowedBy()結(jié)束。 注意 NOT模式前面不能有可選模式。
// strict contiguity Pattern<Event, ?> strict = start.next("middle").where(...); // relaxed contiguity Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // non-deterministic relaxed contiguity Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // NOT pattern with strict contiguity Pattern<Event, ?> strictNot = start.notNext("not").where(...); // NOT pattern with relaxed contiguity Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
寬松連續(xù)性指的是僅第一個(gè)成功匹配的事件會(huì)被匹配到,然而非確定性寬松連續(xù)性,相同的開始會(huì)有多個(gè)匹配結(jié)果發(fā)出。距離,如果一個(gè)模式是"a b",給定輸入序列是"a c b1 b2"。對于不同連續(xù)性會(huì)有不同輸出。
a和b之間嚴(yán)格連續(xù)性,將會(huì)返回{},也即是沒有匹配。因?yàn)閏的出現(xiàn)導(dǎo)致a,拋棄了。
a和b之間寬松連續(xù)性,返回的是{a,b1},因?yàn)閷捤蛇B續(xù)性將會(huì)拋棄為匹配成功的元素,直至匹配到下一個(gè)要匹配的事件。
a和b之間非確定性寬松連續(xù)性,返回的是{a,b1},{a,b2}。
也可以為模式定義時(shí)間約束。 例如,可以通過pattern.within()方法定義模式應(yīng)在10秒內(nèi)發(fā)生。 時(shí)間模式支持處理時(shí)間和事件時(shí)間。 注意模式序列只能有一個(gè)時(shí)間約束。 如果在不同的單獨(dú)模式上定義了多個(gè)這樣的約束,則應(yīng)用最小的約束。
next.within(Time.seconds(10));
可以為begin,followBy,followByAny和next定義一個(gè)模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且 可對GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。