QBoard » Big Data » Big Data - Data Ingestion Tools : Sqoop, Flume, Kafka, Nifi.. » How to setup a HTTP Source for testing Flume setup?

How to setup a HTTP Source for testing Flume setup?

  • I am a newbie to Flume and Hadoop. We are developing a BI module where we can store all the logs from different servers in HDFS.

    For this I am using Flume. I just started trying it out. Succesfully created a node but now I am willing to setup a HTTP source and a sink that will write incoming requests over HTTP to local file.

    Any suggesstions?

    Thanks in Advance/

      December 30, 2020 1:31 PM IST
    0
  • Hopefully this helps you get started. I'm having some problems testing this on my machine and don't have time to fully troubleshoot it right now, but I'll get to that...

    Assuming you have Flume up and running right now, this should be what your flume.conf file needs to look like to use an HTTP POST source and local file sink (note: this goes to a local file, not HDFS)

    ########## NEW AGENT ########## 
    # flume-ng agent -f /etc/flume/conf/flume.httptest.conf -n httpagent
    # 
    
    # slagent = SysLogAgent
    ###############################
    httpagent.sources = http-source
    httpagent.sinks = local-file-sink
    httpagent.channels = ch3
    
    # Define / Configure Source (multiport seems to support newer "stuff")
    ###############################
    httpagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
    httpagent.sources.http-source.channels = ch3
    httpagent.sources.http-source.port = 81
    
    
    # Local File Sink
    ###############################
    httpagent.sinks.local-file-sink.type = file_roll
    httpagent.sinks.local-file-sink.channel = ch3
    httpagent.sinks.local-file-sink.sink.directory = /root/Desktop/http_test
    httpagent.sinks.local-file-sink.rollInterval = 5
    
    # Channels
    ###############################
    httpagent.channels.ch3.type = memory
    httpagent.channels.ch3.capacity = 1000

     

    Start Flume with the command on the second line. Tweak it for your needs (port, sink.directory, and rollInterval especially). This is a pretty bare minimum config file, there are more options availible, check out the Flume User Guide. Now, as far as this goes, the agent starts and runs fine for me....

    Here's what I don't have time to test. The HTTP agent, by default, accepts data in JSON format. You -should- be able to test this agent by sending a cURL request with a form something like this:

    curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '{"username":"xyz","password":"123"}' http://yourdomain.com:81/


    -X sets the request to POST, -H sends headers, -d sends data (valid json), and then the host:port. The problem for me is that I get an error:

    WARN http.HTTPSource: Received bad request from client. org.apache.flume.source.http.HTTPBadRequestException: Request has invalid JSON Syntax.


    in my Flume client, invalid JSON? So something is being sent wrong. The fact that an error is popping up though shows the Flume source is receiving data. Whatever you have that's POSTing should work as long as it's in a valid format.

      August 2, 2021 2:57 PM IST
    0
  • It's a bit hard to tell exactly what you want from the way the Question is worded, but I'm operating on the assumption that you want to send JSON to Flume using HTTP POST requests and then have Flume dump those JSON events to HDFS (Not the Local File System). If that's what you want to do, this is what you need to do.

    1. Make sure you create a directory in HDFS for Flume to send the events to, first. For example, if you want to send events to /user/flume/events in HDFS, you'll probably have to run the following commands:

    $ su - hdfs
    $ hdfs dfs -mkdir /user/flume
    $ hdfs dfs -mkdir /user/flume/events
    $ hdfs dfs -chmod -R 777 /user/flume
    $ hdfs dfs -chown -R flume /user/flume​

    2.Configure Flume to use an HTTP Source and an HDFS Sink. You'll want to make sure to add in the interceptors for Host and Timestamp, otherwise your events will cause exceptions in the HDFS Sink because that sink is expecting a Host and Timestamp in the Event Headers. Also make sure to expose the port on the server that the Flume HTTPSource is listening on.

    Here's a sample Flume config that works for the Cloudera Quickstart Docker container for CDH-5.7.0

    # Please paste flume.conf here. Example:
    
    # Sources, channels, and sinks are defined per # agent name, in this case 'tier1'.
    tier1.sources  = source1
    tier1.channels = channel1
    tier1.sinks    = sink1
    tier1.sources.source1.interceptors = i1 i2 
    tier1.sources.source1.interceptors.i1.type = host
    tier1.sources.source1.interceptors.i1.preserveExisting = false
    tier1.sources.source1.interceptors.i1.hostHeader = host
    tier1.sources.source1.interceptors.i2.type = timestamp
    
    # For each source, channel, and sink, set # standard properties.
    tier1.sources.source1.type     = http
    tier1.sources.source1.bind     = 0.0.0.0
    tier1.sources.source1.port     = 5140
    # JSONHandler is the default for the httpsource # 
    tier1.sources.source1.handler = org.apache.flume.source.http.JSONHandler
    tier1.sources.source1.channels = channel1
    tier1.channels.channel1.type   = memory
    tier1.sinks.sink1.type         = hdfs
    tier1.sinks.sink1.hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S
    tier1.sinks.sink1.hdfs.filePrefix = event-file-prefix-
    tier1.sinks.sink1.hdfs.round = false
    tier1.sinks.sink1.channel      = channel1
    
    # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel.
    tier1.channels.channel1.capacity = 1000


    3.It's necessary to create a Flume Client that can send the JSON events to the Flume HTTP in the format that it expects (this client could be as simple as a curl request). The most important thing about the format is that the JSON "body": key must have a value that is a String. "body": cannot be a JSON object - if it is, the Gson library that the Flume JSONHandler is using to parse the JSONEvents will throw exceptions because it won't be able to parse the JSON - it is expecting a String.

    This is the JSON format you need:

    [
      {
        "headers": {
          "timestamp": "434324343",
          "host": "localhost",
        },
        "body": "No matter what, this must be a String, not a list or a JSON object",
      },
      { ... following events take the same format as the one above ...}
    ]


    It's a bit hard to tell exactly what you want from the way the Question is worded, but I'm operating on the assumption that you want to send JSON to Flume using HTTP POST requests and then have Flume dump those JSON events to HDFS (Not the Local File System). If that's what you want to do, this is what you need to do.

    Make sure you create a directory in HDFS for Flume to send the events to, first. For example, if you want to send events to /user/flume/events in HDFS, you'll probably have to run the following commands:

    $ su - hdfs
    $ hdfs dfs -mkdir /user/flume
    $ hdfs dfs -mkdir /user/flume/events
    $ hdfs dfs -chmod -R 777 /user/flume
    $ hdfs dfs -chown -R flume /user/flume


    Configure Flume to use an HTTP Source and an HDFS Sink. You'll want to make sure to add in the interceptors for Host and Timestamp, otherwise your events will cause exceptions in the HDFS Sink because that sink is expecting a Host and Timestamp in the Event Headers. Also make sure to expose the port on the server that the Flume HTTPSource is listening on.

    Here's a sample Flume config that works for the Cloudera Quickstart Docker container for CDH-5.7.0

    # Please paste flume.conf here. Example:

    # Sources, channels, and sinks are defined per # agent name, in this case 'tier1'.
    tier1.sources = source1
    tier1.channels = channel1
    tier1.sinks = sink1
    tier1.sources.source1.interceptors = i1 i2
    tier1.sources.source1.interceptors.i1.type = host
    tier1.sources.source1.interceptors.i1.preserveExisting = false
    tier1.sources.source1.interceptors.i1.hostHeader = host
    tier1.sources.source1.interceptors.i2.type = timestamp

    # For each source, channel, and sink, set # standard properties.
    tier1.sources.source1.type = http
    tier1.sources.source1.bind = 0.0.0.0
    tier1.sources.source1.port = 5140
    # JSONHandler is the default for the httpsource #
    tier1.sources.source1.handler = org.apache.flume.source.http.JSONHandler
    tier1.sources.source1.channels = channel1
    tier1.channels.channel1.type = memory
    tier1.sinks.sink1.type = hdfs
    tier1.sinks.sink1.hdfs.path = /user/flume/events/%y-%m-%d/%H%M/%S
    tier1.sinks.sink1.hdfs.filePrefix = event-file-prefix-
    tier1.sinks.sink1.hdfs.round = false
    tier1.sinks.sink1.channel = channel1

    # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel.
    tier1.channels.channel1.capacity = 1000
    It's necessary to create a Flume Client that can send the JSON events to the Flume HTTP in the format that it expects (this client could be as simple as a curl request). The most important thing about the format is that the JSON "body": key must have a value that is a String. "body": cannot be a JSON object - if it is, the Gson library that the Flume JSONHandler is using to parse the JSONEvents will throw exceptions because it won't be able to parse the JSON - it is expecting a String.

    This is the JSON format you need:

    [
    {
    "headers": {
    "timestamp": "434324343",
    "host": "localhost",
    },
    "body": "No matter what, this must be a String, not a list or a JSON object",
    },
    { ... following events take the same format as the one above ...}
    ]
    Troubleshooting

    If Flume is sending your client (such as Curl) 200 OK Success messages, but you don't see any files on HDFS, check the Flume logs. An issue I ran into early on was that my Flume Channel didn't have enough capacity and couldn't receive any events as a result. If that happens, the Channel or the HTTPSource will throw Exceptions that you will be able to see in the Flume Logs (probably in /var/log/flume-ng/). To fix this problem, increase the tier1.channels.channel1.capacity.
    If you see Exceptions in the Flume logs indicating either that Flume couldn't write to HDFS because of permissions, or because the destination directory couldn't be found, check to make sure you created the destination directory in HDFS and opened up its permissions as detailed in Step 1, above.
      August 4, 2021 2:40 PM IST
    0