Fate On Spark 大文件上传异常

Fate On Spark 大文件上传异常,第1张

Fate On Spark 大文件上传异常

目录
  • 背景
  • 复现
    • 代码
    • 内存信息
  • 总结
  • 解决方案
  • pipe详解
    • 介绍:
    • pipe 容量

背景

Fate On Spark在测试大文件上传的时候,遇到当文件大小超过fate所在机器内存大小的时候,上传任务最终状态为canceled,经过多次测试发现整个文件其实是已经完整上传到HDFS上,但是Fate在文件上传完毕之后会调用count()函数,当fate底层storage为hdfs时:count()执行逻辑其实就是将刚刚上传到hdfs的数据重新read一遍,读一行就做一下计数。问题就在于fate读取hdfs上数据使用的方法上面:python的第三方库pyarrow,

机器信息: 32核128G
hadoop文件 323.03G

复现 代码
import io
from pyarrow import fs

# path = "hdfs://fate-cluster//fate/input_data/experiment/student_homo_host"
path = "hdfs://fate-cluster//fate/input_data/experiment/idCard_1000_tag_10000w_*_20w_guest"

try:
    from pyarrow import HadoopFileSystem

    HadoopFileSystem(path)
except Exception as e:
    print("error")
_hdfs_client = fs.HadoopFileSystem.from_uri(path)


def _as_generator():
    with io.TextIOWrapper(buffer=_hdfs_client.open_input_stream, encoding="utf-8", newline=hdfs_utils.newline,
                          line_buffering=True, write_through=True) as reader:
        print("-----准备流结束------")
        for line in reader:
            yield line


def count():
    count = 0
    for _ in _as_generator():
        count += 1
    return count


if __name__ == '__main__':
    print(count())
内存信息

总结

直接用python直接调用时会出现Killed的系统异常信息(该信息无法在python层面被捕获)。运行在spark环境中时不会有系统的kill信息显示,生产环境中出现问题后并不好排查。目前推测HadoopFileSystem.open_input_stream()打开的文件流是整个文件大小。

解决方案

目前使用hadoop命令来替换之前的流处理,通过子进程执行hadoop fs -cat ${path}返回的标准输出来逐行读取内容。

  with subprocess.Popen(["hadoop", "fs", "-cat", self._path], universal_newlines=True, stdout=subprocess.PIPE) as pipe:
                for _ in pipe.stdout:
                    yield _

subprocess.Popen 是异步、subprocess.run是同步,

pipe详解

linux pipe详解

介绍:

Pipe 就是一个管道,一端可以将数据写入到pi pe,一端可以将数据从管道中取出

读写情况分析pipe IO and FIFOS

  1. 消费者读取一个空的pi pe,这个消费者会一直阻塞,知道pi pe中数据可用
  2. 生产者向一个满的pi pe中写数据,这个生产者也会一直阻塞,直到消费者读取了数据并足够消费者写入(非阻塞IO也可以设置O_NONBLOCK)
  3. 所有指向pi pe的生产者都已经关闭,访问此pi pe的消费者会会读取完毕整个文件,没有读取到则return 0
  4. 所有指向pi pe的消费者都已经关闭,访问此pi pe的生产者将生成一个SIGPIPE信号,如果忽略这个信号,则会报错,错误为EPIPE
pipe 容量
  1. Linux 2.6.11之前pi pe大小和页大小一致4KB
  2. Linux 2.6.11之后pi pe容量为16个页大小为64KB
  3. Linux2.6.35之后可以查看和设置F_GETPIPE_SZ and F_SETPIPE_SZ

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5687998.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存