小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

flume interceptors flume攔截器

 行者花雕 2022-12-24 發(fā)布于北京

flume用戶自定義攔截器.創(chuàng)建flume-demo的maven項(xiàng)目.

創(chuàng)建項(xiàng)目文件POM.xml.

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>
package com.kpwong.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    //單個(gè)事件攔截
    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String body = new String( event.getBody());

        if (body.contains("hello")){
            headers.put("topic","letter");
        }
        else
        {
            headers.put("topic","number");
        }

        return event;
    }

    //多個(gè)事件攔截
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包項(xiàng)目jar文件。拷貝文件到/flume/lib目錄下

 

 

 配置conf文件.準(zhǔn)備三臺(tái)機(jī)器(hadoop202,hadoop203,hadoop204)

在hadoop202上。配置flume2.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#channel interceptors
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop203
a2.sinks.k1.port = 4141

a2.sinks.k2.type=avro
a2.sinks.k2.hostname = hadoop204
a2.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

 攔截器配置代碼:

a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
hadoop203上配置flume3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop203
a3.sources.r1.port = 4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
hadoop204上配置:
a4.sources = r1
a4.sinks = k1
a4.channels = c1
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop204
a4.sources.r1.port = 4142
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1

在hadoop204上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf  -n a4 -Dflume.root.logger=INFO,console

在hadoop203上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console

在hadoop202上運(yùn)行:

bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2

 nc localhost 44444

實(shí)驗(yàn)結(jié)果:

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多