EMRPHP流媒体创建不同的输出文件,而不是在本地机器上运行

epggiuax  于 2021-05-30  发布在  Hadoop
关注(0)|答案(0)|浏览(217)

我已经创建了转换json->avro的php代码。


# !/usr/bin/php

<?php
require_once 'vendor/autoload.php';

error_reporting(E_ALL);
ini_set('display_errors', 1);

//$outputFile = __DIR__ . '/test_avro_out.avr';
$avroJsonSchema = file_get_contents(__DIR__ . '/HttpRequestEvent.avsc');
//Open $file_name for writing, using the given writer's schema
$avroWriter = AvroDataIO::open_file('php://stdout', 'w', $avroJsonSchema);
$counter = 1;
while (($buf = fgets(STDIN)) !== false) {
    try {
        //replace ,null: with ,"null": to prevent map keys which are not strings.
        $original = array("null:","userIp","[null]");
        $replaceWith   = array("\"null\":", "userIP","[]");
        $data = json_decode(str_replace($original, $replaceWith, $buf), true);
        if ($data === false || $data == null ) {
            throw new InvalidArgumentException("Unable to parse JSON line");
        }

        $mapped = map_request_event($data);
        $avroWriter->append($mapped);

    } catch (Exception $ex) {
        fprintf(STDERR, "Caught exception: %s\n", $ex->getMessage());
        fprintf(STDERR, "Line num: %s\n",$counter);
        fprintf(STDERR, "buf: %s\n", $buf);
    }
    $counter++;
}
$avroWriter->close();

function map_request_event($data)
{
    if (empty($data['data']['__props']) || empty($data['data']['__props']['eventData']) || !isset($data['data']['__props']['eventData']['__props'])) {
        throw new InvalidArgumentException("Got invalid event object");
    }

    $tsData = $data['data']['__props'];
    $data = $data['data']['__props']['eventData']['__props'];
    $reqData = $data['generalInformation']['__props'];

    $mapped = array(
        'request_id' => safe_read_key($reqData,'requestId'),
        'timestamp' => (double)safe_read_key($tsData,'timeStamp'),
        'account_id' => intval(safe_read_key($reqData,'accountId')),
        'site_id' => intval(safe_read_key($reqData,'siteId')),
        'user_id' => (string)safe_read_key($reqData,'userId'),
        'http_return_code' => (string)safe_read_key($reqData,'httpReturnCode'),
        'request_headers' => safe_read_key($reqData,'requestHeaders'), // FIXME: Make sure header values do not contain NULL
        'session_id' => (string)safe_read_key($reqData,'sessionId'),
        'request_method' => safe_read_key($reqData,'httpRequestMethod'),
        'request_time' => (double)safe_read_key($reqData,'requestTime'),
        'user_agent' => (string)safe_read_key($reqData,'userAgent'),
        'referrer_uri' => (string)safe_read_key($reqData,'referrerURI'),
        'request_uri' => safe_read_key($reqData,'requestURI'),
        //'url_params_get' => safe_read_key($reqData,'urlParamsGet'),
        //'url_params_post' => safe_read_key($reqData,'urlParamsPost'),
        'server' => safe_read_key($reqData,'server'),
        'user_ip' => safe_read_key($reqData,'userIP'),
        'charset' => safe_read_key($reqData,'charset'),
        'language' => safe_read_key($reqData,'language'),
    //'full_url' => safe_read_key($reqData,'fullUrl'),
        'activities' => array_map('map_activity', safe_read_key($data,'logEventArray')),
    );

    return $mapped;
}

function map_activity($data)
{
    $activity = $data['__props'];

    $mapped = array(
        'activity_time' => $activity['dateTime'],
        'activity_id' => intval($activity['activityId']),
        'scenario' => (string)$activity['scenario'],
        'count' => intval($activity['count']),
    );

    return $mapped;
}

function safe_read_key($data, $key, $default = null)
{
    if (!isset($data[$key]) && !array_key_exists($key, $data)) {
        if ($default !== null) {
            return $default;
        } else {
            throw new Exception('Key Does not exist');
        }
    }

    return $data[$key];
}

我正在使用emr流和s3作为输入/输出。此代码设置为Map器,而reducer设置为none。
在同一输入文件上运行此代码时:
在本地机器上->我得到一个普通的avro文件。
在emr->文件已损坏,无法读取。
以下是压缩的两个结果文件的链接:http://speedy.sh/p9fhd/test.7z

part-00000 is the EMR Avro result file.
test.avro is the local one.

我试着比较这些。有一个模式,但不足以理解哪里出了问题。
我使用Map绘制器有什么问题吗?比较两个二进制文件可以扣除什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题