This commit is contained in:
2025-12-05 16:57:52 -05:00
commit 2f3d42cf5f
23 changed files with 1264 additions and 0 deletions

View File

@@ -0,0 +1,88 @@
defmodule FileProcessor do
@moduledoc """
Documentation for `FileProcessor`.
"""
use GenServer
# Client
@server FileProcessor.Server
def start_link(initial_values) do
GenServer.start_link(__MODULE__, initial_values, name: @server)
end
def put_file({filename, contents}) do
GenServer.call(@server, {:put_file, filename, contents})
end
def get_files() do
GenServer.call(@server, :get_files)
end
def delete_file(filename) do
GenServer.call(@server, {:delete_file, filename})
end
def put_rfp({filename, contents}) do
GenServer.call(@server, {:put_rfp, filename, contents})
end
def get_rfp() do
GenServer.call(@server, :get_rfp)
end
def delete_rfp(filename) do
GenServer.call(@server, {:delete_rfp, filename})
end
def process_files() do
GenServer.call(@server, :process_files)
end
# Server
alias FileProcessor.Impl
@impl true
def init(initial_values) do
Impl.do_init()
{:ok, initial_values}
end
@impl true
def handle_call({:put_file, {filename, file}}, _from, current_state) do
{:reply, Impl.put_file(filename, file), current_state}
end
@impl true
def handle_call(:get_files, _from, current_state) do
{:reply, Impl.get_files(), current_state}
end
@impl true
def handle_call({:delete_file, filename}, _from, current_state) do
{:reply, Impl.delete_file(filename), current_state}
end
@impl true
def handle_call({:put_rfp, {filename, file}}, _from, current_state) do
{:reply, Impl.put_rfp(filename, file), current_state}
end
@impl true
def handle_call(:get_rfp, _from, current_state) do
{:reply, Impl.get_rfp(), current_state}
end
@impl true
def handle_call({:delete_rfp, filename}, _from, current_state) do
{:reply, Impl.delete_rfp(filename), current_state}
end
@impl true
def handle_call(:process_files, _from, current_state) do
{:reply, Impl.process_files(), current_state}
end
end

View File

@@ -0,0 +1,23 @@
defmodule FileProcessor.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@impl true
def start(_type, _args) do
children = [
# Starts a worker by calling: FileProcessor.Worker.start_link(arg)
{FileProcessor, %{}},
PdfExtractor,
{ResultsServer, %{:results=>%{}, :good=>[], :bad=>[]}},
{Phoenix.PubSub, name: TrustedEdgeServer.PubSub}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: FileProcessor.Supervisor]
Supervisor.start_link(children, opts)
end
end

View File

@@ -0,0 +1,230 @@
defmodule FileProcessor.Impl do
require Logger
@output_dir Application.fetch_env!(:file_processor, :filepath)
@rfp_dir Application.fetch_env!(:file_processor, :rfppath)
@gpu_node :gpu@kittykat
def do_init() do
Logger.debug("Using dir: #{@output_dir} ")
res = File.mkdir_p(@output_dir)
Logger.debug("Result = #{res}")
Logger.debug("Using rfp dir: #{@rfp_dir} ")
res = File.mkdir_p(@rfp_dir)
Logger.debug("Result = #{res}")
end
def put_file(filename, contents) do
Logger.debug("Using dir: #{@output_dir} ")
Logger.debug("Handling #{filename}...")
full_path = Path.join(@output_dir, filename)
case File.write(full_path, contents) do
:ok ->
# Return all files so liveview can update
get_files()
{:error, reason} ->
{:ok, files} = get_files()
{:error, reason, files}
end
end
def get_files() do
File.ls(@output_dir)
end
def delete_file(filename) do
Logger.debug("Deleting #{filename}...")
full_path = Path.join(@output_dir, filename)
case File.rm(full_path) do
:ok ->
get_files()
{:error, reason} ->
{:ok, files} = get_files()
{:error, reason, files}
end
end
def put_rfp(filename, contents) do
Logger.debug("Using dir: #{@rfp_dir} ")
Logger.debug("Handling #{filename}...")
# Remove old one(s)
files_to_delete = Path.wildcard(Path.join(@rfp_dir, "*"))
Enum.each(files_to_delete, fn file -> File.rm(file) end)
# Write new one
full_path = Path.join(@rfp_dir, filename)
case File.write(full_path, contents) do
:ok ->
get_rfp()
{:error, reason} ->
{:ok, rfps} = get_rfp()
{:error, reason, rfps}
end
end
def get_rfp() do
File.ls(@rfp_dir)
end
def delete_rfp(filename) do
Logger.debug("Deleting RFP #{filename}...")
full_path = Path.join(@rfp_dir, filename)
case File.rm(full_path) do
:ok ->
get_rfp()
{:error, reason} ->
{:ok, rfps} = get_rfp()
{:error, reason, rfps}
end
end
def process_files() do
# Only do work if RFP uploaded
{:ok, rfp} = get_rfp()
if Enum.empty?(rfp) do
{:error, :rfp_missing}
else
# Start work for all files
# Reset results since new job started
ResultsServer.reset()
res = Node.connect(@gpu_node)
Logger.debug("GPU Node connection: #{inspect(res)}")
case get_files() do
{:ok, files} ->
Enum.each(files, fn filepath ->
full_path = Path.join(@output_dir, filepath)
spawn(fn -> FileProcessor.Impl.handle_file(full_path) end)
end)
{:error, reason} ->
Logger.error("Failed to read files dir: #{reason}")
end
# Start work for RFP
case get_rfp() do
{:ok, files} ->
Enum.each(files, fn filepath ->
full_path = Path.join(@rfp_dir, filepath)
spawn(fn -> FileProcessor.Impl.handle_rfp(full_path) end)
end)
{:error, reason} ->
Logger.error("Failed to read rfp file dir: #{reason}")
end
{:ok}
end
end
def handle_file(filepath) do
Logger.debug("Processing #{filepath}")
filetype = get_filetype(filepath)
text = extract_text(filetype, filepath)
#Logger.debug("Got text from #{filepath}: #{inspect(text)}")
chunks =
case text do
{:error} ->
# Send failure to results server
[]
good_text ->
chunks = TextChunker.split(good_text, chunk_size: 100, chunk_overlap: 10)
#Logger.debug("Chunks: #{inspect(chunks)}")
chunks
end
all_chunks =
Enum.map(chunks, fn chunk ->
# Logger.debug("Chunk: #{chunk.text}")
embedding = GenServer.call({BertEmbedding, @gpu_node}, {:embed, chunk.text}, :infinity)
# Logger.debug("Embedding: #{inspect(embedding)}")
%ChunkResult{chunk: chunk, embedding: embedding}
end)
results = %FileResult{filename: Path.basename(filepath), chunks: all_chunks}
#Logger.debug("Result done struct = #{inspect(results)}")
ResultsServer.put_result(results)
end
def handle_rfp(filepath) do
Logger.debug("Processing RFP #{filepath}")
end
defp get_filetype(filepath) do
res = FileType.from_path(filepath)
Logger.debug("Filetype = #{inspect(res)} for #{filepath}")
case res do
{:ok, {type, _}} ->
type
{:error, _} ->
if String.ends_with?(filepath, "txt") do
"txt"
else
{:error}
end
end
end
#############################################
# Extract text functions for each filetype
#############################################
defp extract_text("txt", filepath) do
Logger.debug("Extracting from text file: #{filepath}")
case File.read(filepath) do
{:ok, content} ->
content
{:error, reason} ->
Logger.error("Error reading file: #{reason}")
{:error}
end
end
defp extract_text("pdf", filepath) do
Logger.debug("Extracting from pdf file: #{filepath}")
case PdfExtractor.extract_text_timeout(filepath, :infinity) do
{:ok, res} ->
values = Map.values(res)
Enum.join(values, " ")
{:error, reason} ->
Logger.error("Error extracting pdf: #{filepath}, #{inspect(reason)}")
{:error}
_ ->
Logger.error("Error surprsing value from pdf extraction: #{filepath}")
{:error}
end
end
defp extract_text(unk, filepath) do
Logger.error("Unknown filetype for file: #{filepath}, received: #{inspect(unk)}")
{:error}
end
#######################################
end

View File

@@ -0,0 +1,7 @@
defmodule FileResult do
defstruct filename: "", chunks: []
end
defmodule ChunkResult do
defstruct chunk: %TextChunker.Chunk{}, embedding: %{}
end

View File

@@ -0,0 +1,89 @@
defmodule ResultsServer do
require Logger
use GenServer
@moduledoc """
Genserver with data:
%{results => %{filename->FileResult, filename2->FileResult},
good => [filename1, filename2],
bad => [badfiles]}
"""
# Client
#
def start_link(initial_values) do
GenServer.start_link(__MODULE__, initial_values, name: ResultsServer)
end
@impl true
def init(initial_values) do
{:ok, initial_values}
end
def put_result(file_result) do
GenServer.cast(ResultsServer, {:put_result, file_result})
end
def print_results() do
GenServer.cast(ResultsServer, :print_results)
end
def reset() do
GenServer.cast(ResultsServer, :reset)
end
# Server
@impl true
def handle_cast({:put_result, file_result}, current_state) do
filename = file_result.filename
chunks = file_result.chunks
Logger.debug("Got result: #{inspect(filename)}")
{new_state, status} =
case chunks do
[] ->
{Map.put(current_state, :bad, current_state.bad ++ [filename]), :bad}
_ ->
new_state = Map.put(current_state, :good, current_state.good ++ [filename])
results = new_state.results
new_results = Map.put(results, filename, file_result)
{Map.put(new_state, :results, new_results), :good}
end
done_files = new_state.bad ++ new_state.good
{:ok, all_files} = FileProcessor.get_files()
remaining = all_files -- done_files
p = length(done_files)/length(all_files)
Phoenix.PubSub.broadcast(TrustedEdgeServer.PubSub, "the_topic", {:new_result, {filename, status}})
Phoenix.PubSub.broadcast(TrustedEdgeServer.PubSub, "the_topic", {:new_percent_done, p})
Logger.info("Percent done: #{p}")
Logger.info("Remaining: #{inspect(remaining)}")
{:noreply, new_state}
end
@impl true
def handle_cast(:print_results, current_state) do
good = current_state.good
bad = current_state.bad
results = current_state.results
Logger.debug("Good: #{inspect(good)}")
Logger.debug("Bad: #{inspect(bad)}")
Enum.each(results, fn {key, value} ->
Logger.debug("File: #{key}")
Logger.debug("Chunks: #{inspect(value.chunks)}")
end)
{:noreply, current_state}
end
@impl true
def handle_cast(:reset, current_state) do
{:noreply, %{:results=>%{}, :good=>[], :bad=>[]}}
end
end